feat: 流式输出性能优化

This commit is contained in:
tbphp
2025-06-29 17:41:19 +08:00
parent f83cb83846
commit e55889354f

View File

@@ -2,6 +2,7 @@
package proxy package proxy
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
@@ -24,6 +25,8 @@ import (
var ignorableStreamErrors = []string{ var ignorableStreamErrors = []string{
"context canceled", "context canceled",
"connection reset by peer", "connection reset by peer",
"broken pipe",
"use of closed network connection",
} }
// isIgnorableStreamError checks if the error is a common, non-critical error that can occur // isIgnorableStreamError checks if the error is a common, non-critical error that can occur
@@ -59,7 +62,7 @@ func NewProxyServer(keyManager types.KeyManager, configManager types.ConfigManag
MaxIdleConnsPerHost: 20, MaxIdleConnsPerHost: 20,
MaxConnsPerHost: 0, MaxConnsPerHost: 0,
IdleConnTimeout: time.Duration(openaiConfig.IdleConnTimeout) * time.Second, IdleConnTimeout: time.Duration(openaiConfig.IdleConnTimeout) * time.Second,
TLSHandshakeTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second, TLSHandshakeTimeout: 15 * time.Second,
ExpectContinueTimeout: 1 * time.Second, ExpectContinueTimeout: 1 * time.Second,
DisableCompression: !perfConfig.EnableGzip, DisableCompression: !perfConfig.EnableGzip,
ForceAttemptHTTP2: true, ForceAttemptHTTP2: true,
@@ -73,12 +76,12 @@ func NewProxyServer(keyManager types.KeyManager, configManager types.ConfigManag
MaxIdleConnsPerHost: 40, MaxIdleConnsPerHost: 40,
MaxConnsPerHost: 0, MaxConnsPerHost: 0,
IdleConnTimeout: time.Duration(openaiConfig.IdleConnTimeout) * time.Second, IdleConnTimeout: time.Duration(openaiConfig.IdleConnTimeout) * time.Second,
TLSHandshakeTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second, TLSHandshakeTimeout: 15 * time.Second,
ExpectContinueTimeout: 1 * time.Second, ExpectContinueTimeout: 1 * time.Second,
DisableCompression: true, // Always disable compression for streaming DisableCompression: true,
ForceAttemptHTTP2: true, ForceAttemptHTTP2: true,
WriteBufferSize: 64 * 1024, WriteBufferSize: 0,
ReadBufferSize: 64 * 1024, ReadBufferSize: 0,
ResponseHeaderTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second, ResponseHeaderTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second,
} }
@@ -132,6 +135,11 @@ func (ps *ProxyServer) HandleProxy(c *gin.Context) {
// isStreamRequest determines if this is a streaming request // isStreamRequest determines if this is a streaming request
func (ps *ProxyServer) isStreamRequest(bodyBytes []byte, c *gin.Context) bool { func (ps *ProxyServer) isStreamRequest(bodyBytes []byte, c *gin.Context) bool {
// Check for Gemini streaming endpoint
if strings.HasSuffix(c.Request.URL.Path, ":streamGenerateContent") {
return true
}
// Check Accept header // Check Accept header
if strings.Contains(c.GetHeader("Accept"), "text/event-stream") { if strings.Contains(c.GetHeader("Accept"), "text/event-stream") {
return true return true
@@ -144,9 +152,11 @@ func (ps *ProxyServer) isStreamRequest(bodyBytes []byte, c *gin.Context) bool {
// Check stream parameter in request body // Check stream parameter in request body
if len(bodyBytes) > 0 { if len(bodyBytes) > 0 {
if strings.Contains(string(bodyBytes), `"stream":true`) || var bodyJSON map[string]interface{}
strings.Contains(string(bodyBytes), `"stream": true`) { if err := json.Unmarshal(bodyBytes, &bodyJSON); err == nil {
return true if stream, ok := bodyJSON["stream"].(bool); ok && stream {
return true
}
} }
} }
@@ -398,13 +408,16 @@ func (ps *ProxyServer) executeRequestWithRetry(c *gin.Context, startTime time.Ti
} }
} }
var newline = []byte("\n")
// handleStreamingResponse handles streaming responses // handleStreamingResponse handles streaming responses
func (ps *ProxyServer) handleStreamingResponse(c *gin.Context, resp *http.Response) { func (ps *ProxyServer) handleStreamingResponse(c *gin.Context, resp *http.Response) {
// Set headers for streaming // Set headers for streaming
c.Header("Cache-Control", "no-cache") c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive") c.Header("Connection", "keep-alive")
c.Header("Content-Type", "text/event-stream")
c.Header("X-Accel-Buffering", "no")
// Stream response directly
flusher, ok := c.Writer.(http.Flusher) flusher, ok := c.Writer.(http.Flusher)
if !ok { if !ok {
logrus.Error("Streaming unsupported") logrus.Error("Streaming unsupported")
@@ -415,26 +428,36 @@ func (ps *ProxyServer) handleStreamingResponse(c *gin.Context, resp *http.Respon
return return
} }
// Copy streaming data with optimized buffer size scanner := bufio.NewScanner(resp.Body)
buffer := make([]byte, 32*1024) // 32KB buffer for better performance scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for {
n, err := resp.Body.Read(buffer) for scanner.Scan() {
if n > 0 { lineBytes := scanner.Bytes()
if _, writeErr := c.Writer.Write(buffer[:n]); writeErr != nil { if _, err := c.Writer.Write(lineBytes); err != nil {
logrus.Errorf("Failed to write streaming data: %v", writeErr) if isIgnorableStreamError(err) {
break logrus.Debugf("Stream closed by client: %v", err)
} else {
logrus.Errorf("Failed to write streaming data: %v", err)
} }
flusher.Flush() return
} }
if err != nil { if _, err := c.Writer.Write(newline); err != nil {
if err != io.EOF { if isIgnorableStreamError(err) {
if isIgnorableStreamError(err) { logrus.Debugf("Stream closed by client: %v", err)
logrus.Debugf("Stream closed by client or network: %v", err) } else {
} else { logrus.Errorf("Failed to write streaming data: %v", err)
logrus.Errorf("Error reading streaming response: %v", err)
}
} }
break return
}
flusher.Flush()
}
if err := scanner.Err(); err != nil {
if isIgnorableStreamError(err) {
logrus.Debugf("Stream closed by client or network: %v", err)
} else {
logrus.Errorf("Error reading streaming response: %v", err)
} }
} }
} }