diff --git a/internal/app/app.go b/internal/app/app.go index ece94b5..6b14611 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -28,10 +28,10 @@ type App struct { configManager types.ConfigManager settingsManager *config.SystemSettingsManager logCleanupService *services.LogCleanupService - keyCronService *services.KeyCronService - keyValidationPool *services.KeyValidationPool - keyPoolProvider *keypool.KeyProvider - leaderService *services.LeaderService + keyCronService *services.KeyCronService + keyValidationPool *services.KeyValidationPool + keyPoolProvider *keypool.KeyProvider + leaderService *services.LeaderService proxyServer *proxy.ProxyServer storage store.Store db *gorm.DB @@ -46,11 +46,11 @@ type AppParams struct { Engine *gin.Engine ConfigManager types.ConfigManager SettingsManager *config.SystemSettingsManager - LogCleanupService *services.LogCleanupService - KeyCronService *services.KeyCronService - KeyValidationPool *services.KeyValidationPool - KeyPoolProvider *keypool.KeyProvider - LeaderService *services.LeaderService + LogCleanupService *services.LogCleanupService + KeyCronService *services.KeyCronService + KeyValidationPool *services.KeyValidationPool + KeyPoolProvider *keypool.KeyProvider + LeaderService *services.LeaderService ProxyServer *proxy.ProxyServer Storage store.Store DB *gorm.DB @@ -63,12 +63,12 @@ func NewApp(params AppParams) *App { engine: params.Engine, configManager: params.ConfigManager, settingsManager: params.SettingsManager, - logCleanupService: params.LogCleanupService, - keyCronService: params.KeyCronService, - keyValidationPool: params.KeyValidationPool, - keyPoolProvider: params.KeyPoolProvider, - leaderService: params.LeaderService, - proxyServer: params.ProxyServer, + logCleanupService: params.LogCleanupService, + keyCronService: params.KeyCronService, + keyValidationPool: params.KeyValidationPool, + keyPoolProvider: params.KeyPoolProvider, + leaderService: params.LeaderService, + proxyServer: params.ProxyServer, storage: params.Storage, db: params.DB, requestLogChan: params.RequestLogChan, @@ -77,13 +77,21 @@ func NewApp(params AppParams) *App { // Start runs the application, it is a non-blocking call. func (a *App) Start() error { - // Initialize system settings - if err := a.settingsManager.InitializeSystemSettings(); err != nil { - return fmt.Errorf("failed to initialize system settings: %w", err) + // Perform leader election first. This is a blocking call. + if err := a.leaderService.ElectLeader(); err != nil { + return fmt.Errorf("leader election failed: %w", err) } - logrus.Info("System settings initialized") - logrus.Info("Loading API keys into the key pool...") + if a.leaderService.IsLeader() { + if err := a.settingsManager.InitializeSystemSettings(); err != nil { + return fmt.Errorf("failed to initialize system settings: %w", err) + } + logrus.Info("System settings initialized by leader.") + } else { + logrus.Info("This node is not the leader. Skipping leader-only initialization tasks.") + } + + logrus.Debug("Loading API keys into the key pool...") if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil { return fmt.Errorf("failed to load keys into key pool: %w", err) } @@ -93,7 +101,6 @@ func (a *App) Start() error { // Start background services a.startRequestLogger() a.logCleanupService.Start() - a.leaderService.Start() a.keyValidationPool.Start() a.keyCronService.Start() @@ -105,7 +112,7 @@ func (a *App) Start() error { ReadTimeout: time.Duration(serverConfig.ReadTimeout) * time.Second, WriteTimeout: time.Duration(serverConfig.WriteTimeout) * time.Second, IdleTimeout: time.Duration(serverConfig.IdleTimeout) * time.Second, - MaxHeaderBytes: 1 << 20, // 1MB header limit + MaxHeaderBytes: 1 << 20, } // Start HTTP server in a new goroutine diff --git a/internal/config/system_settings.go b/internal/config/system_settings.go index 94a0a53..7a377e7 100644 --- a/internal/config/system_settings.go +++ b/internal/config/system_settings.go @@ -197,7 +197,7 @@ func (sm *SystemSettingsManager) LoadFromDatabase() error { sm.settings = DefaultSystemSettings() sm.mapToStruct(settingsMap, &sm.settings) - logrus.Info("System settings loaded from database") + logrus.Debug("System settings loaded from database") return nil } diff --git a/internal/services/leader_service.go b/internal/services/leader_service.go index e43824b..00f6230 100644 --- a/internal/services/leader_service.go +++ b/internal/services/leader_service.go @@ -3,6 +3,7 @@ package services import ( "crypto/rand" "encoding/hex" + "fmt" "sync" "sync/atomic" "time" @@ -16,10 +17,9 @@ const ( leaderLockKey = "cluster:leader" leaderLockTTL = 30 * time.Second leaderRenewalInterval = 10 * time.Second + electionTimeout = 15 * time.Second ) -// Lua script for atomic lock renewal. -// KEYS[1]: lock key, ARGV[1]: node ID, ARGV[2]: TTL in seconds. const renewLockScript = ` if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("expire", KEYS[1], ARGV[2]) @@ -27,8 +27,6 @@ else return 0 end` -// Lua script for atomic lock release. -// KEYS[1]: lock key, ARGV[1]: node ID. const releaseLockScript = ` if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) @@ -38,136 +36,120 @@ end` // LeaderService provides a mechanism for electing a single leader in a cluster. type LeaderService struct { - store store.Store - nodeID string - isLeader atomic.Bool - stopChan chan struct{} - wg sync.WaitGroup - isSingleNode bool - firstElectionDone chan struct{} - firstElectionOnce sync.Once + store store.Store + nodeID string + isLeader atomic.Bool + stopChan chan struct{} + wg sync.WaitGroup + isSingleNode bool } // NewLeaderService creates a new LeaderService. func NewLeaderService(s store.Store) *LeaderService { - // Check if the store supports Lua scripting to determine if we are in a distributed environment. _, isDistributed := s.(store.LuaScripter) - service := &LeaderService{ - store: s, - nodeID: generateNodeID(), - stopChan: make(chan struct{}), - isSingleNode: !isDistributed, - firstElectionDone: make(chan struct{}), + store: s, + nodeID: generateNodeID(), + stopChan: make(chan struct{}), + isSingleNode: !isDistributed, } - if service.isSingleNode { - logrus.Info("Store does not support Lua, running in single-node mode. Assuming leadership.") + logrus.Info("Running in single-node mode.") service.isLeader.Store(true) - close(service.firstElectionDone) } else { - logrus.Info("Store supports Lua, running in distributed mode.") + logrus.Info("Running in distributed mode.") } - return service } -// Start begins the leader election process. -func (s *LeaderService) Start() { +// ElectLeader attempts to become the cluster leader. This is a blocking call. +func (s *LeaderService) ElectLeader() error { if s.isSingleNode { - return + logrus.Info("In single-node mode, leadership is assumed. Skipping election.") + return nil } - s.wg.Add(1) - go s.electionLoop() + + logrus.WithField("nodeID", s.nodeID).Debug("Attempting to acquire leadership...") + + acquired, err := s.acquireLock() + if err != nil { + return fmt.Errorf("failed to acquire leader lock: %w", err) + } + + if acquired { + logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leadership. Starting renewal process.") + s.isLeader.Store(true) + s.wg.Add(1) + go s.renewalLoop() + } else { + logrus.WithField("nodeID", s.nodeID).Info("Another node is already the leader.") + s.isLeader.Store(false) + } + + return nil } -// Stop gracefully stops the leader election process. +// Stop gracefully stops the leader renewal process if this node is the leader. func (s *LeaderService) Stop() { - if s.isSingleNode { + if s.isSingleNode || !s.isLeader.Load() { return } + logrus.Info("Stopping leader renewal process...") close(s.stopChan) s.wg.Wait() + s.releaseLock() + logrus.Info("Leader renewal process stopped.") } // IsLeader returns true if the current node is the leader. -// In distributed mode, this call will block until the first election attempt is complete. func (s *LeaderService) IsLeader() bool { - <-s.firstElectionDone return s.isLeader.Load() } -func (s *LeaderService) electionLoop() { +// renewalLoop is the background process that keeps the leader lock alive. +func (s *LeaderService) renewalLoop() { defer s.wg.Done() - logrus.WithField("nodeID", s.nodeID).Info("Starting leader election loop...") - - // Attempt to acquire leadership immediately on start. - s.tryToBeLeader() - ticker := time.NewTicker(leaderRenewalInterval) defer ticker.Stop() for { select { case <-ticker.C: - s.tryToBeLeader() - case <-s.stopChan: - logrus.Info("Stopping leader election loop...") - if s.isLeader.Load() { - s.releaseLock() + if err := s.renewLock(); err != nil { + logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.") + s.isLeader.Store(false) + return } + logrus.Debug("Successfully renewed leader lock.") + case <-s.stopChan: + logrus.Info("Leader renewal loop stopping.") return } } } -func (s *LeaderService) tryToBeLeader() { - defer s.firstElectionOnce.Do(func() { - close(s.firstElectionDone) - }) - - if s.isLeader.Load() { - if err := s.renewLock(); err != nil { - logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.") - s.isLeader.Store(false) - } - return - } - - acquired, err := s.acquireLock() - if err != nil { - logrus.WithError(err).Error("Error trying to acquire leader lock.") - s.isLeader.Store(false) - return - } - - if acquired { - logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leader lock.") - s.isLeader.Store(true) - } -} - func (s *LeaderService) acquireLock() (bool, error) { return s.store.SetNX(leaderLockKey, []byte(s.nodeID), leaderLockTTL) } func (s *LeaderService) renewLock() error { - luaStore := s.store.(store.LuaScripter) // Already checked in NewLeaderService + luaStore := s.store.(store.LuaScripter) ttlSeconds := int(leaderLockTTL.Seconds()) - res, err := luaStore.Eval(renewLockScript, []string{leaderLockKey}, s.nodeID, ttlSeconds) if err != nil { return err } - if i, ok := res.(int64); !ok || i == 0 { - return store.ErrNotFound // Not our lock anymore + return fmt.Errorf("failed to renew lock, another node may have taken over") } return nil } func (s *LeaderService) releaseLock() { - luaStore := s.store.(store.LuaScripter) // Already checked in NewLeaderService + if !s.isLeader.Load() { + return + } + luaStore := s.store.(store.LuaScripter) if _, err := luaStore.Eval(releaseLockScript, []string{leaderLockKey}, s.nodeID); err != nil { logrus.WithError(err).Error("Failed to release leader lock on shutdown.") } else { diff --git a/internal/store/factory.go b/internal/store/factory.go index 63bdf66..9980b56 100644 --- a/internal/store/factory.go +++ b/internal/store/factory.go @@ -10,19 +10,15 @@ import ( ) // NewStore creates a new store based on the application configuration. -// It prioritizes Redis if a DSN is provided, otherwise it falls back to an in-memory store. func NewStore(cfg types.ConfigManager) (Store, error) { redisDSN := cfg.GetRedisDSN() - // Prioritize Redis if configured if redisDSN != "" { - logrus.Info("Redis DSN found, initializing Redis store...") opts, err := redis.ParseURL(redisDSN) if err != nil { return nil, fmt.Errorf("failed to parse redis DSN: %w", err) } client := redis.NewClient(opts) - // Ping the server to ensure a connection is established. if err := client.Ping(context.Background()).Err(); err != nil { return nil, fmt.Errorf("failed to connect to redis: %w", err) } @@ -31,7 +27,6 @@ func NewStore(cfg types.ConfigManager) (Store, error) { return NewRedisStore(client), nil } - // Fallback to in-memory store logrus.Info("Redis DSN not configured, falling back to in-memory store.") return NewMemoryStore(), nil }