feat: 领导者服务改为同步锁
This commit is contained in:
@@ -3,6 +3,7 @@ package services
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -16,10 +17,9 @@ const (
|
||||
leaderLockKey = "cluster:leader"
|
||||
leaderLockTTL = 30 * time.Second
|
||||
leaderRenewalInterval = 10 * time.Second
|
||||
electionTimeout = 15 * time.Second
|
||||
)
|
||||
|
||||
// Lua script for atomic lock renewal.
|
||||
// KEYS[1]: lock key, ARGV[1]: node ID, ARGV[2]: TTL in seconds.
|
||||
const renewLockScript = `
|
||||
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("expire", KEYS[1], ARGV[2])
|
||||
@@ -27,8 +27,6 @@ else
|
||||
return 0
|
||||
end`
|
||||
|
||||
// Lua script for atomic lock release.
|
||||
// KEYS[1]: lock key, ARGV[1]: node ID.
|
||||
const releaseLockScript = `
|
||||
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("del", KEYS[1])
|
||||
@@ -38,136 +36,120 @@ end`
|
||||
|
||||
// LeaderService provides a mechanism for electing a single leader in a cluster.
|
||||
type LeaderService struct {
|
||||
store store.Store
|
||||
nodeID string
|
||||
isLeader atomic.Bool
|
||||
stopChan chan struct{}
|
||||
wg sync.WaitGroup
|
||||
isSingleNode bool
|
||||
firstElectionDone chan struct{}
|
||||
firstElectionOnce sync.Once
|
||||
store store.Store
|
||||
nodeID string
|
||||
isLeader atomic.Bool
|
||||
stopChan chan struct{}
|
||||
wg sync.WaitGroup
|
||||
isSingleNode bool
|
||||
}
|
||||
|
||||
// NewLeaderService creates a new LeaderService.
|
||||
func NewLeaderService(s store.Store) *LeaderService {
|
||||
// Check if the store supports Lua scripting to determine if we are in a distributed environment.
|
||||
_, isDistributed := s.(store.LuaScripter)
|
||||
|
||||
service := &LeaderService{
|
||||
store: s,
|
||||
nodeID: generateNodeID(),
|
||||
stopChan: make(chan struct{}),
|
||||
isSingleNode: !isDistributed,
|
||||
firstElectionDone: make(chan struct{}),
|
||||
store: s,
|
||||
nodeID: generateNodeID(),
|
||||
stopChan: make(chan struct{}),
|
||||
isSingleNode: !isDistributed,
|
||||
}
|
||||
|
||||
if service.isSingleNode {
|
||||
logrus.Info("Store does not support Lua, running in single-node mode. Assuming leadership.")
|
||||
logrus.Info("Running in single-node mode.")
|
||||
service.isLeader.Store(true)
|
||||
close(service.firstElectionDone)
|
||||
} else {
|
||||
logrus.Info("Store supports Lua, running in distributed mode.")
|
||||
logrus.Info("Running in distributed mode.")
|
||||
}
|
||||
|
||||
return service
|
||||
}
|
||||
|
||||
// Start begins the leader election process.
|
||||
func (s *LeaderService) Start() {
|
||||
// ElectLeader attempts to become the cluster leader. This is a blocking call.
|
||||
func (s *LeaderService) ElectLeader() error {
|
||||
if s.isSingleNode {
|
||||
return
|
||||
logrus.Info("In single-node mode, leadership is assumed. Skipping election.")
|
||||
return nil
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go s.electionLoop()
|
||||
|
||||
logrus.WithField("nodeID", s.nodeID).Debug("Attempting to acquire leadership...")
|
||||
|
||||
acquired, err := s.acquireLock()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to acquire leader lock: %w", err)
|
||||
}
|
||||
|
||||
if acquired {
|
||||
logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leadership. Starting renewal process.")
|
||||
s.isLeader.Store(true)
|
||||
s.wg.Add(1)
|
||||
go s.renewalLoop()
|
||||
} else {
|
||||
logrus.WithField("nodeID", s.nodeID).Info("Another node is already the leader.")
|
||||
s.isLeader.Store(false)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop gracefully stops the leader election process.
|
||||
// Stop gracefully stops the leader renewal process if this node is the leader.
|
||||
func (s *LeaderService) Stop() {
|
||||
if s.isSingleNode {
|
||||
if s.isSingleNode || !s.isLeader.Load() {
|
||||
return
|
||||
}
|
||||
logrus.Info("Stopping leader renewal process...")
|
||||
close(s.stopChan)
|
||||
s.wg.Wait()
|
||||
s.releaseLock()
|
||||
logrus.Info("Leader renewal process stopped.")
|
||||
}
|
||||
|
||||
// IsLeader returns true if the current node is the leader.
|
||||
// In distributed mode, this call will block until the first election attempt is complete.
|
||||
func (s *LeaderService) IsLeader() bool {
|
||||
<-s.firstElectionDone
|
||||
return s.isLeader.Load()
|
||||
}
|
||||
|
||||
func (s *LeaderService) electionLoop() {
|
||||
// renewalLoop is the background process that keeps the leader lock alive.
|
||||
func (s *LeaderService) renewalLoop() {
|
||||
defer s.wg.Done()
|
||||
logrus.WithField("nodeID", s.nodeID).Info("Starting leader election loop...")
|
||||
|
||||
// Attempt to acquire leadership immediately on start.
|
||||
s.tryToBeLeader()
|
||||
|
||||
ticker := time.NewTicker(leaderRenewalInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.tryToBeLeader()
|
||||
case <-s.stopChan:
|
||||
logrus.Info("Stopping leader election loop...")
|
||||
if s.isLeader.Load() {
|
||||
s.releaseLock()
|
||||
if err := s.renewLock(); err != nil {
|
||||
logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.")
|
||||
s.isLeader.Store(false)
|
||||
return
|
||||
}
|
||||
logrus.Debug("Successfully renewed leader lock.")
|
||||
case <-s.stopChan:
|
||||
logrus.Info("Leader renewal loop stopping.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LeaderService) tryToBeLeader() {
|
||||
defer s.firstElectionOnce.Do(func() {
|
||||
close(s.firstElectionDone)
|
||||
})
|
||||
|
||||
if s.isLeader.Load() {
|
||||
if err := s.renewLock(); err != nil {
|
||||
logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.")
|
||||
s.isLeader.Store(false)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
acquired, err := s.acquireLock()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Error trying to acquire leader lock.")
|
||||
s.isLeader.Store(false)
|
||||
return
|
||||
}
|
||||
|
||||
if acquired {
|
||||
logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leader lock.")
|
||||
s.isLeader.Store(true)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LeaderService) acquireLock() (bool, error) {
|
||||
return s.store.SetNX(leaderLockKey, []byte(s.nodeID), leaderLockTTL)
|
||||
}
|
||||
|
||||
func (s *LeaderService) renewLock() error {
|
||||
luaStore := s.store.(store.LuaScripter) // Already checked in NewLeaderService
|
||||
luaStore := s.store.(store.LuaScripter)
|
||||
ttlSeconds := int(leaderLockTTL.Seconds())
|
||||
|
||||
res, err := luaStore.Eval(renewLockScript, []string{leaderLockKey}, s.nodeID, ttlSeconds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if i, ok := res.(int64); !ok || i == 0 {
|
||||
return store.ErrNotFound // Not our lock anymore
|
||||
return fmt.Errorf("failed to renew lock, another node may have taken over")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *LeaderService) releaseLock() {
|
||||
luaStore := s.store.(store.LuaScripter) // Already checked in NewLeaderService
|
||||
if !s.isLeader.Load() {
|
||||
return
|
||||
}
|
||||
luaStore := s.store.(store.LuaScripter)
|
||||
if _, err := luaStore.Eval(releaseLockScript, []string{leaderLockKey}, s.nodeID); err != nil {
|
||||
logrus.WithError(err).Error("Failed to release leader lock on shutdown.")
|
||||
} else {
|
||||
|
Reference in New Issue
Block a user