feat: 优雅退出

This commit is contained in:
tbphp
2025-07-13 20:55:32 +08:00
parent 99fd68d08f
commit ae751eb48a
8 changed files with 142 additions and 54 deletions

View File

@@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"time" "time"
"gpt-load/internal/config" "gpt-load/internal/config"
@@ -168,19 +169,47 @@ func (a *App) Start() error {
func (a *App) Stop(ctx context.Context) { func (a *App) Stop(ctx context.Context) {
logrus.Info("Shutting down server...") logrus.Info("Shutting down server...")
// Shutdown http server
if err := a.httpServer.Shutdown(ctx); err != nil { if err := a.httpServer.Shutdown(ctx); err != nil {
logrus.Errorf("Server forced to shutdown: %v", err) logrus.Errorf("Server forced to shutdown: %v", err)
} }
// Stop background services stoppableServices := []func(context.Context){
a.cronChecker.Stop() a.cronChecker.Stop,
a.leaderLock.Stop() a.leaderLock.Stop,
a.logCleanupService.Stop() a.logCleanupService.Stop,
a.requestLogService.Stop() a.requestLogService.Stop,
a.groupManager.Stop() a.groupManager.Stop,
a.settingsManager.Stop() a.settingsManager.Stop,
}
var wg sync.WaitGroup
wg.Add(len(stoppableServices))
for _, stopFunc := range stoppableServices {
go func(stop func(context.Context)) {
defer wg.Done()
stop(ctx)
}(stopFunc)
}
// Wait for all services to stop, or for the context to be done.
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
logrus.Info("All background services stopped.")
case <-ctx.Done():
logrus.Warn("Shutdown timed out, some services may not have stopped gracefully.")
}
// Step 3: Close storage connection last.
if a.storage != nil {
a.storage.Close() a.storage.Close()
}
logrus.Info("Server exited gracefully") logrus.Info("Server exited gracefully")
} }

View File

