Compare commits

...

8 Commits

Author SHA1 Message Date
RPRX
8deb953aec v1.8.20 2024-07-20 06:10:40 +00:00
ll11l1lIllIl1lll
a0040f13dd SplitHTTP: Server supports HTTP/3 (#3554)
Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com>
2024-07-19 17:53:47 +00:00
ll11l1lIllIl1lll
d8994b7603 Fix SplitHTTP H3 crash on v2rayNG (#3559)
Fixes https://github.com/XTLS/Xray-core/issues/3556
2024-07-19 17:52:34 +00:00
RPRX
b277bacdf6 v1.8.19 2024-07-17 13:51:21 +00:00
RPRX
9288a7c0dc Upgrade dependencies 2024-07-17 13:45:16 +00:00
ll11l1lIllIl1lll
c40fc44a34 SplitHTTP: Client supports HTTP/3 (#3543)
Closes https://github.com/XTLS/Xray-core/issues/3456

Co-authored-by: Fangliding <Fangliding.fshxy@outlook.com>
Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com>
2024-07-17 07:55:28 -05:00
yuhan6665
02cd3b8c74 Fix SplitHTTP race condition when creating new sessions (#3533)
Co-authored-by: nobody <nobody@nowhere.mars>
Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com>
2024-07-17 11:41:17 +00:00
风扇滑翔翼
a7e198e1e2 Fix WS reading X-Forwarded-For & Add tests (#3546)
Fixes https://github.com/XTLS/Xray-core/issues/3545

---------

Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com>
2024-07-17 10:40:25 +00:00
10 changed files with 201 additions and 56 deletions

View File

@@ -21,7 +21,7 @@ import (
var (
Version_x byte = 1
Version_y byte = 8
Version_z byte = 18
Version_z byte = 20
)
var (

3
go.mod
View File

@@ -16,7 +16,7 @@ require (
github.com/refraction-networking/utls v1.6.7
github.com/sagernet/sing v0.4.1
github.com/sagernet/sing-shadowsocks v0.2.7
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771
github.com/stretchr/testify v1.9.0
github.com/v2fly/ss-bloomring v0.0.0-20210312155135-28617310f63e
github.com/vishvananda/netlink v1.2.1-beta.2.0.20230316163032-ced5aaba43e3
@@ -46,6 +46,7 @@ require (
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/onsi/ginkgo/v2 v2.19.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
go.uber.org/mock v0.4.0 // indirect

6
go.sum
View File

@@ -110,6 +110,8 @@ github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/quic-go v0.45.1 h1:tPfeYCk+uZHjmDRwHHQmvHRYL2t44ROTujLeFVBmjCA=
github.com/quic-go/quic-go v0.45.1/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI=
github.com/refraction-networking/utls v1.6.7 h1:zVJ7sP1dJx/WtVuITug3qYUq034cDq9B2MR1K67ULZM=
@@ -121,8 +123,8 @@ github.com/sagernet/sing v0.4.1 h1:zVlpE+7k7AFoC2pv6ReqLf0PIHjihL/jsBl5k05PQFk=
github.com/sagernet/sing v0.4.1/go.mod h1:ieZHA/+Y9YZfXs2I3WtuwgyCZ6GPsIR7HdKb1SdEnls=
github.com/sagernet/sing-shadowsocks v0.2.7 h1:zaopR1tbHEw5Nk6FAkM05wCslV6ahVegEZaKMv9ipx8=
github.com/sagernet/sing-shadowsocks v0.2.7/go.mod h1:0rIKJZBR65Qi0zwdKezt4s57y/Tl1ofkaq6NlkzVuyE=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY=
github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM=

View File

@@ -151,7 +151,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
return
}
_, err = c.Write([]byte("Response"))
_, err = c.Write([]byte(c.RemoteAddr().String()))
common.Must(err)
}(conn)
})
@@ -169,7 +169,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
var b [1024]byte
n, err := conn.Read(b[:])
common.Must(err)
if string(b[:n]) != "Response" {
if string(b[:n]) != "1.1.1.1:0" {
t.Error("response: ", string(b[:n]))
}

View File

@@ -32,6 +32,7 @@ type DefaultDialerClient struct {
download *http.Client
upload *http.Client
isH2 bool
isH3 bool
// pool of net.Conn, created using dialUploadConn
uploadRawPool *sync.Pool
dialUploadConn func(ctxInner context.Context) (net.Conn, error)
@@ -118,7 +119,7 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string,
}
req.Header = c.transportConfig.GetRequestHeader()
if c.isH2 {
if c.isH2 || c.isH3 {
resp, err := c.upload.Do(req)
if err != nil {
return err

View File

@@ -10,6 +10,8 @@ import (
"sync"
"time"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
@@ -50,12 +52,9 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
return client
}
if browser_dialer.HasBrowserDialer() {
return &BrowserDialerClient{}
}
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1")
isH3 := tlsConfig != nil && (len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "h3")
var gotlsConfig *gotls.Config
@@ -83,10 +82,56 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
return conn, nil
}
var uploadTransport http.RoundTripper
var downloadTransport http.RoundTripper
var uploadTransport http.RoundTripper
if isH2 {
if isH3 {
dest.Network = net.Network_UDP
quicConfig := &quic.Config{
HandshakeIdleTimeout: 10 * time.Second,
MaxIdleTimeout: 90 * time.Second,
KeepAlivePeriod: 3 * time.Second,
Allow0RTT: true,
}
roundTripper := &http3.RoundTripper{
TLSClientConfig: gotlsConfig,
QUICConfig: quicConfig,
Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (quic.EarlyConnection, error) {
conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
if err != nil {
return nil, err
}
var udpConn *net.UDPConn
var udpAddr *net.UDPAddr
switch c := conn.(type) {
case *internet.PacketConnWrapper:
var ok bool
udpConn, ok = c.Conn.(*net.UDPConn)
if !ok {
return nil, errors.New("PacketConnWrapper does not contain a UDP connection")
}
udpAddr, err = net.ResolveUDPAddr("udp", c.Dest.String())
if err != nil {
return nil, err
}
case *net.UDPConn:
udpConn = c
udpAddr, err = net.ResolveUDPAddr("udp", c.RemoteAddr().String())
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported connection type: %T", conn)
}
return quic.DialEarly(ctx, udpConn, udpAddr, tlsCfg, cfg)
},
}
downloadTransport = roundTripper
uploadTransport = roundTripper
} else if isH2 {
downloadTransport = &http2.Transport{
DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) {
return dialContext(ctxInner)
@@ -107,7 +152,6 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
// http.Client and our custom dial context.
DisableKeepAlives: true,
}
// we use uploadRawPool for that
uploadTransport = nil
}
@@ -121,6 +165,7 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
Transport: uploadTransport,
},
isH2: isH2,
isH3: isH3,
uploadRawPool: &sync.Pool{},
dialUploadConn: dialContext,
}

View File

@@ -11,6 +11,8 @@ import (
"sync"
"time"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
@@ -27,6 +29,7 @@ type requestHandler struct {
host string
path string
ln *Listener
sessionMu *sync.Mutex
sessions sync.Map
localAddr gonet.TCPAddr
}
@@ -56,11 +59,21 @@ func (h *requestHandler) maybeReapSession(isFullyConnected *done.Instance, sessi
}
func (h *requestHandler) upsertSession(sessionId string) *httpSession {
// fast path
currentSessionAny, ok := h.sessions.Load(sessionId)
if ok {
return currentSessionAny.(*httpSession)
}
// slow path
h.sessionMu.Lock()
defer h.sessionMu.Unlock()
currentSessionAny, ok = h.sessions.Load(sessionId)
if ok {
return currentSessionAny.(*httpSession)
}
s := &httpSession{
uploadQueue: NewUploadQueue(int(2 * h.ln.config.GetNormalizedMaxConcurrentUploads())),
isFullyConnected: done.New(),
@@ -223,9 +236,12 @@ func (c *httpResponseBodyWriter) Close() error {
type Listener struct {
sync.Mutex
server http.Server
h3server *http3.Server
listener net.Listener
h3listener *quic.EarlyListener
config *Config
addConn internet.ConnHandler
isH3 bool
}
func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) {
@@ -242,6 +258,17 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
var listener net.Listener
var err error
var localAddr = gonet.TCPAddr{}
handler := &requestHandler{
host: shSettings.Host,
path: shSettings.GetNormalizedPath(),
ln: l,
sessionMu: &sync.Mutex{},
sessions: sync.Map{},
localAddr: localAddr,
}
tlsConfig := getTLSConfig(streamSettings)
l.isH3 = len(tlsConfig.NextProtos) == 1 && tlsConfig.NextProtos[0] == "h3"
if port == net.Port(0) { // unix
listener, err = internet.ListenSystem(ctx, &net.UnixAddr{
@@ -252,6 +279,29 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
return nil, errors.New("failed to listen unix domain socket(for SH) on ", address).Base(err)
}
errors.LogInfo(ctx, "listening unix domain socket(for SH) on ", address)
} else if l.isH3 { // quic
Conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
IP: address.IP(),
Port: int(port),
}, streamSettings.SocketSettings)
if err != nil {
return nil, errors.New("failed to listen UDP(for SH3) on ", address, ":", port).Base(err)
}
h3listener, err := quic.ListenEarly(Conn,tlsConfig, nil)
if err != nil {
return nil, errors.New("failed to listen QUIC(for SH3) on ", address, ":", port).Base(err)
}
l.h3listener = h3listener
errors.LogInfo(ctx, "listening QUIC(for SH3) on ", address, ":", port)
l.h3server = &http3.Server{
Handler: handler,
}
go func() {
if err := l.h3server.ServeListener(l.h3listener); err != nil {
errors.LogWarningInner(ctx, err, "failed to serve http3 for splithttp")
}
}()
} else { // tcp
localAddr = gonet.TCPAddr{
IP: address.IP(),
@@ -264,39 +314,28 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
if err != nil {
return nil, errors.New("failed to listen TCP(for SH) on ", address, ":", port).Base(err)
}
l.listener = listener
errors.LogInfo(ctx, "listening TCP(for SH) on ", address, ":", port)
}
if config := v2tls.ConfigFromStreamSettings(streamSettings); config != nil {
if tlsConfig := config.GetTLSConfig(); tlsConfig != nil {
listener = tls.NewListener(listener, tlsConfig)
}
}
handler := &requestHandler{
host: shSettings.Host,
path: shSettings.GetNormalizedPath(),
ln: l,
sessions: sync.Map{},
localAddr: localAddr,
}
// h2cHandler can handle both plaintext HTTP/1.1 and h2c
h2cHandler := h2c.NewHandler(handler, &http2.Server{})
l.listener = listener
l.server = http.Server{
Handler: h2cHandler,
ReadHeaderTimeout: time.Second * 4,
MaxHeaderBytes: 8192,
}
go func() {
if err := l.server.Serve(l.listener); err != nil {
errors.LogWarningInner(ctx, err, "failed to serve http for splithttp")
}
}()
}
l.listener = listener
if config := v2tls.ConfigFromStreamSettings(streamSettings); config != nil {
if tlsConfig := config.GetTLSConfig(); tlsConfig != nil {
listener = tls.NewListener(listener, tlsConfig)
}
}
return l, err
}
@@ -308,9 +347,22 @@ func (ln *Listener) Addr() net.Addr {
// Close implements net.Listener.Close().
func (ln *Listener) Close() error {
if ln.h3server != nil {
if err := ln.h3server.Close(); err != nil {
return err
}
} else if ln.listener != nil {
return ln.listener.Close()
}
return errors.New("listener does not have an HTTP/3 server or a net.listener")
}
func getTLSConfig(streamSettings *internet.MemoryStreamConfig) *tls.Config {
config := v2tls.ConfigFromStreamSettings(streamSettings)
if config == nil {
return &tls.Config{}
}
return config.GetTLSConfig()
}
func init() {
common.Must(internet.RegisterTransportListener(protocolName, ListenSH))
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol/tls/cert"
"github.com/xtls/xray-core/testing/servers/tcp"
"github.com/xtls/xray-core/testing/servers/udp"
"github.com/xtls/xray-core/transport/internet"
. "github.com/xtls/xray-core/transport/internet/splithttp"
"github.com/xtls/xray-core/transport/internet/stat"
@@ -63,8 +64,8 @@ func Test_listenSHAndDial(t *testing.T) {
}
common.Must(conn.Close())
<-time.After(time.Second * 5)
conn, err = Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)
common.Must(err)
_, err = conn.Write([]byte("Test connection 2"))
common.Must(err)
@@ -96,7 +97,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
return
}
_, err = c.Write([]byte("Response"))
_, err = c.Write([]byte(c.RemoteAddr().String()))
common.Must(err)
}(conn)
})
@@ -113,7 +114,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
var b [1024]byte
n, _ := conn.Read(b[:])
if string(b[:n]) != "Response" {
if string(b[:n]) != "1.1.1.1:0" {
t.Error("response: ", string(b[:n]))
}
@@ -204,3 +205,42 @@ func Test_listenSHAndDial_H2C(t *testing.T) {
t.Error("Expected h2 but got:", resp.ProtoMajor)
}
}
func Test_listenSHAndDial_QUIC(t *testing.T) {
if runtime.GOARCH == "arm64" {
return
}
listenPort := udp.PickPort()
start := time.Now()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{
Path: "shs",
},
SecurityType: "tls",
SecuritySettings: &tls.Config{
AllowInsecure: true,
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.CommonName("localhost")))},
NextProtocol: []string{"h3"},
},
}
listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) {
go func() {
_ = conn.Close()
}()
})
common.Must(err)
defer listen.Close()
conn, err := Dial(context.Background(), net.UDPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)
common.Must(err)
_ = conn.Close()
end := time.Now()
if !end.Before(start.Add(time.Second * 5)) {
t.Error("end: ", end, " start: ", start)
}
}

