From 37bdcf2e7277f4a6690580e759a4de13ef54990a Mon Sep 17 00:00:00 2001 From: tbphp Date: Sun, 13 Jul 2025 19:27:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E9=87=8D=E6=9E=84=E8=B0=83=E6=95=B4key=E9=AA=8C=E8=AF=81?= =?UTF-8?q?=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 3 +- internal/app/app.go | 5 - internal/config/manager.go | 2 - internal/config/system_settings.go | 2 +- internal/container/container.go | 3 - internal/handler/key_handler.go | 2 +- internal/keypool/provider.go | 4 +- internal/keypool/validator.go | 13 +-- internal/services/key_cron_service.go | 82 +++++++--------- .../services/key_manual_validation_service.go | 11 +-- internal/services/key_service.go | 5 +- .../services/key_validation_concurrency.go | 93 ------------------- internal/types/types.go | 1 - 13 files changed, 51 insertions(+), 175 deletions(-) delete mode 100644 internal/services/key_validation_concurrency.go diff --git a/.env.example b/.env.example index a6aae13..ed3294b 100644 --- a/.env.example +++ b/.env.example @@ -18,9 +18,8 @@ ALLOWED_METHODS=GET,POST,PUT,DELETE,OPTIONS ALLOWED_HEADERS=* ALLOW_CREDENTIALS=false -# 性能配置 +# 并发数量 MAX_CONCURRENT_REQUESTS=100 -KEY_VALIDATION_POOL_SIZE=50 # 数据库配置 # 示例 DSN: user:password@tcp(localhost:3306)/gpt_load?charset=utf8mb4&parseTime=True&loc=Local diff --git a/internal/app/app.go b/internal/app/app.go index 51d67fe..d3e5708 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -31,7 +31,6 @@ type App struct { logCleanupService *services.LogCleanupService requestLogService *services.RequestLogService keyCronService *services.KeyCronService - keyValidationPool *services.KeyValidationPool keyPoolProvider *keypool.KeyProvider leaderService *services.LeaderService proxyServer *proxy.ProxyServer @@ -50,7 +49,6 @@ type AppParams struct { LogCleanupService *services.LogCleanupService RequestLogService *services.RequestLogService KeyCronService *services.KeyCronService - KeyValidationPool *services.KeyValidationPool KeyPoolProvider *keypool.KeyProvider LeaderService *services.LeaderService ProxyServer *proxy.ProxyServer @@ -68,7 +66,6 @@ func NewApp(params AppParams) *App { logCleanupService: params.LogCleanupService, requestLogService: params.RequestLogService, keyCronService: params.KeyCronService, - keyValidationPool: params.KeyValidationPool, keyPoolProvider: params.KeyPoolProvider, leaderService: params.LeaderService, proxyServer: params.ProxyServer, @@ -141,7 +138,6 @@ func (a *App) Start() error { a.requestLogService.Start() a.logCleanupService.Start() - a.keyValidationPool.Start() a.keyCronService.Start() // Create HTTP server @@ -179,7 +175,6 @@ func (a *App) Stop(ctx context.Context) { // Stop background services a.keyCronService.Stop() - a.keyValidationPool.Stop() a.leaderService.Stop() a.logCleanupService.Stop() a.requestLogService.Stop() diff --git a/internal/config/manager.go b/internal/config/manager.go index ea7ceec..f187561 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -89,7 +89,6 @@ func (m *Manager) ReloadConfig() error { }, Performance: types.PerformanceConfig{ MaxConcurrentRequests: utils.ParseInteger(os.Getenv("MAX_CONCURRENT_REQUESTS"), 100), - KeyValidationPoolSize: utils.ParseInteger(os.Getenv("KEY_VALIDATION_POOL_SIZE"), 10), }, Log: types.LogConfig{ Level: utils.GetEnvOrDefault("LOG_LEVEL", "info"), @@ -193,7 +192,6 @@ func (m *Manager) DisplayServerConfig() { logrus.Info("--- Performance ---") logrus.Infof(" Max Concurrent Requests: %d", perfConfig.MaxConcurrentRequests) - logrus.Infof(" Key Validation Pool Size: %d", perfConfig.KeyValidationPoolSize) logrus.Info("--- Security ---") logrus.Infof(" Authentication: enabled (key loaded)") diff --git a/internal/config/system_settings.go b/internal/config/system_settings.go index 79450cf..b8853ec 100644 --- a/internal/config/system_settings.go +++ b/internal/config/system_settings.go @@ -227,7 +227,7 @@ func (sm *SystemSettingsManager) GetEffectiveConfig(groupConfigJSON datatypes.JS gcv := reflect.ValueOf(groupConfig) ecv := reflect.ValueOf(&effectiveConfig).Elem() - for i := 0; i < gcv.NumField(); i++ { + for i := range gcv.NumField() { groupField := gcv.Field(i) if groupField.Kind() == reflect.Ptr && !groupField.IsNil() { groupFieldValue := groupField.Elem() diff --git a/internal/container/container.go b/internal/container/container.go index 855a756..a976be1 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -48,9 +48,6 @@ func BuildContainer() (*dig.Container, error) { if err := container.Provide(services.NewTaskService); err != nil { return nil, err } - if err := container.Provide(services.NewKeyValidationPool); err != nil { - return nil, err - } if err := container.Provide(services.NewKeyManualValidationService); err != nil { return nil, err } diff --git a/internal/handler/key_handler.go b/internal/handler/key_handler.go index c9500db..25cbb2d 100644 --- a/internal/handler/key_handler.go +++ b/internal/handler/key_handler.go @@ -211,7 +211,7 @@ func (s *Server) TestMultipleKeys(c *gin.Context) { return } - results, err := s.KeyService.TestMultipleKeys(c.Request.Context(), group, req.KeysText) + results, err := s.KeyService.TestMultipleKeys(group, req.KeysText) if err != nil { if strings.Contains(err.Error(), "batch size exceeds the limit") { response.Error(c, app_errors.NewAPIError(app_errors.ErrValidation, err.Error())) diff --git a/internal/keypool/provider.go b/internal/keypool/provider.go index 11b40d0..5a98368 100644 --- a/internal/keypool/provider.go +++ b/internal/keypool/provider.go @@ -140,12 +140,12 @@ func (p *KeyProvider) handleFailure(apiKey *models.APIKey, group *models.Group, return fmt.Errorf("failed to get key details from store: %w", err) } - failureCount, _ := strconv.ParseInt(keyDetails["failure_count"], 10, 64) - if keyDetails["status"] == models.KeyStatusInvalid { return nil } + failureCount, _ := strconv.ParseInt(keyDetails["failure_count"], 10, 64) + // 获取该分组的有效配置 blacklistThreshold := group.EffectiveConfig.BlacklistThreshold diff --git a/internal/keypool/validator.go b/internal/keypool/validator.go index bd57ae2..ccc2d3d 100644 --- a/internal/keypool/validator.go +++ b/internal/keypool/validator.go @@ -6,6 +6,7 @@ import ( "gpt-load/internal/channel" "gpt-load/internal/config" "gpt-load/internal/models" + "time" "github.com/sirupsen/logrus" "go.uber.org/dig" @@ -46,10 +47,9 @@ func NewKeyValidator(params KeyValidatorParams) *KeyValidator { } // ValidateSingleKey performs a validation check on a single API key. -func (s *KeyValidator) ValidateSingleKey(ctx context.Context, key *models.APIKey, group *models.Group) (bool, error) { - if ctx.Err() != nil { - return false, fmt.Errorf("context cancelled or timed out: %w", ctx.Err()) - } +func (s *KeyValidator) ValidateSingleKey(key *models.APIKey, group *models.Group) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() ch, err := s.channelFactory.GetChannel(group) if err != nil { @@ -79,7 +79,7 @@ func (s *KeyValidator) ValidateSingleKey(ctx context.Context, key *models.APIKey } // TestMultipleKeys performs a synchronous validation for a list of key values within a specific group. -func (s *KeyValidator) TestMultipleKeys(ctx context.Context, group *models.Group, keyValues []string) ([]KeyTestResult, error) { +func (s *KeyValidator) TestMultipleKeys(group *models.Group, keyValues []string) ([]KeyTestResult, error) { results := make([]KeyTestResult, len(keyValues)) // Find which of the provided keys actually exist in the database for this group @@ -103,7 +103,8 @@ func (s *KeyValidator) TestMultipleKeys(ctx context.Context, group *models.Group continue } - isValid, validationErr := s.ValidateSingleKey(ctx, &apiKey, group) + isValid, validationErr := s.ValidateSingleKey(&apiKey, group) + results[i] = KeyTestResult{ KeyValue: kv, IsValid: isValid, diff --git a/internal/services/key_cron_service.go b/internal/services/key_cron_service.go index 69c1574..786f9d7 100644 --- a/internal/services/key_cron_service.go +++ b/internal/services/key_cron_service.go @@ -1,8 +1,8 @@ package services import ( - "context" "gpt-load/internal/config" + "gpt-load/internal/keypool" "gpt-load/internal/models" "sync" "time" @@ -11,11 +11,11 @@ import ( "gorm.io/gorm" ) -// KeyCronService is responsible for periodically submitting keys for validation. +// KeyCronService is responsible for periodically validating invalid keys. type KeyCronService struct { DB *gorm.DB SettingsManager *config.SystemSettingsManager - Pool *KeyValidationPool + Validator *keypool.KeyValidator LeaderService *LeaderService stopChan chan struct{} wg sync.WaitGroup @@ -25,13 +25,13 @@ type KeyCronService struct { func NewKeyCronService( db *gorm.DB, settingsManager *config.SystemSettingsManager, - pool *KeyValidationPool, + validator *keypool.KeyValidator, leaderService *LeaderService, ) *KeyCronService { return &KeyCronService{ DB: db, SettingsManager: settingsManager, - Pool: pool, + Validator: validator, LeaderService: leaderService, stopChan: make(chan struct{}), } @@ -77,7 +77,7 @@ func (s *KeyCronService) runLoop() { } } -// submitValidationJobs finds groups and keys that need validation and submits them to the pool. +// submitValidationJobs finds groups whose keys need validation and validates them. func (s *KeyCronService) submitValidationJobs() { var groups []models.Group if err := s.DB.Find(&groups).Error; err != nil { @@ -86,61 +86,47 @@ func (s *KeyCronService) submitValidationJobs() { } validationStartTime := time.Now() - groupsToUpdateTimestamp := make(map[uint]*models.Group) - total := 0 for i := range groups { group := &groups[i] effectiveSettings := s.SettingsManager.GetEffectiveConfig(group.Config) interval := time.Duration(effectiveSettings.KeyValidationIntervalMinutes) * time.Minute if group.LastValidatedAt == nil || validationStartTime.Sub(*group.LastValidatedAt) > interval { - groupsToUpdateTimestamp[group.ID] = group - var keys []models.APIKey - if err := s.DB.Where("group_id = ?", group.ID).Find(&keys).Error; err != nil { - logrus.Errorf("KeyCronService: Failed to get keys for group %s: %v", group.Name, err) + groupProcessStart := time.Now() + var invalidKeys []models.APIKey + err := s.DB.Where("group_id = ? AND status = ?", group.ID, models.KeyStatusInvalid).Find(&invalidKeys).Error + if err != nil { + logrus.Errorf("KeyCronService: Failed to get invalid keys for group %s: %v", group.Name, err) continue } - lenKey := len(keys) + validatedCount := len(invalidKeys) + becameValidCount := 0 + if validatedCount > 0 { + logrus.Debugf("KeyCronService: Found %d invalid keys to validate for group %s.", validatedCount, group.Name) + for j := range invalidKeys { + key := &invalidKeys[j] + isValid, _ := s.Validator.ValidateSingleKey(key, group) - if lenKey == 0 { - continue - } - - total += lenKey - - if lenKey > 0 { - logrus.Infof("KeyCronService: Submitting %d keys for group %s for validation.", lenKey, group.Name) - } - - for _, key := range keys { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - - job := ValidationJob{ - Key: key, - Group: group, - Ctx: ctx, - CancelFunc: cancel, + if isValid { + becameValidCount++ + } } - - s.Pool.SubmitJob(job) } + + if err := s.DB.Model(group).Update("last_validated_at", time.Now()).Error; err != nil { + logrus.Errorf("KeyCronService: Failed to update last_validated_at for group %s: %v", group.Name, err) + } + + duration := time.Since(groupProcessStart) + logrus.Infof( + "KeyCronService: Group '%s' validation finished. Total checked: %d, became valid: %d. Duration: %s.", + group.Name, + validatedCount, + becameValidCount, + duration.String(), + ) } } - - // Update timestamps for all groups that were due for validation - if len(groupsToUpdateTimestamp) > 0 { - s.updateGroupTimestamps(groupsToUpdateTimestamp, validationStartTime) - } -} - -func (s *KeyCronService) updateGroupTimestamps(groups map[uint]*models.Group, validationStartTime time.Time) { - var groupIDs []uint - for id := range groups { - groupIDs = append(groupIDs, id) - } - if err := s.DB.Model(&models.Group{}).Where("id IN ?", groupIDs).Update("last_validated_at", validationStartTime).Error; err != nil { - logrus.Errorf("KeyCronService: Failed to batch update last_validated_at for groups: %v", err) - } } diff --git a/internal/services/key_manual_validation_service.go b/internal/services/key_manual_validation_service.go index cefa37b..319ef4e 100644 --- a/internal/services/key_manual_validation_service.go +++ b/internal/services/key_manual_validation_service.go @@ -1,7 +1,6 @@ package services import ( - "context" "fmt" "gpt-load/internal/config" "gpt-load/internal/keypool" @@ -71,12 +70,8 @@ func (s *KeyManualValidationService) runValidation(group *models.Group, keys []m jobs := make(chan models.APIKey, len(keys)) results := make(chan bool, len(keys)) - performanceConfig := s.ConfigManager.GetPerformanceConfig() - concurrency := performanceConfig.KeyValidationPoolSize - - if concurrency <= 0 { - concurrency = 10 - } + // 固定10并发,避免超频 + concurrency := 10 var wg sync.WaitGroup for range concurrency { @@ -136,7 +131,7 @@ func (s *KeyManualValidationService) runValidation(group *models.Group, keys []m func (s *KeyManualValidationService) validationWorker(wg *sync.WaitGroup, group *models.Group, jobs <-chan models.APIKey, results chan<- bool) { defer wg.Done() for key := range jobs { - isValid, _ := s.Validator.ValidateSingleKey(context.Background(), &key, group) + isValid, _ := s.Validator.ValidateSingleKey(&key, group) results <- isValid } } diff --git a/internal/services/key_service.go b/internal/services/key_service.go index b9ecf99..8c4dadd 100644 --- a/internal/services/key_service.go +++ b/internal/services/key_service.go @@ -1,7 +1,6 @@ package services import ( - "context" "encoding/json" "fmt" "gpt-load/internal/keypool" @@ -282,7 +281,7 @@ func (s *KeyService) ListKeysInGroupQuery(groupID uint, statusFilter string, sea } // TestMultipleKeys handles a one-off validation test for multiple keys. -func (s *KeyService) TestMultipleKeys(ctx context.Context, group *models.Group, keysText string) ([]keypool.KeyTestResult, error) { +func (s *KeyService) TestMultipleKeys(group *models.Group, keysText string) ([]keypool.KeyTestResult, error) { keysToTest := s.ParseKeysFromText(keysText) if len(keysToTest) > maxRequestKeys { return nil, fmt.Errorf("batch size exceeds the limit of %d keys, got %d", maxRequestKeys, len(keysToTest)) @@ -298,7 +297,7 @@ func (s *KeyService) TestMultipleKeys(ctx context.Context, group *models.Group, end = len(keysToTest) } chunk := keysToTest[i:end] - results, err := s.KeyValidator.TestMultipleKeys(ctx, group, chunk) + results, err := s.KeyValidator.TestMultipleKeys(group, chunk) if err != nil { return nil, err } diff --git a/internal/services/key_validation_concurrency.go b/internal/services/key_validation_concurrency.go deleted file mode 100644 index 8598372..0000000 --- a/internal/services/key_validation_concurrency.go +++ /dev/null @@ -1,93 +0,0 @@ -package services - -import ( - "context" - "gpt-load/internal/keypool" - "gpt-load/internal/models" - "gpt-load/internal/types" - "sync" - - "github.com/sirupsen/logrus" -) - -// ValidationJob represents a single key validation task for the worker pool. -type ValidationJob struct { - TaskID string - Key models.APIKey - Group *models.Group - Ctx context.Context - CancelFunc context.CancelFunc -} - -// KeyValidationPool manages a global worker pool for key validation. -type KeyValidationPool struct { - validator *keypool.KeyValidator - configManager types.ConfigManager - jobs chan ValidationJob - stopChan chan struct{} - wg sync.WaitGroup -} - -// NewKeyValidationPool creates a new KeyValidationPool. -func NewKeyValidationPool(validator *keypool.KeyValidator, configManager types.ConfigManager) *KeyValidationPool { - return &KeyValidationPool{ - validator: validator, - configManager: configManager, - jobs: make(chan ValidationJob, 1024), - stopChan: make(chan struct{}), - } -} - -// Start initializes and runs the worker pool. -func (p *KeyValidationPool) Start() { - performanceConfig := p.configManager.GetPerformanceConfig() - concurrency := performanceConfig.KeyValidationPoolSize - if concurrency <= 0 { - concurrency = 10 - } - - logrus.Infof("Starting KeyValidationPool with %d workers...", concurrency) - - p.wg.Add(concurrency) - for range concurrency { - go p.worker() - } -} - -// Stop gracefully stops the worker pool. -func (p *KeyValidationPool) Stop() { - logrus.Info("Stopping KeyValidationPool...") - close(p.stopChan) - close(p.jobs) - p.wg.Wait() - - logrus.Info("KeyValidationPool stopped.") -} - -// worker is a single goroutine that processes jobs. -func (p *KeyValidationPool) worker() { - defer p.wg.Done() - for { - select { - case job, ok := <-p.jobs: - if !ok { - return - } - ctx := job.Ctx - if ctx == nil { - ctx = context.Background() - } - p.validator.ValidateSingleKey(ctx, &job.Key, job.Group) - if job.CancelFunc != nil { - job.CancelFunc() - } - case <-p.stopChan: - return - } - } -} - -// SubmitJob adds a new validation job to the pool. -func (p *KeyValidationPool) SubmitJob(job ValidationJob) { - p.jobs <- job -} diff --git a/internal/types/types.go b/internal/types/types.go index fcd90d2..4a34ce1 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -62,7 +62,6 @@ type CORSConfig struct { // PerformanceConfig represents performance configuration type PerformanceConfig struct { MaxConcurrentRequests int `json:"max_concurrent_requests"` - KeyValidationPoolSize int `json:"key_validation_pool_size"` } // LogConfig represents logging configuration