From fc63189246355b3d5d903d90a5f406cc60d57211 Mon Sep 17 00:00:00 2001 From: tbphp Date: Tue, 15 Jul 2025 10:25:55 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E9=87=8D=E6=9E=84=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=E4=B8=BB=E4=BB=8E=E8=8A=82=E7=82=B9=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 1 + internal/app/app.go | 102 ++++------ internal/config/manager.go | 6 + internal/config/system_settings.go | 12 +- internal/container/container.go | 3 - internal/keypool/cron_checker.go | 16 +- internal/services/log_cleanup_service.go | 10 +- internal/services/request_log_service.go | 11 +- internal/store/leader_lock.go | 248 ----------------------- internal/store/redis.go | 5 - internal/store/store.go | 5 - internal/types/types.go | 2 + 12 files changed, 58 insertions(+), 363 deletions(-) delete mode 100644 internal/store/leader_lock.go diff --git a/.env.example b/.env.example index a87520a..e3917db 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,7 @@ # 服务器配置 PORT=3001 HOST=0.0.0.0 +IS_SLAVE=false # 服务器读取、写入和空闲连接的超时时间(秒) SERVER_READ_TIMEOUT=120 diff --git a/internal/app/app.go b/internal/app/app.go index 1abda8d..13d32e1 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -33,7 +33,6 @@ type App struct { requestLogService *services.RequestLogService cronChecker *keypool.CronChecker keyPoolProvider *keypool.KeyProvider - leaderLock *store.LeaderLock proxyServer *proxy.ProxyServer storage store.Store db *gorm.DB @@ -51,7 +50,6 @@ type AppParams struct { RequestLogService *services.RequestLogService CronChecker *keypool.CronChecker KeyPoolProvider *keypool.KeyProvider - LeaderLock *store.LeaderLock ProxyServer *proxy.ProxyServer Storage store.Store DB *gorm.DB @@ -68,7 +66,6 @@ func NewApp(params AppParams) *App { requestLogService: params.RequestLogService, cronChecker: params.CronChecker, keyPoolProvider: params.KeyPoolProvider, - leaderLock: params.LeaderLock, proxyServer: params.ProxyServer, storage: params.Storage, db: params.DB, @@ -77,59 +74,43 @@ func NewApp(params AppParams) *App { // Start runs the application, it is a non-blocking call. func (a *App) Start() error { + // Master 节点执行初始化 + if a.configManager.IsMaster() { + logrus.Info("Starting as Master Node.") - // 启动 Leader Lock 服务并等待选举结果 - if err := a.leaderLock.Start(); err != nil { - return fmt.Errorf("leader service failed to start: %w", err) - } - - // Leader 节点执行初始化,Follower 节点等待 - if a.leaderLock.IsLeader() { - logrus.Info("Leader mode. Performing initial one-time tasks...") - acquired, err := a.leaderLock.AcquireInitializingLock() - if err != nil { - return fmt.Errorf("failed to acquire initializing lock: %w", err) + // 数据库迁移 + if err := a.db.AutoMigrate( + &models.SystemSetting{}, + &models.Group{}, + &models.APIKey{}, + &models.RequestLog{}, + &models.GroupHourlyStat{}, + ); err != nil { + return fmt.Errorf("database auto-migration failed: %w", err) } - if !acquired { - logrus.Warn("Could not acquire initializing lock, another leader might be active. Switching to follower mode for initialization.") - if err := a.leaderLock.WaitForInitializationToComplete(); err != nil { - return fmt.Errorf("failed to wait for initialization as a fallback follower: %w", err) - } - } else { - defer a.leaderLock.ReleaseInitializingLock() + logrus.Info("Database auto-migration completed.") - // 数据库迁移 - if err := a.db.AutoMigrate( - &models.SystemSetting{}, - &models.Group{}, - &models.APIKey{}, - &models.RequestLog{}, - &models.GroupHourlyStat{}, - ); err != nil { - return fmt.Errorf("database auto-migration failed: %w", err) - } - logrus.Info("Database auto-migration completed.") - - // 初始化系统设置 - if err := a.settingsManager.EnsureSettingsInitialized(); err != nil { - return fmt.Errorf("failed to initialize system settings: %w", err) - } - logrus.Info("System settings initialized in DB.") - - a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderLock) - - // 从数据库加载密钥到 Redis - if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil { - return fmt.Errorf("failed to load keys into key pool: %w", err) - } - logrus.Debug("API keys loaded into Redis cache by leader.") + // 初始化系统设置 + if err := a.settingsManager.EnsureSettingsInitialized(); err != nil { + return fmt.Errorf("failed to initialize system settings: %w", err) } + logrus.Info("System settings initialized in DB.") + + a.settingsManager.Initialize(a.storage, a.groupManager, a.configManager.IsMaster()) + + // 从数据库加载密钥到 Redis + if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil { + return fmt.Errorf("failed to load keys into key pool: %w", err) + } + logrus.Debug("API keys loaded into Redis cache by master.") + + // 仅 Master 节点启动的服务 + a.requestLogService.Start() + a.logCleanupService.Start() + a.cronChecker.Start() } else { - logrus.Info("Follower Mode. Waiting for leader to complete initialization.") - if err := a.leaderLock.WaitForInitializationToComplete(); err != nil { - return fmt.Errorf("follower failed to start: %w", err) - } - a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderLock) + logrus.Info("Starting as Slave Node.") + a.settingsManager.Initialize(a.storage, a.groupManager, a.configManager.IsMaster()) } // 显示配置并启动所有后台服务 @@ -137,10 +118,6 @@ func (a *App) Start() error { a.groupManager.Initialize() - a.requestLogService.Start() - a.logCleanupService.Start() - a.cronChecker.Start() - // Create HTTP server serverConfig := a.configManager.GetEffectiveServerConfig() a.httpServer = &http.Server{ @@ -174,8 +151,6 @@ func (a *App) Stop(ctx context.Context) { // 动态计算 HTTP 关机超时时间,为后台服务固定预留 5 秒 httpShutdownTimeout := totalTimeout - 5*time.Second - - // 为 HTTP 服务器的优雅关闭创建一个独立的 context httpShutdownCtx, cancelHttpShutdown := context.WithTimeout(context.Background(), httpShutdownTimeout) defer cancelHttpShutdown() @@ -190,14 +165,18 @@ func (a *App) Stop(ctx context.Context) { // 使用原始的总超时 context 继续关闭其他后台服务 stoppableServices := []func(context.Context){ - a.cronChecker.Stop, - a.leaderLock.Stop, - a.logCleanupService.Stop, - a.requestLogService.Stop, a.groupManager.Stop, a.settingsManager.Stop, } + if serverConfig.IsMaster { + stoppableServices = append(stoppableServices, + a.cronChecker.Stop, + a.logCleanupService.Stop, + a.requestLogService.Stop, + ) + } + var wg sync.WaitGroup wg.Add(len(stoppableServices)) @@ -221,7 +200,6 @@ func (a *App) Stop(ctx context.Context) { logrus.Warn("Shutdown timed out, some services may not have stopped gracefully.") } - // Step 3: Close storage connection last. if a.storage != nil { a.storage.Close() } diff --git a/internal/config/manager.go b/internal/config/manager.go index 76eedfb..d32da74 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -70,6 +70,7 @@ func (m *Manager) ReloadConfig() error { config := &Config{ Server: types.ServerConfig{ + IsMaster: !utils.ParseBoolean(os.Getenv("IS_SLAVE"), true), Port: utils.ParseInteger(os.Getenv("PORT"), 3001), Host: utils.GetEnvOrDefault("HOST", "0.0.0.0"), ReadTimeout: utils.ParseInteger(os.Getenv("SERVER_READ_TIMEOUT"), 120), @@ -111,6 +112,11 @@ func (m *Manager) ReloadConfig() error { return nil } +// IsMaster returns Server mode +func (m *Manager) IsMaster() bool { + return m.config.Server.IsMaster +} + // GetAuthConfig returns authentication configuration func (m *Manager) GetAuthConfig() types.AuthConfig { return m.config.Auth diff --git a/internal/config/system_settings.go b/internal/config/system_settings.go index 35e7b4d..7c76760 100644 --- a/internal/config/system_settings.go +++ b/internal/config/system_settings.go @@ -36,12 +36,8 @@ type groupManager interface { Invalidate() error } -type leaderLock interface { - IsLeader() bool -} - // Initialize initializes the SystemSettingsManager with database and store dependencies. -func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager, leaderLock leaderLock) error { +func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager, isMaster bool) error { settingsLoader := func() (types.SystemSettings, error) { var dbSettings []models.SystemSetting if err := db.DB.Find(&dbSettings).Error; err != nil { @@ -83,12 +79,10 @@ func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager, } afterLoader := func(newData types.SystemSettings) { - if !leaderLock.IsLeader() { + if !isMaster { return } - if err := gm.Invalidate(); err != nil { - logrus.Debugf("Failed to invalidate group manager cache after settings update: %v", err) - } + gm.Invalidate() } syncer, err := syncer.NewCacheSyncer( diff --git a/internal/container/container.go b/internal/container/container.go index f8d2567..010fe1c 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -34,9 +34,6 @@ func BuildContainer() (*dig.Container, error) { if err := container.Provide(store.NewStore); err != nil { return nil, err } - if err := container.Provide(store.NewLeaderLock); err != nil { - return nil, err - } if err := container.Provide(httpclient.NewHTTPClientManager); err != nil { return nil, err } diff --git a/internal/keypool/cron_checker.go b/internal/keypool/cron_checker.go index d3e643c..62b11d4 100644 --- a/internal/keypool/cron_checker.go +++ b/internal/keypool/cron_checker.go @@ -4,7 +4,6 @@ import ( "context" "gpt-load/internal/config" "gpt-load/internal/models" - "gpt-load/internal/store" "sync" "sync/atomic" "time" @@ -18,7 +17,6 @@ type CronChecker struct { DB *gorm.DB SettingsManager *config.SystemSettingsManager Validator *KeyValidator - LeaderLock *store.LeaderLock stopChan chan struct{} wg sync.WaitGroup } @@ -28,13 +26,11 @@ func NewCronChecker( db *gorm.DB, settingsManager *config.SystemSettingsManager, validator *KeyValidator, - leaderLock *store.LeaderLock, ) *CronChecker { return &CronChecker{ DB: db, SettingsManager: settingsManager, Validator: validator, - LeaderLock: leaderLock, stopChan: make(chan struct{}), } } @@ -68,9 +64,7 @@ func (s *CronChecker) Stop(ctx context.Context) { func (s *CronChecker) runLoop() { defer s.wg.Done() - if s.LeaderLock.IsLeader() { - s.submitValidationJobs() - } + s.submitValidationJobs() ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() @@ -78,12 +72,8 @@ func (s *CronChecker) runLoop() { for { select { case <-ticker.C: - if s.LeaderLock.IsLeader() { - logrus.Debug("CronChecker: Running as leader, submitting validation jobs.") - s.submitValidationJobs() - } else { - logrus.Debug("CronChecker: Not the leader. Standing by.") - } + logrus.Debug("CronChecker: Running as Master, submitting validation jobs.") + s.submitValidationJobs() case <-s.stopChan: return } diff --git a/internal/services/log_cleanup_service.go b/internal/services/log_cleanup_service.go index 1504b26..193691c 100644 --- a/internal/services/log_cleanup_service.go +++ b/internal/services/log_cleanup_service.go @@ -4,7 +4,6 @@ import ( "context" "gpt-load/internal/config" "gpt-load/internal/models" - "gpt-load/internal/store" "sync" "time" @@ -16,17 +15,15 @@ import ( type LogCleanupService struct { db *gorm.DB settingsManager *config.SystemSettingsManager - leaderLock *store.LeaderLock stopCh chan struct{} wg sync.WaitGroup } // NewLogCleanupService 创建新的日志清理服务 -func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsManager, leaderLock *store.LeaderLock) *LogCleanupService { +func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsManager) *LogCleanupService { return &LogCleanupService{ db: db, settingsManager: settingsManager, - leaderLock: leaderLock, stopCh: make(chan struct{}), } } @@ -77,11 +74,6 @@ func (s *LogCleanupService) run() { // cleanupExpiredLogs 清理过期的请求日志 func (s *LogCleanupService) cleanupExpiredLogs() { - if !s.leaderLock.IsLeader() { - logrus.Debug("Not the leader, skipping log cleanup.") - return - } - // 获取日志保留天数配置 settings := s.settingsManager.GetSettings() retentionDays := settings.RequestLogRetentionDays diff --git a/internal/services/request_log_service.go b/internal/services/request_log_service.go index f49d041..dd232f0 100644 --- a/internal/services/request_log_service.go +++ b/internal/services/request_log_service.go @@ -28,19 +28,17 @@ type RequestLogService struct { db *gorm.DB store store.Store settingsManager *config.SystemSettingsManager - leaderLock *store.LeaderLock stopChan chan struct{} wg sync.WaitGroup ticker *time.Ticker } // NewRequestLogService creates a new RequestLogService instance -func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *store.LeaderLock) *RequestLogService { +func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager) *RequestLogService { return &RequestLogService{ db: db, store: store, settingsManager: sm, - leaderLock: ls, stopChan: make(chan struct{}), } } @@ -133,12 +131,7 @@ func (s *RequestLogService) flush() { return } - if !s.leaderLock.IsLeader() { - logrus.Debug("Not a leader, skipping log flush.") - return - } - - logrus.Debug("Leader starting to flush request logs...") + logrus.Debug("Master starting to flush request logs...") for { keys, err := s.store.SPopN(PendingLogKeysSet, DefaultLogFlushBatchSize) diff --git a/internal/store/leader_lock.go b/internal/store/leader_lock.go deleted file mode 100644 index 427a7d8..0000000 --- a/internal/store/leader_lock.go +++ /dev/null @@ -1,248 +0,0 @@ -package store - -import ( - "context" - "crypto/rand" - "encoding/hex" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/sirupsen/logrus" -) - -const ( - leaderLockKey = "cluster:leader" - leaderLockTTL = 30 * time.Second - leaderRenewalInterval = 10 * time.Second - initializingLockKey = "cluster:initializing" - initializingLockTTL = 5 * time.Minute -) - -const renewLockScript = ` -if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("expire", KEYS[1], ARGV[2]) -else - return 0 -end` - -const releaseLockScript = ` -if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) -else - return 0 -end` - -// LeaderLock provides a mechanism for electing a single leader in a cluster. -type LeaderLock struct { - store Store - nodeID string - isLeader atomic.Bool - stopChan chan struct{} - wg sync.WaitGroup - isSingleNode bool -} - -// NewLeaderLock creates a new LeaderLock. -func NewLeaderLock(s Store) *LeaderLock { - _, isDistributed := s.(LuaScripter) - service := &LeaderLock{ - store: s, - nodeID: generateNodeID(), - stopChan: make(chan struct{}), - isSingleNode: !isDistributed, - } - if service.isSingleNode { - logrus.Debug("Running in single-node mode. Assuming leadership.") - service.isLeader.Store(true) - } else { - logrus.Debug("Running in distributed mode.") - } - return service -} - -// Start performs an initial leader election and starts the background leadership maintenance loop. -func (s *LeaderLock) Start() error { - if s.isSingleNode { - return nil - } - - if err := s.tryToBeLeader(); err != nil { - return fmt.Errorf("initial leader election failed: %w", err) - } - - s.wg.Add(1) - go s.maintainLeadershipLoop() - - return nil -} - -// Stop gracefully stops the leadership maintenance process. -func (s *LeaderLock) Stop(ctx context.Context) { - if s.isSingleNode { - return - } - close(s.stopChan) - - done := make(chan struct{}) - go func() { - s.wg.Wait() - close(done) - }() - - select { - case <-done: - logrus.Info("Leadership maintenance process stopped gracefully.") - case <-ctx.Done(): - logrus.Warn("Leadership maintenance process stop timed out.") - } - - if s.isLeader.Load() { - s.releaseLock() - } -} - -// IsLeader returns true if the current node is the leader. -func (s *LeaderLock) IsLeader() bool { - return s.isLeader.Load() -} - -// AcquireInitializingLock sets a temporary lock to indicate that initialization is in progress. -func (s *LeaderLock) AcquireInitializingLock() (bool, error) { - if !s.IsLeader() { - return false, nil - } - logrus.Debug("Leader acquiring initialization lock...") - return s.store.SetNX(initializingLockKey, []byte(s.nodeID), initializingLockTTL) -} - -// ReleaseInitializingLock removes the initialization lock. -func (s *LeaderLock) ReleaseInitializingLock() { - if !s.IsLeader() { - return - } - logrus.Debug("Leader releasing initialization lock...") - if err := s.store.Delete(initializingLockKey); err != nil { - logrus.WithError(err).Error("Failed to release initialization lock.") - } -} - -// WaitForInitializationToComplete waits until the initialization lock is released. -func (s *LeaderLock) WaitForInitializationToComplete() error { - if s.isSingleNode || s.IsLeader() { - return nil - } - - logrus.Debug("Follower waiting for leader to complete initialization...") - - time.Sleep(2 * time.Second) - - // Use a context with timeout to prevent indefinite waiting. - ctx, cancel := context.WithTimeout(context.Background(), initializingLockTTL+1*time.Minute) - defer cancel() - - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - - // Initial check before starting the loop - exists, err := s.store.Exists(initializingLockKey) - if err != nil { - logrus.WithError(err).Warn("Initial check for initialization lock failed, will proceed to loop.") - } else if !exists { - logrus.Debug("Initialization lock not found on initial check. Assuming initialization is complete.") - return nil - } - - for { - select { - case <-ctx.Done(): - return fmt.Errorf("timed out waiting for leader initialization after %v", initializingLockTTL) - case <-ticker.C: - exists, err := s.store.Exists(initializingLockKey) - if err != nil { - logrus.WithError(err).Warn("Error checking initialization lock, will retry...") - continue - } - if !exists { - logrus.Debug("Initialization lock released. Follower proceeding with startup.") - return nil - } - } - } -} - -// maintainLeadershipLoop is the background process that keeps trying to acquire or renew the lock. -func (s *LeaderLock) maintainLeadershipLoop() { - defer s.wg.Done() - ticker := time.NewTicker(leaderRenewalInterval) - defer ticker.Stop() - - logrus.Debug("Leadership maintenance loop started.") - for { - select { - case <-ticker.C: - if err := s.tryToBeLeader(); err != nil { - logrus.WithError(err).Warn("Error during leadership maintenance cycle.") - } - case <-s.stopChan: - return - } - } -} - -// tryToBeLeader is an idempotent function that attempts to acquire or renew the lock. -func (s *LeaderLock) tryToBeLeader() error { - if s.isLeader.Load() { - err := s.renewLock() - if err != nil { - logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.") - s.isLeader.Store(false) - } - return nil - } - - acquired, err := s.acquireLock() - if err != nil { - return fmt.Errorf("failed to acquire lock: %w", err) - } - if acquired { - logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leadership.") - s.isLeader.Store(true) - } - return nil -} - -func (s *LeaderLock) acquireLock() (bool, error) { - return s.store.SetNX(leaderLockKey, []byte(s.nodeID), leaderLockTTL) -} - -func (s *LeaderLock) renewLock() error { - luaStore := s.store.(LuaScripter) - ttlSeconds := int(leaderLockTTL.Seconds()) - res, err := luaStore.Eval(renewLockScript, []string{leaderLockKey}, s.nodeID, ttlSeconds) - if err != nil { - return err - } - if i, ok := res.(int64); !ok || i == 0 { - return fmt.Errorf("failed to renew lock, another node may have taken over") - } - return nil -} - -func (s *LeaderLock) releaseLock() { - luaStore := s.store.(LuaScripter) - if _, err := luaStore.Eval(releaseLockScript, []string{leaderLockKey}, s.nodeID); err != nil { - logrus.WithError(err).Error("Failed to release leader lock on shutdown.") - } else { - logrus.Info("Successfully released leader lock.") - } -} - -func generateNodeID() string { - bytes := make([]byte, 16) - if _, err := rand.Read(bytes); err != nil { - return "node-" + time.Now().Format(time.RFC3339Nano) - } - return hex.EncodeToString(bytes) -} diff --git a/internal/store/redis.go b/internal/store/redis.go index 36813c2..2e82a12 100644 --- a/internal/store/redis.go +++ b/internal/store/redis.go @@ -138,11 +138,6 @@ func (s *RedisStore) Pipeline() Pipeliner { } } -// Eval executes a Lua script on Redis. -func (s *RedisStore) Eval(script string, keys []string, args ...any) (any, error) { - return s.client.Eval(context.Background(), script, keys, args...).Result() -} - // --- Pub/Sub operations --- // redisSubscription wraps the redis.PubSub to implement the Subscription interface. diff --git a/internal/store/store.go b/internal/store/store.go index 9a045e6..cbb8d60 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -74,8 +74,3 @@ type Pipeliner interface { type RedisPipeliner interface { Pipeline() Pipeliner } - -// LuaScripter is an optional interface that a Store can implement to provide Lua script execution. -type LuaScripter interface { - Eval(script string, keys []string, args ...any) (any, error) -} diff --git a/internal/types/types.go b/internal/types/types.go index f86483a..556e5f2 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -2,6 +2,7 @@ package types // ConfigManager defines the interface for configuration management type ConfigManager interface { + IsMaster() bool GetAuthConfig() AuthConfig GetCORSConfig() CORSConfig GetPerformanceConfig() PerformanceConfig @@ -41,6 +42,7 @@ type SystemSettings struct { type ServerConfig struct { Port int `json:"port"` Host string `json:"host"` + IsMaster bool `json:"is_master"` ReadTimeout int `json:"read_timeout"` WriteTimeout int `json:"write_timeout"` IdleTimeout int `json:"idle_timeout"`