View File

@@ -14,14 +14,18 @@ import (
var _ buf.Writer = (*connection)(nil)
// connection is a wrapper for net.Conn over WebSocket connection.
// remoteAddr is used to pass "virtual" remote IP addresses in X-Forwarded-For.
// so we shouldn't directly read it form conn.
type connection struct {
conn *websocket.Conn
reader io.Reader
remoteAddr net.Addr
}
func NewConnection(conn *websocket.Conn, remoteAddr net.Addr, extraReader io.Reader) *connection {
return &connection{
conn: conn,
remoteAddr: remoteAddr,
reader: extraReader,
}
}
@@ -90,7 +94,7 @@ func (c *connection) LocalAddr() net.Addr {
}
func (c *connection) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
return c.remoteAddr
}
func (c *connection) SetDeadline(t time.Time) error {

View File

@@ -91,7 +91,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
return
}
_, err = c.Write([]byte("Response"))
_, err = c.Write([]byte(c.RemoteAddr().String()))
common.Must(err)
}(conn)
})
@@ -109,7 +109,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
var b [1024]byte
n, err := conn.Read(b[:])
common.Must(err)
if string(b[:n]) != "Response" {
if string(b[:n]) != "1.1.1.1:0" {
t.Error("response: ", string(b[:n]))
}