feat: 优化代理服务

This commit is contained in:
tbphp
2025-07-11 17:11:00 +08:00
parent 6ffbb7e9a1
commit 1368792aa0
16 changed files with 249 additions and 262 deletions

View File

@@ -0,0 +1,63 @@
package proxy
import (
"bytes"
"compress/gzip"
"encoding/json"
app_errors "gpt-load/internal/errors"
"gpt-load/internal/models"
"io"
"net/http"
"github.com/sirupsen/logrus"
)
func (ps *ProxyServer) applyParamOverrides(bodyBytes []byte, group *models.Group) ([]byte, error) {
if len(group.ParamOverrides) == 0 || len(bodyBytes) == 0 {
return bodyBytes, nil
}
var requestData map[string]any
if err := json.Unmarshal(bodyBytes, &requestData); err != nil {
logrus.Warnf("failed to unmarshal request body for param override, passing through: %v", err)
return bodyBytes, nil
}
for key, value := range group.ParamOverrides {
requestData[key] = value
}
return json.Marshal(requestData)
}
// logUpstreamError provides a centralized way to log errors from upstream interactions.
func logUpstreamError(context string, err error) {
if err == nil {
return
}
if app_errors.IsIgnorableError(err) {
logrus.Debugf("Ignorable upstream error in %s: %v", context, err)
} else {
logrus.Errorf("Upstream error in %s: %v", context, err)
}
}
// handleGzipCompression checks for gzip encoding and decompresses the body if necessary.
func handleGzipCompression(resp *http.Response, bodyBytes []byte) []byte {
if resp.Header.Get("Content-Encoding") == "gzip" {
reader, gzipErr := gzip.NewReader(bytes.NewReader(bodyBytes))
if gzipErr != nil {
logrus.Warnf("Failed to create gzip reader for error body: %v", gzipErr)
return bodyBytes
}
defer reader.Close()
decompressedBody, readAllErr := io.ReadAll(reader)
if readAllErr != nil {
logrus.Warnf("Failed to decompress gzip error body: %v", readAllErr)
return bodyBytes
}
return decompressedBody
}
return bodyBytes
}

View File

@@ -0,0 +1,55 @@
package proxy
import (
"bufio"
"net/http"
"io"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)
func (ps *ProxyServer) handleStreamingResponse(c *gin.Context, resp *http.Response) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Accel-Buffering", "no")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
logrus.Error("Streaming unsupported by the writer, falling back to normal response")
ps.handleNormalResponse(c, resp)
return
}
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
select {
case <-c.Request.Context().Done():
logrus.Debugf("Client disconnected, closing stream.")
return
default:
}
if _, err := c.Writer.Write(scanner.Bytes()); err != nil {
logUpstreamError("writing stream to client", err)
return
}
if _, err := c.Writer.Write([]byte("\n\n")); err != nil {
logUpstreamError("writing stream newline to client", err)
return
}
flusher.Flush()
}
if err := scanner.Err(); err != nil {
logUpstreamError("reading from upstream scanner", err)
}
}
func (ps *ProxyServer) handleNormalResponse(c *gin.Context, resp *http.Response) {
if _, err := io.Copy(c.Writer, resp.Body); err != nil {
logUpstreamError("copying response body", err)
}
}

View File

