feat: 优化领导者模式-从节点等待初始化完成

This commit is contained in:
tbphp
2025-07-09 10:18:17 +08:00
parent 234731d826
commit 6e7042a97d
2 changed files with 112 additions and 26 deletions

View File

@@ -82,45 +82,64 @@ func (a *App) Start() error {
return fmt.Errorf("leader service failed to start: %w", err)
}
// 2. Leader 节点执行不依赖配置的“写”操作
// 2. Leader 节点执行初始化Follower 节点等待
if a.leaderService.IsLeader() {
logrus.Info("Leader mode. Performing initial one-time tasks...")
// 2.1. 数据库迁移
if err := a.db.AutoMigrate(
&models.RequestLog{},
&models.APIKey{},
&models.SystemSetting{},
&models.Group{},
); err != nil {
return fmt.Errorf("database auto-migration failed: %w", err)
acquired, err := a.leaderService.AcquireInitializingLock()
if err != nil {
return fmt.Errorf("failed to acquire initializing lock: %w", err)
}
logrus.Info("Database auto-migration completed.")
if !acquired {
logrus.Warn("Could not acquire initializing lock, another leader might be active. Switching to follower mode for initialization.")
if err := a.leaderService.WaitForInitializationToComplete(); err != nil {
return fmt.Errorf("failed to wait for initialization as a fallback follower: %w", err)
}
} else {
// Release the lock when the initialization is done.
defer a.leaderService.ReleaseInitializingLock()
// 2.2. 初始化系统设置
if err := a.settingsManager.EnsureSettingsInitialized(); err != nil {
return fmt.Errorf("failed to initialize system settings: %w", err)
// 2.1. 数据库迁移
if err := a.db.AutoMigrate(
&models.RequestLog{},
&models.APIKey{},
&models.SystemSetting{},
&models.Group{},
); err != nil {
return fmt.Errorf("database auto-migration failed: %w", err)
}
logrus.Info("Database auto-migration completed.")
// 2.2. 初始化系统设置
if err := a.settingsManager.EnsureSettingsInitialized(); err != nil {
return fmt.Errorf("failed to initialize system settings: %w", err)
}
logrus.Info("System settings initialized in DB.")
// 2.3. 加载配置到内存 (Leader 先行)
if err := a.settingsManager.LoadFromDatabase(); err != nil {
return fmt.Errorf("leader failed to load system settings from database: %w", err)
}
logrus.Info("System settings loaded into memory by leader.")
// 2.4. 从数据库加载密钥到 Redis
if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil {
return fmt.Errorf("failed to load keys into key pool: %w", err)
}
logrus.Info("API keys loaded into Redis cache by leader.")
}
logrus.Info("System settings initialized in DB.")
} else {
logrus.Info("Follower Mode. Skipping initial one-time tasks.")
logrus.Info("Follower Mode. Waiting for leader to complete initialization.")
if err := a.leaderService.WaitForInitializationToComplete(); err != nil {
return fmt.Errorf("follower failed to start: %w", err)
}
}
// 3. 所有节点从数据库加载配置到内存
// 3. 所有节点加载或重新加载配置以确保一致性
if err := a.settingsManager.LoadFromDatabase(); err != nil {
return fmt.Errorf("failed to load system settings from database: %w", err)
}
logrus.Info("System settings loaded into memory.")
// 4. Leader 节点执行依赖配置的“写”操作
if a.leaderService.IsLeader() {
// 4.1. 从数据库加载密钥到 Redis
if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil {
return fmt.Errorf("failed to load keys into key pool: %w", err)
}
logrus.Info("API keys loaded into Redis cache by leader.")
}
// 5. 显示配置并启动所有后台服务
a.settingsManager.DisplayCurrentSettings()
a.configManager.DisplayConfig()

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
@@ -17,6 +18,8 @@ const (
leaderLockKey = "cluster:leader"
leaderLockTTL = 30 * time.Second
leaderRenewalInterval = 10 * time.Second
initializingLockKey = "cluster:initializing"
initializingLockTTL = 5 * time.Minute
)
const renewLockScript = `
@@ -97,6 +100,70 @@ func (s *LeaderService) IsLeader() bool {
return s.isLeader.Load()
}
// AcquireInitializingLock sets a temporary lock to indicate that initialization is in progress.
func (s *LeaderService) AcquireInitializingLock() (bool, error) {
if !s.IsLeader() {
return false, nil
}
logrus.Debug("Leader acquiring initialization lock...")
return s.store.SetNX(initializingLockKey, []byte(s.nodeID), initializingLockTTL)
}
// ReleaseInitializingLock removes the initialization lock.
func (s *LeaderService) ReleaseInitializingLock() {
if !s.IsLeader() {
return
}
logrus.Debug("Leader releasing initialization lock...")
if err := s.store.Delete(initializingLockKey); err != nil {
logrus.WithError(err).Error("Failed to release initialization lock.")
}
}
// WaitForInitializationToComplete waits until the initialization lock is released.
func (s *LeaderService) WaitForInitializationToComplete() error {
if s.isSingleNode || s.IsLeader() {
return nil
}
logrus.Debug("Follower waiting for leader to complete initialization...")
time.Sleep(2 * time.Second)
// Use a context with timeout to prevent indefinite waiting.
ctx, cancel := context.WithTimeout(context.Background(), initializingLockTTL+1*time.Minute)
defer cancel()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
// Initial check before starting the loop
exists, err := s.store.Exists(initializingLockKey)
if err != nil {
logrus.WithError(err).Warn("Initial check for initialization lock failed, will proceed to loop.")
} else if !exists {
logrus.Debug("Initialization lock not found on initial check. Assuming initialization is complete.")
return nil
}
for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out waiting for leader initialization after %v", initializingLockTTL)
case <-ticker.C:
exists, err := s.store.Exists(initializingLockKey)
if err != nil {
logrus.WithError(err).Warn("Error checking initialization lock, will retry...")
continue
}
if !exists {
logrus.Debug("Initialization lock released. Follower proceeding with startup.")
return nil
}
}
}
}
// maintainLeadershipLoop is the background process that keeps trying to acquire or renew the lock.
func (s *LeaderService) maintainLeadershipLoop() {
defer s.wg.Done()