Merge pull request #17 from tbphp/fix-log-apikey-error

Fix log apikey error
This commit is contained in:
tbphp
2025-07-17 18:36:08 +08:00
committed by GitHub
9 changed files with 196 additions and 121 deletions

View File

@@ -9,6 +9,7 @@ import (
"time"
"gpt-load/internal/config"
db "gpt-load/internal/db/migrations"
"gpt-load/internal/keypool"
"gpt-load/internal/models"
"gpt-load/internal/proxy"
@@ -88,6 +89,8 @@ func (a *App) Start() error {
); err != nil {
return fmt.Errorf("database auto-migration failed: %w", err)
}
// 数据修复
db.MigrateDatabase(a.db)
logrus.Info("Database auto-migration completed.")
// 初始化系统设置

View File

@@ -0,0 +1,10 @@
package db
import (
"gorm.io/gorm"
)
func MigrateDatabase(db *gorm.DB) error {
// v1.0.13 修复请求日志数据
return V1_0_13_FixRequestLogs(db)
}

View File

@@ -0,0 +1,130 @@
package db
import (
"gpt-load/internal/models"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
)
func V1_0_13_FixRequestLogs(db *gorm.DB) error {
return db.Transaction(func(tx *gorm.DB) error {
// 如果有key_id就执行修复
if !tx.Migrator().HasColumn(&models.RequestLog{}, "key_id") {
return nil
}
logrus.Info("Old schema detected. Starting data migration for request_logs...")
if !tx.Migrator().HasColumn(&models.RequestLog{}, "group_name") {
logrus.Info("Adding 'group_name' column to request_logs table...")
if err := tx.Migrator().AddColumn(&models.RequestLog{}, "group_name"); err != nil {
return err
}
}
if !tx.Migrator().HasColumn(&models.RequestLog{}, "key_value") {
logrus.Info("Adding 'key_value' column to request_logs table...")
if err := tx.Migrator().AddColumn(&models.RequestLog{}, "key_value"); err != nil {
return err
}
}
type OldRequestLog struct {
ID string
KeyID uint `gorm:"column:key_id"`
GroupID uint
}
batchSize := 1000
for i := 0; ; i++ {
logrus.Infof("Processing batch %d...", i+1)
var oldLogs []OldRequestLog
result := tx.Model(&models.RequestLog{}).
Select("id", "key_id", "group_id").
Where("key_value IS NULL OR group_name IS NULL").
Limit(batchSize).
Find(&oldLogs)
if result.Error != nil {
return result.Error
}
if len(oldLogs) == 0 {
logrus.Info("All batches processed.")
break
}
keyIDMap := make(map[uint]bool)
groupIDMap := make(map[uint]bool)
for _, logEntry := range oldLogs {
if logEntry.KeyID > 0 {
keyIDMap[logEntry.KeyID] = true
}
if logEntry.GroupID > 0 {
groupIDMap[logEntry.GroupID] = true
}
}
var apiKeys []models.APIKey
if len(keyIDMap) > 0 {
var keyIDs []uint
for id := range keyIDMap {
keyIDs = append(keyIDs, id)
}
if err := tx.Model(&models.APIKey{}).Where("id IN ?", keyIDs).Find(&apiKeys).Error; err != nil {
return err
}
}
keyValueMapping := make(map[uint]string)
for _, key := range apiKeys {
keyValueMapping[key.ID] = key.KeyValue
}
var groups []models.Group
if len(groupIDMap) > 0 {
var groupIDs []uint
for id := range groupIDMap {
groupIDs = append(groupIDs, id)
}
if err := tx.Model(&models.Group{}).Where("id IN ?", groupIDs).Find(&groups).Error; err != nil {
return err
}
}
groupNameMapping := make(map[uint]string)
for _, group := range groups {
groupNameMapping[group.ID] = group.Name
}
for _, logEntry := range oldLogs {
groupName, gExists := groupNameMapping[logEntry.GroupID]
if !gExists {
logrus.Warnf("Log ID %s: Could not find Group for group_id %d. Setting group_name to empty string.", logEntry.ID, logEntry.GroupID)
}
keyValue, kExists := keyValueMapping[logEntry.KeyID]
if !kExists {
logrus.Warnf("Log ID %s: Could not find APIKey for key_id %d. Setting key_value to empty string.", logEntry.ID, logEntry.KeyID)
}
updates := map[string]any{
"group_name": groupName,
"key_value": keyValue,
}
if err := tx.Model(&models.RequestLog{}).Where("id = ?", logEntry.ID).UpdateColumns(updates).Error; err != nil {
logrus.WithError(err).Errorf("Failed to update log entry with ID: %s", logEntry.ID)
continue
}
}
logrus.Infof("Successfully updated %d log entries in batch %d.", len(oldLogs), i+1)
}
logrus.Info("Data migration complete. Dropping 'key_id' column from request_logs table...")
if err := tx.Migrator().DropColumn(&models.RequestLog{}, "key_id"); err != nil {
logrus.WithError(err).Warn("Failed to drop 'key_id' column. This can be done manually.")
}
logrus.Info("Database migration successful!")
return nil
})
}

View File

@@ -4,50 +4,29 @@ import (
"strconv"
"time"
"github.com/gin-gonic/gin"
"gpt-load/internal/db"
app_errors "gpt-load/internal/errors"
"gpt-load/internal/models"
"gpt-load/internal/response"
"github.com/gin-gonic/gin"
)
// LogResponse defines the structure for log entries in the API response,
// enriching the base log with related data.
// LogResponse defines the structure for log entries in the API response
type LogResponse struct {
models.RequestLog
GroupName string `json:"group_name"`
KeyValue string `json:"key_value"`
}
// GetLogs Get request logs
func GetLogs(c *gin.Context) {
// --- 1. Build WHERE conditions ---
query := db.DB.Model(&models.RequestLog{})
if groupName := c.Query("group_name"); groupName != "" {
var groupIDs []uint
db.DB.Model(&models.Group{}).Where("name LIKE ? OR display_name LIKE ?", "%"+groupName+"%", "%"+groupName+"%").Pluck("id", &groupIDs)
if len(groupIDs) == 0 {
response.Success(c, &response.PaginatedResponse{
Items: []LogResponse{},
Pagination: response.Pagination{TotalItems: 0, Page: 1, PageSize: response.DefaultPageSize},
})
return
}
query = query.Where("group_id IN ?", groupIDs)
query = query.Where("group_name LIKE ?", "%"+groupName+"%")
}
if keyValue := c.Query("key_value"); keyValue != "" {
var keyIDs []uint
likePattern := "%" + keyValue[1:len(keyValue)-1] + "%"
db.DB.Model(&models.APIKey{}).Where("key_value LIKE ?", likePattern).Pluck("id", &keyIDs)
if len(keyIDs) == 0 {
response.Success(c, &response.PaginatedResponse{
Items: []LogResponse{},
Pagination: response.Pagination{TotalItems: 0, Page: 1, PageSize: response.DefaultPageSize},
})
return
}
query = query.Where("key_id IN ?", keyIDs)
query = query.Where("key_value LIKE ?", likePattern)
}
if isSuccessStr := c.Query("is_success"); isSuccessStr != "" {
if isSuccess, err := strconv.ParseBool(isSuccessStr); err == nil {
@@ -76,71 +55,14 @@ func GetLogs(c *gin.Context) {
}
}
// --- 2. Get Paginated Logs ---
var logs []models.RequestLog
query = query.Order("timestamp desc") // Apply ordering before pagination
query = query.Order("timestamp desc")
pagination, err := response.Paginate(c, query, &logs)
if err != nil {
response.Error(c, app_errors.ParseDBError(err))
return
}
// --- 3. Enrich Logs with GroupName and KeyValue ---
if len(logs) == 0 {
response.Success(c, pagination) // Return empty pagination response
return
}
// Collect IDs for enrichment
groupIds := make(map[uint]bool)
keyIds := make(map[uint]bool)
for _, log := range logs {
if log.GroupID != 0 {
groupIds[log.GroupID] = true
}
if log.KeyID != 0 {
keyIds[log.KeyID] = true
}
}
// Fetch enrichment data
groupMap := make(map[uint]string)
if len(groupIds) > 0 {
var groups []models.Group
var ids []uint
for id := range groupIds {
ids = append(ids, id)
}
db.DB.Where("id IN ?", ids).Find(&groups)
for _, group := range groups {
groupMap[group.ID] = group.Name
}
}
keyMap := make(map[uint]string)
if len(keyIds) > 0 {
var keys []models.APIKey
var ids []uint
for id := range keyIds {
ids = append(ids, id)
}
db.DB.Where("id IN ?", ids).Find(&keys)
for _, key := range keys {
keyMap[key.ID] = key.KeyValue
}
}
// Build final response
logResponses := make([]LogResponse, len(logs))
for i, log := range logs {
logResponses[i] = LogResponse{
RequestLog: log,
GroupName: groupMap[log.GroupID],
KeyValue: keyMap[log.KeyID],
}
}
// --- 4. Send Response ---
pagination.Items = logResponses
pagination.Items = logs
response.Success(c, pagination)
}

View File

@@ -76,7 +76,8 @@ type RequestLog struct {
ID string `gorm:"type:varchar(36);primaryKey" json:"id"`
Timestamp time.Time `gorm:"type:datetime(3);not null;index" json:"timestamp"`
GroupID uint `gorm:"not null;index" json:"group_id"`
KeyID uint `gorm:"not null;index" json:"key_id"`
GroupName string `gorm:"type:varchar(255);index" json:"group_name"`
KeyValue string `gorm:"type:varchar(512)" json:"key_value"`
IsSuccess bool `gorm:"not null" json:"is_success"`
SourceIP string `gorm:"type:varchar(45)" json:"source_ip"`
StatusCode int `gorm:"not null" json:"status_code"`

View File

@@ -9,7 +9,6 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"time"
"gpt-load/internal/channel"
@@ -115,12 +114,11 @@ func (ps *ProxyServer) executeRequestWithRetry(
}
logrus.Debugf("Max retries exceeded for group %s after %d attempts. Parsed Error: %s", group.Name, retryCount, logMessage)
keyID, _ := strconv.ParseUint(lastError.KeyID, 10, 64)
ps.logRequest(c, group, uint(keyID), startTime, lastError.StatusCode, retryCount, errors.New(logMessage), isStream, lastError.UpstreamAddr)
ps.logRequest(c, group, &models.APIKey{KeyValue: lastError.KeyValue}, startTime, lastError.StatusCode, retryCount, errors.New(logMessage), isStream, lastError.UpstreamAddr)
} else {
response.Error(c, app_errors.ErrMaxRetriesExceeded)
logrus.Debugf("Max retries exceeded for group %s after %d attempts.", group.Name, retryCount)
ps.logRequest(c, group, 0, startTime, http.StatusServiceUnavailable, retryCount, app_errors.ErrMaxRetriesExceeded, isStream, "")
ps.logRequest(c, group, nil, startTime, http.StatusServiceUnavailable, retryCount, app_errors.ErrMaxRetriesExceeded, isStream, "")
}
return
}
@@ -129,7 +127,7 @@ func (ps *ProxyServer) executeRequestWithRetry(
if err != nil {
logrus.Errorf("Failed to select a key for group %s on attempt %d: %v", group.Name, retryCount+1, err)
response.Error(c, app_errors.NewAPIError(app_errors.ErrNoKeysAvailable, err.Error()))
ps.logRequest(c, group, 0, startTime, http.StatusServiceUnavailable, retryCount, err, isStream, "")
ps.logRequest(c, group, nil, startTime, http.StatusServiceUnavailable, retryCount, err, isStream, "")
return
}
@@ -177,7 +175,7 @@ func (ps *ProxyServer) executeRequestWithRetry(
if err != nil || (resp != nil && resp.StatusCode >= 400) {
if err != nil && app_errors.IsIgnorableError(err) {
logrus.Debugf("Client-side ignorable error for key %s, aborting retries: %v", utils.MaskAPIKey(apiKey.KeyValue), err)
ps.logRequest(c, group, apiKey.ID, startTime, 499, retryCount+1, err, isStream, upstreamURL)
ps.logRequest(c, group, apiKey, startTime, 499, retryCount+1, err, isStream, upstreamURL)
return
}
@@ -210,7 +208,7 @@ func (ps *ProxyServer) executeRequestWithRetry(
StatusCode: statusCode,
ErrorMessage: errorMessage,
ParsedErrorMessage: parsedError,
KeyID: fmt.Sprintf("%d", apiKey.ID),
KeyValue: apiKey.KeyValue,
Attempt: retryCount + 1,
UpstreamAddr: upstreamURL,
})
@@ -220,7 +218,7 @@ func (ps *ProxyServer) executeRequestWithRetry(
// ps.keyProvider.UpdateStatus(apiKey, group, true) // 请求成功不再重置成功次数减少IO消耗
logrus.Debugf("Request for group %s succeeded on attempt %d with key %s", group.Name, retryCount+1, utils.MaskAPIKey(apiKey.KeyValue))
ps.logRequest(c, group, apiKey.ID, startTime, resp.StatusCode, retryCount+1, nil, isStream, upstreamURL)
ps.logRequest(c, group, apiKey, startTime, resp.StatusCode, retryCount+1, nil, isStream, upstreamURL)
for key, values := range resp.Header {
for _, value := range values {
@@ -240,7 +238,7 @@ func (ps *ProxyServer) executeRequestWithRetry(
func (ps *ProxyServer) logRequest(
c *gin.Context,
group *models.Group,
keyID uint,
apiKey *models.APIKey,
startTime time.Time,
statusCode int,
retries int,
@@ -256,7 +254,7 @@ func (ps *ProxyServer) logRequest(
logEntry := &models.RequestLog{
GroupID: group.ID,
KeyID: keyID,
GroupName: group.Name,
IsSuccess: finalError == nil && statusCode < 400,
SourceIP: c.ClientIP(),
StatusCode: statusCode,
@@ -267,6 +265,9 @@ func (ps *ProxyServer) logRequest(
IsStream: isStream,
UpstreamAddr: utils.TruncateString(upstreamAddr, 500),
}
if apiKey != nil {
logEntry.KeyValue = apiKey.KeyValue
}
if finalError != nil {
logEntry.ErrorMessage = finalError.Error()

View File

@@ -207,24 +207,24 @@ func (s *RequestLogService) writeLogsToDB(logs []*models.RequestLog) error {
return fmt.Errorf("failed to batch insert request logs: %w", err)
}
keyStats := make(map[uint]int64)
keyStats := make(map[string]int64)
for _, log := range logs {
if log.IsSuccess {
keyStats[log.KeyID]++
if log.IsSuccess && log.KeyValue != "" {
keyStats[log.KeyValue]++
}
}
if len(keyStats) > 0 {
var caseStmt strings.Builder
var keyIDs []uint
caseStmt.WriteString("CASE id ")
for keyID, count := range keyStats {
caseStmt.WriteString(fmt.Sprintf("WHEN %d THEN request_count + %d ", keyID, count))
keyIDs = append(keyIDs, keyID)
var keyValues []string
caseStmt.WriteString("CASE key_value ")
for keyValue, count := range keyStats {
caseStmt.WriteString(fmt.Sprintf("WHEN '%s' THEN request_count + %d ", keyValue, count))
keyValues = append(keyValues, keyValue)
}
caseStmt.WriteString("END")
if err := tx.Model(&models.APIKey{}).Where("id IN ?", keyIDs).
if err := tx.Model(&models.APIKey{}).Where("key_value IN ?", keyValues).
Updates(map[string]any{
"request_count": gorm.Expr(caseStmt.String()),
"last_used_at": time.Now(),

View File

@@ -85,7 +85,7 @@ type RetryError struct {
StatusCode int `json:"status_code"`
ErrorMessage string `json:"error_message"`
ParsedErrorMessage string `json:"-"`
KeyID string `json:"key_id"`
KeyValue string `json:"key_value"`
Attempt int `json:"attempt"`
UpstreamAddr string `json:"-"`
}