package services import ( "context" "encoding/json" "fmt" "gpt-load/internal/config" "gpt-load/internal/models" "gpt-load/internal/store" "strings" "sync" "time" "github.com/google/uuid" "github.com/sirupsen/logrus" "gorm.io/gorm" "gorm.io/gorm/clause" ) const ( RequestLogCachePrefix = "request_log:" PendingLogKeysSet = "pending_log_keys" DefaultLogFlushBatchSize = 200 ) // RequestLogService is responsible for managing request logs. type RequestLogService struct { db *gorm.DB store store.Store settingsManager *config.SystemSettingsManager leaderLock *store.LeaderLock stopChan chan struct{} wg sync.WaitGroup ticker *time.Ticker } // NewRequestLogService creates a new RequestLogService instance func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *store.LeaderLock) *RequestLogService { return &RequestLogService{ db: db, store: store, settingsManager: sm, leaderLock: ls, stopChan: make(chan struct{}), } } // Start initializes the service and starts the periodic flush routine func (s *RequestLogService) Start() { s.wg.Add(1) go s.runLoop() } func (s *RequestLogService) runLoop() { defer s.wg.Done() // Initial flush on start s.flush() interval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute if interval <= 0 { interval = time.Minute } s.ticker = time.NewTicker(interval) defer s.ticker.Stop() for { select { case <-s.ticker.C: newInterval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute if newInterval <= 0 { newInterval = time.Minute } if newInterval != interval { s.ticker.Reset(newInterval) interval = newInterval logrus.Debugf("Request log write interval updated to: %v", interval) } s.flush() case <-s.stopChan: return } } } // Stop gracefully stops the RequestLogService func (s *RequestLogService) Stop(ctx context.Context) { close(s.stopChan) done := make(chan struct{}) go func() { s.wg.Wait() close(done) }() select { case <-done: s.flush() logrus.Info("RequestLogService stopped gracefully.") case <-ctx.Done(): logrus.Warn("RequestLogService stop timed out.") } } // Record logs a request to the database and cache func (s *RequestLogService) Record(log *models.RequestLog) error { log.ID = uuid.NewString() log.Timestamp = time.Now() if s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes == 0 { return s.writeLogsToDB([]*models.RequestLog{log}) } cacheKey := RequestLogCachePrefix + log.ID logBytes, err := json.Marshal(log) if err != nil { return fmt.Errorf("failed to marshal request log: %w", err) } ttl := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes*5) * time.Minute if err := s.store.Set(cacheKey, logBytes, ttl); err != nil { return err } return s.store.SAdd(PendingLogKeysSet, cacheKey) } // flush data from cache to database func (s *RequestLogService) flush() { if s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes == 0 { logrus.Debug("Sync mode enabled, skipping scheduled log flush.") return } if !s.leaderLock.IsLeader() { logrus.Debug("Not a leader, skipping log flush.") return } logrus.Debug("Leader starting to flush request logs...") for { keys, err := s.store.SPopN(PendingLogKeysSet, DefaultLogFlushBatchSize) if err != nil { logrus.Errorf("Failed to pop pending log keys from store: %v", err) return } if len(keys) == 0 { return } logrus.Debugf("Popped %d request logs to flush.", len(keys)) var logs []*models.RequestLog var processedKeys []string for _, key := range keys { logBytes, err := s.store.Get(key) if err != nil { if err == store.ErrNotFound { logrus.Warnf("Log key %s found in set but not in store, skipping.", key) } else { logrus.Warnf("Failed to get log for key %s: %v", key, err) } continue } var log models.RequestLog if err := json.Unmarshal(logBytes, &log); err != nil { logrus.Warnf("Failed to unmarshal log for key %s: %v", key, err) continue } logs = append(logs, &log) processedKeys = append(processedKeys, key) } if len(logs) == 0 { continue } err = s.writeLogsToDB(logs) if err != nil { logrus.Errorf("Failed to flush request logs batch, will retry next time. Error: %v", err) if len(keys) > 0 { keysToRetry := make([]any, len(keys)) for i, k := range keys { keysToRetry[i] = k } if saddErr := s.store.SAdd(PendingLogKeysSet, keysToRetry...); saddErr != nil { logrus.Errorf("CRITICAL: Failed to re-add failed log keys to set: %v", saddErr) } } return } if len(processedKeys) > 0 { if err := s.store.Del(processedKeys...); err != nil { logrus.Errorf("Failed to delete flushed log bodies from store: %v", err) } } logrus.Infof("Successfully flushed %d request logs.", len(logs)) } } // writeLogsToDB writes a batch of request logs to the database func (s *RequestLogService) writeLogsToDB(logs []*models.RequestLog) error { if len(logs) == 0 { return nil } return s.db.Transaction(func(tx *gorm.DB) error { if err := tx.CreateInBatches(logs, len(logs)).Error; err != nil { return fmt.Errorf("failed to batch insert request logs: %w", err) } keyStats := make(map[uint]int64) for _, log := range logs { if log.IsSuccess { keyStats[log.KeyID]++ } } if len(keyStats) > 0 { var caseStmt strings.Builder var keyIDs []uint caseStmt.WriteString("CASE id ") for keyID, count := range keyStats { caseStmt.WriteString(fmt.Sprintf("WHEN %d THEN request_count + %d ", keyID, count)) keyIDs = append(keyIDs, keyID) } caseStmt.WriteString("END") if err := tx.Model(&models.APIKey{}).Where("id IN ?", keyIDs). Updates(map[string]any{ "request_count": gorm.Expr(caseStmt.String()), "last_used_at": time.Now(), }).Error; err != nil { return fmt.Errorf("failed to batch update api_key stats: %w", err) } } // 更新统计表 hourlyStats := make(map[struct { Time time.Time GroupID uint }]struct{ Success, Failure int64 }) for _, log := range logs { hourlyTime := log.Timestamp.Truncate(time.Hour) key := struct { Time time.Time GroupID uint }{Time: hourlyTime, GroupID: log.GroupID} counts := hourlyStats[key] if log.IsSuccess { counts.Success++ } else { counts.Failure++ } hourlyStats[key] = counts } if len(hourlyStats) > 0 { for key, counts := range hourlyStats { err := tx.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "time"}, {Name: "group_id"}}, DoUpdates: clause.Assignments(map[string]any{ "success_count": gorm.Expr("success_count + ?", counts.Success), "failure_count": gorm.Expr("failure_count + ?", counts.Failure), "updated_at": time.Now(), }), }).Create(&models.GroupHourlyStat{ Time: key.Time, GroupID: key.GroupID, SuccessCount: counts.Success, FailureCount: counts.Failure, }).Error if err != nil { return fmt.Errorf("failed to upsert group hourly stat: %w", err) } } } return nil }) }