From 0fa92758d8b75ca1315223bb2927a212f9e8bbbb Mon Sep 17 00:00:00 2001 From: tbphp Date: Wed, 9 Jul 2025 13:28:40 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=B3=BB=E7=BB=9F=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E7=BC=93=E5=AD=98=E5=90=8C=E6=AD=A5=E5=99=A8?= =?UTF-8?q?=E4=BF=9D=E6=8C=81=E9=9B=86=E7=BE=A4=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/app.go | 28 ++-- internal/config/manager.go | 19 ++- internal/config/system_settings.go | 192 +++++++++++++-------------- internal/handler/settings_handler.go | 18 +-- 4 files changed, 115 insertions(+), 142 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 4938dd9..bfa000c 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -77,12 +77,13 @@ func NewApp(params AppParams) *App { // Start runs the application, it is a non-blocking call. func (a *App) Start() error { - // 1. 启动 Leader Service 并等待选举结果 + + // 启动 Leader Service 并等待选举结果 if err := a.leaderService.Start(); err != nil { return fmt.Errorf("leader service failed to start: %w", err) } - // 2. Leader 节点执行初始化,Follower 节点等待 + // Leader 节点执行初始化,Follower 节点等待 if a.leaderService.IsLeader() { logrus.Info("Leader mode. Performing initial one-time tasks...") acquired, err := a.leaderService.AcquireInitializingLock() @@ -95,10 +96,9 @@ func (a *App) Start() error { return fmt.Errorf("failed to wait for initialization as a fallback follower: %w", err) } } else { - // Release the lock when the initialization is done. defer a.leaderService.ReleaseInitializingLock() - // 2.1. 数据库迁移 + // 数据库迁移 if err := a.db.AutoMigrate( &models.RequestLog{}, &models.APIKey{}, @@ -109,19 +109,15 @@ func (a *App) Start() error { } logrus.Info("Database auto-migration completed.") - // 2.2. 初始化系统设置 + // 初始化系统设置 if err := a.settingsManager.EnsureSettingsInitialized(); err != nil { return fmt.Errorf("failed to initialize system settings: %w", err) } logrus.Info("System settings initialized in DB.") - // 2.3. 加载配置到内存 (Leader 先行) - if err := a.settingsManager.LoadFromDatabase(); err != nil { - return fmt.Errorf("leader failed to load system settings from database: %w", err) - } - logrus.Info("System settings loaded into memory by leader.") + a.settingsManager.Initialize(a.storage) - // 2.4. 从数据库加载密钥到 Redis + // 从数据库加载密钥到 Redis if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil { return fmt.Errorf("failed to load keys into key pool: %w", err) } @@ -132,15 +128,10 @@ func (a *App) Start() error { if err := a.leaderService.WaitForInitializationToComplete(); err != nil { return fmt.Errorf("follower failed to start: %w", err) } + a.settingsManager.Initialize(a.storage) } - // 3. 所有节点加载或重新加载配置以确保一致性 - if err := a.settingsManager.LoadFromDatabase(); err != nil { - return fmt.Errorf("failed to load system settings from database: %w", err) - } - logrus.Info("System settings loaded into memory.") - - // 5. 显示配置并启动所有后台服务 + // 显示配置并启动所有后台服务 a.settingsManager.DisplayCurrentSettings() a.configManager.DisplayConfig() @@ -189,6 +180,7 @@ func (a *App) Stop(ctx context.Context) { a.keyValidationPool.Stop() a.leaderService.Stop() a.logCleanupService.Stop() + a.settingsManager.Stop() // Close resources a.proxyServer.Close() diff --git a/internal/config/manager.go b/internal/config/manager.go index aebb101..c381891 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "reflect" "strconv" "strings" @@ -306,14 +307,22 @@ func getEnvOrDefault(key, defaultValue string) string { // GetInt is a helper function for SystemSettingsManager to get an integer value with a default. func (s *SystemSettingsManager) GetInt(key string, defaultValue int) int { - s.mu.RLock() - defer s.mu.RUnlock() + settings := s.GetSettings() + v := reflect.ValueOf(settings) + t := v.Type() - if valStr, ok := s.settingsCache[key]; ok { - if valInt, err := strconv.Atoi(valStr); err == nil { - return valInt + for i := 0; i < t.NumField(); i++ { + structField := t.Field(i) + jsonTag := strings.Split(structField.Tag.Get("json"), ",")[0] + if jsonTag == key { + valueField := v.Field(i) + if valueField.Kind() == reflect.Int { + return int(valueField.Int()) + } + break } } + return defaultValue } diff --git a/internal/config/system_settings.go b/internal/config/system_settings.go index 7c7ad91..315d3cc 100644 --- a/internal/config/system_settings.go +++ b/internal/config/system_settings.go @@ -4,11 +4,12 @@ import ( "fmt" "gpt-load/internal/db" "gpt-load/internal/models" + "gpt-load/internal/store" + "gpt-load/internal/syncer" "os" "reflect" "strconv" "strings" - "sync" "github.com/sirupsen/logrus" "gorm.io/datatypes" @@ -109,35 +110,85 @@ func DefaultSystemSettings() SystemSettings { // SystemSettingsManager 管理系统配置 type SystemSettingsManager struct { - settings SystemSettings - settingsCache map[string]string // Cache for raw string values - mu sync.RWMutex + syncer *syncer.CacheSyncer[SystemSettings] } -var globalSystemSettings *SystemSettingsManager -var once sync.Once +const SettingsUpdateChannel = "system_settings:updated" -// NewSystemSettingsManager 获取全局系统配置管理器单例 -func NewSystemSettingsManager() *SystemSettingsManager { - once.Do(func() { - globalSystemSettings = &SystemSettingsManager{} - }) - return globalSystemSettings +// NewSystemSettingsManager creates a new, uninitialized SystemSettingsManager. +func NewSystemSettingsManager() (*SystemSettingsManager, error) { + return &SystemSettingsManager{}, nil +} + +// Initialize initializes the SystemSettingsManager with database and store dependencies. +func (sm *SystemSettingsManager) Initialize(store store.Store) error { + settingsLoader := func() (SystemSettings, error) { + var dbSettings []models.SystemSetting + if err := db.DB.Find(&dbSettings).Error; err != nil { + return SystemSettings{}, fmt.Errorf("failed to load system settings from db: %w", err) + } + + settingsMap := make(map[string]string) + for _, setting := range dbSettings { + settingsMap[setting.SettingKey] = setting.SettingValue + } + + // Start with default settings, then override with values from the database. + settings := DefaultSystemSettings() + v := reflect.ValueOf(&settings).Elem() + t := v.Type() + jsonToField := make(map[string]string) + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + jsonTag := strings.Split(field.Tag.Get("json"), ",")[0] + if jsonTag != "" { + jsonToField[jsonTag] = field.Name + } + } + + for key, valStr := range settingsMap { + if fieldName, ok := jsonToField[key]; ok { + fieldValue := v.FieldByName(fieldName) + if fieldValue.IsValid() && fieldValue.CanSet() { + if err := setFieldFromString(fieldValue, valStr); err != nil { + logrus.Warnf("Failed to set value from map for field %s: %v", fieldName, err) + } + } + } + } + return settings, nil + } + + syncer, err := syncer.NewCacheSyncer( + settingsLoader, + store, + SettingsUpdateChannel, + logrus.WithField("syncer", "system_settings"), + ) + if err != nil { + return fmt.Errorf("failed to create system settings syncer: %w", err) + } + + sm.syncer = syncer + return nil +} + +// Stop gracefully stops the SystemSettingsManager's background syncer. +func (sm *SystemSettingsManager) Stop() { + if sm.syncer != nil { + sm.syncer.Stop() + } } // EnsureSettingsInitialized 确保数据库中存在所有系统设置的记录。 func (sm *SystemSettingsManager) EnsureSettingsInitialized() error { - if db.DB == nil { - return fmt.Errorf("database not initialized") - } - defaultSettings := DefaultSystemSettings() metadata := GenerateSettingsMetadata(&defaultSettings) for _, meta := range metadata { var existing models.SystemSetting err := db.DB.Where("setting_key = ?", meta.Key).First(&existing).Error - if err != nil { // Not found + if err != nil { value := fmt.Sprintf("%v", meta.DefaultValue) if meta.Key == "app_url" { // Special handling for app_url initialization @@ -171,51 +222,23 @@ func (sm *SystemSettingsManager) EnsureSettingsInitialized() error { return nil } -// LoadFromDatabase 从数据库加载系统配置到内存 -func (sm *SystemSettingsManager) LoadFromDatabase() error { - if db.DB == nil { - return fmt.Errorf("database not initialized") - } - - var settings []models.SystemSetting - if err := db.DB.Find(&settings).Error; err != nil { - return fmt.Errorf("failed to load system settings: %w", err) - } - - settingsMap := make(map[string]string) - for _, setting := range settings { - settingsMap[setting.SettingKey] = setting.SettingValue - } - - sm.mu.Lock() - defer sm.mu.Unlock() - - sm.settingsCache = settingsMap - - // 使用默认值,然后用数据库中的值覆盖 - sm.settings = DefaultSystemSettings() - sm.mapToStruct(settingsMap, &sm.settings) - - logrus.Debug("System settings loaded from database") - return nil -} - // GetSettings 获取当前系统配置 +// If the syncer is not initialized, it returns default settings. func (sm *SystemSettingsManager) GetSettings() SystemSettings { - sm.mu.RLock() - defer sm.mu.RUnlock() - return sm.settings + if sm.syncer == nil { + logrus.Warn("SystemSettingsManager is not initialized, returning default settings.") + return DefaultSystemSettings() + } + return sm.syncer.Get() } // GetAppUrl returns the effective App URL. // It prioritizes the value from system settings (database) over the APP_URL environment variable. func (sm *SystemSettingsManager) GetAppUrl() string { - sm.mu.RLock() - defer sm.mu.RUnlock() - // 1. 优先级: 数据库中的系统配置 - if sm.settings.AppUrl != "" { - return sm.settings.AppUrl + settings := sm.GetSettings() + if settings.AppUrl != "" { + return settings.AppUrl } // 2. 回退: 环境变量 @@ -224,10 +247,6 @@ func (sm *SystemSettingsManager) GetAppUrl() string { // UpdateSettings 更新系统配置 func (sm *SystemSettingsManager) UpdateSettings(settingsMap map[string]any) error { - if db.DB == nil { - return fmt.Errorf("database not initialized") - } - // 验证配置项 if err := sm.ValidateSettings(settingsMap); err != nil { return err @@ -251,17 +270,14 @@ func (sm *SystemSettingsManager) UpdateSettings(settingsMap map[string]any) erro } } - // 重新加载配置到内存 - return sm.LoadFromDatabase() + // 触发所有实例重新加载 + return sm.syncer.Invalidate() } // GetEffectiveConfig 获取有效配置 (系统配置 + 分组覆盖) func (sm *SystemSettingsManager) GetEffectiveConfig(groupConfig datatypes.JSONMap) SystemSettings { - sm.mu.RLock() - defer sm.mu.RUnlock() - // 从系统配置开始 - effectiveConfig := sm.settings + effectiveConfig := sm.GetSettings() v := reflect.ValueOf(&effectiveConfig).Elem() t := v.Type() @@ -360,49 +376,19 @@ func (sm *SystemSettingsManager) ValidateSettings(settingsMap map[string]any) er // DisplayCurrentSettings 显示当前系统配置信息 func (sm *SystemSettingsManager) DisplayCurrentSettings() { - sm.mu.RLock() - defer sm.mu.RUnlock() - + settings := sm.GetSettings() logrus.Info("Current System Settings:") - logrus.Infof(" App URL: %s", sm.settings.AppUrl) - logrus.Infof(" Blacklist threshold: %d", sm.settings.BlacklistThreshold) - logrus.Infof(" Max retries: %d", sm.settings.MaxRetries) + logrus.Infof(" App URL: %s", settings.AppUrl) + logrus.Infof(" Blacklist threshold: %d", settings.BlacklistThreshold) + logrus.Infof(" Max retries: %d", settings.MaxRetries) logrus.Infof(" Server timeouts: read=%ds, write=%ds, idle=%ds, shutdown=%ds", - sm.settings.ServerReadTimeout, sm.settings.ServerWriteTimeout, - sm.settings.ServerIdleTimeout, sm.settings.ServerGracefulShutdownTimeout) + settings.ServerReadTimeout, settings.ServerWriteTimeout, + settings.ServerIdleTimeout, settings.ServerGracefulShutdownTimeout) logrus.Infof(" Request timeouts: request=%ds, response=%ds, idle_conn=%ds", - sm.settings.RequestTimeout, sm.settings.ResponseTimeout, sm.settings.IdleConnTimeout) - logrus.Infof(" Request log retention: %d days", sm.settings.RequestLogRetentionDays) + settings.RequestTimeout, settings.ResponseTimeout, settings.IdleConnTimeout) + logrus.Infof(" Request log retention: %d days", settings.RequestLogRetentionDays) logrus.Infof(" Key validation: interval=%dmin, task_timeout=%dmin", - sm.settings.KeyValidationIntervalMinutes, sm.settings.KeyValidationTaskTimeoutMinutes) -} - -// 辅助方法 - -func (sm *SystemSettingsManager) mapToStruct(m map[string]string, s *SystemSettings) { - v := reflect.ValueOf(s).Elem() - t := v.Type() - - // 创建一个从 json 标签到字段名的映射 - jsonToField := make(map[string]string) - for i := 0; i < t.NumField(); i++ { - field := t.Field(i) - jsonTag := strings.Split(field.Tag.Get("json"), ",")[0] - if jsonTag != "" { - jsonToField[jsonTag] = field.Name - } - } - - for key, valStr := range m { - if fieldName, ok := jsonToField[key]; ok { - fieldValue := v.FieldByName(fieldName) - if fieldValue.IsValid() && fieldValue.CanSet() { - if err := setFieldFromString(fieldValue, valStr); err != nil { - logrus.Warnf("Failed to set value from map for field %s: %v", fieldName, err) - } - } - } - } + settings.KeyValidationIntervalMinutes, settings.KeyValidationTaskTimeoutMinutes) } // setFieldFromString sets a struct field's value from a string, based on the field's kind. diff --git a/internal/handler/settings_handler.go b/internal/handler/settings_handler.go index a58b6c5..ba2672b 100644 --- a/internal/handler/settings_handler.go +++ b/internal/handler/settings_handler.go @@ -59,24 +59,10 @@ func (s *Server) UpdateSettings(c *gin.Context) { return } - // 重载系统配置 - if err := s.SettingsManager.LoadFromDatabase(); err != nil { - logrus.Errorf("Failed to reload system settings: %v", err) - response.Error(c, app_errors.NewAPIError(app_errors.ErrInternalServer, "Failed to reload system settings after update")) - return - } - s.SettingsManager.DisplayCurrentSettings() - logrus.Info("Configuration reloaded successfully via API") + logrus.Info("Settings update request processed. Invalidation notification sent.") response.Success(c, gin.H{ - "message": "Configuration reloaded successfully", - "timestamp": gin.H{ - "reloaded_at": "now", - }, - }) - - response.Success(c, gin.H{ - "message": "Settings updated successfully. Configuration reloaded.", + "message": "Settings updated successfully. Configuration will be reloaded in the background across all instances.", }) }