@@ -2,15 +2,12 @@
package proxy
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
"gpt-load/internal/channel"
@@ -21,34 +18,12 @@ import (
"gpt-load/internal/response"
"gpt-load/internal/services"
"gpt-load/internal/types"
"gpt-load/internal/utils"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)
// A list of errors that are considered normal during streaming when a client disconnects.
var ignorableStreamErrors = []string{
"context canceled",
"connection reset by peer",
"broken pipe",
"use of closed network connection",
}
// isIgnorableStreamError checks if the error is a common, non-critical error that can occur
// when a client disconnects during a streaming response.
func isIgnorableStreamError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
for _, ignorableError := range ignorableStreamErrors {
if strings.Contains(errStr, ignorableError) {
return true
}
}
return false
}
// ProxyServer represents the proxy server
type ProxyServer struct {
keyProvider *keypool.KeyProvider
@@ -97,17 +72,14 @@ func (ps *ProxyServer) HandleProxy(c *gin.Context) {
}
c.Request.Body.Close()
// 4. Apply parameter overrides if any.
finalBodyBytes, err := ps.applyParamOverrides(bodyBytes, group)
if err != nil {
response.Error(c, app_errors.NewAPIError(app_errors.ErrInternalServer, fmt.Sprintf("Failed to apply parameter overrides: %v", err)))
return
}
// 5. Determine if this is a streaming request.
isStream := channelHandler.IsStreamRequest(c, bodyBytes)
// 6. Execute the request using the recursive retry logic.
ps.executeRequestWithRetry(c, channelHandler, group, finalBodyBytes, isStream, startTime, 0, nil)
}
@@ -124,7 +96,6 @@ func (ps *ProxyServer) executeRequestWithRetry(
) {
cfg := group.EffectiveConfig
if retryCount > cfg.MaxRetries {
logrus.Errorf("Max retries exceeded for group %s after %d attempts.", group.Name, retryCount)
if len(retryErrors) > 0 {
lastError := retryErrors[len(retryErrors)-1]
var errorJSON map[string]any
@@ -133,8 +104,10 @@ func (ps *ProxyServer) executeRequestWithRetry(
} else {
response.Error(c, app_errors.NewAPIErrorWithUpstream(lastError.StatusCode, "UPSTREAM_ERROR", lastError.ErrorMessage))
}
logrus.Debugf("Max retries exceeded for group %s after %d attempts. Parsed Error: %s", group.Name, retryCount, lastError.ErrorMessage)
} else {
response.Error(c, app_errors.ErrMaxRetriesExceeded)
logrus.Debugf("Max retries exceeded for group %s after %d attempts.", group.Name, retryCount)
}
return
}
@@ -173,58 +146,54 @@ func (ps *ProxyServer) executeRequestWithRetry(
req.Header = c.Request.Header.Clone()
channelHandler.ModifyRequest(req, apiKey, group)
client := channelHandler.GetHTTPClient()
var client *http.Client
if isStream {
client = channelHandler.GetStreamClient()
req.Header.Set("X-Accel-Buffering", "no")
} else {
client = channelHandler.GetHTTPClient()
}
resp, err := client.Do(req)
if err != nil {
ps.keyProvider.UpdateStatus(apiKey.ID, group.ID, false)
logrus.Warnf("Request failed (attempt %d/%d) for key %s: %v", retryCount+1, cfg.MaxRetries, apiKey.KeyValue[:8], err)
newRetryErrors := append(retryErrors, types.RetryError{
StatusCode: 0,
ErrorMessage: err.Error(),
KeyID: fmt.Sprintf("%d", apiKey.ID),
Attempt: retryCount + 1,
})
ps.executeRequestWithRetry(c, channelHandler, group, bodyBytes, isStream, startTime, retryCount+1, newRetryErrors)
return
if resp != nil {
defer resp.Body.Close()
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
// Unified error handling for retries.
if err != nil || (resp != nil && resp.StatusCode >= 400) {
if err != nil && app_errors.IsIgnorableError(err) {
logrus.Debugf("Client-side ignorable error for key %s, aborting retries: %v", utils.MaskAPIKey(apiKey.KeyValue), err)
return
}
ps.keyProvider.UpdateStatus(apiKey.ID, group.ID, false)
errorBody, readErr := io.ReadAll(resp.Body)
if readErr != nil {
logrus.Errorf("Failed to read error body: %v", readErr)
// Even if reading fails, we should proceed with retry logic
errorBody = []byte("Failed to read error body")
}
// Check for gzip encoding and decompress if necessary.
if resp.Header.Get("Content-Encoding") == "gzip" {
reader, err := gzip.NewReader(bytes.NewReader(errorBody))
if err == nil {
decompressedBody, err := io.ReadAll(reader)
if err == nil {
errorBody = decompressedBody
} else {
logrus.Warnf("Failed to decompress gzip error body: %v", err)
}
reader.Close()
} else {
logrus.Warnf("Failed to create gzip reader for error body: %v", err)
var statusCode int
var errorMessage string
var parsedError string
if err != nil {
statusCode = 0
errorMessage = err.Error()
logrus.Debugf("Request failed (attempt %d/%d) for key %s: %v", retryCount+1, cfg.MaxRetries, utils.MaskAPIKey(apiKey.KeyValue), err)
} else {
// HTTP-level error (status >= 400)
statusCode = resp.StatusCode
errorBody, readErr := io.ReadAll(resp.Body)
if readErr != nil {
logrus.Errorf("Failed to read error body: %v", readErr)
errorBody = []byte("Failed to read error body")
}
}
logrus.Warnf("Request failed with status %d (attempt %d/%d) for key %s. Body: %s", resp.StatusCode, retryCount+1, cfg.MaxRetries, apiKey.KeyValue[:8], string(errorBody))
errorBody = handleGzipCompression(resp, errorBody)
errorMessage = string(errorBody)
parsedError = app_errors.ParseUpstreamError(errorBody)
logrus.Debugf("Request failed with status %d (attempt %d/%d) for key %s. Parsed Error: %s", statusCode, retryCount+1, cfg.MaxRetries, utils.MaskAPIKey(apiKey.KeyValue), parsedError)
}
newRetryErrors := append(retryErrors, types.RetryError{
StatusCode: resp.StatusCode,
ErrorMessage: string(errorBody),
StatusCode: statusCode,
ErrorMessage: errorMessage,
KeyID: fmt.Sprintf("%d", apiKey.ID),
Attempt: retryCount + 1,
})
@@ -233,7 +202,7 @@ func (ps *ProxyServer) executeRequestWithRetry(
}
ps.keyProvider.UpdateStatus(apiKey.ID, group.ID, true)
logrus.Debugf("Request for group %s succeeded on attempt %d with key %s", group.Name, retryCount+1, apiKey.KeyValue[:8])
logrus.Debugf("Request for group %s succeeded on attempt %d with key %s", group.Name, retryCount+1, utils.MaskAPIKey(apiKey.KeyValue))
for key, values := range resp.Header {
for _, value := range values {
@@ -248,65 +217,3 @@ func (ps *ProxyServer) executeRequestWithRetry(
ps.handleNormalResponse(c, resp)
}
}
func (ps *ProxyServer) handleStreamingResponse(c *gin.Context, resp *http.Response) {
flusher, ok := c.Writer.(http.Flusher)
if !ok {
logrus.Error("Streaming unsupported by the writer")
ps.handleNormalResponse(c, resp)
return
}
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
if _, err := c.Writer.Write(scanner.Bytes()); err != nil {
if !isIgnorableStreamError(err) {
logrus.Errorf("Error writing to client: %v", err)
}
return
}
if _, err := c.Writer.Write([]byte("\n")); err != nil {
if !isIgnorableStreamError(err) {
logrus.Errorf("Error writing newline to client: %v", err)
}
return
}
flusher.Flush()
}
if err := scanner.Err(); err != nil && !isIgnorableStreamError(err) {
logrus.Errorf("Error reading from upstream: %v", err)
}
}
func (ps *ProxyServer) handleNormalResponse(c *gin.Context, resp *http.Response) {
if _, err := io.Copy(c.Writer, resp.Body); err != nil {
if !isIgnorableStreamError(err) {
logrus.Errorf("Failed to copy response body to client: %v", err)
}
}
}
func (ps *ProxyServer) applyParamOverrides(bodyBytes []byte, group *models.Group) ([]byte, error) {
if len(group.ParamOverrides) == 0 || len(bodyBytes) == 0 {
return bodyBytes, nil
}
var requestData map[string]any
if err := json.Unmarshal(bodyBytes, &requestData); err != nil {
logrus.Warnf("failed to unmarshal request body for param override, passing through: %v", err)
return bodyBytes, nil
}
for key, value := range group.ParamOverrides {
requestData[key] = value
}
return json.Marshal(requestData)
}
func (ps *ProxyServer) Close() {
// The HTTP clients are now managed by the channel factory and httpclient manager,
// so the proxy server itself doesn't need to close them.
// The httpclient manager will handle closing idle connections for all its clients.
}