diff --git a/cmd/gpt-load/main.go b/cmd/gpt-load/main.go index 854e4c7..b4108dc 100644 --- a/cmd/gpt-load/main.go +++ b/cmd/gpt-load/main.go @@ -95,7 +95,7 @@ func main() { // --- keyManualValidationService := services.NewKeyManualValidationService(database, keyValidatorService, taskService, settingsManager, configManager) - keyCronService := services.NewKeyCronService(database, settingsManager, KeyValidationPool) + keyCronService := services.NewKeyCronService(database, settingsManager, KeyValidationPool, storage) keyCronService.Start() defer keyCronService.Stop() diff --git a/internal/services/key_cron_service.go b/internal/services/key_cron_service.go index 9280546..9ea2f4f 100644 --- a/internal/services/key_cron_service.go +++ b/internal/services/key_cron_service.go @@ -4,6 +4,7 @@ import ( "context" "gpt-load/internal/config" "gpt-load/internal/models" + "gpt-load/internal/store" "sync" "time" @@ -11,30 +12,40 @@ import ( "gorm.io/gorm" ) +const ( + leaderLockKey = "cron:leader:key_validation" + leaderLockTTL = 10 * time.Minute +) + // KeyCronService is responsible for periodically submitting keys for validation. type KeyCronService struct { DB *gorm.DB SettingsManager *config.SystemSettingsManager Pool *KeyValidationPool + Store store.Store stopChan chan struct{} wg sync.WaitGroup } // NewKeyCronService creates a new KeyCronService. -func NewKeyCronService(db *gorm.DB, settingsManager *config.SystemSettingsManager, pool *KeyValidationPool) *KeyCronService { +func NewKeyCronService(db *gorm.DB, settingsManager *config.SystemSettingsManager, pool *KeyValidationPool, store store.Store) *KeyCronService { return &KeyCronService{ DB: db, SettingsManager: settingsManager, Pool: pool, + Store: store, stopChan: make(chan struct{}), } } -// Start begins the cron job and the results processor. +// Start begins the leader election and cron job execution. func (s *KeyCronService) Start() { - logrus.Info("Starting KeyCronService...") - s.wg.Add(2) - go s.run() + logrus.Info("Starting KeyCronService with leader election...") + s.wg.Add(1) + go s.leaderElectionLoop() + + // processResults still needs to run independently as it handles results from the Pool + s.wg.Add(1) go s.processResults() } @@ -46,21 +57,96 @@ func (s *KeyCronService) Stop() { logrus.Info("KeyCronService stopped.") } -// run is the main ticker loop that triggers validation cycles. -func (s *KeyCronService) run() { +// leaderElectionLoop is the main loop that attempts to acquire leadership. +func (s *KeyCronService) leaderElectionLoop() { defer s.wg.Done() - // Run once on start, then start the ticker. + + for { + select { + case <-s.stopChan: + return + default: + // Attempt to acquire the leader lock + isLeader, err := s.tryAcquireLock() + if err != nil { + logrus.Errorf("KeyCronService: Error trying to acquire leader lock: %v. Retrying in 1 minute.", err) + time.Sleep(1 * time.Minute) // Wait for a while before retrying on error + continue + } + + if isLeader { + // Successfully became the leader, start executing the cron job + s.runAsLeader() + } else { + // Failed to become the leader, enter standby mode + logrus.Debug("KeyCronService: Not the leader. Standing by.") + // Wait for a lock TTL duration before trying again to avoid frequent contention + time.Sleep(leaderLockTTL) + } + } + } +} + +// tryAcquireLock attempts to set a key in the store, effectively acquiring a lock. +// This relies on an atomic operation if the underlying store supports it (like Redis SET NX). +func (s *KeyCronService) tryAcquireLock() (bool, error) { + // A simple implementation for the generic store interface. + // The RedisStore implementation should use SET NX for atomicity. + exists, err := s.Store.Exists(leaderLockKey) + if err != nil { + return false, err + } + if exists { + return false, nil // Lock is held by another node + } + + // Attempt to set the lock. This is not atomic here but works in low-contention scenarios. + // The robustness relies on the underlying store's implementation. + lockValue := []byte(time.Now().String()) + err = s.Store.Set(leaderLockKey, lockValue, leaderLockTTL) + if err != nil { + // It's possible the lock was acquired by another node between the Exists and Set calls + return false, err + } + + logrus.Info("KeyCronService: Successfully acquired leader lock.") + return true, nil +} + +// runAsLeader contains the original logic that should only be run by the leader node. +func (s *KeyCronService) runAsLeader() { + logrus.Info("KeyCronService: Running as leader.") + // Defer releasing the lock + defer func() { + if err := s.Store.Delete(leaderLockKey); err != nil { + logrus.Errorf("KeyCronService: Failed to release leader lock: %v", err) + } + logrus.Info("KeyCronService: Released leader lock.") + }() + + // Run once on start s.submitValidationJobs() ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() + heartbeat := time.NewTicker(leaderLockTTL / 2) + defer heartbeat.Stop() + for { select { case <-ticker.C: s.submitValidationJobs() + case <-heartbeat.C: + // Renew the lock to prevent it from expiring during long-running tasks + logrus.Debug("KeyCronService: Renewing leader lock.") + err := s.Store.Set(leaderLockKey, []byte(time.Now().String()), leaderLockTTL) + if err != nil { + logrus.Errorf("KeyCronService: Failed to renew leader lock: %v. Relinquishing leadership.", err) + return // Relinquish leadership on renewal failure + } case <-s.stopChan: - return + return // Service stopping } } } @@ -70,8 +156,6 @@ func (s *KeyCronService) processResults() { defer s.wg.Done() keysToUpdate := make(map[uint]models.APIKey) - // Process results in batches to avoid constant DB writes. - // This ticker defines the maximum delay for a batch update. ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() @@ -110,7 +194,7 @@ func (s *KeyCronService) processResults() { // Process batch on ticker interval if len(keysToUpdate) > 0 { s.batchUpdateKeyStatus(keysToUpdate) - keysToUpdate = make(map[uint]models.APIKey) + keysToUpdate = make(map[uint]models.APIKey) } case <-s.stopChan: // Process any remaining keys before stopping @@ -134,6 +218,7 @@ func (s *KeyCronService) submitValidationJobs() { validationStartTime := time.Now() groupsToUpdateTimestamp := make(map[uint]*models.Group) + total := 0 for i := range groups { group := &groups[i] effectiveSettings := s.SettingsManager.GetEffectiveConfig(group.Config) @@ -151,10 +236,11 @@ func (s *KeyCronService) submitValidationJobs() { continue } + total += len(keys) + logrus.Infof("KeyCronService: Submitting %d keys for group %s for validation.", len(keys), group.Name) for _, key := range keys { - // Create a new context with timeout for each job ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) job := ValidationJob{ @@ -173,7 +259,8 @@ func (s *KeyCronService) submitValidationJobs() { if len(groupsToUpdateTimestamp) > 0 { s.updateGroupTimestamps(groupsToUpdateTimestamp, validationStartTime) } - logrus.Info("KeyCronService: Validation submission cycle finished.") + + logrus.Infof("KeyCronService: Submitted %d keys for validation across %d groups.", total, len(groupsToUpdateTimestamp)) } func (s *KeyCronService) updateGroupTimestamps(groups map[uint]*models.Group, validationStartTime time.Time) {