From 234731d826c363cc66c2c2da10d9b5e71f333865 Mon Sep 17 00:00:00 2001 From: tbphp Date: Wed, 9 Jul 2025 09:46:38 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=A2=B3=E7=90=86=E9=9B=86=E7=BE=A4?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E8=BF=90=E8=A1=8C=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 1 - internal/app/app.go | 41 +++++++++++++++++++++++----- internal/config/manager.go | 4 +-- internal/config/system_settings.go | 7 +++-- internal/container/container.go | 6 ++--- internal/db/database.go | 36 +++++++++---------------- internal/keypool/provider.go | 43 ++++-------------------------- internal/services/log_cleanup.go | 9 ++++++- internal/types/types.go | 3 +-- 9 files changed, 68 insertions(+), 82 deletions(-) diff --git a/.env.example b/.env.example index 3c8adc3..a9e0520 100644 --- a/.env.example +++ b/.env.example @@ -22,7 +22,6 @@ ENABLE_GZIP=true # 数据库配置 DATABASE_DSN=user:password@tcp(localhost:3306)/gpt_load?charset=utf8mb4&parseTime=True&loc=Local -DB_AUTO_MIGRATE=true # Redis配置 REDIS_DSN=redis://:password@localhost:6379/1 diff --git a/internal/app/app.go b/internal/app/app.go index 17042d7..64ecdde 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -77,27 +77,54 @@ func NewApp(params AppParams) *App { // Start runs the application, it is a non-blocking call. func (a *App) Start() error { + // 1. 启动 Leader Service 并等待选举结果 if err := a.leaderService.Start(); err != nil { return fmt.Errorf("leader service failed to start: %w", err) } + // 2. Leader 节点执行不依赖配置的“写”操作 if a.leaderService.IsLeader() { - if err := a.settingsManager.InitializeSystemSettings(); err != nil { + logrus.Info("Leader mode. Performing initial one-time tasks...") + + // 2.1. 数据库迁移 + if err := a.db.AutoMigrate( + &models.RequestLog{}, + &models.APIKey{}, + &models.SystemSetting{}, + &models.Group{}, + ); err != nil { + return fmt.Errorf("database auto-migration failed: %w", err) + } + logrus.Info("Database auto-migration completed.") + + // 2.2. 初始化系统设置 + if err := a.settingsManager.EnsureSettingsInitialized(); err != nil { return fmt.Errorf("failed to initialize system settings: %w", err) } - logrus.Info("System settings initialized by leader.") + logrus.Info("System settings initialized in DB.") } else { - logrus.Info("This node is not the leader. Skipping leader-only initialization tasks.") + logrus.Info("Follower Mode. Skipping initial one-time tasks.") } - logrus.Debug("Loading API keys into the key pool...") - if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil { - return fmt.Errorf("failed to load keys into key pool: %w", err) + // 3. 所有节点从数据库加载配置到内存 + if err := a.settingsManager.LoadFromDatabase(); err != nil { + return fmt.Errorf("failed to load system settings from database: %w", err) } + logrus.Info("System settings loaded into memory.") + + // 4. Leader 节点执行依赖配置的“写”操作 + if a.leaderService.IsLeader() { + // 4.1. 从数据库加载密钥到 Redis + if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil { + return fmt.Errorf("failed to load keys into key pool: %w", err) + } + logrus.Info("API keys loaded into Redis cache by leader.") + } + + // 5. 显示配置并启动所有后台服务 a.settingsManager.DisplayCurrentSettings() a.configManager.DisplayConfig() - // Start background services a.startRequestLogger() a.logCleanupService.Start() a.keyValidationPool.Start() diff --git a/internal/config/manager.go b/internal/config/manager.go index f57cbd6..aebb101 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -118,8 +118,7 @@ func (m *Manager) ReloadConfig() error { EnableRequest: parseBoolean(os.Getenv("LOG_ENABLE_REQUEST"), true), }, Database: types.DatabaseConfig{ - DSN: os.Getenv("DATABASE_DSN"), - AutoMigrate: parseBoolean(os.Getenv("DB_AUTO_MIGRATE"), true), + DSN: os.Getenv("DATABASE_DSN"), }, RedisDSN: os.Getenv("REDIS_DSN"), } @@ -318,7 +317,6 @@ func (s *SystemSettingsManager) GetInt(key string, defaultValue int) int { return defaultValue } - // SetupLogger configures the logging system based on the provided configuration. func SetupLogger(configManager types.ConfigManager) { logConfig := configManager.GetLogConfig() diff --git a/internal/config/system_settings.go b/internal/config/system_settings.go index 7a377e7..7c7ad91 100644 --- a/internal/config/system_settings.go +++ b/internal/config/system_settings.go @@ -125,8 +125,8 @@ func NewSystemSettingsManager() *SystemSettingsManager { return globalSystemSettings } -// InitializeSystemSettings 初始化系统配置到数据库 -func (sm *SystemSettingsManager) InitializeSystemSettings() error { +// EnsureSettingsInitialized 确保数据库中存在所有系统设置的记录。 +func (sm *SystemSettingsManager) EnsureSettingsInitialized() error { if db.DB == nil { return fmt.Errorf("database not initialized") } @@ -168,8 +168,7 @@ func (sm *SystemSettingsManager) InitializeSystemSettings() error { } } - // 加载配置到内存 - return sm.LoadFromDatabase() + return nil } // LoadFromDatabase 从数据库加载系统配置到内存 diff --git a/internal/container/container.go b/internal/container/container.go index 2c66910..2c2a342 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -24,6 +24,9 @@ func BuildContainer() (*dig.Container, error) { if err := container.Provide(config.NewManager); err != nil { return nil, err } + if err := container.Provide(services.NewLeaderService); err != nil { + return nil, err + } if err := container.Provide(db.NewDB); err != nil { return nil, err } @@ -56,9 +59,6 @@ func BuildContainer() (*dig.Container, error) { if err := container.Provide(services.NewLogCleanupService); err != nil { return nil, err } - if err := container.Provide(services.NewLeaderService); err != nil { - return nil, err - } if err := container.Provide(keypool.NewProvider); err != nil { return nil, err } diff --git a/internal/db/database.go b/internal/db/database.go index 6747960..909d030 100644 --- a/internal/db/database.go +++ b/internal/db/database.go @@ -2,7 +2,6 @@ package db import ( "fmt" - "gpt-load/internal/models" "gpt-load/internal/types" "log" "os" @@ -21,15 +20,18 @@ func NewDB(configManager types.ConfigManager) (*gorm.DB, error) { return nil, fmt.Errorf("DATABASE_DSN is not configured") } - newLogger := logger.New( - log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer - logger.Config{ - SlowThreshold: time.Second, // Slow SQL threshold - LogLevel: logger.Info, // Log level - IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger - Colorful: true, // Disable color - }, - ) + var newLogger logger.Interface + if configManager.GetLogConfig().Level == "debug" { + newLogger = logger.New( + log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer + logger.Config{ + SlowThreshold: time.Second, // Slow SQL threshold + LogLevel: logger.Info, // Log level + IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger + Colorful: true, // Disable color + }, + ) + } var err error DB, err = gorm.Open(mysql.Open(dbConfig.DSN), &gorm.Config{ @@ -49,18 +51,6 @@ func NewDB(configManager types.ConfigManager) (*gorm.DB, error) { sqlDB.SetMaxOpenConns(100) sqlDB.SetConnMaxLifetime(time.Hour) - if dbConfig.AutoMigrate { - err = DB.AutoMigrate( - &models.SystemSetting{}, - &models.Group{}, - &models.APIKey{}, - &models.RequestLog{}, - ) - if err != nil { - return nil, fmt.Errorf("failed to auto-migrate database: %w", err) - } - } - - fmt.Println("Database connection initialized and models migrated.") + fmt.Println("Database connection initialized.") return DB, nil } diff --git a/internal/keypool/provider.go b/internal/keypool/provider.go index d3ac256..ff2381c 100644 --- a/internal/keypool/provider.go +++ b/internal/keypool/provider.go @@ -14,11 +14,6 @@ import ( "gorm.io/gorm" ) -const ( - keypoolInitializedKey = "keypool:initialized" - keypoolLoadingKey = "keypool:loading" -) - type KeyProvider struct { db *gorm.DB store store.Store @@ -192,36 +187,14 @@ func (p *KeyProvider) handleFailure(keyID uint, keyHashKey, activeKeysListKey st // LoadKeysFromDB 从数据库加载所有分组和密钥,并填充到 Store 中。 func (p *KeyProvider) LoadKeysFromDB() error { - // 1. 检查是否已初始化 - initialized, err := p.store.Exists(keypoolInitializedKey) - if err != nil { - return fmt.Errorf("failed to check for keypool initialization flag: %w", err) - } - if initialized { - logrus.Info("Key pool already initialized, skipping database load.") - return nil - } - // 2. 设置加载锁,防止集群中多个节点同时加载 - lockAcquired, err := p.store.SetNX(keypoolLoadingKey, []byte("1"), 10*time.Minute) - if err != nil { - return fmt.Errorf("failed to acquire loading lock: %w", err) - } - if !lockAcquired { - logrus.Info("Another instance is already loading the key pool. Skipping.") - return nil - } - defer p.store.Delete(keypoolLoadingKey) - - logrus.Info("Acquired loading lock. Starting first-time initialization of key pool...") - - // 3. 分批从数据库加载并使用 Pipeline 写入 Redis + // 1. 分批从数据库加载并使用 Pipeline 写入 Redis allActiveKeyIDs := make(map[uint][]any) batchSize := 1000 var batchKeys []*models.APIKey - err = p.db.Model(&models.APIKey{}).FindInBatches(&batchKeys, batchSize, func(tx *gorm.DB, batch int) error { - logrus.Infof("Processing batch %d with %d keys...", batch, len(batchKeys)) + err := p.db.Model(&models.APIKey{}).FindInBatches(&batchKeys, batchSize, func(tx *gorm.DB, batch int) error { + logrus.Debugf("Processing batch %d with %d keys...", batch, len(batchKeys)) var pipeline store.Pipeliner if redisStore, ok := p.store.(store.RedisPipeliner); ok { @@ -257,24 +230,18 @@ func (p *KeyProvider) LoadKeysFromDB() error { return fmt.Errorf("failed during batch processing of keys: %w", err) } - // 4. 更新所有分组的 active_keys 列表 + // 2. 更新所有分组的 active_keys 列表 logrus.Info("Updating active key lists for all groups...") for groupID, activeIDs := range allActiveKeyIDs { if len(activeIDs) > 0 { activeKeysListKey := fmt.Sprintf("group:%d:active_keys", groupID) - p.store.Delete(activeKeysListKey) // Clean slate + p.store.Delete(activeKeysListKey) if err := p.store.LPush(activeKeysListKey, activeIDs...); err != nil { logrus.WithFields(logrus.Fields{"groupID": groupID, "error": err}).Error("Failed to LPush active keys for group") } } } - // 5. 设置最终的初始化成功标志 - logrus.Info("Key pool loaded successfully. Setting initialization flag.") - if err := p.store.Set(keypoolInitializedKey, []byte("1"), 0); err != nil { - logrus.WithError(err).Error("Critical: Failed to set final initialization flag. Next startup might re-run initialization.") - } - return nil } diff --git a/internal/services/log_cleanup.go b/internal/services/log_cleanup.go index 2550dda..0e9a754 100644 --- a/internal/services/log_cleanup.go +++ b/internal/services/log_cleanup.go @@ -13,14 +13,16 @@ import ( type LogCleanupService struct { db *gorm.DB settingsManager *config.SystemSettingsManager + leaderService *LeaderService stopCh chan struct{} } // NewLogCleanupService 创建新的日志清理服务 -func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsManager) *LogCleanupService { +func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsManager, leaderService *LeaderService) *LogCleanupService { return &LogCleanupService{ db: db, settingsManager: settingsManager, + leaderService: leaderService, stopCh: make(chan struct{}), } } @@ -58,6 +60,11 @@ func (s *LogCleanupService) run() { // cleanupExpiredLogs 清理过期的请求日志 func (s *LogCleanupService) cleanupExpiredLogs() { + if !s.leaderService.IsLeader() { + logrus.Debug("Not the leader, skipping log cleanup.") + return + } + // 获取日志保留天数配置 settings := s.settingsManager.GetSettings() retentionDays := settings.RequestLogRetentionDays diff --git a/internal/types/types.go b/internal/types/types.go index 76057a9..c06798e 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -77,6 +77,5 @@ type LogConfig struct { // DatabaseConfig represents database configuration type DatabaseConfig struct { - DSN string `json:"dsn"` - AutoMigrate bool `json:"autoMigrate"` + DSN string `json:"dsn"` }