@@ -1,6 +1,7 @@
package config package config
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"gpt-load/internal/db" "gpt-load/internal/db"
@@ -106,7 +107,7 @@ func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager,
} }
// Stop gracefully stops the SystemSettingsManager's background syncer. // Stop gracefully stops the SystemSettingsManager's background syncer.
func (sm *SystemSettingsManager) Stop() { func (sm *SystemSettingsManager) Stop(ctx context.Context) {
if sm.syncer != nil { if sm.syncer != nil {
sm.syncer.Stop() sm.syncer.Stop()
} }

View File

@@ -1,6 +1,7 @@
package keypool package keypool
import ( import (
"context"
"gpt-load/internal/config" "gpt-load/internal/config"
"gpt-load/internal/models" "gpt-load/internal/models"
"gpt-load/internal/store" "gpt-load/internal/store"
@@ -44,12 +45,23 @@ func (s *CronChecker) Start() {
go s.runLoop() go s.runLoop()
} }
// Stop stops the cron job. // Stop stops the cron job, respecting the context for shutdown timeout.
func (s *CronChecker) Stop() { func (s *CronChecker) Stop(ctx context.Context) {
logrus.Info("Stopping CronChecker...")
close(s.stopChan) close(s.stopChan)
// Wait for the goroutine to finish, or for the shutdown to time out.
done := make(chan struct{})
go func() {
s.wg.Wait() s.wg.Wait()
logrus.Info("CronChecker stopped.") close(done)
}()
select {
case <-done:
logrus.Info("CronChecker stopped gracefully.")
case <-ctx.Done():
logrus.Warn("CronChecker stop timed out.")
}
} }
func (s *CronChecker) runLoop() { func (s *CronChecker) runLoop() {
@@ -104,8 +116,12 @@ func (s *CronChecker) submitValidationJobs() {
validatedCount := len(invalidKeys) validatedCount := len(invalidKeys)
becameValidCount := 0 becameValidCount := 0
if validatedCount > 0 { if validatedCount > 0 {
logrus.Debugf("CronChecker: Found %d invalid keys to validate for group %s.", validatedCount, group.Name)
for j := range invalidKeys { for j := range invalidKeys {
select {
case <-s.stopChan:
return
default:
}
key := &invalidKeys[j] key := &invalidKeys[j]
isValid, _ := s.Validator.ValidateSingleKey(key, group) isValid, _ := s.Validator.ValidateSingleKey(key, group)

View File

@@ -1,6 +1,7 @@
package services package services
import ( import (
"context"
"fmt" "fmt"
"gpt-load/internal/config" "gpt-load/internal/config"
"gpt-load/internal/models" "gpt-load/internal/models"
@@ -93,7 +94,7 @@ func (gm *GroupManager) Invalidate() error {
} }
// Stop gracefully stops the GroupManager's background syncer. // Stop gracefully stops the GroupManager's background syncer.
func (gm *GroupManager) Stop() { func (gm *GroupManager) Stop(ctx context.Context) {
if gm.syncer != nil { if gm.syncer != nil {
gm.syncer.Stop() gm.syncer.Stop()
} }

View File

@@ -1,9 +1,11 @@
package services package services
import ( import (
"context"
"gpt-load/internal/config" "gpt-load/internal/config"
"gpt-load/internal/models" "gpt-load/internal/models"
"gpt-load/internal/store" "gpt-load/internal/store"
"sync"
"time" "time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@@ -16,6 +18,7 @@ type LogCleanupService struct {
settingsManager *config.SystemSettingsManager settingsManager *config.SystemSettingsManager
leaderLock *store.LeaderLock leaderLock *store.LeaderLock
stopCh chan struct{} stopCh chan struct{}
wg sync.WaitGroup
} }
// NewLogCleanupService 创建新的日志清理服务 // NewLogCleanupService 创建新的日志清理服务
@@ -30,18 +33,32 @@ func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsMan
// Start 启动日志清理服务 // Start 启动日志清理服务
func (s *LogCleanupService) Start() { func (s *LogCleanupService) Start() {
s.wg.Add(1)
go s.run() go s.run()
logrus.Debug("Log cleanup service started") logrus.Debug("Log cleanup service started")
} }
// Stop 停止日志清理服务 // Stop 停止日志清理服务
func (s *LogCleanupService) Stop() { func (s *LogCleanupService) Stop(ctx context.Context) {
close(s.stopCh) close(s.stopCh)
logrus.Info("Log cleanup service stopped")
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
logrus.Info("LogCleanupService stopped gracefully.")
case <-ctx.Done():
logrus.Warn("LogCleanupService stop timed out.")
}
} }
// run 运行日志清理的主循环 // run 运行日志清理的主循环
func (s *LogCleanupService) run() { func (s *LogCleanupService) run() {
defer s.wg.Done()
ticker := time.NewTicker(2 * time.Hour) ticker := time.NewTicker(2 * time.Hour)
defer ticker.Stop() defer ticker.Stop()

View File

@@ -8,6 +8,7 @@ import (
"gpt-load/internal/models" "gpt-load/internal/models"
"gpt-load/internal/store" "gpt-load/internal/store"
"strings" "strings"
"sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@@ -28,35 +29,41 @@ type RequestLogService struct {
store store.Store store store.Store
settingsManager *config.SystemSettingsManager settingsManager *config.SystemSettingsManager
leaderLock *store.LeaderLock leaderLock *store.LeaderLock
ctx context.Context stopChan chan struct{}
cancel context.CancelFunc wg sync.WaitGroup
ticker *time.Ticker ticker *time.Ticker
} }
// NewRequestLogService creates a new RequestLogService instance // NewRequestLogService creates a new RequestLogService instance
func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *store.LeaderLock) *RequestLogService { func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *store.LeaderLock) *RequestLogService {
ctx, cancel := context.WithCancel(context.Background())
return &RequestLogService{ return &RequestLogService{
db: db, db: db,
store: store, store: store,
settingsManager: sm, settingsManager: sm,
leaderLock: ls, leaderLock: ls,
ctx: ctx, stopChan: make(chan struct{}),
cancel: cancel,
} }
} }
// Start initializes the service and starts the periodic flush routine // Start initializes the service and starts the periodic flush routine
func (s *RequestLogService) Start() { func (s *RequestLogService) Start() {
go s.flush() s.wg.Add(1)
go s.runLoop()
}
func (s *RequestLogService) runLoop() {
defer s.wg.Done()
// Initial flush on start
s.flush()
interval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute interval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute
if interval <= 0 { if interval <= 0 {
interval = time.Minute interval = time.Minute
} }
s.ticker = time.NewTicker(interval) s.ticker = time.NewTicker(interval)
defer s.ticker.Stop()
go func() {
for { for {
select { select {
case <-s.ticker.C: case <-s.ticker.C:
@@ -70,19 +77,29 @@ func (s *RequestLogService) Start() {
logrus.Debugf("Request log write interval updated to: %v", interval) logrus.Debugf("Request log write interval updated to: %v", interval)
} }
s.flush() s.flush()
case <-s.ctx.Done(): case <-s.stopChan:
s.ticker.Stop()
logrus.Info("RequestLogService stopped.")
return return
} }
} }
}()
} }
// Stop gracefully stops the RequestLogService // Stop gracefully stops the RequestLogService
func (s *RequestLogService) Stop() { func (s *RequestLogService) Stop(ctx context.Context) {
close(s.stopChan)
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
s.flush() s.flush()
s.cancel() logrus.Info("RequestLogService stopped gracefully.")
case <-ctx.Done():
logrus.Warn("RequestLogService stop timed out.")
}
} }
// Record logs a request to the database and cache // Record logs a request to the database and cache

View File

@@ -79,18 +79,28 @@ func (s *LeaderLock) Start() error {
} }
// Stop gracefully stops the leadership maintenance process. // Stop gracefully stops the leadership maintenance process.
func (s *LeaderLock) Stop() { func (s *LeaderLock) Stop(ctx context.Context) {
if s.isSingleNode { if s.isSingleNode {
return return
} }
logrus.Info("Stopping leadership maintenance process...")
close(s.stopChan) close(s.stopChan)
done := make(chan struct{})
go func() {
s.wg.Wait() s.wg.Wait()
close(done)
}()
select {
case <-done:
logrus.Info("Leadership maintenance process stopped gracefully.")
case <-ctx.Done():
logrus.Warn("Leadership maintenance process stop timed out.")
}
if s.isLeader.Load() { if s.isLeader.Load() {
s.releaseLock() s.releaseLock()
} }
logrus.Info("Leadership maintenance process stopped.")
} }
// IsLeader returns true if the current node is the leader. // IsLeader returns true if the current node is the leader.
@@ -176,7 +186,6 @@ func (s *LeaderLock) maintainLeadershipLoop() {
logrus.WithError(err).Warn("Error during leadership maintenance cycle.") logrus.WithError(err).Warn("Error during leadership maintenance cycle.")
} }
case <-s.stopChan: case <-s.stopChan:
logrus.Info("Leadership maintenance loop stopping.")
return return
} }
} }

View File

@@ -68,7 +68,6 @@ func (s *CacheSyncer[T]) Invalidate() error {
// Stop gracefully shuts down the syncer's background goroutine. // Stop gracefully shuts down the syncer's background goroutine.
func (s *CacheSyncer[T]) Stop() { func (s *CacheSyncer[T]) Stop() {
s.logger.Debug("stopping cache syncer...")
close(s.stopChan) close(s.stopChan)
s.wg.Wait() s.wg.Wait()
s.logger.Info("cache syncer stopped.") s.logger.Info("cache syncer stopped.")
@@ -139,7 +138,6 @@ func (s *CacheSyncer[T]) listenForUpdates() {
s.logger.Errorf("failed to reload cache after notification: %v", err) s.logger.Errorf("failed to reload cache after notification: %v", err)
} }
case <-s.stopChan: case <-s.stopChan:
s.logger.Info("received stop signal, exiting subscriber loop.")
if err := subscription.Close(); err != nil { if err := subscription.Close(); err != nil {
s.logger.Errorf("failed to close subscription: %v", err) s.logger.Errorf("failed to close subscription: %v", err)
} }