|
|
|
@@ -16,50 +16,90 @@ const (
|
|
|
|
|
leaderLockKey = "cluster:leader"
|
|
|
|
|
leaderLockTTL = 30 * time.Second
|
|
|
|
|
leaderRenewalInterval = 10 * time.Second
|
|
|
|
|
leaderElectionTimeout = 5 * time.Second
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Lua script for atomic lock renewal.
|
|
|
|
|
// KEYS[1]: lock key, ARGV[1]: node ID, ARGV[2]: TTL in seconds.
|
|
|
|
|
const renewLockScript = `
|
|
|
|
|
if redis.call("get", KEYS[1]) == ARGV[1] then
|
|
|
|
|
return redis.call("expire", KEYS[1], ARGV[2])
|
|
|
|
|
else
|
|
|
|
|
return 0
|
|
|
|
|
end`
|
|
|
|
|
|
|
|
|
|
// Lua script for atomic lock release.
|
|
|
|
|
// KEYS[1]: lock key, ARGV[1]: node ID.
|
|
|
|
|
const releaseLockScript = `
|
|
|
|
|
if redis.call("get", KEYS[1]) == ARGV[1] then
|
|
|
|
|
return redis.call("del", KEYS[1])
|
|
|
|
|
else
|
|
|
|
|
return 0
|
|
|
|
|
end`
|
|
|
|
|
|
|
|
|
|
// LeaderService provides a mechanism for electing a single leader in a cluster.
|
|
|
|
|
type LeaderService struct {
|
|
|
|
|
store store.Store
|
|
|
|
|
nodeID string
|
|
|
|
|
isLeader atomic.Bool
|
|
|
|
|
stopChan chan struct{}
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
store store.Store
|
|
|
|
|
nodeID string
|
|
|
|
|
isLeader atomic.Bool
|
|
|
|
|
stopChan chan struct{}
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
isSingleNode bool
|
|
|
|
|
firstElectionDone chan struct{}
|
|
|
|
|
firstElectionOnce sync.Once
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewLeaderService creates a new LeaderService.
|
|
|
|
|
func NewLeaderService(store store.Store) *LeaderService {
|
|
|
|
|
return &LeaderService{
|
|
|
|
|
store: store,
|
|
|
|
|
nodeID: generateNodeID(),
|
|
|
|
|
stopChan: make(chan struct{}),
|
|
|
|
|
func NewLeaderService(s store.Store) *LeaderService {
|
|
|
|
|
// Check if the store supports Lua scripting to determine if we are in a distributed environment.
|
|
|
|
|
_, isDistributed := s.(store.LuaScripter)
|
|
|
|
|
|
|
|
|
|
service := &LeaderService{
|
|
|
|
|
store: s,
|
|
|
|
|
nodeID: generateNodeID(),
|
|
|
|
|
stopChan: make(chan struct{}),
|
|
|
|
|
isSingleNode: !isDistributed,
|
|
|
|
|
firstElectionDone: make(chan struct{}),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if service.isSingleNode {
|
|
|
|
|
logrus.Info("Store does not support Lua, running in single-node mode. Assuming leadership.")
|
|
|
|
|
service.isLeader.Store(true)
|
|
|
|
|
close(service.firstElectionDone)
|
|
|
|
|
} else {
|
|
|
|
|
logrus.Info("Store supports Lua, running in distributed mode.")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return service
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start begins the leader election process.
|
|
|
|
|
func (s *LeaderService) Start() {
|
|
|
|
|
logrus.WithField("nodeID", s.nodeID).Info("Starting LeaderService...")
|
|
|
|
|
if s.isSingleNode {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
s.wg.Add(1)
|
|
|
|
|
go s.electionLoop()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Stop gracefully stops the leader election process.
|
|
|
|
|
func (s *LeaderService) Stop() {
|
|
|
|
|
logrus.Info("Stopping LeaderService...")
|
|
|
|
|
if s.isSingleNode {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
close(s.stopChan)
|
|
|
|
|
s.wg.Wait()
|
|
|
|
|
logrus.Info("LeaderService stopped.")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// IsLeader returns true if the current node is the leader.
|
|
|
|
|
// This is a fast, local check against an atomic boolean.
|
|
|
|
|
// In distributed mode, this call will block until the first election attempt is complete.
|
|
|
|
|
func (s *LeaderService) IsLeader() bool {
|
|
|
|
|
<-s.firstElectionDone
|
|
|
|
|
return s.isLeader.Load()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *LeaderService) electionLoop() {
|
|
|
|
|
defer s.wg.Done()
|
|
|
|
|
logrus.WithField("nodeID", s.nodeID).Info("Starting leader election loop...")
|
|
|
|
|
|
|
|
|
|
// Attempt to acquire leadership immediately on start.
|
|
|
|
|
s.tryToBeLeader()
|
|
|
|
@@ -72,7 +112,8 @@ func (s *LeaderService) electionLoop() {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
s.tryToBeLeader()
|
|
|
|
|
case <-s.stopChan:
|
|
|
|
|
if s.IsLeader() {
|
|
|
|
|
logrus.Info("Stopping leader election loop...")
|
|
|
|
|
if s.isLeader.Load() {
|
|
|
|
|
s.releaseLock()
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
@@ -81,8 +122,11 @@ func (s *LeaderService) electionLoop() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *LeaderService) tryToBeLeader() {
|
|
|
|
|
if s.IsLeader() {
|
|
|
|
|
// Already the leader, just renew the lock.
|
|
|
|
|
defer s.firstElectionOnce.Do(func() {
|
|
|
|
|
close(s.firstElectionDone)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if s.isLeader.Load() {
|
|
|
|
|
if err := s.renewLock(); err != nil {
|
|
|
|
|
logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.")
|
|
|
|
|
s.isLeader.Store(false)
|
|
|
|
@@ -90,7 +134,6 @@ func (s *LeaderService) tryToBeLeader() {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Not the leader, try to acquire the lock.
|
|
|
|
|
acquired, err := s.acquireLock()
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.WithError(err).Error("Error trying to acquire leader lock.")
|
|
|
|
@@ -101,41 +144,40 @@ func (s *LeaderService) tryToBeLeader() {
|
|
|
|
|
if acquired {
|
|
|
|
|
logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leader lock.")
|
|
|
|
|
s.isLeader.Store(true)
|
|
|
|
|
} else {
|
|
|
|
|
logrus.Debug("Could not acquire leader lock, another node is likely the leader.")
|
|
|
|
|
s.isLeader.Store(false)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *LeaderService) acquireLock() (bool, error) {
|
|
|
|
|
// SetNX is an atomic operation. If the key already exists, it does nothing.
|
|
|
|
|
// This is the core of our distributed lock.
|
|
|
|
|
return s.store.SetNX(leaderLockKey, []byte(s.nodeID), leaderLockTTL)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *LeaderService) renewLock() error {
|
|
|
|
|
// To renew, we must ensure we are still the lock holder.
|
|
|
|
|
// A LUA script is the safest way to do this atomically.
|
|
|
|
|
// For simplicity here, we get and set, but this is not truly atomic without LUA.
|
|
|
|
|
// A simple SET can also work if we are confident in our election loop timing.
|
|
|
|
|
return s.store.Set(leaderLockKey, []byte(s.nodeID), leaderLockTTL)
|
|
|
|
|
luaStore := s.store.(store.LuaScripter) // Already checked in NewLeaderService
|
|
|
|
|
ttlSeconds := int(leaderLockTTL.Seconds())
|
|
|
|
|
|
|
|
|
|
res, err := luaStore.Eval(renewLockScript, []string{leaderLockKey}, s.nodeID, ttlSeconds)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if i, ok := res.(int64); !ok || i == 0 {
|
|
|
|
|
return store.ErrNotFound // Not our lock anymore
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *LeaderService) releaseLock() {
|
|
|
|
|
// Best-effort attempt to release the lock on shutdown.
|
|
|
|
|
// The TTL will handle cases where this fails.
|
|
|
|
|
if err := s.store.Delete(leaderLockKey); err != nil {
|
|
|
|
|
luaStore := s.store.(store.LuaScripter) // Already checked in NewLeaderService
|
|
|
|
|
if _, err := luaStore.Eval(releaseLockScript, []string{leaderLockKey}, s.nodeID); err != nil {
|
|
|
|
|
logrus.WithError(err).Error("Failed to release leader lock on shutdown.")
|
|
|
|
|
} else {
|
|
|
|
|
logrus.Info("Successfully released leader lock.")
|
|
|
|
|
}
|
|
|
|
|
s.isLeader.Store(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func generateNodeID() string {
|
|
|
|
|
bytes := make([]byte, 16)
|
|
|
|
|
if _, err := rand.Read(bytes); err != nil {
|
|
|
|
|
// Fallback to a timestamp-based ID if crypto/rand fails
|
|
|
|
|
return "node-" + time.Now().Format(time.RFC3339Nano)
|
|
|
|
|
}
|
|
|
|
|
return hex.EncodeToString(bytes)
|
|
|
|
|