fix: 修复redis pub/sub模式竞态问题

This commit is contained in:
tbphp
2025-07-09 15:52:00 +08:00
parent 0fa92758d8
commit 9bd32d5885
5 changed files with 23 additions and 22 deletions

View File

@@ -132,7 +132,6 @@ func (a *App) Start() error {
}
// 显示配置并启动所有后台服务
a.settingsManager.DisplayCurrentSettings()
a.configManager.DisplayConfig()
a.startRequestLogger()

View File

@@ -156,6 +156,9 @@ func (sm *SystemSettingsManager) Initialize(store store.Store) error {
}
}
}
sm.DisplayCurrentSettings(settings)
return settings, nil
}
@@ -375,8 +378,7 @@ func (sm *SystemSettingsManager) ValidateSettings(settingsMap map[string]any) er
}
// DisplayCurrentSettings 显示当前系统配置信息
func (sm *SystemSettingsManager) DisplayCurrentSettings() {
settings := sm.GetSettings()
func (sm *SystemSettingsManager) DisplayCurrentSettings(settings SystemSettings) {
logrus.Info("Current System Settings:")
logrus.Infof(" App URL: %s", settings.AppUrl)
logrus.Infof(" Blacklist threshold: %d", settings.BlacklistThreshold)

View File

@@ -5,9 +5,9 @@ import (
app_errors "gpt-load/internal/errors"
"gpt-load/internal/models"
"gpt-load/internal/response"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)
// GetSettings handles the GET /api/settings request.
@@ -39,8 +39,6 @@ func (s *Server) GetSettings(c *gin.Context) {
}
// UpdateSettings handles the PUT /api/settings request.
// It receives a key-value JSON object and updates system settings.
// After updating, it triggers a configuration reload.
func (s *Server) UpdateSettings(c *gin.Context) {
var settingsMap map[string]any
if err := c.ShouldBindJSON(&settingsMap); err != nil {
@@ -59,9 +57,8 @@ func (s *Server) UpdateSettings(c *gin.Context) {
return
}
s.SettingsManager.DisplayCurrentSettings()
time.Sleep(100 * time.Millisecond) // 等待异步更新配置
logrus.Info("Settings update request processed. Invalidation notification sent.")
response.Success(c, gin.H{
"message": "Settings updated successfully. Configuration will be reloaded in the background across all instances.",
})

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/redis/go-redis/v9"
@@ -128,23 +129,26 @@ func (s *RedisStore) Eval(script string, keys []string, args ...interface{}) (in
// redisSubscription wraps the redis.PubSub to implement the Subscription interface.
type redisSubscription struct {
pubsub *redis.PubSub
pubsub *redis.PubSub
msgChan chan *Message
once sync.Once
}
// Channel returns a channel that receives messages from the subscription.
// It handles the conversion from redis.Message to our internal Message type.
func (rs *redisSubscription) Channel() <-chan *Message {
ch := make(chan *Message)
go func() {
defer close(ch)
for redisMsg := range rs.pubsub.Channel() {
ch <- &Message{
Channel: redisMsg.Channel,
Payload: []byte(redisMsg.Payload),
rs.once.Do(func() {
rs.msgChan = make(chan *Message, 10)
go func() {
defer close(rs.msgChan)
for redisMsg := range rs.pubsub.Channel() {
rs.msgChan <- &Message{
Channel: redisMsg.Channel,
Payload: []byte(redisMsg.Payload),
}
}
}
}()
return ch
}()
})
return rs.msgChan
}
// Close closes the subscription.

View File

@@ -60,7 +60,6 @@ func (s *CacheSyncer[T]) Get() T {
// Invalidate publishes a notification to all instances to reload their cache.
func (s *CacheSyncer[T]) Invalidate() error {
s.logger.Debug("publishing invalidation notification")
s.reload()
return s.store.Publish(s.channelName, []byte("reload"))
}
@@ -125,7 +124,7 @@ func (s *CacheSyncer[T]) listenForUpdates() {
case msg, ok := <-subscription.Channel():
if !ok {
s.logger.Warn("subscription channel closed, attempting to re-subscribe...")
break subscriberLoop // This will lead to closing the current subscription and retrying.
break subscriberLoop
}
s.logger.Debugf("received invalidation notification, payload: %s", string(msg.Payload))
if err := s.reload(); err != nil {