From 9ba7375f9c57967a973b86999c3c9d3d147f47c3 Mon Sep 17 00:00:00 2001 From: tbphp Date: Sat, 7 Jun 2025 00:04:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 4 +-- docker-compose.yml | 39 ++++---------------------- internal/proxy/proxy.go | 62 +++++++++++++++++++++++++++++++++-------- 3 files changed, 59 insertions(+), 46 deletions(-) diff --git a/.env.example b/.env.example index 1b1667e..992c627 100644 --- a/.env.example +++ b/.env.example @@ -50,8 +50,8 @@ ENABLE_KEEP_ALIVE=true # 禁用压缩(减少CPU开销) DISABLE_COMPRESSION=true -# 缓冲区大小(字节) -BUFFER_SIZE=32768 +# 缓冲区大小(字节,建议流式响应使用64KB或更大) +BUFFER_SIZE=65536 # =========================================== # 日志配置 diff --git a/docker-compose.yml b/docker-compose.yml index 555ddde..94b6fd5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,9 @@ -version: '3.8' +version: "3.8" services: - openai-proxy: + gpt-load: build: . - container_name: openai-proxy-go + container_name: gpt-load ports: - "3000:3000" volumes: @@ -12,7 +12,7 @@ services: # 挂载配置文件(只读) - ./.env:/app/.env:ro restart: unless-stopped - + # 健康检查 healthcheck: test: ["CMD", "curl", "-f", "http://localhost:3000/health"] @@ -20,33 +20,6 @@ services: timeout: 10s retries: 3 start_period: 40s - - # 环境变量 - environment: - - GO_ENV=production - - # 日志配置 - logging: - driver: "json-file" - options: - max-size: "10m" - max-file: "3" - - # 资源限制 - deploy: - resources: - limits: - memory: 512M - cpus: '1.0' - reservations: - memory: 128M - cpus: '0.25' - - # 网络配置 - networks: - - proxy-network -# 网络定义 -networks: - proxy-network: - driver: bridge + env_file: + - .env diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index bc7130a..60cbeec 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -309,8 +309,15 @@ func (ps *ProxyServer) handleProxy(c *gin.Context) { // 增加请求计数 atomic.AddInt64(&ps.requestCount, 1) - // 读取请求体用于重试 + // 检查是否为流式请求 + isStreamRequest := strings.Contains(c.GetHeader("Accept"), "text/event-stream") || + strings.Contains(c.Request.URL.RawQuery, "stream=true") || + strings.Contains(c.Request.Header.Get("Content-Type"), "text/event-stream") + var bodyBytes []byte + var requestBody io.Reader = c.Request.Body + + // 为了支持重试,需要缓存请求体(包括流式请求) if c.Request.Body != nil { var err error bodyBytes, err = io.ReadAll(c.Request.Body) @@ -326,14 +333,15 @@ func (ps *ProxyServer) handleProxy(c *gin.Context) { }) return } + requestBody = strings.NewReader(string(bodyBytes)) } // 执行带重试的请求 - ps.executeRequestWithRetry(c, startTime, bodyBytes, 0) + ps.executeRequestWithRetry(c, startTime, bodyBytes, requestBody, isStreamRequest, 0) } // executeRequestWithRetry 执行带重试的请求 -func (ps *ProxyServer) executeRequestWithRetry(c *gin.Context, startTime time.Time, bodyBytes []byte, retryCount int) { +func (ps *ProxyServer) executeRequestWithRetry(c *gin.Context, startTime time.Time, bodyBytes []byte, requestBody io.Reader, isStreamRequest bool, retryCount int) { // 获取密钥信息 keyInfo, err := ps.keyManager.GetNextKey() if err != nil { @@ -358,10 +366,10 @@ func (ps *ProxyServer) executeRequestWithRetry(c *gin.Context, startTime time.Ti c.Set("retryCount", retryCount) } - // 使用缓存的请求体 - var requestBody io.Reader + // 准备请求体和内容长度 var contentLength int64 if len(bodyBytes) > 0 { + // 使用缓存的请求体(支持重试) requestBody = strings.NewReader(string(bodyBytes)) contentLength = int64(len(bodyBytes)) } @@ -429,7 +437,7 @@ func (ps *ProxyServer) executeRequestWithRetry(c *gin.Context, startTime time.Ti // 检查是否可以重试 if retryCount < config.AppConfig.Keys.MaxRetries { logrus.Infof("准备重试请求 (第 %d/%d 次)", retryCount+1, config.AppConfig.Keys.MaxRetries) - ps.executeRequestWithRetry(c, startTime, bodyBytes, retryCount+1) + ps.executeRequestWithRetry(c, startTime, bodyBytes, nil, isStreamRequest, retryCount+1) return } @@ -465,7 +473,7 @@ func (ps *ProxyServer) executeRequestWithRetry(c *gin.Context, startTime time.Ti resp.Body.Close() logrus.Infof("准备重试请求 (第 %d/%d 次)", retryCount+1, config.AppConfig.Keys.MaxRetries) - ps.executeRequestWithRetry(c, startTime, bodyBytes, retryCount+1) + ps.executeRequestWithRetry(c, startTime, bodyBytes, nil, isStreamRequest, retryCount+1) return } @@ -488,10 +496,42 @@ func (ps *ProxyServer) executeRequestWithRetry(c *gin.Context, startTime time.Ti // 设置状态码 c.Status(resp.StatusCode) - // 流式复制响应体(零拷贝) - _, err = io.Copy(c.Writer, resp.Body) - if err != nil { - logrus.Errorf("复制响应体失败: %v (响应时间: %v)", err, responseTime) + // 优化流式响应传输 + if isStreamRequest { + // 流式响应:启用实时刷新 + if flusher, ok := c.Writer.(http.Flusher); ok { + // 使用优化的缓冲区进行流式复制 + buf := make([]byte, config.AppConfig.Performance.BufferSize) + for { + n, err := resp.Body.Read(buf) + if n > 0 { + _, writeErr := c.Writer.Write(buf[:n]) + if writeErr != nil { + logrus.Errorf("写入流式响应失败: %v", writeErr) + break + } + flusher.Flush() // 立即刷新到客户端 + } + if err != nil { + if err != io.EOF { + logrus.Errorf("读取流式响应失败: %v", err) + } + break + } + } + } else { + // 降级到标准复制 + _, err = io.Copy(c.Writer, resp.Body) + if err != nil { + logrus.Errorf("复制流式响应失败: %v", err) + } + } + } else { + // 非流式响应:使用标准零拷贝 + _, err = io.Copy(c.Writer, resp.Body) + if err != nil { + logrus.Errorf("复制响应体失败: %v (响应时间: %v)", err, responseTime) + } } }