From e8ba4411738e7de6ecd294e2a78b63360b55708b Mon Sep 17 00:00:00 2001 From: tbphp Date: Sat, 7 Jun 2025 11:45:19 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 3 --- internal/config/config.go | 7 ------ internal/proxy/proxy.go | 50 ++++++++++++++++----------------------- 3 files changed, 20 insertions(+), 40 deletions(-) diff --git a/.env.example b/.env.example index c6de4b2..050be0e 100644 --- a/.env.example +++ b/.env.example @@ -56,9 +56,6 @@ BUFFER_SIZE=65536 # 流式传输缓冲区大小(字节,默认64KB) STREAM_BUFFER_SIZE=65536 -# 流式传输flush间隔(毫秒,默认100ms) -STREAM_FLUSH_INTERVAL=100 - # 流式请求响应头超时(毫秒,默认10秒) STREAM_HEADER_TIMEOUT=10000 diff --git a/internal/config/config.go b/internal/config/config.go index f776383..b9c4f85 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -74,7 +74,6 @@ type PerformanceConfig struct { DisableCompression bool `json:"disableCompression"` BufferSize int `json:"bufferSize"` StreamBufferSize int `json:"streamBufferSize"` // 流式传输缓冲区大小 - StreamFlushInterval int `json:"streamFlushInterval"` // 流式传输flush间隔(毫秒) StreamHeaderTimeout int `json:"streamHeaderTimeout"` // 流式请求响应头超时(毫秒) } @@ -138,7 +137,6 @@ func LoadConfig() (*Config, error) { DisableCompression: parseBoolean(os.Getenv("DISABLE_COMPRESSION"), true), BufferSize: parseInteger(os.Getenv("BUFFER_SIZE"), 32*1024), StreamBufferSize: parseInteger(os.Getenv("STREAM_BUFFER_SIZE"), 64*1024), // 默认64KB - StreamFlushInterval: parseInteger(os.Getenv("STREAM_FLUSH_INTERVAL"), 100), // 默认100ms StreamHeaderTimeout: parseInteger(os.Getenv("STREAM_HEADER_TIMEOUT"), 10000), // 默认10秒 }, Log: LogConfig{ @@ -201,10 +199,6 @@ func validateConfig(config *Config) error { errors = append(errors, "流式缓冲区大小不能小于 1KB") } - if config.Performance.StreamFlushInterval < 10 { - errors = append(errors, "流式flush间隔不能小于 10ms") - } - if config.Performance.StreamHeaderTimeout < 1000 { errors = append(errors, "流式响应头超时不能小于 1秒") } @@ -257,7 +251,6 @@ func DisplayConfig(config *Config) { logrus.Infof(" 压缩: %s", compressionStatus) logrus.Infof(" 缓冲区大小: %d bytes", config.Performance.BufferSize) logrus.Infof(" 流式缓冲区: %d bytes", config.Performance.StreamBufferSize) - logrus.Infof(" 流式Flush间隔: %dms", config.Performance.StreamFlushInterval) logrus.Infof(" 流式响应头超时: %dms", config.Performance.StreamHeaderTimeout) // 显示日志配置 diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 8beb6fc..48e389e 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -29,7 +29,6 @@ type ProxyServer struct { upstreamURL *url.URL requestCount int64 startTime time.Time - flushTicker *time.Ticker // 流式响应的定时flush } // NewProxyServer 创建新的代理服务器 @@ -581,37 +580,31 @@ func (ps *ProxyServer) handleStreamResponse(c *gin.Context, body io.ReadCloser) return } - // 使用智能flush策略 - flushInterval := time.Duration(config.AppConfig.Performance.StreamFlushInterval) * time.Millisecond - lastFlush := time.Now() + // 实现零缓存、实时转发 + copyDone := make(chan bool) - // 使用更大的缓冲区以减少系统调用 - buf := make([]byte, config.AppConfig.Performance.StreamBufferSize) - - for { - n, err := body.Read(buf) - if n > 0 { - _, writeErr := c.Writer.Write(buf[:n]) - if writeErr != nil { - logrus.Errorf("写入流式响应失败: %v", writeErr) - break - } - - // 智能flush:基于时间间隔或数据量 - if time.Since(lastFlush) >= flushInterval || n >= config.AppConfig.Performance.StreamBufferSize/2 { + // 在一个独立的goroutine中定期flush,确保数据被立即发送 + go func() { + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-copyDone: + // io.Copy完成后,执行最后一次flush并退出 + flusher.Flush() + return + case <-ticker.C: flusher.Flush() - lastFlush = time.Now() } } + }() - if err != nil { - // 最后一次flush确保所有数据都发送出去 - flusher.Flush() - if err != io.EOF { - logrus.Errorf("读取流式响应失败: %v", err) - } - break - } + // 使用io.Copy进行高效的数据复制 + _, err := io.Copy(c.Writer, body) + close(copyDone) // 通知flush的goroutine可以停止了 + + if err != nil && err != io.EOF { + logrus.Errorf("复制流式响应失败: %v", err) } } @@ -620,7 +613,4 @@ func (ps *ProxyServer) Close() { if ps.keyManager != nil { ps.keyManager.Close() } - if ps.flushTicker != nil { - ps.flushTicker.Stop() - } }