Files
gpt-load/internal/app/app.go
2025-07-14 11:13:44 +08:00

231 lines
7.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package app provides the main application logic and lifecycle management.
package app
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"gpt-load/internal/config"
"gpt-load/internal/keypool"
"gpt-load/internal/models"
"gpt-load/internal/proxy"
"gpt-load/internal/services"
"gpt-load/internal/store"
"gpt-load/internal/types"
"gpt-load/internal/version"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"go.uber.org/dig"
"gorm.io/gorm"
)
// App holds all services and manages the application lifecycle.
type App struct {
engine *gin.Engine
configManager types.ConfigManager
settingsManager *config.SystemSettingsManager
groupManager *services.GroupManager
logCleanupService *services.LogCleanupService
requestLogService *services.RequestLogService
cronChecker *keypool.CronChecker
keyPoolProvider *keypool.KeyProvider
leaderLock *store.LeaderLock
proxyServer *proxy.ProxyServer
storage store.Store
db *gorm.DB
httpServer *http.Server
}
// AppParams defines the dependencies for the App.
type AppParams struct {
dig.In
Engine *gin.Engine
ConfigManager types.ConfigManager
SettingsManager *config.SystemSettingsManager
GroupManager *services.GroupManager
LogCleanupService *services.LogCleanupService
RequestLogService *services.RequestLogService
CronChecker *keypool.CronChecker
KeyPoolProvider *keypool.KeyProvider
LeaderLock *store.LeaderLock
ProxyServer *proxy.ProxyServer
Storage store.Store
DB *gorm.DB
}
// NewApp is the constructor for App, with dependencies injected by dig.
func NewApp(params AppParams) *App {
return &App{
engine: params.Engine,
configManager: params.ConfigManager,
settingsManager: params.SettingsManager,
groupManager: params.GroupManager,
logCleanupService: params.LogCleanupService,
requestLogService: params.RequestLogService,
cronChecker: params.CronChecker,
keyPoolProvider: params.KeyPoolProvider,
leaderLock: params.LeaderLock,
proxyServer: params.ProxyServer,
storage: params.Storage,
db: params.DB,
}
}
// Start runs the application, it is a non-blocking call.
func (a *App) Start() error {
// 启动 Leader Lock 服务并等待选举结果
if err := a.leaderLock.Start(); err != nil {
return fmt.Errorf("leader service failed to start: %w", err)
}
// Leader 节点执行初始化Follower 节点等待
if a.leaderLock.IsLeader() {
logrus.Info("Leader mode. Performing initial one-time tasks...")
acquired, err := a.leaderLock.AcquireInitializingLock()
if err != nil {
return fmt.Errorf("failed to acquire initializing lock: %w", err)
}
if !acquired {
logrus.Warn("Could not acquire initializing lock, another leader might be active. Switching to follower mode for initialization.")
if err := a.leaderLock.WaitForInitializationToComplete(); err != nil {
return fmt.Errorf("failed to wait for initialization as a fallback follower: %w", err)
}
} else {
defer a.leaderLock.ReleaseInitializingLock()
// 数据库迁移
if err := a.db.AutoMigrate(
&models.SystemSetting{},
&models.Group{},
&models.APIKey{},
&models.RequestLog{},
&models.GroupHourlyStat{},
); err != nil {
return fmt.Errorf("database auto-migration failed: %w", err)
}
logrus.Info("Database auto-migration completed.")
// 初始化系统设置
if err := a.settingsManager.EnsureSettingsInitialized(); err != nil {
return fmt.Errorf("failed to initialize system settings: %w", err)
}
logrus.Info("System settings initialized in DB.")
a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderLock)
// 从数据库加载密钥到 Redis
if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil {
return fmt.Errorf("failed to load keys into key pool: %w", err)
}
logrus.Debug("API keys loaded into Redis cache by leader.")
}
} else {
logrus.Info("Follower Mode. Waiting for leader to complete initialization.")
if err := a.leaderLock.WaitForInitializationToComplete(); err != nil {
return fmt.Errorf("follower failed to start: %w", err)
}
a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderLock)
}
// 显示配置并启动所有后台服务
a.configManager.DisplayServerConfig()
a.groupManager.Initialize()
a.requestLogService.Start()
a.logCleanupService.Start()
a.cronChecker.Start()
// Create HTTP server
serverConfig := a.configManager.GetEffectiveServerConfig()
a.httpServer = &http.Server{
Addr: fmt.Sprintf("%s:%d", serverConfig.Host, serverConfig.Port),
Handler: a.engine,
ReadTimeout: time.Duration(serverConfig.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(serverConfig.WriteTimeout) * time.Second,
IdleTimeout: time.Duration(serverConfig.IdleTimeout) * time.Second,
MaxHeaderBytes: 1 << 20,
}
// Start HTTP server in a new goroutine
go func() {
logrus.Infof("GPT-Load proxy server started successfully on Version: %s", version.Version)
logrus.Infof("Server address: http://%s:%d", serverConfig.Host, serverConfig.Port)
logrus.Info("")
if err := a.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logrus.Fatalf("Server startup failed: %v", err)
}
}()
return nil
}
// Stop gracefully shuts down the application.
func (a *App) Stop(ctx context.Context) {
logrus.Info("Shutting down server...")
serverConfig := a.configManager.GetEffectiveServerConfig()
totalTimeout := time.Duration(serverConfig.GracefulShutdownTimeout) * time.Second
// 动态计算 HTTP 关机超时时间,为后台服务固定预留 5 秒
httpShutdownTimeout := totalTimeout - 5*time.Second
// 为 HTTP 服务器的优雅关闭创建一个独立的 context
httpShutdownCtx, cancelHttpShutdown := context.WithTimeout(context.Background(), httpShutdownTimeout)
defer cancelHttpShutdown()
logrus.Debugf("Attempting to gracefully shut down HTTP server (max %v)...", httpShutdownTimeout)
if err := a.httpServer.Shutdown(httpShutdownCtx); err != nil {
logrus.Debugf("HTTP server graceful shutdown timed out as expected, forcing remaining connections to close.")
if closeErr := a.httpServer.Close(); closeErr != nil {
logrus.Errorf("Error forcing HTTP server to close: %v", closeErr)
}
}
logrus.Info("HTTP server has been shut down.")
// 使用原始的总超时 context 继续关闭其他后台服务
stoppableServices := []func(context.Context){
a.cronChecker.Stop,
a.leaderLock.Stop,
a.logCleanupService.Stop,
a.requestLogService.Stop,
a.groupManager.Stop,
a.settingsManager.Stop,
}
var wg sync.WaitGroup
wg.Add(len(stoppableServices))
for _, stopFunc := range stoppableServices {
go func(stop func(context.Context)) {
defer wg.Done()
stop(ctx)
}(stopFunc)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
logrus.Info("All background services stopped.")
case <-ctx.Done():
logrus.Warn("Shutdown timed out, some services may not have stopped gracefully.")
}
// Step 3: Close storage connection last.
if a.storage != nil {
a.storage.Close()
}
logrus.Info("Server exited gracefully")
}