Files
gpt-load/internal/services/request_log_service.go
2025-07-15 10:25:55 +08:00

282 lines
7.0 KiB
Go

package services
import (
"context"
"encoding/json"
"fmt"
"gpt-load/internal/config"
"gpt-load/internal/models"
"gpt-load/internal/store"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
const (
RequestLogCachePrefix = "request_log:"
PendingLogKeysSet = "pending_log_keys"
DefaultLogFlushBatchSize = 200
)
// RequestLogService is responsible for managing request logs.
type RequestLogService struct {
db *gorm.DB
store store.Store
settingsManager *config.SystemSettingsManager
stopChan chan struct{}
wg sync.WaitGroup
ticker *time.Ticker
}
// NewRequestLogService creates a new RequestLogService instance
func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager) *RequestLogService {
return &RequestLogService{
db: db,
store: store,
settingsManager: sm,
stopChan: make(chan struct{}),
}
}
// Start initializes the service and starts the periodic flush routine
func (s *RequestLogService) Start() {
s.wg.Add(1)
go s.runLoop()
}
func (s *RequestLogService) runLoop() {
defer s.wg.Done()
// Initial flush on start
s.flush()
interval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute
if interval <= 0 {
interval = time.Minute
}
s.ticker = time.NewTicker(interval)
defer s.ticker.Stop()
for {
select {
case <-s.ticker.C:
newInterval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute
if newInterval <= 0 {
newInterval = time.Minute
}
if newInterval != interval {
s.ticker.Reset(newInterval)
interval = newInterval
logrus.Debugf("Request log write interval updated to: %v", interval)
}
s.flush()
case <-s.stopChan:
return
}
}
}
// Stop gracefully stops the RequestLogService
func (s *RequestLogService) Stop(ctx context.Context) {
close(s.stopChan)
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
s.flush()
logrus.Info("RequestLogService stopped gracefully.")
case <-ctx.Done():
logrus.Warn("RequestLogService stop timed out.")
}
}
// Record logs a request to the database and cache
func (s *RequestLogService) Record(log *models.RequestLog) error {
log.ID = uuid.NewString()
log.Timestamp = time.Now()
if s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes == 0 {
return s.writeLogsToDB([]*models.RequestLog{log})
}
cacheKey := RequestLogCachePrefix + log.ID
logBytes, err := json.Marshal(log)
if err != nil {
return fmt.Errorf("failed to marshal request log: %w", err)
}
ttl := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes*5) * time.Minute
if err := s.store.Set(cacheKey, logBytes, ttl); err != nil {
return err
}
return s.store.SAdd(PendingLogKeysSet, cacheKey)
}
// flush data from cache to database
func (s *RequestLogService) flush() {
if s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes == 0 {
logrus.Debug("Sync mode enabled, skipping scheduled log flush.")
return
}
logrus.Debug("Master starting to flush request logs...")
for {
keys, err := s.store.SPopN(PendingLogKeysSet, DefaultLogFlushBatchSize)
if err != nil {
logrus.Errorf("Failed to pop pending log keys from store: %v", err)
return
}
if len(keys) == 0 {
return
}
logrus.Debugf("Popped %d request logs to flush.", len(keys))
var logs []*models.RequestLog
var processedKeys []string
for _, key := range keys {
logBytes, err := s.store.Get(key)
if err != nil {
if err == store.ErrNotFound {
logrus.Warnf("Log key %s found in set but not in store, skipping.", key)
} else {
logrus.Warnf("Failed to get log for key %s: %v", key, err)
}
continue
}
var log models.RequestLog
if err := json.Unmarshal(logBytes, &log); err != nil {
logrus.Warnf("Failed to unmarshal log for key %s: %v", key, err)
continue
}
logs = append(logs, &log)
processedKeys = append(processedKeys, key)
}
if len(logs) == 0 {
continue
}
err = s.writeLogsToDB(logs)
if err != nil {
logrus.Errorf("Failed to flush request logs batch, will retry next time. Error: %v", err)
if len(keys) > 0 {
keysToRetry := make([]any, len(keys))
for i, k := range keys {
keysToRetry[i] = k
}
if saddErr := s.store.SAdd(PendingLogKeysSet, keysToRetry...); saddErr != nil {
logrus.Errorf("CRITICAL: Failed to re-add failed log keys to set: %v", saddErr)
}
}
return
}
if len(processedKeys) > 0 {
if err := s.store.Del(processedKeys...); err != nil {
logrus.Errorf("Failed to delete flushed log bodies from store: %v", err)
}
}
logrus.Infof("Successfully flushed %d request logs.", len(logs))
}
}
// writeLogsToDB writes a batch of request logs to the database
func (s *RequestLogService) writeLogsToDB(logs []*models.RequestLog) error {
if len(logs) == 0 {
return nil
}
return s.db.Transaction(func(tx *gorm.DB) error {
if err := tx.CreateInBatches(logs, len(logs)).Error; err != nil {
return fmt.Errorf("failed to batch insert request logs: %w", err)
}
keyStats := make(map[uint]int64)
for _, log := range logs {
if log.IsSuccess {
keyStats[log.KeyID]++
}
}
if len(keyStats) > 0 {
var caseStmt strings.Builder
var keyIDs []uint
caseStmt.WriteString("CASE id ")
for keyID, count := range keyStats {
caseStmt.WriteString(fmt.Sprintf("WHEN %d THEN request_count + %d ", keyID, count))
keyIDs = append(keyIDs, keyID)
}
caseStmt.WriteString("END")
if err := tx.Model(&models.APIKey{}).Where("id IN ?", keyIDs).
Updates(map[string]any{
"request_count": gorm.Expr(caseStmt.String()),
"last_used_at": time.Now(),
}).Error; err != nil {
return fmt.Errorf("failed to batch update api_key stats: %w", err)
}
}
// 更新统计表
hourlyStats := make(map[struct {
Time time.Time
GroupID uint
}]struct{ Success, Failure int64 })
for _, log := range logs {
hourlyTime := log.Timestamp.Truncate(time.Hour)
key := struct {
Time time.Time
GroupID uint
}{Time: hourlyTime, GroupID: log.GroupID}
counts := hourlyStats[key]
if log.IsSuccess {
counts.Success++
} else {
counts.Failure++
}
hourlyStats[key] = counts
}
if len(hourlyStats) > 0 {
for key, counts := range hourlyStats {
err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "time"}, {Name: "group_id"}},
DoUpdates: clause.Assignments(map[string]any{
"success_count": gorm.Expr("success_count + ?", counts.Success),
"failure_count": gorm.Expr("failure_count + ?", counts.Failure),
"updated_at": time.Now(),
}),
}).Create(&models.GroupHourlyStat{
Time: key.Time,
GroupID: key.GroupID,
SuccessCount: counts.Success,
FailureCount: counts.Failure,
}).Error
if err != nil {
return fmt.Errorf("failed to upsert group hourly stat: %w", err)
}
}
}
return nil
})
}