diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 48e389e..a648f59 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -583,31 +583,98 @@ func (ps *ProxyServer) handleStreamResponse(c *gin.Context, body io.ReadCloser) // 实现零缓存、实时转发 copyDone := make(chan bool) + // 检查客户端连接状态 + ctx := c.Request.Context() + // 在一个独立的goroutine中定期flush,确保数据被立即发送 go func() { + defer func() { + // 防止panic + if r := recover(); r != nil { + logrus.Errorf("Flush goroutine panic: %v", r) + } + }() + ticker := time.NewTicker(50 * time.Millisecond) defer ticker.Stop() + for { select { case <-copyDone: // io.Copy完成后,执行最后一次flush并退出 - flusher.Flush() + ps.safeFlush(flusher) + return + case <-ctx.Done(): + // 客户端断开连接,停止flush return case <-ticker.C: - flusher.Flush() + ps.safeFlush(flusher) } } }() // 使用io.Copy进行高效的数据复制 _, err := io.Copy(c.Writer, body) - close(copyDone) // 通知flush的goroutine可以停止了 + + // 安全地关闭channel + select { + case <-copyDone: + // channel已经关闭 + default: + close(copyDone) // 通知flush的goroutine可以停止了 + } if err != nil && err != io.EOF { - logrus.Errorf("复制流式响应失败: %v", err) + // 检查是否是连接断开导致的错误 + if ps.isConnectionError(err) { + logrus.Debugf("客户端连接断开: %v", err) + } else { + logrus.Errorf("复制流式响应失败: %v", err) + } } } +// safeFlush 安全地执行flush操作 +func (ps *ProxyServer) safeFlush(flusher http.Flusher) { + defer func() { + if r := recover(); r != nil { + // 忽略flush时的panic,通常是因为连接已断开 + logrus.Debugf("Flush panic (connection likely closed): %v", r) + } + }() + + if flusher != nil { + flusher.Flush() + } +} + +// isConnectionError 检查是否是连接相关的错误 +func (ps *ProxyServer) isConnectionError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + // 常见的连接断开错误 + connectionErrors := []string{ + "broken pipe", + "connection reset by peer", + "connection aborted", + "client disconnected", + "write: broken pipe", + "use of closed network connection", + "context canceled", + } + + for _, connErr := range connectionErrors { + if strings.Contains(errStr, connErr) { + return true + } + } + + return false +} + // Close 关闭代理服务器 func (ps *ProxyServer) Close() { if ps.keyManager != nil {