From 6e7042a97d930a5cd055976b97a5bc2eaa56fb1c Mon Sep 17 00:00:00 2001 From: tbphp Date: Wed, 9 Jul 2025 10:18:17 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E9=A2=86=E5=AF=BC?= =?UTF-8?q?=E8=80=85=E6=A8=A1=E5=BC=8F-=E4=BB=8E=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E7=AD=89=E5=BE=85=E5=88=9D=E5=A7=8B=E5=8C=96=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/app.go | 71 ++++++++++++++++++----------- internal/services/leader_service.go | 67 +++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 26 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 64ecdde..4938dd9 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -82,45 +82,64 @@ func (a *App) Start() error { return fmt.Errorf("leader service failed to start: %w", err) } - // 2. Leader 节点执行不依赖配置的“写”操作 + // 2. Leader 节点执行初始化,Follower 节点等待 if a.leaderService.IsLeader() { logrus.Info("Leader mode. Performing initial one-time tasks...") - - // 2.1. 数据库迁移 - if err := a.db.AutoMigrate( - &models.RequestLog{}, - &models.APIKey{}, - &models.SystemSetting{}, - &models.Group{}, - ); err != nil { - return fmt.Errorf("database auto-migration failed: %w", err) + acquired, err := a.leaderService.AcquireInitializingLock() + if err != nil { + return fmt.Errorf("failed to acquire initializing lock: %w", err) } - logrus.Info("Database auto-migration completed.") + if !acquired { + logrus.Warn("Could not acquire initializing lock, another leader might be active. Switching to follower mode for initialization.") + if err := a.leaderService.WaitForInitializationToComplete(); err != nil { + return fmt.Errorf("failed to wait for initialization as a fallback follower: %w", err) + } + } else { + // Release the lock when the initialization is done. + defer a.leaderService.ReleaseInitializingLock() - // 2.2. 初始化系统设置 - if err := a.settingsManager.EnsureSettingsInitialized(); err != nil { - return fmt.Errorf("failed to initialize system settings: %w", err) + // 2.1. 数据库迁移 + if err := a.db.AutoMigrate( + &models.RequestLog{}, + &models.APIKey{}, + &models.SystemSetting{}, + &models.Group{}, + ); err != nil { + return fmt.Errorf("database auto-migration failed: %w", err) + } + logrus.Info("Database auto-migration completed.") + + // 2.2. 初始化系统设置 + if err := a.settingsManager.EnsureSettingsInitialized(); err != nil { + return fmt.Errorf("failed to initialize system settings: %w", err) + } + logrus.Info("System settings initialized in DB.") + + // 2.3. 加载配置到内存 (Leader 先行) + if err := a.settingsManager.LoadFromDatabase(); err != nil { + return fmt.Errorf("leader failed to load system settings from database: %w", err) + } + logrus.Info("System settings loaded into memory by leader.") + + // 2.4. 从数据库加载密钥到 Redis + if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil { + return fmt.Errorf("failed to load keys into key pool: %w", err) + } + logrus.Info("API keys loaded into Redis cache by leader.") } - logrus.Info("System settings initialized in DB.") } else { - logrus.Info("Follower Mode. Skipping initial one-time tasks.") + logrus.Info("Follower Mode. Waiting for leader to complete initialization.") + if err := a.leaderService.WaitForInitializationToComplete(); err != nil { + return fmt.Errorf("follower failed to start: %w", err) + } } - // 3. 所有节点从数据库加载配置到内存 + // 3. 所有节点加载或重新加载配置以确保一致性 if err := a.settingsManager.LoadFromDatabase(); err != nil { return fmt.Errorf("failed to load system settings from database: %w", err) } logrus.Info("System settings loaded into memory.") - // 4. Leader 节点执行依赖配置的“写”操作 - if a.leaderService.IsLeader() { - // 4.1. 从数据库加载密钥到 Redis - if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil { - return fmt.Errorf("failed to load keys into key pool: %w", err) - } - logrus.Info("API keys loaded into Redis cache by leader.") - } - // 5. 显示配置并启动所有后台服务 a.settingsManager.DisplayCurrentSettings() a.configManager.DisplayConfig() diff --git a/internal/services/leader_service.go b/internal/services/leader_service.go index e51be58..0b0df5c 100644 --- a/internal/services/leader_service.go +++ b/internal/services/leader_service.go @@ -1,6 +1,7 @@ package services import ( + "context" "crypto/rand" "encoding/hex" "fmt" @@ -17,6 +18,8 @@ const ( leaderLockKey = "cluster:leader" leaderLockTTL = 30 * time.Second leaderRenewalInterval = 10 * time.Second + initializingLockKey = "cluster:initializing" + initializingLockTTL = 5 * time.Minute ) const renewLockScript = ` @@ -97,6 +100,70 @@ func (s *LeaderService) IsLeader() bool { return s.isLeader.Load() } +// AcquireInitializingLock sets a temporary lock to indicate that initialization is in progress. +func (s *LeaderService) AcquireInitializingLock() (bool, error) { + if !s.IsLeader() { + return false, nil + } + logrus.Debug("Leader acquiring initialization lock...") + return s.store.SetNX(initializingLockKey, []byte(s.nodeID), initializingLockTTL) +} + +// ReleaseInitializingLock removes the initialization lock. +func (s *LeaderService) ReleaseInitializingLock() { + if !s.IsLeader() { + return + } + logrus.Debug("Leader releasing initialization lock...") + if err := s.store.Delete(initializingLockKey); err != nil { + logrus.WithError(err).Error("Failed to release initialization lock.") + } +} + +// WaitForInitializationToComplete waits until the initialization lock is released. +func (s *LeaderService) WaitForInitializationToComplete() error { + if s.isSingleNode || s.IsLeader() { + return nil + } + + logrus.Debug("Follower waiting for leader to complete initialization...") + + time.Sleep(2 * time.Second) + + // Use a context with timeout to prevent indefinite waiting. + ctx, cancel := context.WithTimeout(context.Background(), initializingLockTTL+1*time.Minute) + defer cancel() + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + // Initial check before starting the loop + exists, err := s.store.Exists(initializingLockKey) + if err != nil { + logrus.WithError(err).Warn("Initial check for initialization lock failed, will proceed to loop.") + } else if !exists { + logrus.Debug("Initialization lock not found on initial check. Assuming initialization is complete.") + return nil + } + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timed out waiting for leader initialization after %v", initializingLockTTL) + case <-ticker.C: + exists, err := s.store.Exists(initializingLockKey) + if err != nil { + logrus.WithError(err).Warn("Error checking initialization lock, will retry...") + continue + } + if !exists { + logrus.Debug("Initialization lock released. Follower proceeding with startup.") + return nil + } + } + } +} + // maintainLeadershipLoop is the background process that keeps trying to acquire or renew the lock. func (s *LeaderService) maintainLeadershipLoop() { defer s.wg.Done()