Files
gpt-load/internal/services/key_validation_concurrency.go
2025-07-07 21:03:51 +08:00

94 lines
2.1 KiB
Go

package services
import (
"context"
"gpt-load/internal/keypool"
"gpt-load/internal/models"
"gpt-load/internal/types"
"sync"
"github.com/sirupsen/logrus"
)
// ValidationJob represents a single key validation task for the worker pool.
type ValidationJob struct {
TaskID string
Key models.APIKey
Group *models.Group
Ctx context.Context
CancelFunc context.CancelFunc
}
// KeyValidationPool manages a global worker pool for key validation.
type KeyValidationPool struct {
validator *keypool.KeyValidator
configManager types.ConfigManager
jobs chan ValidationJob
stopChan chan struct{}
wg sync.WaitGroup
}
// NewKeyValidationPool creates a new KeyValidationPool.
func NewKeyValidationPool(validator *keypool.KeyValidator, configManager types.ConfigManager) *KeyValidationPool {
return &KeyValidationPool{
validator: validator,
configManager: configManager,
jobs: make(chan ValidationJob, 1024),
stopChan: make(chan struct{}),
}
}
// Start initializes and runs the worker pool.
func (p *KeyValidationPool) Start() {
performanceConfig := p.configManager.GetPerformanceConfig()
concurrency := performanceConfig.KeyValidationPoolSize
if concurrency <= 0 {
concurrency = 10
}
logrus.Infof("Starting KeyValidationPool with %d workers...", concurrency)
p.wg.Add(concurrency)
for range concurrency {
go p.worker()
}
}
// Stop gracefully stops the worker pool.
func (p *KeyValidationPool) Stop() {
logrus.Info("Stopping KeyValidationPool...")
close(p.stopChan)
close(p.jobs)
p.wg.Wait()
logrus.Info("KeyValidationPool stopped.")
}
// worker is a single goroutine that processes jobs.
func (p *KeyValidationPool) worker() {
defer p.wg.Done()
for {
select {
case job, ok := <-p.jobs:
if !ok {
return
}
ctx := job.Ctx
if ctx == nil {
ctx = context.Background()
}
p.validator.ValidateSingleKey(ctx, &job.Key, job.Group)
if job.CancelFunc != nil {
job.CancelFunc()
}
case <-p.stopChan:
return
}
}
}
// SubmitJob adds a new validation job to the pool.
func (p *KeyValidationPool) SubmitJob(job ValidationJob) {
p.jobs <- job
}