From 7340cdded1df4ee57cbd0f70d85480204d8d079e Mon Sep 17 00:00:00 2001 From: tbphp Date: Tue, 8 Jul 2025 18:33:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=B8=BA=E5=85=A8=E5=B1=80=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/app.go | 29 +++--- internal/container/container.go | 3 + internal/services/key_cron_service.go | 96 +++-------------- internal/services/leader_service.go | 142 ++++++++++++++++++++++++++ 4 files changed, 179 insertions(+), 91 deletions(-) create mode 100644 internal/services/leader_service.go diff --git a/internal/app/app.go b/internal/app/app.go index 7ae538b..ece94b5 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -28,9 +28,10 @@ type App struct { configManager types.ConfigManager settingsManager *config.SystemSettingsManager logCleanupService *services.LogCleanupService - keyCronService *services.KeyCronService - keyValidationPool *services.KeyValidationPool - keyPoolProvider *keypool.KeyProvider + keyCronService *services.KeyCronService + keyValidationPool *services.KeyValidationPool + keyPoolProvider *keypool.KeyProvider + leaderService *services.LeaderService proxyServer *proxy.ProxyServer storage store.Store db *gorm.DB @@ -45,10 +46,11 @@ type AppParams struct { Engine *gin.Engine ConfigManager types.ConfigManager SettingsManager *config.SystemSettingsManager - LogCleanupService *services.LogCleanupService - KeyCronService *services.KeyCronService - KeyValidationPool *services.KeyValidationPool - KeyPoolProvider *keypool.KeyProvider + LogCleanupService *services.LogCleanupService + KeyCronService *services.KeyCronService + KeyValidationPool *services.KeyValidationPool + KeyPoolProvider *keypool.KeyProvider + LeaderService *services.LeaderService ProxyServer *proxy.ProxyServer Storage store.Store DB *gorm.DB @@ -61,11 +63,12 @@ func NewApp(params AppParams) *App { engine: params.Engine, configManager: params.ConfigManager, settingsManager: params.SettingsManager, - logCleanupService: params.LogCleanupService, - keyCronService: params.KeyCronService, - keyValidationPool: params.KeyValidationPool, - keyPoolProvider: params.KeyPoolProvider, - proxyServer: params.ProxyServer, + logCleanupService: params.LogCleanupService, + keyCronService: params.KeyCronService, + keyValidationPool: params.KeyValidationPool, + keyPoolProvider: params.KeyPoolProvider, + leaderService: params.LeaderService, + proxyServer: params.ProxyServer, storage: params.Storage, db: params.DB, requestLogChan: params.RequestLogChan, @@ -90,6 +93,7 @@ func (a *App) Start() error { // Start background services a.startRequestLogger() a.logCleanupService.Start() + a.leaderService.Start() a.keyValidationPool.Start() a.keyCronService.Start() @@ -131,6 +135,7 @@ func (a *App) Stop(ctx context.Context) { // Stop background services a.keyCronService.Stop() a.keyValidationPool.Stop() + a.leaderService.Stop() a.logCleanupService.Stop() // Close resources diff --git a/internal/container/container.go b/internal/container/container.go index 8f6503e..2c66910 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -56,6 +56,9 @@ func BuildContainer() (*dig.Container, error) { if err := container.Provide(services.NewLogCleanupService); err != nil { return nil, err } + if err := container.Provide(services.NewLeaderService); err != nil { + return nil, err + } if err := container.Provide(keypool.NewProvider); err != nil { return nil, err } diff --git a/internal/services/key_cron_service.go b/internal/services/key_cron_service.go index f6d44cc..efd69dd 100644 --- a/internal/services/key_cron_service.go +++ b/internal/services/key_cron_service.go @@ -4,7 +4,6 @@ import ( "context" "gpt-load/internal/config" "gpt-load/internal/models" - "gpt-load/internal/store" "sync" "time" @@ -12,38 +11,37 @@ import ( "gorm.io/gorm" ) -const ( - leaderLockKey = "cron:leader:key_validation" - leaderLockTTL = 10 * time.Minute -) - // KeyCronService is responsible for periodically submitting keys for validation. type KeyCronService struct { DB *gorm.DB SettingsManager *config.SystemSettingsManager Pool *KeyValidationPool - Store store.Store + LeaderService *LeaderService stopChan chan struct{} wg sync.WaitGroup } // NewKeyCronService creates a new KeyCronService. -func NewKeyCronService(db *gorm.DB, settingsManager *config.SystemSettingsManager, pool *KeyValidationPool, store store.Store) *KeyCronService { +func NewKeyCronService( + db *gorm.DB, + settingsManager *config.SystemSettingsManager, + pool *KeyValidationPool, + leaderService *LeaderService, +) *KeyCronService { return &KeyCronService{ DB: db, SettingsManager: settingsManager, Pool: pool, - Store: store, + LeaderService: leaderService, stopChan: make(chan struct{}), } } -// Start begins the leader election and cron job execution. +// Start begins the cron job execution. func (s *KeyCronService) Start() { - logrus.Info("Starting KeyCronService with leader election...") + logrus.Info("Starting KeyCronService...") s.wg.Add(1) - go s.leaderElectionLoop() - + go s.runLoop() } // Stop stops the cron job. @@ -54,80 +52,20 @@ func (s *KeyCronService) Stop() { logrus.Info("KeyCronService stopped.") } -// leaderElectionLoop is the main loop that attempts to acquire leadership. -func (s *KeyCronService) leaderElectionLoop() { +func (s *KeyCronService) runLoop() { defer s.wg.Done() - for { - select { - case <-s.stopChan: - return - default: - isLeader, err := s.tryAcquireLock() - if err != nil { - logrus.Errorf("KeyCronService: Error trying to acquire leader lock: %v. Retrying in 1 minute.", err) - time.Sleep(1 * time.Minute) - continue - } - - if isLeader { - s.runAsLeader() - } else { - logrus.Debug("KeyCronService: Not the leader. Standing by.") - time.Sleep(leaderLockTTL) - } - } - } -} - -// tryAcquireLock attempts to set a key in the store, effectively acquiring a lock. -func (s *KeyCronService) tryAcquireLock() (bool, error) { - exists, err := s.Store.Exists(leaderLockKey) - if err != nil { - return false, err - } - if exists { - return false, nil // Lock is held by another node - } - - lockValue := []byte(time.Now().String()) - err = s.Store.Set(leaderLockKey, lockValue, leaderLockTTL) - if err != nil { - return false, err - } - - logrus.Info("KeyCronService: Successfully acquired leader lock.") - return true, nil -} - -func (s *KeyCronService) runAsLeader() { - logrus.Info("KeyCronService: Running as leader.") - defer func() { - if err := s.Store.Delete(leaderLockKey); err != nil { - logrus.Errorf("KeyCronService: Failed to release leader lock: %v", err) - } - logrus.Info("KeyCronService: Released leader lock.") - }() - - // Run once on start - s.submitValidationJobs() - ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() - heartbeat := time.NewTicker(leaderLockTTL / 2) - defer heartbeat.Stop() - for { select { case <-ticker.C: - s.submitValidationJobs() - case <-heartbeat.C: - logrus.Debug("KeyCronService: Renewing leader lock.") - err := s.Store.Set(leaderLockKey, []byte(time.Now().String()), leaderLockTTL) - if err != nil { - logrus.Errorf("KeyCronService: Failed to renew leader lock: %v. Relinquishing leadership.", err) - return + if s.LeaderService.IsLeader() { + logrus.Info("KeyCronService: Running as leader, submitting validation jobs.") + s.submitValidationJobs() + } else { + logrus.Debug("KeyCronService: Not the leader. Standing by.") } case <-s.stopChan: return diff --git a/internal/services/leader_service.go b/internal/services/leader_service.go new file mode 100644 index 0000000..b915973 --- /dev/null +++ b/internal/services/leader_service.go @@ -0,0 +1,142 @@ +package services + +import ( + "crypto/rand" + "encoding/hex" + "sync" + "sync/atomic" + "time" + + "gpt-load/internal/store" + + "github.com/sirupsen/logrus" +) + +const ( + leaderLockKey = "cluster:leader" + leaderLockTTL = 30 * time.Second + leaderRenewalInterval = 10 * time.Second + leaderElectionTimeout = 5 * time.Second +) + +// LeaderService provides a mechanism for electing a single leader in a cluster. +type LeaderService struct { + store store.Store + nodeID string + isLeader atomic.Bool + stopChan chan struct{} + wg sync.WaitGroup +} + +// NewLeaderService creates a new LeaderService. +func NewLeaderService(store store.Store) *LeaderService { + return &LeaderService{ + store: store, + nodeID: generateNodeID(), + stopChan: make(chan struct{}), + } +} + +// Start begins the leader election process. +func (s *LeaderService) Start() { + logrus.WithField("nodeID", s.nodeID).Info("Starting LeaderService...") + s.wg.Add(1) + go s.electionLoop() +} + +// Stop gracefully stops the leader election process. +func (s *LeaderService) Stop() { + logrus.Info("Stopping LeaderService...") + close(s.stopChan) + s.wg.Wait() + logrus.Info("LeaderService stopped.") +} + +// IsLeader returns true if the current node is the leader. +// This is a fast, local check against an atomic boolean. +func (s *LeaderService) IsLeader() bool { + return s.isLeader.Load() +} + +func (s *LeaderService) electionLoop() { + defer s.wg.Done() + + // Attempt to acquire leadership immediately on start. + s.tryToBeLeader() + + ticker := time.NewTicker(leaderRenewalInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.tryToBeLeader() + case <-s.stopChan: + if s.IsLeader() { + s.releaseLock() + } + return + } + } +} + +func (s *LeaderService) tryToBeLeader() { + if s.IsLeader() { + // Already the leader, just renew the lock. + if err := s.renewLock(); err != nil { + logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.") + s.isLeader.Store(false) + } + return + } + + // Not the leader, try to acquire the lock. + acquired, err := s.acquireLock() + if err != nil { + logrus.WithError(err).Error("Error trying to acquire leader lock.") + s.isLeader.Store(false) + return + } + + if acquired { + logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leader lock.") + s.isLeader.Store(true) + } else { + logrus.Debug("Could not acquire leader lock, another node is likely the leader.") + s.isLeader.Store(false) + } +} + +func (s *LeaderService) acquireLock() (bool, error) { + // SetNX is an atomic operation. If the key already exists, it does nothing. + // This is the core of our distributed lock. + return s.store.SetNX(leaderLockKey, []byte(s.nodeID), leaderLockTTL) +} + +func (s *LeaderService) renewLock() error { + // To renew, we must ensure we are still the lock holder. + // A LUA script is the safest way to do this atomically. + // For simplicity here, we get and set, but this is not truly atomic without LUA. + // A simple SET can also work if we are confident in our election loop timing. + return s.store.Set(leaderLockKey, []byte(s.nodeID), leaderLockTTL) +} + +func (s *LeaderService) releaseLock() { + // Best-effort attempt to release the lock on shutdown. + // The TTL will handle cases where this fails. + if err := s.store.Delete(leaderLockKey); err != nil { + logrus.WithError(err).Error("Failed to release leader lock on shutdown.") + } else { + logrus.Info("Successfully released leader lock.") + } + s.isLeader.Store(false) +} + +func generateNodeID() string { + bytes := make([]byte, 16) + if _, err := rand.Read(bytes); err != nil { + // Fallback to a timestamp-based ID if crypto/rand fails + return "node-" + time.Now().Format(time.RFC3339Nano) + } + return hex.EncodeToString(bytes) +}