diff --git a/internal/app/app.go b/internal/app/app.go index 0adb4ae..e0a8af8 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net/http" + "sync" "time" "gpt-load/internal/config" @@ -168,19 +169,47 @@ func (a *App) Start() error { func (a *App) Stop(ctx context.Context) { logrus.Info("Shutting down server...") - // Shutdown http server if err := a.httpServer.Shutdown(ctx); err != nil { logrus.Errorf("Server forced to shutdown: %v", err) } - // Stop background services - a.cronChecker.Stop() - a.leaderLock.Stop() - a.logCleanupService.Stop() - a.requestLogService.Stop() - a.groupManager.Stop() - a.settingsManager.Stop() - a.storage.Close() + stoppableServices := []func(context.Context){ + a.cronChecker.Stop, + a.leaderLock.Stop, + a.logCleanupService.Stop, + a.requestLogService.Stop, + a.groupManager.Stop, + a.settingsManager.Stop, + } + + var wg sync.WaitGroup + wg.Add(len(stoppableServices)) + + for _, stopFunc := range stoppableServices { + go func(stop func(context.Context)) { + defer wg.Done() + stop(ctx) + }(stopFunc) + } + + // Wait for all services to stop, or for the context to be done. + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + logrus.Info("All background services stopped.") + case <-ctx.Done(): + logrus.Warn("Shutdown timed out, some services may not have stopped gracefully.") + } + + // Step 3: Close storage connection last. + if a.storage != nil { + a.storage.Close() + } logrus.Info("Server exited gracefully") } diff --git a/internal/config/system_settings.go b/internal/config/system_settings.go index 34307a4..1762c34 100644 --- a/internal/config/system_settings.go +++ b/internal/config/system_settings.go @@ -1,6 +1,7 @@ package config import ( + "context" "encoding/json" "fmt" "gpt-load/internal/db" @@ -106,7 +107,7 @@ func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager, } // Stop gracefully stops the SystemSettingsManager's background syncer. -func (sm *SystemSettingsManager) Stop() { +func (sm *SystemSettingsManager) Stop(ctx context.Context) { if sm.syncer != nil { sm.syncer.Stop() } diff --git a/internal/keypool/cron_checker.go b/internal/keypool/cron_checker.go index 735601f..64c1fd5 100644 --- a/internal/keypool/cron_checker.go +++ b/internal/keypool/cron_checker.go @@ -1,6 +1,7 @@ package keypool import ( + "context" "gpt-load/internal/config" "gpt-load/internal/models" "gpt-load/internal/store" @@ -44,12 +45,23 @@ func (s *CronChecker) Start() { go s.runLoop() } -// Stop stops the cron job. -func (s *CronChecker) Stop() { - logrus.Info("Stopping CronChecker...") +// Stop stops the cron job, respecting the context for shutdown timeout. +func (s *CronChecker) Stop(ctx context.Context) { close(s.stopChan) - s.wg.Wait() - logrus.Info("CronChecker stopped.") + + // Wait for the goroutine to finish, or for the shutdown to time out. + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + logrus.Info("CronChecker stopped gracefully.") + case <-ctx.Done(): + logrus.Warn("CronChecker stop timed out.") + } } func (s *CronChecker) runLoop() { @@ -104,8 +116,12 @@ func (s *CronChecker) submitValidationJobs() { validatedCount := len(invalidKeys) becameValidCount := 0 if validatedCount > 0 { - logrus.Debugf("CronChecker: Found %d invalid keys to validate for group %s.", validatedCount, group.Name) for j := range invalidKeys { + select { + case <-s.stopChan: + return + default: + } key := &invalidKeys[j] isValid, _ := s.Validator.ValidateSingleKey(key, group) diff --git a/internal/services/group_manager.go b/internal/services/group_manager.go index bdf5f0b..840ac8a 100644 --- a/internal/services/group_manager.go +++ b/internal/services/group_manager.go @@ -1,6 +1,7 @@ package services import ( + "context" "fmt" "gpt-load/internal/config" "gpt-load/internal/models" @@ -93,7 +94,7 @@ func (gm *GroupManager) Invalidate() error { } // Stop gracefully stops the GroupManager's background syncer. -func (gm *GroupManager) Stop() { +func (gm *GroupManager) Stop(ctx context.Context) { if gm.syncer != nil { gm.syncer.Stop() } diff --git a/internal/services/log_cleanup_service.go b/internal/services/log_cleanup_service.go index 6c0bb6c..1504b26 100644 --- a/internal/services/log_cleanup_service.go +++ b/internal/services/log_cleanup_service.go @@ -1,9 +1,11 @@ package services import ( + "context" "gpt-load/internal/config" "gpt-load/internal/models" "gpt-load/internal/store" + "sync" "time" "github.com/sirupsen/logrus" @@ -16,6 +18,7 @@ type LogCleanupService struct { settingsManager *config.SystemSettingsManager leaderLock *store.LeaderLock stopCh chan struct{} + wg sync.WaitGroup } // NewLogCleanupService 创建新的日志清理服务 @@ -30,18 +33,32 @@ func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsMan // Start 启动日志清理服务 func (s *LogCleanupService) Start() { + s.wg.Add(1) go s.run() logrus.Debug("Log cleanup service started") } // Stop 停止日志清理服务 -func (s *LogCleanupService) Stop() { +func (s *LogCleanupService) Stop(ctx context.Context) { close(s.stopCh) - logrus.Info("Log cleanup service stopped") + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + logrus.Info("LogCleanupService stopped gracefully.") + case <-ctx.Done(): + logrus.Warn("LogCleanupService stop timed out.") + } } // run 运行日志清理的主循环 func (s *LogCleanupService) run() { + defer s.wg.Done() ticker := time.NewTicker(2 * time.Hour) defer ticker.Stop() diff --git a/internal/services/request_log_service.go b/internal/services/request_log_service.go index a026372..f49d041 100644 --- a/internal/services/request_log_service.go +++ b/internal/services/request_log_service.go @@ -8,6 +8,7 @@ import ( "gpt-load/internal/models" "gpt-load/internal/store" "strings" + "sync" "time" "github.com/google/uuid" @@ -28,61 +29,77 @@ type RequestLogService struct { store store.Store settingsManager *config.SystemSettingsManager leaderLock *store.LeaderLock - ctx context.Context - cancel context.CancelFunc + stopChan chan struct{} + wg sync.WaitGroup ticker *time.Ticker } // NewRequestLogService creates a new RequestLogService instance func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *store.LeaderLock) *RequestLogService { - ctx, cancel := context.WithCancel(context.Background()) return &RequestLogService{ db: db, store: store, settingsManager: sm, leaderLock: ls, - ctx: ctx, - cancel: cancel, + stopChan: make(chan struct{}), } } // Start initializes the service and starts the periodic flush routine func (s *RequestLogService) Start() { - go s.flush() + s.wg.Add(1) + go s.runLoop() +} + +func (s *RequestLogService) runLoop() { + defer s.wg.Done() + + // Initial flush on start + s.flush() interval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute if interval <= 0 { interval = time.Minute } s.ticker = time.NewTicker(interval) + defer s.ticker.Stop() - go func() { - for { - select { - case <-s.ticker.C: - newInterval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute - if newInterval <= 0 { - newInterval = time.Minute - } - if newInterval != interval { - s.ticker.Reset(newInterval) - interval = newInterval - logrus.Debugf("Request log write interval updated to: %v", interval) - } - s.flush() - case <-s.ctx.Done(): - s.ticker.Stop() - logrus.Info("RequestLogService stopped.") - return + for { + select { + case <-s.ticker.C: + newInterval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute + if newInterval <= 0 { + newInterval = time.Minute } + if newInterval != interval { + s.ticker.Reset(newInterval) + interval = newInterval + logrus.Debugf("Request log write interval updated to: %v", interval) + } + s.flush() + case <-s.stopChan: + return } - }() + } } // Stop gracefully stops the RequestLogService -func (s *RequestLogService) Stop() { - s.flush() - s.cancel() +func (s *RequestLogService) Stop(ctx context.Context) { + close(s.stopChan) + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + s.flush() + logrus.Info("RequestLogService stopped gracefully.") + case <-ctx.Done(): + logrus.Warn("RequestLogService stop timed out.") + } } // Record logs a request to the database and cache diff --git a/internal/store/leader_lock.go b/internal/store/leader_lock.go index f98d120..427a7d8 100644 --- a/internal/store/leader_lock.go +++ b/internal/store/leader_lock.go @@ -79,18 +79,28 @@ func (s *LeaderLock) Start() error { } // Stop gracefully stops the leadership maintenance process. -func (s *LeaderLock) Stop() { +func (s *LeaderLock) Stop(ctx context.Context) { if s.isSingleNode { return } - logrus.Info("Stopping leadership maintenance process...") close(s.stopChan) - s.wg.Wait() + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + logrus.Info("Leadership maintenance process stopped gracefully.") + case <-ctx.Done(): + logrus.Warn("Leadership maintenance process stop timed out.") + } if s.isLeader.Load() { s.releaseLock() } - logrus.Info("Leadership maintenance process stopped.") } // IsLeader returns true if the current node is the leader. @@ -176,7 +186,6 @@ func (s *LeaderLock) maintainLeadershipLoop() { logrus.WithError(err).Warn("Error during leadership maintenance cycle.") } case <-s.stopChan: - logrus.Info("Leadership maintenance loop stopping.") return } } diff --git a/internal/syncer/cache_syncer.go b/internal/syncer/cache_syncer.go index 28cee7b..67aca80 100644 --- a/internal/syncer/cache_syncer.go +++ b/internal/syncer/cache_syncer.go @@ -68,7 +68,6 @@ func (s *CacheSyncer[T]) Invalidate() error { // Stop gracefully shuts down the syncer's background goroutine. func (s *CacheSyncer[T]) Stop() { - s.logger.Debug("stopping cache syncer...") close(s.stopChan) s.wg.Wait() s.logger.Info("cache syncer stopped.") @@ -139,7 +138,6 @@ func (s *CacheSyncer[T]) listenForUpdates() { s.logger.Errorf("failed to reload cache after notification: %v", err) } case <-s.stopChan: - s.logger.Info("received stop signal, exiting subscriber loop.") if err := subscription.Close(); err != nil { s.logger.Errorf("failed to close subscription: %v", err) }