From 5daf1a8112a4cb4d6da2b80643896f91b6406e95 Mon Sep 17 00:00:00 2001 From: tbphp Date: Mon, 7 Jul 2025 21:03:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/key_cron_service.go | 103 +----------------- .../services/key_manual_validation_service.go | 2 +- .../services/key_validation_concurrency.go | 37 +------ 3 files changed, 6 insertions(+), 136 deletions(-) diff --git a/internal/services/key_cron_service.go b/internal/services/key_cron_service.go index 50e8c40..79e9c6c 100644 --- a/internal/services/key_cron_service.go +++ b/internal/services/key_cron_service.go @@ -44,9 +44,6 @@ func (s *KeyCronService) Start() { s.wg.Add(1) go s.leaderElectionLoop() - // processResults still needs to run independently as it handles results from the Pool - s.wg.Add(1) - go s.processResults() } // Stop stops the cron job. @@ -66,21 +63,17 @@ func (s *KeyCronService) leaderElectionLoop() { case <-s.stopChan: return default: - // Attempt to acquire the leader lock isLeader, err := s.tryAcquireLock() if err != nil { logrus.Errorf("KeyCronService: Error trying to acquire leader lock: %v. Retrying in 1 minute.", err) - time.Sleep(1 * time.Minute) // Wait for a while before retrying on error + time.Sleep(1 * time.Minute) continue } if isLeader { - // Successfully became the leader, start executing the cron job s.runAsLeader() } else { - // Failed to become the leader, enter standby mode logrus.Debug("KeyCronService: Not the leader. Standing by.") - // Wait for a lock TTL duration before trying again to avoid frequent contention time.Sleep(leaderLockTTL) } } @@ -88,10 +81,7 @@ func (s *KeyCronService) leaderElectionLoop() { } // tryAcquireLock attempts to set a key in the store, effectively acquiring a lock. -// This relies on an atomic operation if the underlying store supports it (like Redis SET NX). func (s *KeyCronService) tryAcquireLock() (bool, error) { - // A simple implementation for the generic store interface. - // The RedisStore implementation should use SET NX for atomicity. exists, err := s.Store.Exists(leaderLockKey) if err != nil { return false, err @@ -100,12 +90,9 @@ func (s *KeyCronService) tryAcquireLock() (bool, error) { return false, nil // Lock is held by another node } - // Attempt to set the lock. This is not atomic here but works in low-contention scenarios. - // The robustness relies on the underlying store's implementation. lockValue := []byte(time.Now().String()) err = s.Store.Set(leaderLockKey, lockValue, leaderLockTTL) if err != nil { - // It's possible the lock was acquired by another node between the Exists and Set calls return false, err } @@ -113,10 +100,8 @@ func (s *KeyCronService) tryAcquireLock() (bool, error) { return true, nil } -// runAsLeader contains the original logic that should only be run by the leader node. func (s *KeyCronService) runAsLeader() { logrus.Info("KeyCronService: Running as leader.") - // Defer releasing the lock defer func() { if err := s.Store.Delete(leaderLockKey); err != nil { logrus.Errorf("KeyCronService: Failed to release leader lock: %v", err) @@ -138,74 +123,19 @@ func (s *KeyCronService) runAsLeader() { case <-ticker.C: s.submitValidationJobs() case <-heartbeat.C: - // Renew the lock to prevent it from expiring during long-running tasks logrus.Debug("KeyCronService: Renewing leader lock.") err := s.Store.Set(leaderLockKey, []byte(time.Now().String()), leaderLockTTL) if err != nil { logrus.Errorf("KeyCronService: Failed to renew leader lock: %v. Relinquishing leadership.", err) - return // Relinquish leadership on renewal failure - } - case <-s.stopChan: - return // Service stopping - } - } -} - -// processResults consumes results from the validation pool and updates the database. -func (s *KeyCronService) processResults() { - defer s.wg.Done() - keysToUpdate := make(map[uint]models.APIKey) - - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case result, ok := <-s.Pool.ResultsChannel(): - if !ok { - s.batchUpdateKeyStatus(keysToUpdate) return } - - key := result.Job.Key - var newStatus string - var newErrorReason string - - if result.Error != nil { - newStatus = models.KeyStatusInvalid - newErrorReason = result.Error.Error() - } else { - if result.IsValid { - newStatus = models.KeyStatusActive - newErrorReason = "" - } else { - newStatus = models.KeyStatusInvalid - newErrorReason = "Validation returned false without a specific error." - } - } - - if key.Status != newStatus || key.ErrorReason != newErrorReason { - key.Status = newStatus - key.ErrorReason = newErrorReason - keysToUpdate[key.ID] = key - } - - case <-ticker.C: - // Process batch on ticker interval - if len(keysToUpdate) > 0 { - s.batchUpdateKeyStatus(keysToUpdate) - keysToUpdate = make(map[uint]models.APIKey) - } case <-s.stopChan: - // Process any remaining keys before stopping - if len(keysToUpdate) > 0 { - s.batchUpdateKeyStatus(keysToUpdate) - } return } } } + // submitValidationJobs finds groups and keys that need validation and submits them to the pool. func (s *KeyCronService) submitValidationJobs() { logrus.Info("KeyCronService: Starting validation submission cycle.") @@ -272,32 +202,3 @@ func (s *KeyCronService) updateGroupTimestamps(groups map[uint]*models.Group, va logrus.Errorf("KeyCronService: Failed to batch update last_validated_at for groups: %v", err) } } - -func (s *KeyCronService) batchUpdateKeyStatus(keysToUpdate map[uint]models.APIKey) { - if len(keysToUpdate) == 0 { - return - } - logrus.Infof("KeyCronService: Batch updating status for %d keys.", len(keysToUpdate)) - - var keys []models.APIKey - for _, key := range keysToUpdate { - keys = append(keys, key) - } - - err := s.DB.Transaction(func(tx *gorm.DB) error { - for _, key := range keys { - updates := map[string]any{ - "status": key.Status, - "error_reason": key.ErrorReason, - } - if err := tx.Model(&models.APIKey{}).Where("id = ?", key.ID).Updates(updates).Error; err != nil { - logrus.Errorf("KeyCronService: Failed to update key ID %d: %v", key.ID, err) - } - } - return nil - }) - - if err != nil { - logrus.Errorf("KeyCronService: Transaction failed during batch update of key statuses: %v", err) - } -} diff --git a/internal/services/key_manual_validation_service.go b/internal/services/key_manual_validation_service.go index 20cc5ad..acaf091 100644 --- a/internal/services/key_manual_validation_service.go +++ b/internal/services/key_manual_validation_service.go @@ -80,7 +80,7 @@ func (s *KeyManualValidationService) runValidation(group *models.Group, keys []m } var wg sync.WaitGroup - for i := 0; i < concurrency; i++ { + for range concurrency { wg.Add(1) go s.validationWorker(&wg, group, jobs, results) } diff --git a/internal/services/key_validation_concurrency.go b/internal/services/key_validation_concurrency.go index 83bffd6..8598372 100644 --- a/internal/services/key_validation_concurrency.go +++ b/internal/services/key_validation_concurrency.go @@ -19,19 +19,11 @@ type ValidationJob struct { CancelFunc context.CancelFunc } -// ValidationResult holds the outcome of a validation job. -type ValidationResult struct { - Job ValidationJob - IsValid bool - Error error -} - // KeyValidationPool manages a global worker pool for key validation. type KeyValidationPool struct { validator *keypool.KeyValidator configManager types.ConfigManager jobs chan ValidationJob - results chan ValidationResult // 定时任务结果 stopChan chan struct{} wg sync.WaitGroup } @@ -41,9 +33,8 @@ func NewKeyValidationPool(validator *keypool.KeyValidator, configManager types.C return &KeyValidationPool{ validator: validator, configManager: configManager, - jobs: make(chan ValidationJob, 1024), - results: make(chan ValidationResult, 1024), - stopChan: make(chan struct{}), + jobs: make(chan ValidationJob, 1024), + stopChan: make(chan struct{}), } } @@ -70,9 +61,6 @@ func (p *KeyValidationPool) Stop() { close(p.jobs) p.wg.Wait() - // 关闭结果通道 - close(p.results) - logrus.Info("KeyValidationPool stopped.") } @@ -89,24 +77,10 @@ func (p *KeyValidationPool) worker() { if ctx == nil { ctx = context.Background() } - isValid, err := p.validator.ValidateSingleKey(ctx, &job.Key, job.Group) + p.validator.ValidateSingleKey(ctx, &job.Key, job.Group) if job.CancelFunc != nil { job.CancelFunc() } - result := ValidationResult{ - Job: job, - IsValid: isValid, - Error: err, - } - - // Block until the result can be sent or the pool is stopped. - // This provides back-pressure and prevents result loss. - select { - case p.results <- result: - case <-p.stopChan: - logrus.Infof("Worker stopping, discarding result for key %d", job.Key.ID) - return - } case <-p.stopChan: return } @@ -117,8 +91,3 @@ func (p *KeyValidationPool) worker() { func (p *KeyValidationPool) SubmitJob(job ValidationJob) { p.jobs <- job } - -// ResultsChannel returns the channel for reading validation results. -func (p *KeyValidationPool) ResultsChannel() <-chan ValidationResult { - return p.results -}