feat: group api

This commit is contained in:
tbphp
2025-07-03 16:29:12 +08:00
parent a94412353a
commit 80662af9de
8 changed files with 288 additions and 156 deletions

View File

@@ -1,122 +1,151 @@
package channel
import (
"bytes"
"compress/gzip"
"fmt"
"gpt-load/internal/models"
"gpt-load/internal/response"
"io"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"sync/atomic"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/sirupsen/logrus"
)
// RequestModifier defines a function that can modify the upstream request,
// for example, by adding authentication headers.
type RequestModifier func(req *http.Request, apiKey *models.APIKey)
// BaseChannel provides a foundation for specific channel implementations.
// BaseChannel provides common functionality for channel proxies.
type BaseChannel struct {
Name string
BaseURL *url.URL
HTTPClient *http.Client
Name string
Upstreams []*url.URL
HTTPClient *http.Client
roundRobin uint64
}
// ProcessRequest handles the generic logic of creating, sending, and handling an upstream request.
func (ch *BaseChannel) ProcessRequest(c *gin.Context, apiKey *models.APIKey, modifier RequestModifier) error {
// 1. Create the upstream request
req, err := ch.createUpstreamRequest(c, apiKey, modifier)
if err != nil {
response.Error(c, http.StatusInternalServerError, "Failed to create upstream request")
return fmt.Errorf("create upstream request failed: %w", err)
// RequestModifier is a function that can modify the request before it's sent.
type RequestModifier func(req *http.Request, key *models.APIKey)
// getUpstreamURL selects an upstream URL using round-robin.
func (b *BaseChannel) getUpstreamURL() *url.URL {
if len(b.Upstreams) == 0 {
return nil
}
if len(b.Upstreams) == 1 {
return b.Upstreams[0]
}
index := atomic.AddUint64(&b.roundRobin, 1) - 1
return b.Upstreams[index%uint64(len(b.Upstreams))]
}
// ProcessRequest handles the common logic of processing and forwarding a request.
func (b *BaseChannel) ProcessRequest(c *gin.Context, apiKey *models.APIKey, modifier RequestModifier) error {
upstreamURL := b.getUpstreamURL()
if upstreamURL == nil {
return fmt.Errorf("no upstream URL configured for channel %s", b.Name)
}
// 2. Send the request
resp, err := ch.HTTPClient.Do(req)
if err != nil {
response.Error(c, http.StatusServiceUnavailable, "Upstream service unavailable")
return fmt.Errorf("upstream request failed: %w", err)
}
defer resp.Body.Close()
director := func(req *http.Request) {
req.URL.Scheme = upstreamURL.Scheme
req.URL.Host = upstreamURL.Host
req.URL.Path = singleJoiningSlash(upstreamURL.Path, req.URL.Path)
req.Host = upstreamURL.Host
// 3. Handle non-200 status codes
if resp.StatusCode != http.StatusOK {
errorMsg := ch.getErrorMessage(resp)
response.Error(c, resp.StatusCode, errorMsg)
return fmt.Errorf("upstream returned status %d: %s", resp.StatusCode, errorMsg)
}
// 4. Stream the successful response back to the client
for key, values := range resp.Header {
for _, value := range values {
c.Header(key, value)
// Apply the channel-specific modifications
if modifier != nil {
modifier(req, apiKey)
}
// Remove headers that should not be forwarded
req.Header.Del("Cookie")
req.Header.Del("X-Real-Ip")
req.Header.Del("X-Forwarded-For")
}
c.Status(http.StatusOK)
_, err = io.Copy(c.Writer, resp.Body)
errorHandler := func(rw http.ResponseWriter, req *http.Request, err error) {
logrus.WithFields(logrus.Fields{
"channel": b.Name,
"key_id": apiKey.ID,
"error": err,
}).Error("HTTP proxy error")
rw.WriteHeader(http.StatusBadGateway)
}
proxy := &httputil.ReverseProxy{
Director: director,
ErrorHandler: errorHandler,
Transport: b.HTTPClient.Transport,
}
// Check if the client request is for a streaming endpoint
if isStreamingRequest(c) {
return b.handleStreaming(c, proxy)
}
proxy.ServeHTTP(c.Writer, c.Request)
return nil
}
func (b *BaseChannel) handleStreaming(c *gin.Context, proxy *httputil.ReverseProxy) error {
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
// Use a pipe to avoid buffering the entire response
pr, pw := io.Pipe()
defer pr.Close()
// Create a new request with the pipe reader as the body
// This is a bit of a hack to get ReverseProxy to stream
req := c.Request.Clone(c.Request.Context())
req.Body = pr
// Start the proxy in a goroutine
go func() {
defer pw.Close()
proxy.ServeHTTP(c.Writer, req)
}()
// Copy the original request body to the pipe writer
_, err := io.Copy(pw, c.Request.Body)
if err != nil {
logrus.Errorf("Failed to copy response body to client: %v", err)
return fmt.Errorf("copy response body failed: %w", err)
logrus.Errorf("Error copying request body to pipe: %v", err)
return err
}
return nil
}
func (ch *BaseChannel) createUpstreamRequest(c *gin.Context, apiKey *models.APIKey, modifier RequestModifier) (*http.Request, error) {
targetURL := *ch.BaseURL
targetURL.Path = c.Param("path")
body, err := io.ReadAll(c.Request.Body)
if err != nil {
return nil, fmt.Errorf("failed to read request body: %w", err)
}
c.Request.Body = io.NopCloser(bytes.NewBuffer(body))
req, err := http.NewRequestWithContext(c.Request.Context(), c.Request.Method, targetURL.String(), bytes.NewBuffer(body))
if err != nil {
return nil, fmt.Errorf("failed to create new request: %w", err)
// isStreamingRequest checks if the request is for a streaming response.
func isStreamingRequest(c *gin.Context) bool {
// For Gemini, streaming is indicated by the path.
if strings.Contains(c.Request.URL.Path, ":streamGenerateContent") {
return true
}
req.Header = c.Request.Header.Clone()
req.Host = ch.BaseURL.Host
// Apply the channel-specific modifications
if modifier != nil {
modifier(req, apiKey)
// For OpenAI, streaming is indicated by a "stream": true field in the JSON body.
// We use ShouldBindBodyWith to check the body without consuming it, so it can be read again by the proxy.
type streamPayload struct {
Stream bool `json:"stream"`
}
var p streamPayload
if err := c.ShouldBindBodyWith(&p, binding.JSON); err == nil {
return p.Stream
}
return req, nil
return false
}
func (ch *BaseChannel) getErrorMessage(resp *http.Response) string {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Sprintf("HTTP %d (failed to read error body: %v)", resp.StatusCode, err)
// singleJoiningSlash joins two URL paths with a single slash.
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
var errorMessage string
if resp.Header.Get("Content-Encoding") == "gzip" {
reader, gErr := gzip.NewReader(bytes.NewReader(bodyBytes))
if gErr != nil {
return string(bodyBytes)
}
defer reader.Close()
uncompressedBytes, rErr := io.ReadAll(reader)
if rErr != nil {
return fmt.Sprintf("gzip read error: %v", rErr)
}
errorMessage = string(uncompressedBytes)
} else {
errorMessage = string(bodyBytes)
}
if strings.TrimSpace(errorMessage) == "" {
return fmt.Sprintf("HTTP %d: %s", resp.StatusCode, http.StatusText(resp.StatusCode))
}
return errorMessage
}
return a + b
}