diff --git a/internal/handler/key_handler.go b/internal/handler/key_handler.go index 25cbb2d..3a82ab6 100644 --- a/internal/handler/key_handler.go +++ b/internal/handler/key_handler.go @@ -201,11 +201,17 @@ func (s *Server) TestMultipleKeys(c *gin.Context) { return } - group, ok := s.findGroupByID(c, req.GroupID) + groupDB, ok := s.findGroupByID(c, req.GroupID) if !ok { return } + group, err := s.GroupManager.GetGroupByName(groupDB.Name) + if err != nil { + response.Error(c, app_errors.NewAPIError(app_errors.ErrResourceNotFound, fmt.Sprintf("Group '%s' not found", groupDB.Name))) + return + } + if err := validateKeysText(req.KeysText); err != nil { response.Error(c, app_errors.NewAPIError(app_errors.ErrValidation, err.Error())) return @@ -234,11 +240,17 @@ func (s *Server) ValidateGroupKeys(c *gin.Context) { return } - group, ok := s.findGroupByID(c, req.GroupID) + groupDB, ok := s.findGroupByID(c, req.GroupID) if !ok { return } + group, err := s.GroupManager.GetGroupByName(groupDB.Name) + if err != nil { + response.Error(c, app_errors.NewAPIError(app_errors.ErrResourceNotFound, fmt.Sprintf("Group '%s' not found", groupDB.Name))) + return + } + taskStatus, err := s.KeyManualValidationService.StartValidationTask(group) if err != nil { response.Error(c, app_errors.NewAPIError(app_errors.ErrTaskInProgress, err.Error())) diff --git a/internal/keypool/cron_checker.go b/internal/keypool/cron_checker.go index 64c1fd5..d3e643c 100644 --- a/internal/keypool/cron_checker.go +++ b/internal/keypool/cron_checker.go @@ -6,6 +6,7 @@ import ( "gpt-load/internal/models" "gpt-load/internal/store" "sync" + "sync/atomic" "time" "github.com/sirupsen/logrus" @@ -89,7 +90,7 @@ func (s *CronChecker) runLoop() { } } -// submitValidationJobs finds groups whose keys need validation and validates them. +// submitValidationJobs finds groups whose keys need validation and validates them concurrently. func (s *CronChecker) submitValidationJobs() { var groups []models.Group if err := s.DB.Find(&groups).Error; err != nil { @@ -98,51 +99,93 @@ func (s *CronChecker) submitValidationJobs() { } validationStartTime := time.Now() + var wg sync.WaitGroup for i := range groups { group := &groups[i] - effectiveSettings := s.SettingsManager.GetEffectiveConfig(group.Config) - interval := time.Duration(effectiveSettings.KeyValidationIntervalMinutes) * time.Minute + group.EffectiveConfig = s.SettingsManager.GetEffectiveConfig(group.Config) + interval := time.Duration(group.EffectiveConfig.KeyValidationIntervalMinutes) * time.Minute if group.LastValidatedAt == nil || validationStartTime.Sub(*group.LastValidatedAt) > interval { - groupProcessStart := time.Now() - var invalidKeys []models.APIKey - err := s.DB.Where("group_id = ? AND status = ?", group.ID, models.KeyStatusInvalid).Find(&invalidKeys).Error - if err != nil { - logrus.Errorf("CronChecker: Failed to get invalid keys for group %s: %v", group.Name, err) - continue - } - - validatedCount := len(invalidKeys) - becameValidCount := 0 - if validatedCount > 0 { - for j := range invalidKeys { - select { - case <-s.stopChan: - return - default: - } - key := &invalidKeys[j] - isValid, _ := s.Validator.ValidateSingleKey(key, group) - - if isValid { - becameValidCount++ - } - } - } - - if err := s.DB.Model(group).Update("last_validated_at", time.Now()).Error; err != nil { - logrus.Errorf("CronChecker: Failed to update last_validated_at for group %s: %v", group.Name, err) - } - - duration := time.Since(groupProcessStart) - logrus.Infof( - "CronChecker: Group '%s' validation finished. Total checked: %d, became valid: %d. Duration: %s.", - group.Name, - validatedCount, - becameValidCount, - duration.String(), - ) + wg.Add(1) + g := group + go func() { + defer wg.Done() + s.validateGroupKeys(g) + }() } } + + wg.Wait() +} + +// validateGroupKeys validates all invalid keys for a single group concurrently. +func (s *CronChecker) validateGroupKeys(group *models.Group) { + groupProcessStart := time.Now() + + var invalidKeys []models.APIKey + err := s.DB.Where("group_id = ? AND status = ?", group.ID, models.KeyStatusInvalid).Find(&invalidKeys).Error + if err != nil { + logrus.Errorf("CronChecker: Failed to get invalid keys for group %s: %v", group.Name, err) + return + } + + if len(invalidKeys) == 0 { + if err := s.DB.Model(group).Update("last_validated_at", time.Now()).Error; err != nil { + logrus.Errorf("CronChecker: Failed to update last_validated_at for group %s: %v", group.Name, err) + } + logrus.Infof("CronChecker: Group '%s' has no invalid keys to check.", group.Name) + return + } + + var becameValidCount int32 + var keyWg sync.WaitGroup + jobs := make(chan *models.APIKey, len(invalidKeys)) + + concurrency := group.EffectiveConfig.KeyValidationConcurrency + for range concurrency { + keyWg.Add(1) + go func() { + defer keyWg.Done() + for { + select { + case key, ok := <-jobs: + if !ok { + return + } + isValid, _ := s.Validator.ValidateSingleKey(key, group) + if isValid { + atomic.AddInt32(&becameValidCount, 1) + } + case <-s.stopChan: + return + } + } + }() + } + +DistributeLoop: + for i := range invalidKeys { + select { + case jobs <- &invalidKeys[i]: + case <-s.stopChan: + break DistributeLoop + } + } + close(jobs) + + keyWg.Wait() + + if err := s.DB.Model(group).Update("last_validated_at", time.Now()).Error; err != nil { + logrus.Errorf("CronChecker: Failed to update last_validated_at for group %s: %v", group.Name, err) + } + + duration := time.Since(groupProcessStart) + logrus.Infof( + "CronChecker: Group '%s' validation finished. Total checked: %d, became valid: %d. Duration: %s.", + group.Name, + len(invalidKeys), + becameValidCount, + duration.String(), + ) } diff --git a/internal/keypool/validator.go b/internal/keypool/validator.go index ccc2d3d..71109ec 100644 --- a/internal/keypool/validator.go +++ b/internal/keypool/validator.go @@ -48,7 +48,10 @@ func NewKeyValidator(params KeyValidatorParams) *KeyValidator { // ValidateSingleKey performs a validation check on a single API key. func (s *KeyValidator) ValidateSingleKey(key *models.APIKey, group *models.Group) (bool, error) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if group.EffectiveConfig.AppUrl == "" { + group.EffectiveConfig = s.SettingsManager.GetEffectiveConfig(group.Config) + } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(group.EffectiveConfig.KeyValidationTimeoutSeconds)*time.Second) defer cancel() ch, err := s.channelFactory.GetChannel(group) @@ -58,7 +61,6 @@ func (s *KeyValidator) ValidateSingleKey(key *models.APIKey, group *models.Group isValid, validationErr := ch.ValidateKey(ctx, key.KeyValue) - group.EffectiveConfig = s.SettingsManager.GetEffectiveConfig(group.Config) s.keypoolProvider.UpdateStatus(key, group, isValid) if !isValid { diff --git a/internal/models/types.go b/internal/models/types.go index 35b7221..6c59680 100644 --- a/internal/models/types.go +++ b/internal/models/types.go @@ -34,6 +34,8 @@ type GroupConfig struct { MaxRetries *int `json:"max_retries,omitempty"` BlacklistThreshold *int `json:"blacklist_threshold,omitempty"` KeyValidationIntervalMinutes *int `json:"key_validation_interval_minutes,omitempty"` + KeyValidationConcurrency *int `json:"key_validation_concurrency,omitempty"` + KeyValidationTimeoutSeconds *int `json:"key_validation_timeout_seconds,omitempty"` } // Group 对应 groups 表 @@ -96,10 +98,10 @@ type StatCard struct { // DashboardStatsResponse 用于仪表盘基础统计的API响应 type DashboardStatsResponse struct { - KeyCount StatCard `json:"key_count"` - GroupCount StatCard `json:"group_count"` - RequestCount StatCard `json:"request_count"` - ErrorRate StatCard `json:"error_rate"` + KeyCount StatCard `json:"key_count"` + GroupCount StatCard `json:"group_count"` + RequestCount StatCard `json:"request_count"` + ErrorRate StatCard `json:"error_rate"` } // ChartDataset 用于图表的数据集 diff --git a/internal/services/key_manual_validation_service.go b/internal/services/key_manual_validation_service.go index 319ef4e..811f29f 100644 --- a/internal/services/key_manual_validation_service.go +++ b/internal/services/key_manual_validation_service.go @@ -70,8 +70,7 @@ func (s *KeyManualValidationService) runValidation(group *models.Group, keys []m jobs := make(chan models.APIKey, len(keys)) results := make(chan bool, len(keys)) - // 固定10并发,避免超频 - concurrency := 10 + concurrency := group.EffectiveConfig.KeyValidationConcurrency var wg sync.WaitGroup for range concurrency { @@ -84,7 +83,6 @@ func (s *KeyManualValidationService) runValidation(group *models.Group, keys []m } close(jobs) - // Wait for all workers to complete in a separate goroutine go func() { wg.Wait() close(results) diff --git a/internal/types/types.go b/internal/types/types.go index 4a34ce1..d166c8e 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -32,7 +32,9 @@ type SystemSettings struct { // 密钥配置 MaxRetries int `json:"max_retries" default:"3" name:"最大重试次数" category:"密钥配置" desc:"单个请求使用不同 Key 的最大重试次数,0为不重试。" validate:"min=0"` BlacklistThreshold int `json:"blacklist_threshold" default:"3" name:"黑名单阈值" category:"密钥配置" desc:"一个 Key 连续失败多少次后进入黑名单,0为不拉黑。" validate:"min=0"` - KeyValidationIntervalMinutes int `json:"key_validation_interval_minutes" default:"60" name:"定时验证周期(分钟)" category:"密钥配置" desc:"后台定时验证密钥的默认周期(分钟)。" validate:"min=30"` + KeyValidationIntervalMinutes int `json:"key_validation_interval_minutes" default:"60" name:"验证间隔(分钟)" category:"密钥配置" desc:"后台验证密钥的默认间隔(分钟)。" validate:"min=30"` + KeyValidationConcurrency int `json:"key_validation_concurrency" default:"10" name:"验证并发数" category:"密钥配置" desc:"后台定时验证无效 Key 时的并发数。" validate:"min=1"` + KeyValidationTimeoutSeconds int `json:"key_validation_timeout_seconds" default:"20" name:"验证超时(秒)" category:"密钥配置" desc:"后台定时验证单个 Key 时的 API 请求超时时间(秒)。" validate:"min=5"` } // ServerConfig represents server configuration