package services import ( "context" "encoding/json" "fmt" "gpt-load/internal/config" "gpt-load/internal/models" "gpt-load/internal/store" "strings" "time" "github.com/google/uuid" "github.com/sirupsen/logrus" "gorm.io/gorm" ) const ( RequestLogCachePrefix = "request_log:" PendingLogKeysSet = "pending_log_keys" DefaultLogFlushBatchSize = 500 ) // RequestLogService is responsible for managing request logs. type RequestLogService struct { db *gorm.DB store store.Store settingsManager *config.SystemSettingsManager leaderService *LeaderService ctx context.Context cancel context.CancelFunc ticker *time.Ticker } // NewRequestLogService creates a new RequestLogService instance func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *LeaderService) *RequestLogService { ctx, cancel := context.WithCancel(context.Background()) return &RequestLogService{ db: db, store: store, settingsManager: sm, leaderService: ls, ctx: ctx, cancel: cancel, } } // Start initializes the service and starts the periodic flush routine func (s *RequestLogService) Start() { go s.flush() interval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute if interval <= 0 { interval = time.Minute } s.ticker = time.NewTicker(interval) go func() { for { select { case <-s.ticker.C: newInterval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute if newInterval <= 0 { newInterval = time.Minute } if newInterval != interval { s.ticker.Reset(newInterval) interval = newInterval logrus.Debugf("Request log write interval updated to: %v", interval) } s.flush() case <-s.ctx.Done(): s.ticker.Stop() logrus.Info("RequestLogService stopped.") return } } }() } // Stop gracefully stops the RequestLogService func (s *RequestLogService) Stop() { s.flush() s.cancel() } // Record logs a request to the database and cache func (s *RequestLogService) Record(log *models.RequestLog) error { log.ID = uuid.NewString() log.Timestamp = time.Now() if s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes == 0 { return s.writeLogsToDB([]*models.RequestLog{log}) } cacheKey := RequestLogCachePrefix + log.ID logBytes, err := json.Marshal(log) if err != nil { return fmt.Errorf("failed to marshal request log: %w", err) } ttl := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes*5) * time.Minute if err := s.store.Set(cacheKey, logBytes, ttl); err != nil { return err } return s.store.SAdd(PendingLogKeysSet, cacheKey) } // flush data from cache to database func (s *RequestLogService) flush() { if s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes == 0 { logrus.Debug("Sync mode enabled, skipping scheduled log flush.") return } if !s.leaderService.IsLeader() { logrus.Debug("Not a leader, skipping log flush.") return } logrus.Debug("Leader starting to flush request logs...") for { keys, err := s.store.SPopN(PendingLogKeysSet, DefaultLogFlushBatchSize) if err != nil { logrus.Errorf("Failed to pop pending log keys from store: %v", err) return } if len(keys) == 0 { logrus.Debug("No more request logs to flush in this cycle.") return } logrus.Debugf("Popped %d request logs to flush.", len(keys)) var logs []*models.RequestLog var processedKeys []string for _, key := range keys { logBytes, err := s.store.Get(key) if err != nil { if err == store.ErrNotFound { logrus.Warnf("Log key %s found in set but not in store, skipping.", key) } else { logrus.Warnf("Failed to get log for key %s: %v", key, err) } continue } var log models.RequestLog if err := json.Unmarshal(logBytes, &log); err != nil { logrus.Warnf("Failed to unmarshal log for key %s: %v", key, err) continue } logs = append(logs, &log) processedKeys = append(processedKeys, key) } if len(logs) == 0 { continue } err = s.writeLogsToDB(logs) if err != nil { logrus.Errorf("Failed to flush request logs batch, will retry next time. Error: %v", err) if len(keys) > 0 { keysToRetry := make([]any, len(keys)) for i, k := range keys { keysToRetry[i] = k } if saddErr := s.store.SAdd(PendingLogKeysSet, keysToRetry...); saddErr != nil { logrus.Errorf("CRITICAL: Failed to re-add failed log keys to set: %v", saddErr) } } return } if len(processedKeys) > 0 { if err := s.store.Del(processedKeys...); err != nil { logrus.Errorf("Failed to delete flushed log bodies from store: %v", err) } } logrus.Infof("Successfully flushed %d request logs.", len(logs)) } } // writeLogsToDB writes a batch of request logs to the database func (s *RequestLogService) writeLogsToDB(logs []*models.RequestLog) error { if len(logs) == 0 { return nil } return s.db.Transaction(func(tx *gorm.DB) error { if err := tx.CreateInBatches(logs, len(logs)).Error; err != nil { return fmt.Errorf("failed to batch insert request logs: %w", err) } keyStats := make(map[uint]int64) for _, log := range logs { if log.IsSuccess { keyStats[log.KeyID]++ } } if len(keyStats) > 0 { var caseStmt strings.Builder var keyIDs []uint caseStmt.WriteString("CASE id ") for keyID, count := range keyStats { caseStmt.WriteString(fmt.Sprintf("WHEN %d THEN request_count + %d ", keyID, count)) keyIDs = append(keyIDs, keyID) } caseStmt.WriteString("END") if err := tx.Model(&models.APIKey{}).Where("id IN ?", keyIDs). Updates(map[string]any{ "request_count": gorm.Expr(caseStmt.String()), "last_used_at": time.Now(), }).Error; err != nil { return fmt.Errorf("failed to batch update api_key stats: %w", err) } } return nil }) }