From 2d669b7ece0798119a10e02df9c283f92535cd2b Mon Sep 17 00:00:00 2001 From: tbphp Date: Tue, 8 Jul 2025 22:02:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81-?= =?UTF-8?q?=E6=95=85=E9=9A=9C=E8=BD=AC=E7=A7=BB=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/app.go | 5 +- internal/services/leader_service.go | 82 ++++++++++++++++------------- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 6b14611..17042d7 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -77,9 +77,8 @@ func NewApp(params AppParams) *App { // Start runs the application, it is a non-blocking call. func (a *App) Start() error { - // Perform leader election first. This is a blocking call. - if err := a.leaderService.ElectLeader(); err != nil { - return fmt.Errorf("leader election failed: %w", err) + if err := a.leaderService.Start(); err != nil { + return fmt.Errorf("leader service failed to start: %w", err) } if a.leaderService.IsLeader() { diff --git a/internal/services/leader_service.go b/internal/services/leader_service.go index 00f6230..e0bed94 100644 --- a/internal/services/leader_service.go +++ b/internal/services/leader_service.go @@ -17,7 +17,6 @@ const ( leaderLockKey = "cluster:leader" leaderLockTTL = 30 * time.Second leaderRenewalInterval = 10 * time.Second - electionTimeout = 15 * time.Second ) const renewLockScript = ` @@ -54,51 +53,45 @@ func NewLeaderService(s store.Store) *LeaderService { isSingleNode: !isDistributed, } if service.isSingleNode { - logrus.Info("Running in single-node mode.") + logrus.Debug("Running in single-node mode. Assuming leadership.") service.isLeader.Store(true) } else { - logrus.Info("Running in distributed mode.") + logrus.Debug("Running in distributed mode.") } return service } -// ElectLeader attempts to become the cluster leader. This is a blocking call. -func (s *LeaderService) ElectLeader() error { +// Start performs an initial leader election and starts the background leadership maintenance loop. +func (s *LeaderService) Start() error { if s.isSingleNode { - logrus.Info("In single-node mode, leadership is assumed. Skipping election.") + logrus.Info("In single-node mode, leadership is assumed. Skipping election process.") return nil } - logrus.WithField("nodeID", s.nodeID).Debug("Attempting to acquire leadership...") - - acquired, err := s.acquireLock() - if err != nil { - return fmt.Errorf("failed to acquire leader lock: %w", err) + logrus.WithField("nodeID", s.nodeID).Info("Performing initial leader election...") + if err := s.tryToBeLeader(); err != nil { + return fmt.Errorf("initial leader election failed: %w", err) } - if acquired { - logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leadership. Starting renewal process.") - s.isLeader.Store(true) - s.wg.Add(1) - go s.renewalLoop() - } else { - logrus.WithField("nodeID", s.nodeID).Info("Another node is already the leader.") - s.isLeader.Store(false) - } + s.wg.Add(1) + go s.maintainLeadershipLoop() return nil } -// Stop gracefully stops the leader renewal process if this node is the leader. +// Stop gracefully stops the leadership maintenance process. func (s *LeaderService) Stop() { - if s.isSingleNode || !s.isLeader.Load() { + if s.isSingleNode { return } - logrus.Info("Stopping leader renewal process...") + logrus.Info("Stopping leadership maintenance process...") close(s.stopChan) s.wg.Wait() - s.releaseLock() - logrus.Info("Leader renewal process stopped.") + + if s.isLeader.Load() { + s.releaseLock() + } + logrus.Info("Leadership maintenance process stopped.") } // IsLeader returns true if the current node is the leader. @@ -106,28 +99,48 @@ func (s *LeaderService) IsLeader() bool { return s.isLeader.Load() } -// renewalLoop is the background process that keeps the leader lock alive. -func (s *LeaderService) renewalLoop() { +// maintainLeadershipLoop is the background process that keeps trying to acquire or renew the lock. +func (s *LeaderService) maintainLeadershipLoop() { defer s.wg.Done() ticker := time.NewTicker(leaderRenewalInterval) defer ticker.Stop() + logrus.Info("Leadership maintenance loop started.") for { select { case <-ticker.C: - if err := s.renewLock(); err != nil { - logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.") - s.isLeader.Store(false) - return + if err := s.tryToBeLeader(); err != nil { + logrus.WithError(err).Warn("Error during leadership maintenance cycle.") } - logrus.Debug("Successfully renewed leader lock.") case <-s.stopChan: - logrus.Info("Leader renewal loop stopping.") + logrus.Info("Leadership maintenance loop stopping.") return } } } +// tryToBeLeader is an idempotent function that attempts to acquire or renew the lock. +func (s *LeaderService) tryToBeLeader() error { + if s.isLeader.Load() { + err := s.renewLock() + if err != nil { + logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.") + s.isLeader.Store(false) + } + return nil + } + + acquired, err := s.acquireLock() + if err != nil { + return fmt.Errorf("failed to acquire lock: %w", err) + } + if acquired { + logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leadership.") + s.isLeader.Store(true) + } + return nil +} + func (s *LeaderService) acquireLock() (bool, error) { return s.store.SetNX(leaderLockKey, []byte(s.nodeID), leaderLockTTL) } @@ -146,9 +159,6 @@ func (s *LeaderService) renewLock() error { } func (s *LeaderService) releaseLock() { - if !s.isLeader.Load() { - return - } luaStore := s.store.(store.LuaScripter) if _, err := luaStore.Eval(releaseLockScript, []string{leaderLockKey}, s.nodeID); err != nil { logrus.WithError(err).Error("Failed to release leader lock on shutdown.")