feat: 分组管理内存集群同步缓存
This commit is contained in:
@@ -27,6 +27,7 @@ type App struct {
|
||||
engine *gin.Engine
|
||||
configManager types.ConfigManager
|
||||
settingsManager *config.SystemSettingsManager
|
||||
groupManager *services.GroupManager
|
||||
logCleanupService *services.LogCleanupService
|
||||
keyCronService *services.KeyCronService
|
||||
keyValidationPool *services.KeyValidationPool
|
||||
@@ -46,6 +47,7 @@ type AppParams struct {
|
||||
Engine *gin.Engine
|
||||
ConfigManager types.ConfigManager
|
||||
SettingsManager *config.SystemSettingsManager
|
||||
GroupManager *services.GroupManager
|
||||
LogCleanupService *services.LogCleanupService
|
||||
KeyCronService *services.KeyCronService
|
||||
KeyValidationPool *services.KeyValidationPool
|
||||
@@ -63,6 +65,7 @@ func NewApp(params AppParams) *App {
|
||||
engine: params.Engine,
|
||||
configManager: params.ConfigManager,
|
||||
settingsManager: params.SettingsManager,
|
||||
groupManager: params.GroupManager,
|
||||
logCleanupService: params.LogCleanupService,
|
||||
keyCronService: params.KeyCronService,
|
||||
keyValidationPool: params.KeyValidationPool,
|
||||
@@ -131,6 +134,8 @@ func (a *App) Start() error {
|
||||
a.settingsManager.Initialize(a.storage)
|
||||
}
|
||||
|
||||
a.groupManager.Initialize()
|
||||
|
||||
// 显示配置并启动所有后台服务
|
||||
a.configManager.DisplayConfig()
|
||||
|
||||
|
@@ -59,6 +59,9 @@ func BuildContainer() (*dig.Container, error) {
|
||||
if err := container.Provide(services.NewLogCleanupService); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Provide(services.NewGroupManager); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Provide(keypool.NewProvider); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -210,6 +210,9 @@ func (s *Server) CreateGroup(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.GroupManager.Invalidate(); err != nil {
|
||||
logrus.WithContext(c.Request.Context()).WithError(err).Error("failed to invalidate group cache")
|
||||
}
|
||||
response.Success(c, s.newGroupResponse(&group))
|
||||
}
|
||||
|
||||
@@ -341,6 +344,9 @@ func (s *Server) UpdateGroup(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.GroupManager.Invalidate(); err != nil {
|
||||
logrus.WithContext(c.Request.Context()).WithError(err).Error("failed to invalidate group cache")
|
||||
}
|
||||
response.Success(c, s.newGroupResponse(&group))
|
||||
}
|
||||
|
||||
@@ -471,6 +477,9 @@ func (s *Server) DeleteGroup(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.GroupManager.Invalidate(); err != nil {
|
||||
logrus.WithContext(c.Request.Context()).WithError(err).Error("failed to invalidate group cache")
|
||||
}
|
||||
response.Success(c, gin.H{"message": "Group and associated keys deleted successfully"})
|
||||
}
|
||||
|
||||
|
@@ -20,6 +20,7 @@ type Server struct {
|
||||
DB *gorm.DB
|
||||
config types.ConfigManager
|
||||
SettingsManager *config.SystemSettingsManager
|
||||
GroupManager *services.GroupManager
|
||||
KeyManualValidationService *services.KeyManualValidationService
|
||||
TaskService *services.TaskService
|
||||
KeyService *services.KeyService
|
||||
@@ -32,6 +33,7 @@ type NewServerParams struct {
|
||||
DB *gorm.DB
|
||||
Config types.ConfigManager
|
||||
SettingsManager *config.SystemSettingsManager
|
||||
GroupManager *services.GroupManager
|
||||
KeyManualValidationService *services.KeyManualValidationService
|
||||
TaskService *services.TaskService
|
||||
KeyService *services.KeyService
|
||||
@@ -44,6 +46,7 @@ func NewServer(params NewServerParams) *Server {
|
||||
DB: params.DB,
|
||||
config: params.Config,
|
||||
SettingsManager: params.SettingsManager,
|
||||
GroupManager: params.GroupManager,
|
||||
KeyManualValidationService: params.KeyManualValidationService,
|
||||
TaskService: params.TaskService,
|
||||
KeyService: params.KeyService,
|
||||
|
87
internal/services/group_manager.go
Normal file
87
internal/services/group_manager.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gpt-load/internal/errors"
|
||||
"gpt-load/internal/models"
|
||||
"gpt-load/internal/store"
|
||||
"gpt-load/internal/syncer"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
const GroupUpdateChannel = "groups:updated"
|
||||
|
||||
// GroupManager manages the caching of group data.
|
||||
type GroupManager struct {
|
||||
syncer *syncer.CacheSyncer[map[string]*models.Group]
|
||||
db *gorm.DB
|
||||
store store.Store
|
||||
}
|
||||
|
||||
// NewGroupManager creates a new, uninitialized GroupManager.
|
||||
func NewGroupManager(db *gorm.DB, store store.Store) *GroupManager {
|
||||
return &GroupManager{
|
||||
db: db,
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize sets up the CacheSyncer. This is called separately to handle potential
|
||||
func (gm *GroupManager) Initialize() error {
|
||||
loader := func() (map[string]*models.Group, error) {
|
||||
var groups []*models.Group
|
||||
if err := gm.db.Find(&groups).Error; err != nil {
|
||||
return nil, fmt.Errorf("failed to load groups from db: %w", err)
|
||||
}
|
||||
|
||||
groupMap := make(map[string]*models.Group, len(groups))
|
||||
for _, group := range groups {
|
||||
g := *group
|
||||
groupMap[g.Name] = &g
|
||||
}
|
||||
return groupMap, nil
|
||||
}
|
||||
|
||||
syncer, err := syncer.NewCacheSyncer(
|
||||
loader,
|
||||
gm.store,
|
||||
GroupUpdateChannel,
|
||||
logrus.WithField("syncer", "groups"),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create group syncer: %w", err)
|
||||
}
|
||||
gm.syncer = syncer
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGroupByName retrieves a single group by its name from the cache.
|
||||
func (gm *GroupManager) GetGroupByName(name string) (*models.Group, error) {
|
||||
if gm.syncer == nil {
|
||||
return nil, fmt.Errorf("GroupManager is not initialized")
|
||||
}
|
||||
|
||||
groups := gm.syncer.Get()
|
||||
group, ok := groups[name]
|
||||
if !ok {
|
||||
return nil, errors.ErrResourceNotFound
|
||||
}
|
||||
return group, nil
|
||||
}
|
||||
|
||||
// Invalidate triggers a cache reload across all instances.
|
||||
func (gm *GroupManager) Invalidate() error {
|
||||
if gm.syncer == nil {
|
||||
return fmt.Errorf("GroupManager is not initialized")
|
||||
}
|
||||
return gm.syncer.Invalidate()
|
||||
}
|
||||
|
||||
// Stop gracefully stops the GroupManager's background syncer.
|
||||
func (gm *GroupManager) Stop() {
|
||||
if gm.syncer != nil {
|
||||
gm.syncer.Stop()
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user