diff --git a/internal/app/app.go b/internal/app/app.go index d3e5708..0adb4ae 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -30,9 +30,9 @@ type App struct { groupManager *services.GroupManager logCleanupService *services.LogCleanupService requestLogService *services.RequestLogService - keyCronService *services.KeyCronService + cronChecker *keypool.CronChecker keyPoolProvider *keypool.KeyProvider - leaderService *services.LeaderService + leaderLock *store.LeaderLock proxyServer *proxy.ProxyServer storage store.Store db *gorm.DB @@ -48,9 +48,9 @@ type AppParams struct { GroupManager *services.GroupManager LogCleanupService *services.LogCleanupService RequestLogService *services.RequestLogService - KeyCronService *services.KeyCronService + CronChecker *keypool.CronChecker KeyPoolProvider *keypool.KeyProvider - LeaderService *services.LeaderService + LeaderLock *store.LeaderLock ProxyServer *proxy.ProxyServer Storage store.Store DB *gorm.DB @@ -65,9 +65,9 @@ func NewApp(params AppParams) *App { groupManager: params.GroupManager, logCleanupService: params.LogCleanupService, requestLogService: params.RequestLogService, - keyCronService: params.KeyCronService, + cronChecker: params.CronChecker, keyPoolProvider: params.KeyPoolProvider, - leaderService: params.LeaderService, + leaderLock: params.LeaderLock, proxyServer: params.ProxyServer, storage: params.Storage, db: params.DB, @@ -77,25 +77,25 @@ func NewApp(params AppParams) *App { // Start runs the application, it is a non-blocking call. func (a *App) Start() error { - // 启动 Leader Service 并等待选举结果 - if err := a.leaderService.Start(); err != nil { + // 启动 Leader Lock 服务并等待选举结果 + if err := a.leaderLock.Start(); err != nil { return fmt.Errorf("leader service failed to start: %w", err) } // Leader 节点执行初始化,Follower 节点等待 - if a.leaderService.IsLeader() { + if a.leaderLock.IsLeader() { logrus.Info("Leader mode. Performing initial one-time tasks...") - acquired, err := a.leaderService.AcquireInitializingLock() + acquired, err := a.leaderLock.AcquireInitializingLock() if err != nil { return fmt.Errorf("failed to acquire initializing lock: %w", err) } if !acquired { logrus.Warn("Could not acquire initializing lock, another leader might be active. Switching to follower mode for initialization.") - if err := a.leaderService.WaitForInitializationToComplete(); err != nil { + if err := a.leaderLock.WaitForInitializationToComplete(); err != nil { return fmt.Errorf("failed to wait for initialization as a fallback follower: %w", err) } } else { - defer a.leaderService.ReleaseInitializingLock() + defer a.leaderLock.ReleaseInitializingLock() // 数据库迁移 if err := a.db.AutoMigrate( @@ -115,7 +115,7 @@ func (a *App) Start() error { } logrus.Info("System settings initialized in DB.") - a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderService) + a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderLock) // 从数据库加载密钥到 Redis if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil { @@ -125,10 +125,10 @@ func (a *App) Start() error { } } else { logrus.Info("Follower Mode. Waiting for leader to complete initialization.") - if err := a.leaderService.WaitForInitializationToComplete(); err != nil { + if err := a.leaderLock.WaitForInitializationToComplete(); err != nil { return fmt.Errorf("follower failed to start: %w", err) } - a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderService) + a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderLock) } // 显示配置并启动所有后台服务 @@ -138,7 +138,7 @@ func (a *App) Start() error { a.requestLogService.Start() a.logCleanupService.Start() - a.keyCronService.Start() + a.cronChecker.Start() // Create HTTP server serverConfig := a.configManager.GetEffectiveServerConfig() @@ -174,8 +174,8 @@ func (a *App) Stop(ctx context.Context) { } // Stop background services - a.keyCronService.Stop() - a.leaderService.Stop() + a.cronChecker.Stop() + a.leaderLock.Stop() a.logCleanupService.Stop() a.requestLogService.Stop() a.groupManager.Stop() diff --git a/internal/config/system_settings.go b/internal/config/system_settings.go index b8853ec..34307a4 100644 --- a/internal/config/system_settings.go +++ b/internal/config/system_settings.go @@ -35,12 +35,12 @@ type groupManager interface { Invalidate() error } -type leaderService interface { +type leaderLock interface { IsLeader() bool } // Initialize initializes the SystemSettingsManager with database and store dependencies. -func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager, leader leaderService) error { +func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager, leaderLock leaderLock) error { settingsLoader := func() (types.SystemSettings, error) { var dbSettings []models.SystemSetting if err := db.DB.Find(&dbSettings).Error; err != nil { @@ -82,7 +82,7 @@ func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager, } afterLoader := func(newData types.SystemSettings) { - if !leader.IsLeader() { + if !leaderLock.IsLeader() { return } if err := gm.Invalidate(); err != nil { diff --git a/internal/container/container.go b/internal/container/container.go index a976be1..f8d2567 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -25,9 +25,6 @@ func BuildContainer() (*dig.Container, error) { if err := container.Provide(config.NewManager); err != nil { return nil, err } - if err := container.Provide(services.NewLeaderService); err != nil { - return nil, err - } if err := container.Provide(db.NewDB); err != nil { return nil, err } @@ -37,6 +34,9 @@ func BuildContainer() (*dig.Container, error) { if err := container.Provide(store.NewStore); err != nil { return nil, err } + if err := container.Provide(store.NewLeaderLock); err != nil { + return nil, err + } if err := container.Provide(httpclient.NewHTTPClientManager); err != nil { return nil, err } @@ -51,9 +51,6 @@ func BuildContainer() (*dig.Container, error) { if err := container.Provide(services.NewKeyManualValidationService); err != nil { return nil, err } - if err := container.Provide(services.NewKeyCronService); err != nil { - return nil, err - } if err := container.Provide(services.NewKeyService); err != nil { return nil, err } @@ -72,6 +69,9 @@ func BuildContainer() (*dig.Container, error) { if err := container.Provide(keypool.NewKeyValidator); err != nil { return nil, err } + if err := container.Provide(keypool.NewCronChecker); err != nil { + return nil, err + } // Handlers if err := container.Provide(handler.NewServer); err != nil { diff --git a/internal/services/key_cron_service.go b/internal/keypool/cron_checker.go similarity index 59% rename from internal/services/key_cron_service.go rename to internal/keypool/cron_checker.go index 786f9d7..735601f 100644 --- a/internal/services/key_cron_service.go +++ b/internal/keypool/cron_checker.go @@ -1,9 +1,9 @@ -package services +package keypool import ( "gpt-load/internal/config" - "gpt-load/internal/keypool" "gpt-load/internal/models" + "gpt-load/internal/store" "sync" "time" @@ -11,51 +11,51 @@ import ( "gorm.io/gorm" ) -// KeyCronService is responsible for periodically validating invalid keys. -type KeyCronService struct { +// NewCronChecker is responsible for periodically validating invalid keys. +type CronChecker struct { DB *gorm.DB SettingsManager *config.SystemSettingsManager - Validator *keypool.KeyValidator - LeaderService *LeaderService + Validator *KeyValidator + LeaderLock *store.LeaderLock stopChan chan struct{} wg sync.WaitGroup } -// NewKeyCronService creates a new KeyCronService. -func NewKeyCronService( +// NewCronChecker creates a new CronChecker. +func NewCronChecker( db *gorm.DB, settingsManager *config.SystemSettingsManager, - validator *keypool.KeyValidator, - leaderService *LeaderService, -) *KeyCronService { - return &KeyCronService{ + validator *KeyValidator, + leaderLock *store.LeaderLock, +) *CronChecker { + return &CronChecker{ DB: db, SettingsManager: settingsManager, Validator: validator, - LeaderService: leaderService, + LeaderLock: leaderLock, stopChan: make(chan struct{}), } } // Start begins the cron job execution. -func (s *KeyCronService) Start() { - logrus.Debug("Starting KeyCronService...") +func (s *CronChecker) Start() { + logrus.Debug("Starting CronChecker...") s.wg.Add(1) go s.runLoop() } // Stop stops the cron job. -func (s *KeyCronService) Stop() { - logrus.Info("Stopping KeyCronService...") +func (s *CronChecker) Stop() { + logrus.Info("Stopping CronChecker...") close(s.stopChan) s.wg.Wait() - logrus.Info("KeyCronService stopped.") + logrus.Info("CronChecker stopped.") } -func (s *KeyCronService) runLoop() { +func (s *CronChecker) runLoop() { defer s.wg.Done() - if s.LeaderService.IsLeader() { + if s.LeaderLock.IsLeader() { s.submitValidationJobs() } @@ -65,11 +65,11 @@ func (s *KeyCronService) runLoop() { for { select { case <-ticker.C: - if s.LeaderService.IsLeader() { - logrus.Debug("KeyCronService: Running as leader, submitting validation jobs.") + if s.LeaderLock.IsLeader() { + logrus.Debug("CronChecker: Running as leader, submitting validation jobs.") s.submitValidationJobs() } else { - logrus.Debug("KeyCronService: Not the leader. Standing by.") + logrus.Debug("CronChecker: Not the leader. Standing by.") } case <-s.stopChan: return @@ -78,10 +78,10 @@ func (s *KeyCronService) runLoop() { } // submitValidationJobs finds groups whose keys need validation and validates them. -func (s *KeyCronService) submitValidationJobs() { +func (s *CronChecker) submitValidationJobs() { var groups []models.Group if err := s.DB.Find(&groups).Error; err != nil { - logrus.Errorf("KeyCronService: Failed to get groups: %v", err) + logrus.Errorf("CronChecker: Failed to get groups: %v", err) return } @@ -97,14 +97,14 @@ func (s *KeyCronService) submitValidationJobs() { var invalidKeys []models.APIKey err := s.DB.Where("group_id = ? AND status = ?", group.ID, models.KeyStatusInvalid).Find(&invalidKeys).Error if err != nil { - logrus.Errorf("KeyCronService: Failed to get invalid keys for group %s: %v", group.Name, err) + logrus.Errorf("CronChecker: Failed to get invalid keys for group %s: %v", group.Name, err) continue } validatedCount := len(invalidKeys) becameValidCount := 0 if validatedCount > 0 { - logrus.Debugf("KeyCronService: Found %d invalid keys to validate for group %s.", validatedCount, group.Name) + logrus.Debugf("CronChecker: Found %d invalid keys to validate for group %s.", validatedCount, group.Name) for j := range invalidKeys { key := &invalidKeys[j] isValid, _ := s.Validator.ValidateSingleKey(key, group) @@ -116,12 +116,12 @@ func (s *KeyCronService) submitValidationJobs() { } if err := s.DB.Model(group).Update("last_validated_at", time.Now()).Error; err != nil { - logrus.Errorf("KeyCronService: Failed to update last_validated_at for group %s: %v", group.Name, err) + logrus.Errorf("CronChecker: Failed to update last_validated_at for group %s: %v", group.Name, err) } duration := time.Since(groupProcessStart) logrus.Infof( - "KeyCronService: Group '%s' validation finished. Total checked: %d, became valid: %d. Duration: %s.", + "CronChecker: Group '%s' validation finished. Total checked: %d, became valid: %d. Duration: %s.", group.Name, validatedCount, becameValidCount, diff --git a/internal/services/log_cleanup.go b/internal/services/log_cleanup_service.go similarity index 92% rename from internal/services/log_cleanup.go rename to internal/services/log_cleanup_service.go index 1bf7776..6c0bb6c 100644 --- a/internal/services/log_cleanup.go +++ b/internal/services/log_cleanup_service.go @@ -3,6 +3,7 @@ package services import ( "gpt-load/internal/config" "gpt-load/internal/models" + "gpt-load/internal/store" "time" "github.com/sirupsen/logrus" @@ -13,16 +14,16 @@ import ( type LogCleanupService struct { db *gorm.DB settingsManager *config.SystemSettingsManager - leaderService *LeaderService + leaderLock *store.LeaderLock stopCh chan struct{} } // NewLogCleanupService 创建新的日志清理服务 -func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsManager, leaderService *LeaderService) *LogCleanupService { +func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsManager, leaderLock *store.LeaderLock) *LogCleanupService { return &LogCleanupService{ db: db, settingsManager: settingsManager, - leaderService: leaderService, + leaderLock: leaderLock, stopCh: make(chan struct{}), } } @@ -59,7 +60,7 @@ func (s *LogCleanupService) run() { // cleanupExpiredLogs 清理过期的请求日志 func (s *LogCleanupService) cleanupExpiredLogs() { - if !s.leaderService.IsLeader() { + if !s.leaderLock.IsLeader() { logrus.Debug("Not the leader, skipping log cleanup.") return } diff --git a/internal/services/request_log_service.go b/internal/services/request_log_service.go index a26d965..a026372 100644 --- a/internal/services/request_log_service.go +++ b/internal/services/request_log_service.go @@ -27,20 +27,20 @@ type RequestLogService struct { db *gorm.DB store store.Store settingsManager *config.SystemSettingsManager - leaderService *LeaderService + leaderLock *store.LeaderLock ctx context.Context cancel context.CancelFunc ticker *time.Ticker } // NewRequestLogService creates a new RequestLogService instance -func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *LeaderService) *RequestLogService { +func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *store.LeaderLock) *RequestLogService { ctx, cancel := context.WithCancel(context.Background()) return &RequestLogService{ db: db, store: store, settingsManager: sm, - leaderService: ls, + leaderLock: ls, ctx: ctx, cancel: cancel, } @@ -116,7 +116,7 @@ func (s *RequestLogService) flush() { return } - if !s.leaderService.IsLeader() { + if !s.leaderLock.IsLeader() { logrus.Debug("Not a leader, skipping log flush.") return } diff --git a/internal/services/leader_service.go b/internal/store/leader_lock.go similarity index 84% rename from internal/services/leader_service.go rename to internal/store/leader_lock.go index 28b1d9a..f98d120 100644 --- a/internal/services/leader_service.go +++ b/internal/store/leader_lock.go @@ -1,4 +1,4 @@ -package services +package store import ( "context" @@ -9,8 +9,6 @@ import ( "sync/atomic" "time" - "gpt-load/internal/store" - "github.com/sirupsen/logrus" ) @@ -36,9 +34,9 @@ else return 0 end` -// LeaderService provides a mechanism for electing a single leader in a cluster. -type LeaderService struct { - store store.Store +// LeaderLock provides a mechanism for electing a single leader in a cluster. +type LeaderLock struct { + store Store nodeID string isLeader atomic.Bool stopChan chan struct{} @@ -46,10 +44,10 @@ type LeaderService struct { isSingleNode bool } -// NewLeaderService creates a new LeaderService. -func NewLeaderService(s store.Store) *LeaderService { - _, isDistributed := s.(store.LuaScripter) - service := &LeaderService{ +// NewLeaderLock creates a new LeaderLock. +func NewLeaderLock(s Store) *LeaderLock { + _, isDistributed := s.(LuaScripter) + service := &LeaderLock{ store: s, nodeID: generateNodeID(), stopChan: make(chan struct{}), @@ -65,7 +63,7 @@ func NewLeaderService(s store.Store) *LeaderService { } // Start performs an initial leader election and starts the background leadership maintenance loop. -func (s *LeaderService) Start() error { +func (s *LeaderLock) Start() error { if s.isSingleNode { return nil } @@ -81,7 +79,7 @@ func (s *LeaderService) Start() error { } // Stop gracefully stops the leadership maintenance process. -func (s *LeaderService) Stop() { +func (s *LeaderLock) Stop() { if s.isSingleNode { return } @@ -96,12 +94,12 @@ func (s *LeaderService) Stop() { } // IsLeader returns true if the current node is the leader. -func (s *LeaderService) IsLeader() bool { +func (s *LeaderLock) IsLeader() bool { return s.isLeader.Load() } // AcquireInitializingLock sets a temporary lock to indicate that initialization is in progress. -func (s *LeaderService) AcquireInitializingLock() (bool, error) { +func (s *LeaderLock) AcquireInitializingLock() (bool, error) { if !s.IsLeader() { return false, nil } @@ -110,7 +108,7 @@ func (s *LeaderService) AcquireInitializingLock() (bool, error) { } // ReleaseInitializingLock removes the initialization lock. -func (s *LeaderService) ReleaseInitializingLock() { +func (s *LeaderLock) ReleaseInitializingLock() { if !s.IsLeader() { return } @@ -121,7 +119,7 @@ func (s *LeaderService) ReleaseInitializingLock() { } // WaitForInitializationToComplete waits until the initialization lock is released. -func (s *LeaderService) WaitForInitializationToComplete() error { +func (s *LeaderLock) WaitForInitializationToComplete() error { if s.isSingleNode || s.IsLeader() { return nil } @@ -165,7 +163,7 @@ func (s *LeaderService) WaitForInitializationToComplete() error { } // maintainLeadershipLoop is the background process that keeps trying to acquire or renew the lock. -func (s *LeaderService) maintainLeadershipLoop() { +func (s *LeaderLock) maintainLeadershipLoop() { defer s.wg.Done() ticker := time.NewTicker(leaderRenewalInterval) defer ticker.Stop() @@ -185,7 +183,7 @@ func (s *LeaderService) maintainLeadershipLoop() { } // tryToBeLeader is an idempotent function that attempts to acquire or renew the lock. -func (s *LeaderService) tryToBeLeader() error { +func (s *LeaderLock) tryToBeLeader() error { if s.isLeader.Load() { err := s.renewLock() if err != nil { @@ -206,12 +204,12 @@ func (s *LeaderService) tryToBeLeader() error { return nil } -func (s *LeaderService) acquireLock() (bool, error) { +func (s *LeaderLock) acquireLock() (bool, error) { return s.store.SetNX(leaderLockKey, []byte(s.nodeID), leaderLockTTL) } -func (s *LeaderService) renewLock() error { - luaStore := s.store.(store.LuaScripter) +func (s *LeaderLock) renewLock() error { + luaStore := s.store.(LuaScripter) ttlSeconds := int(leaderLockTTL.Seconds()) res, err := luaStore.Eval(renewLockScript, []string{leaderLockKey}, s.nodeID, ttlSeconds) if err != nil { @@ -223,8 +221,8 @@ func (s *LeaderService) renewLock() error { return nil } -func (s *LeaderService) releaseLock() { - luaStore := s.store.(store.LuaScripter) +func (s *LeaderLock) releaseLock() { + luaStore := s.store.(LuaScripter) if _, err := luaStore.Eval(releaseLockScript, []string{leaderLockKey}, s.nodeID); err != nil { logrus.WithError(err).Error("Failed to release leader lock on shutdown.") } else {