fix: 优化配置

This commit is contained in:
tbphp
2025-06-07 11:45:19 +08:00
parent c94ed14357
commit e8ba441173
3 changed files with 20 additions and 40 deletions

View File

@@ -56,9 +56,6 @@ BUFFER_SIZE=65536
# 流式传输缓冲区大小字节默认64KB # 流式传输缓冲区大小字节默认64KB
STREAM_BUFFER_SIZE=65536 STREAM_BUFFER_SIZE=65536
# 流式传输flush间隔毫秒默认100ms
STREAM_FLUSH_INTERVAL=100
# 流式请求响应头超时毫秒默认10秒 # 流式请求响应头超时毫秒默认10秒
STREAM_HEADER_TIMEOUT=10000 STREAM_HEADER_TIMEOUT=10000

View File

@@ -74,7 +74,6 @@ type PerformanceConfig struct {
DisableCompression bool `json:"disableCompression"` DisableCompression bool `json:"disableCompression"`
BufferSize int `json:"bufferSize"` BufferSize int `json:"bufferSize"`
StreamBufferSize int `json:"streamBufferSize"` // 流式传输缓冲区大小 StreamBufferSize int `json:"streamBufferSize"` // 流式传输缓冲区大小
StreamFlushInterval int `json:"streamFlushInterval"` // 流式传输flush间隔毫秒
StreamHeaderTimeout int `json:"streamHeaderTimeout"` // 流式请求响应头超时(毫秒) StreamHeaderTimeout int `json:"streamHeaderTimeout"` // 流式请求响应头超时(毫秒)
} }
@@ -138,7 +137,6 @@ func LoadConfig() (*Config, error) {
DisableCompression: parseBoolean(os.Getenv("DISABLE_COMPRESSION"), true), DisableCompression: parseBoolean(os.Getenv("DISABLE_COMPRESSION"), true),
BufferSize: parseInteger(os.Getenv("BUFFER_SIZE"), 32*1024), BufferSize: parseInteger(os.Getenv("BUFFER_SIZE"), 32*1024),
StreamBufferSize: parseInteger(os.Getenv("STREAM_BUFFER_SIZE"), 64*1024), // 默认64KB StreamBufferSize: parseInteger(os.Getenv("STREAM_BUFFER_SIZE"), 64*1024), // 默认64KB
StreamFlushInterval: parseInteger(os.Getenv("STREAM_FLUSH_INTERVAL"), 100), // 默认100ms
StreamHeaderTimeout: parseInteger(os.Getenv("STREAM_HEADER_TIMEOUT"), 10000), // 默认10秒 StreamHeaderTimeout: parseInteger(os.Getenv("STREAM_HEADER_TIMEOUT"), 10000), // 默认10秒
}, },
Log: LogConfig{ Log: LogConfig{
@@ -201,10 +199,6 @@ func validateConfig(config *Config) error {
errors = append(errors, "流式缓冲区大小不能小于 1KB") errors = append(errors, "流式缓冲区大小不能小于 1KB")
} }
if config.Performance.StreamFlushInterval < 10 {
errors = append(errors, "流式flush间隔不能小于 10ms")
}
if config.Performance.StreamHeaderTimeout < 1000 { if config.Performance.StreamHeaderTimeout < 1000 {
errors = append(errors, "流式响应头超时不能小于 1秒") errors = append(errors, "流式响应头超时不能小于 1秒")
} }
@@ -257,7 +251,6 @@ func DisplayConfig(config *Config) {
logrus.Infof(" 压缩: %s", compressionStatus) logrus.Infof(" 压缩: %s", compressionStatus)
logrus.Infof(" 缓冲区大小: %d bytes", config.Performance.BufferSize) logrus.Infof(" 缓冲区大小: %d bytes", config.Performance.BufferSize)
logrus.Infof(" 流式缓冲区: %d bytes", config.Performance.StreamBufferSize) logrus.Infof(" 流式缓冲区: %d bytes", config.Performance.StreamBufferSize)
logrus.Infof(" 流式Flush间隔: %dms", config.Performance.StreamFlushInterval)
logrus.Infof(" 流式响应头超时: %dms", config.Performance.StreamHeaderTimeout) logrus.Infof(" 流式响应头超时: %dms", config.Performance.StreamHeaderTimeout)
// 显示日志配置 // 显示日志配置

View File

@@ -29,7 +29,6 @@ type ProxyServer struct {
upstreamURL *url.URL upstreamURL *url.URL
requestCount int64 requestCount int64
startTime time.Time startTime time.Time
flushTicker *time.Ticker // 流式响应的定时flush
} }
// NewProxyServer 创建新的代理服务器 // NewProxyServer 创建新的代理服务器
@@ -581,37 +580,31 @@ func (ps *ProxyServer) handleStreamResponse(c *gin.Context, body io.ReadCloser)
return return
} }
// 使用智能flush策略 // 实现零缓存、实时转发
flushInterval := time.Duration(config.AppConfig.Performance.StreamFlushInterval) * time.Millisecond copyDone := make(chan bool)
lastFlush := time.Now()
// 使用更大的缓冲区以减少系统调用 // 在一个独立的goroutine中定期flush确保数据被立即发送
buf := make([]byte, config.AppConfig.Performance.StreamBufferSize) go func() {
ticker := time.NewTicker(50 * time.Millisecond)
for { defer ticker.Stop()
n, err := body.Read(buf) for {
if n > 0 { select {
_, writeErr := c.Writer.Write(buf[:n]) case <-copyDone:
if writeErr != nil { // io.Copy完成后执行最后一次flush并退出
logrus.Errorf("写入流式响应失败: %v", writeErr) flusher.Flush()
break return
} case <-ticker.C:
// 智能flush基于时间间隔或数据量
if time.Since(lastFlush) >= flushInterval || n >= config.AppConfig.Performance.StreamBufferSize/2 {
flusher.Flush() flusher.Flush()
lastFlush = time.Now()
} }
} }
}()
if err != nil { // 使用io.Copy进行高效的数据复制
// 最后一次flush确保所有数据都发送出去 _, err := io.Copy(c.Writer, body)
flusher.Flush() close(copyDone) // 通知flush的goroutine可以停止了
if err != io.EOF {
logrus.Errorf("读取流式响应失败: %v", err) if err != nil && err != io.EOF {
} logrus.Errorf("复制流式响应失败: %v", err)
break
}
} }
} }
@@ -620,7 +613,4 @@ func (ps *ProxyServer) Close() {
if ps.keyManager != nil { if ps.keyManager != nil {
ps.keyManager.Close() ps.keyManager.Close()
} }
if ps.flushTicker != nil {
ps.flushTicker.Stop()
}
} }