From 9bd32d5885da899a7f3837d298297fc81cbb93b5 Mon Sep 17 00:00:00 2001 From: tbphp Date: Wed, 9 Jul 2025 15:52:00 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8Dredis=20pub/sub?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E7=AB=9E=E6=80=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/app.go | 1 - internal/config/system_settings.go | 6 ++++-- internal/handler/settings_handler.go | 7 ++----- internal/store/redis.go | 28 ++++++++++++++++------------ internal/syncer/cache_syncer.go | 3 +-- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index bfa000c..083f2fc 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -132,7 +132,6 @@ func (a *App) Start() error { } // 显示配置并启动所有后台服务 - a.settingsManager.DisplayCurrentSettings() a.configManager.DisplayConfig() a.startRequestLogger() diff --git a/internal/config/system_settings.go b/internal/config/system_settings.go index 315d3cc..a09e2ed 100644 --- a/internal/config/system_settings.go +++ b/internal/config/system_settings.go @@ -156,6 +156,9 @@ func (sm *SystemSettingsManager) Initialize(store store.Store) error { } } } + + sm.DisplayCurrentSettings(settings) + return settings, nil } @@ -375,8 +378,7 @@ func (sm *SystemSettingsManager) ValidateSettings(settingsMap map[string]any) er } // DisplayCurrentSettings 显示当前系统配置信息 -func (sm *SystemSettingsManager) DisplayCurrentSettings() { - settings := sm.GetSettings() +func (sm *SystemSettingsManager) DisplayCurrentSettings(settings SystemSettings) { logrus.Info("Current System Settings:") logrus.Infof(" App URL: %s", settings.AppUrl) logrus.Infof(" Blacklist threshold: %d", settings.BlacklistThreshold) diff --git a/internal/handler/settings_handler.go b/internal/handler/settings_handler.go index ba2672b..27ae494 100644 --- a/internal/handler/settings_handler.go +++ b/internal/handler/settings_handler.go @@ -5,9 +5,9 @@ import ( app_errors "gpt-load/internal/errors" "gpt-load/internal/models" "gpt-load/internal/response" + "time" "github.com/gin-gonic/gin" - "github.com/sirupsen/logrus" ) // GetSettings handles the GET /api/settings request. @@ -39,8 +39,6 @@ func (s *Server) GetSettings(c *gin.Context) { } // UpdateSettings handles the PUT /api/settings request. -// It receives a key-value JSON object and updates system settings. -// After updating, it triggers a configuration reload. func (s *Server) UpdateSettings(c *gin.Context) { var settingsMap map[string]any if err := c.ShouldBindJSON(&settingsMap); err != nil { @@ -59,9 +57,8 @@ func (s *Server) UpdateSettings(c *gin.Context) { return } - s.SettingsManager.DisplayCurrentSettings() + time.Sleep(100 * time.Millisecond) // 等待异步更新配置 - logrus.Info("Settings update request processed. Invalidation notification sent.") response.Success(c, gin.H{ "message": "Settings updated successfully. Configuration will be reloaded in the background across all instances.", }) diff --git a/internal/store/redis.go b/internal/store/redis.go index b807d53..1903256 100644 --- a/internal/store/redis.go +++ b/internal/store/redis.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/redis/go-redis/v9" @@ -128,23 +129,26 @@ func (s *RedisStore) Eval(script string, keys []string, args ...interface{}) (in // redisSubscription wraps the redis.PubSub to implement the Subscription interface. type redisSubscription struct { - pubsub *redis.PubSub + pubsub *redis.PubSub + msgChan chan *Message + once sync.Once } // Channel returns a channel that receives messages from the subscription. -// It handles the conversion from redis.Message to our internal Message type. func (rs *redisSubscription) Channel() <-chan *Message { - ch := make(chan *Message) - go func() { - defer close(ch) - for redisMsg := range rs.pubsub.Channel() { - ch <- &Message{ - Channel: redisMsg.Channel, - Payload: []byte(redisMsg.Payload), + rs.once.Do(func() { + rs.msgChan = make(chan *Message, 10) + go func() { + defer close(rs.msgChan) + for redisMsg := range rs.pubsub.Channel() { + rs.msgChan <- &Message{ + Channel: redisMsg.Channel, + Payload: []byte(redisMsg.Payload), + } } - } - }() - return ch + }() + }) + return rs.msgChan } // Close closes the subscription. diff --git a/internal/syncer/cache_syncer.go b/internal/syncer/cache_syncer.go index 7f7470b..1afa173 100644 --- a/internal/syncer/cache_syncer.go +++ b/internal/syncer/cache_syncer.go @@ -60,7 +60,6 @@ func (s *CacheSyncer[T]) Get() T { // Invalidate publishes a notification to all instances to reload their cache. func (s *CacheSyncer[T]) Invalidate() error { s.logger.Debug("publishing invalidation notification") - s.reload() return s.store.Publish(s.channelName, []byte("reload")) } @@ -125,7 +124,7 @@ func (s *CacheSyncer[T]) listenForUpdates() { case msg, ok := <-subscription.Channel(): if !ok { s.logger.Warn("subscription channel closed, attempting to re-subscribe...") - break subscriberLoop // This will lead to closing the current subscription and retrying. + break subscriberLoop } s.logger.Debugf("received invalidation notification, payload: %s", string(msg.Payload)) if err := s.reload(); err != nil {