From fe3fd9c14afd13abe2b4e0a869f177fec9e8c931 Mon Sep 17 00:00:00 2001 From: tbphp Date: Tue, 8 Jul 2025 21:10:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E9=A2=86=E5=AF=BC?= =?UTF-8?q?=E8=80=85=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/key_cron_service.go | 4 + internal/services/leader_service.go | 110 ++++++++++++++++++-------- internal/store/redis.go | 5 ++ internal/store/store.go | 5 ++ 4 files changed, 90 insertions(+), 34 deletions(-) diff --git a/internal/services/key_cron_service.go b/internal/services/key_cron_service.go index efd69dd..38b81aa 100644 --- a/internal/services/key_cron_service.go +++ b/internal/services/key_cron_service.go @@ -55,6 +55,10 @@ func (s *KeyCronService) Stop() { func (s *KeyCronService) runLoop() { defer s.wg.Done() + if s.LeaderService.IsLeader() { + s.submitValidationJobs() + } + ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() diff --git a/internal/services/leader_service.go b/internal/services/leader_service.go index b915973..e43824b 100644 --- a/internal/services/leader_service.go +++ b/internal/services/leader_service.go @@ -16,50 +16,90 @@ const ( leaderLockKey = "cluster:leader" leaderLockTTL = 30 * time.Second leaderRenewalInterval = 10 * time.Second - leaderElectionTimeout = 5 * time.Second ) +// Lua script for atomic lock renewal. +// KEYS[1]: lock key, ARGV[1]: node ID, ARGV[2]: TTL in seconds. +const renewLockScript = ` +if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("expire", KEYS[1], ARGV[2]) +else + return 0 +end` + +// Lua script for atomic lock release. +// KEYS[1]: lock key, ARGV[1]: node ID. +const releaseLockScript = ` +if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) +else + return 0 +end` + // LeaderService provides a mechanism for electing a single leader in a cluster. type LeaderService struct { - store store.Store - nodeID string - isLeader atomic.Bool - stopChan chan struct{} - wg sync.WaitGroup + store store.Store + nodeID string + isLeader atomic.Bool + stopChan chan struct{} + wg sync.WaitGroup + isSingleNode bool + firstElectionDone chan struct{} + firstElectionOnce sync.Once } // NewLeaderService creates a new LeaderService. -func NewLeaderService(store store.Store) *LeaderService { - return &LeaderService{ - store: store, - nodeID: generateNodeID(), - stopChan: make(chan struct{}), +func NewLeaderService(s store.Store) *LeaderService { + // Check if the store supports Lua scripting to determine if we are in a distributed environment. + _, isDistributed := s.(store.LuaScripter) + + service := &LeaderService{ + store: s, + nodeID: generateNodeID(), + stopChan: make(chan struct{}), + isSingleNode: !isDistributed, + firstElectionDone: make(chan struct{}), } + + if service.isSingleNode { + logrus.Info("Store does not support Lua, running in single-node mode. Assuming leadership.") + service.isLeader.Store(true) + close(service.firstElectionDone) + } else { + logrus.Info("Store supports Lua, running in distributed mode.") + } + + return service } // Start begins the leader election process. func (s *LeaderService) Start() { - logrus.WithField("nodeID", s.nodeID).Info("Starting LeaderService...") + if s.isSingleNode { + return + } s.wg.Add(1) go s.electionLoop() } // Stop gracefully stops the leader election process. func (s *LeaderService) Stop() { - logrus.Info("Stopping LeaderService...") + if s.isSingleNode { + return + } close(s.stopChan) s.wg.Wait() - logrus.Info("LeaderService stopped.") } // IsLeader returns true if the current node is the leader. -// This is a fast, local check against an atomic boolean. +// In distributed mode, this call will block until the first election attempt is complete. func (s *LeaderService) IsLeader() bool { + <-s.firstElectionDone return s.isLeader.Load() } func (s *LeaderService) electionLoop() { defer s.wg.Done() + logrus.WithField("nodeID", s.nodeID).Info("Starting leader election loop...") // Attempt to acquire leadership immediately on start. s.tryToBeLeader() @@ -72,7 +112,8 @@ func (s *LeaderService) electionLoop() { case <-ticker.C: s.tryToBeLeader() case <-s.stopChan: - if s.IsLeader() { + logrus.Info("Stopping leader election loop...") + if s.isLeader.Load() { s.releaseLock() } return @@ -81,8 +122,11 @@ func (s *LeaderService) electionLoop() { } func (s *LeaderService) tryToBeLeader() { - if s.IsLeader() { - // Already the leader, just renew the lock. + defer s.firstElectionOnce.Do(func() { + close(s.firstElectionDone) + }) + + if s.isLeader.Load() { if err := s.renewLock(); err != nil { logrus.WithError(err).Error("Failed to renew leader lock, relinquishing leadership.") s.isLeader.Store(false) @@ -90,7 +134,6 @@ func (s *LeaderService) tryToBeLeader() { return } - // Not the leader, try to acquire the lock. acquired, err := s.acquireLock() if err != nil { logrus.WithError(err).Error("Error trying to acquire leader lock.") @@ -101,41 +144,40 @@ func (s *LeaderService) tryToBeLeader() { if acquired { logrus.WithField("nodeID", s.nodeID).Info("Successfully acquired leader lock.") s.isLeader.Store(true) - } else { - logrus.Debug("Could not acquire leader lock, another node is likely the leader.") - s.isLeader.Store(false) } } func (s *LeaderService) acquireLock() (bool, error) { - // SetNX is an atomic operation. If the key already exists, it does nothing. - // This is the core of our distributed lock. return s.store.SetNX(leaderLockKey, []byte(s.nodeID), leaderLockTTL) } func (s *LeaderService) renewLock() error { - // To renew, we must ensure we are still the lock holder. - // A LUA script is the safest way to do this atomically. - // For simplicity here, we get and set, but this is not truly atomic without LUA. - // A simple SET can also work if we are confident in our election loop timing. - return s.store.Set(leaderLockKey, []byte(s.nodeID), leaderLockTTL) + luaStore := s.store.(store.LuaScripter) // Already checked in NewLeaderService + ttlSeconds := int(leaderLockTTL.Seconds()) + + res, err := luaStore.Eval(renewLockScript, []string{leaderLockKey}, s.nodeID, ttlSeconds) + if err != nil { + return err + } + + if i, ok := res.(int64); !ok || i == 0 { + return store.ErrNotFound // Not our lock anymore + } + return nil } func (s *LeaderService) releaseLock() { - // Best-effort attempt to release the lock on shutdown. - // The TTL will handle cases where this fails. - if err := s.store.Delete(leaderLockKey); err != nil { + luaStore := s.store.(store.LuaScripter) // Already checked in NewLeaderService + if _, err := luaStore.Eval(releaseLockScript, []string{leaderLockKey}, s.nodeID); err != nil { logrus.WithError(err).Error("Failed to release leader lock on shutdown.") } else { logrus.Info("Successfully released leader lock.") } - s.isLeader.Store(false) } func generateNodeID() string { bytes := make([]byte, 16) if _, err := rand.Read(bytes); err != nil { - // Fallback to a timestamp-based ID if crypto/rand fails return "node-" + time.Now().Format(time.RFC3339Nano) } return hex.EncodeToString(bytes) diff --git a/internal/store/redis.go b/internal/store/redis.go index d941874..9c22975 100644 --- a/internal/store/redis.go +++ b/internal/store/redis.go @@ -117,3 +117,8 @@ func (s *RedisStore) Pipeline() Pipeliner { pipe: s.client.Pipeline(), } } + +// Eval executes a Lua script on Redis. +func (s *RedisStore) Eval(script string, keys []string, args ...interface{}) (interface{}, error) { + return s.client.Eval(context.Background(), script, keys, args...).Result() +} diff --git a/internal/store/store.go b/internal/store/store.go index 57b42f3..e7b4b5a 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -56,3 +56,8 @@ type Pipeliner interface { type RedisPipeliner interface { Pipeline() Pipeliner } + +// LuaScripter is an optional interface that a Store can implement to provide Lua script execution. +type LuaScripter interface { + Eval(script string, keys []string, args ...interface{}) (interface{}, error) +}