feat: 分布式锁-故障转移机制

This commit is contained in:
tbphp
2025-07-08 22:02:39 +08:00
parent a159debd77
commit 2d669b7ece
2 changed files with 48 additions and 39 deletions

View File

@@ -17,7 +17,6 @@ const (
leaderLockKey = "cluster:leader"
leaderLockTTL = 30 * time.Second
leaderRenewalInterval = 10 * time.Second
electionTimeout = 15 * time.Second
)
const renewLockScript = `
@@ -54,51 +53,45 @@ func NewLeaderService(s store.Store) *LeaderService {
isSingleNode: !isDistributed,
}
if service.isSingleNode {
logrus.Info("Running in single-node mode.")
logrus.Debug("Running in single-node mode. Assuming leadership.")
service.isLeader.Store(true)
} else {
logrus.Info("Running in distributed mode.")
logrus.Debug("Running in distributed mode.")
}
return service
}
// ElectLeader attempts to become the cluster leader. This is a blocking call.
func (s *LeaderService) ElectLeader() error {
// Start performs an initial leader election and starts the background leadership maintenance loop.
func (s *LeaderService) Start() error {
if s.isSingleNode {
logrus.Info("In single-node mode, leadership is assumed. Skipping election.")
logrus.Info("In single-node mode, leadership is assumed. Skipping election process.")
return nil
}
logrus.WithField("nodeID", s.nodeID).Debug("Attempting to acquire leadership...")
acquired, err := s.acquireLock()
if err != nil {
return fmt.Errorf("failed to acquire leader lock: %w", err)
logrus.WithField("nodeID", s.nodeID).Info("Performing initial leader election...")
if err := s.tryToBeLeader(); err != nil {
return fmt.Errorf("initial leader election failed: %w", err)
}
if acquired {
logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leadership. Starting renewal process.")
s.isLeader.Store(true)
s.wg.Add(1)
go s.renewalLoop()
} else {
logrus.WithField("nodeID", s.nodeID).Info("Another node is already the leader.")
s.isLeader.Store(false)
}
s.wg.Add(1)
go s.maintainLeadershipLoop()
return nil
}
// Stop gracefully stops the leader renewal process if this node is the leader.
// Stop gracefully stops the leadership maintenance process.
func (s *LeaderService) Stop() {
if s.isSingleNode || !s.isLeader.Load() {
if s.isSingleNode {
return
}
logrus.Info("Stopping leader renewal process...")
logrus.Info("Stopping leadership maintenance process...")
close(s.stopChan)
s.wg.Wait()
s.releaseLock()
logrus.Info("Leader renewal process stopped.")
if s.isLeader.Load() {
s.releaseLock()
}
logrus.Info("Leadership maintenance process stopped.")
}
// IsLeader returns true if the current node is the leader.
@@ -106,28 +99,48 @@ func (s *LeaderService) IsLeader() bool {
return s.isLeader.Load()
}
// renewalLoop is the background process that keeps the leader lock alive.
func (s *LeaderService) renewalLoop() {
// maintainLeadershipLoop is the background process that keeps trying to acquire or renew the lock.
func (s *LeaderService) maintainLeadershipLoop() {
defer s.wg.Done()
ticker := time.NewTicker(leaderRenewalInterval)
defer ticker.Stop()
logrus.Info("Leadership maintenance loop started.")
for {
select {
case <-ticker.C:
if err := s.renewLock(); err != nil {
logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.")
s.isLeader.Store(false)
return
if err := s.tryToBeLeader(); err != nil {
logrus.WithError(err).Warn("Error during leadership maintenance cycle.")
}
logrus.Debug("Successfully renewed leader lock.")
case <-s.stopChan:
logrus.Info("Leader renewal loop stopping.")
logrus.Info("Leadership maintenance loop stopping.")
return
}
}
}
// tryToBeLeader is an idempotent function that attempts to acquire or renew the lock.
func (s *LeaderService) tryToBeLeader() error {
if s.isLeader.Load() {
err := s.renewLock()
if err != nil {
logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.")
s.isLeader.Store(false)
}
return nil
}
acquired, err := s.acquireLock()
if err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}
if acquired {
logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leadership.")
s.isLeader.Store(true)
}
return nil
}
func (s *LeaderService) acquireLock() (bool, error) {
return s.store.SetNX(leaderLockKey, []byte(s.nodeID), leaderLockTTL)
}
@@ -146,9 +159,6 @@ func (s *LeaderService) renewLock() error {
}
func (s *LeaderService) releaseLock() {
if !s.isLeader.Load() {
return
}
luaStore := s.store.(store.LuaScripter)
if _, err := luaStore.Eval(releaseLockScript, []string{leaderLockKey}, s.nodeID); err != nil {
logrus.WithError(err).Error("Failed to release leader lock on shutdown.")