From e55889354fbcee6e586f17a2665a6123790c4a4f Mon Sep 17 00:00:00 2001 From: tbphp Date: Sun, 29 Jun 2025 17:41:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B5=81=E5=BC=8F=E8=BE=93=E5=87=BA?= =?UTF-8?q?=E6=80=A7=E8=83=BD=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/proxy/server.go | 75 ++++++++++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 26 deletions(-) diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 6344eb6..8e20066 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -2,6 +2,7 @@ package proxy import ( + "bufio" "bytes" "context" "encoding/json" @@ -24,6 +25,8 @@ import ( var ignorableStreamErrors = []string{ "context canceled", "connection reset by peer", + "broken pipe", + "use of closed network connection", } // isIgnorableStreamError checks if the error is a common, non-critical error that can occur @@ -59,7 +62,7 @@ func NewProxyServer(keyManager types.KeyManager, configManager types.ConfigManag MaxIdleConnsPerHost: 20, MaxConnsPerHost: 0, IdleConnTimeout: time.Duration(openaiConfig.IdleConnTimeout) * time.Second, - TLSHandshakeTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second, + TLSHandshakeTimeout: 15 * time.Second, ExpectContinueTimeout: 1 * time.Second, DisableCompression: !perfConfig.EnableGzip, ForceAttemptHTTP2: true, @@ -73,12 +76,12 @@ func NewProxyServer(keyManager types.KeyManager, configManager types.ConfigManag MaxIdleConnsPerHost: 40, MaxConnsPerHost: 0, IdleConnTimeout: time.Duration(openaiConfig.IdleConnTimeout) * time.Second, - TLSHandshakeTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second, + TLSHandshakeTimeout: 15 * time.Second, ExpectContinueTimeout: 1 * time.Second, - DisableCompression: true, // Always disable compression for streaming + DisableCompression: true, ForceAttemptHTTP2: true, - WriteBufferSize: 64 * 1024, - ReadBufferSize: 64 * 1024, + WriteBufferSize: 0, + ReadBufferSize: 0, ResponseHeaderTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second, } @@ -132,6 +135,11 @@ func (ps *ProxyServer) HandleProxy(c *gin.Context) { // isStreamRequest determines if this is a streaming request func (ps *ProxyServer) isStreamRequest(bodyBytes []byte, c *gin.Context) bool { + // Check for Gemini streaming endpoint + if strings.HasSuffix(c.Request.URL.Path, ":streamGenerateContent") { + return true + } + // Check Accept header if strings.Contains(c.GetHeader("Accept"), "text/event-stream") { return true @@ -144,9 +152,11 @@ func (ps *ProxyServer) isStreamRequest(bodyBytes []byte, c *gin.Context) bool { // Check stream parameter in request body if len(bodyBytes) > 0 { - if strings.Contains(string(bodyBytes), `"stream":true`) || - strings.Contains(string(bodyBytes), `"stream": true`) { - return true + var bodyJSON map[string]interface{} + if err := json.Unmarshal(bodyBytes, &bodyJSON); err == nil { + if stream, ok := bodyJSON["stream"].(bool); ok && stream { + return true + } } } @@ -398,13 +408,16 @@ func (ps *ProxyServer) executeRequestWithRetry(c *gin.Context, startTime time.Ti } } +var newline = []byte("\n") + // handleStreamingResponse handles streaming responses func (ps *ProxyServer) handleStreamingResponse(c *gin.Context, resp *http.Response) { // Set headers for streaming c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") + c.Header("Content-Type", "text/event-stream") + c.Header("X-Accel-Buffering", "no") - // Stream response directly flusher, ok := c.Writer.(http.Flusher) if !ok { logrus.Error("Streaming unsupported") @@ -415,26 +428,36 @@ func (ps *ProxyServer) handleStreamingResponse(c *gin.Context, resp *http.Respon return } - // Copy streaming data with optimized buffer size - buffer := make([]byte, 32*1024) // 32KB buffer for better performance - for { - n, err := resp.Body.Read(buffer) - if n > 0 { - if _, writeErr := c.Writer.Write(buffer[:n]); writeErr != nil { - logrus.Errorf("Failed to write streaming data: %v", writeErr) - break + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + + for scanner.Scan() { + lineBytes := scanner.Bytes() + if _, err := c.Writer.Write(lineBytes); err != nil { + if isIgnorableStreamError(err) { + logrus.Debugf("Stream closed by client: %v", err) + } else { + logrus.Errorf("Failed to write streaming data: %v", err) } - flusher.Flush() + return } - if err != nil { - if err != io.EOF { - if isIgnorableStreamError(err) { - logrus.Debugf("Stream closed by client or network: %v", err) - } else { - logrus.Errorf("Error reading streaming response: %v", err) - } + if _, err := c.Writer.Write(newline); err != nil { + if isIgnorableStreamError(err) { + logrus.Debugf("Stream closed by client: %v", err) + } else { + logrus.Errorf("Failed to write streaming data: %v", err) } - break + return + } + + flusher.Flush() + } + + if err := scanner.Err(); err != nil { + if isIgnorableStreamError(err) { + logrus.Debugf("Stream closed by client or network: %v", err) + } else { + logrus.Errorf("Error reading streaming response: %v", err) } } }