Compare commits

..

13 Commits

Author SHA1 Message Date
RPRX
c27d652d80 v1.8.21 2024-07-21 21:32:26 +00:00
RPRX
0f65aa8ed8 Fix SplitHTTP H3 waited for downResponse before uploading
https://github.com/XTLS/Xray-core/issues/3560#issuecomment-2241750579
2024-07-21 20:45:05 +00:00
RPRX
22535d8643 Fix SplitHTTP H3 didn't always reuse QUIC connection
https://github.com/XTLS/Xray-core/issues/3560#issuecomment-2241531502
2024-07-21 08:55:03 +00:00
mmmray
529f206d33 Fix serverside TLS support of SplitHTTP H1/H2 (#3567)
Fix #3566

Also update testsuite so that all tests read and write some data. Opening a connection is not enough to trigger connection errors, because the connection is so lazy.
2024-07-20 19:35:24 -05:00
チセ
964859b4bc SplitHTTP: Remove unnecessary keepalives (#3565)
Remove keep alive since quic-go/http3 doesn't support stream reuse
Discussion see https://t.me/projectXray/3782492

Co-authored-by: Fangliding <Fangliding.fshxy@outlook.com>
Co-authored-by: xqzr <34030394+xqzr@users.noreply.github.com>
Co-authored-by: ll11l1lIllIl1lll <88377095+ll11l1lIllIl1lll@users.noreply.github.com>
2024-07-20 19:34:57 -05:00
RPRX
8deb953aec v1.8.20 2024-07-20 06:10:40 +00:00
ll11l1lIllIl1lll
a0040f13dd SplitHTTP: Server supports HTTP/3 (#3554)
Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com>
2024-07-19 17:53:47 +00:00
ll11l1lIllIl1lll
d8994b7603 Fix SplitHTTP H3 crash on v2rayNG (#3559)
Fixes https://github.com/XTLS/Xray-core/issues/3556
2024-07-19 17:52:34 +00:00
RPRX
b277bacdf6 v1.8.19 2024-07-17 13:51:21 +00:00
RPRX
9288a7c0dc Upgrade dependencies 2024-07-17 13:45:16 +00:00
ll11l1lIllIl1lll
c40fc44a34 SplitHTTP: Client supports HTTP/3 (#3543)
Closes https://github.com/XTLS/Xray-core/issues/3456

Co-authored-by: Fangliding <Fangliding.fshxy@outlook.com>
Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com>
2024-07-17 07:55:28 -05:00
yuhan6665
02cd3b8c74 Fix SplitHTTP race condition when creating new sessions (#3533)
Co-authored-by: nobody <nobody@nowhere.mars>
Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com>
2024-07-17 11:41:17 +00:00
风扇滑翔翼
a7e198e1e2 Fix WS reading X-Forwarded-For & Add tests (#3546)
Fixes https://github.com/XTLS/Xray-core/issues/3545

---------

Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com>
2024-07-17 10:40:25 +00:00
10 changed files with 264 additions and 62 deletions

View File

@@ -21,7 +21,7 @@ import (
var ( var (
Version_x byte = 1 Version_x byte = 1
Version_y byte = 8 Version_y byte = 8
Version_z byte = 18 Version_z byte = 21
) )
var ( var (

3
go.mod
View File

@@ -16,7 +16,7 @@ require (
github.com/refraction-networking/utls v1.6.7 github.com/refraction-networking/utls v1.6.7
github.com/sagernet/sing v0.4.1 github.com/sagernet/sing v0.4.1
github.com/sagernet/sing-shadowsocks v0.2.7 github.com/sagernet/sing-shadowsocks v0.2.7
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0
github.com/v2fly/ss-bloomring v0.0.0-20210312155135-28617310f63e github.com/v2fly/ss-bloomring v0.0.0-20210312155135-28617310f63e
github.com/vishvananda/netlink v1.2.1-beta.2.0.20230316163032-ced5aaba43e3 github.com/vishvananda/netlink v1.2.1-beta.2.0.20230316163032-ced5aaba43e3
@@ -46,6 +46,7 @@ require (
github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/onsi/ginkgo/v2 v2.19.0 // indirect github.com/onsi/ginkgo/v2 v2.19.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 // indirect github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 // indirect
github.com/vishvananda/netns v0.0.4 // indirect github.com/vishvananda/netns v0.0.4 // indirect
go.uber.org/mock v0.4.0 // indirect go.uber.org/mock v0.4.0 // indirect

6
go.sum
View File

@@ -110,6 +110,8 @@ github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/quic-go v0.45.1 h1:tPfeYCk+uZHjmDRwHHQmvHRYL2t44ROTujLeFVBmjCA= github.com/quic-go/quic-go v0.45.1 h1:tPfeYCk+uZHjmDRwHHQmvHRYL2t44ROTujLeFVBmjCA=
github.com/quic-go/quic-go v0.45.1/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI= github.com/quic-go/quic-go v0.45.1/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI=
github.com/refraction-networking/utls v1.6.7 h1:zVJ7sP1dJx/WtVuITug3qYUq034cDq9B2MR1K67ULZM= github.com/refraction-networking/utls v1.6.7 h1:zVJ7sP1dJx/WtVuITug3qYUq034cDq9B2MR1K67ULZM=
@@ -121,8 +123,8 @@ github.com/sagernet/sing v0.4.1 h1:zVlpE+7k7AFoC2pv6ReqLf0PIHjihL/jsBl5k05PQFk=
github.com/sagernet/sing v0.4.1/go.mod h1:ieZHA/+Y9YZfXs2I3WtuwgyCZ6GPsIR7HdKb1SdEnls= github.com/sagernet/sing v0.4.1/go.mod h1:ieZHA/+Y9YZfXs2I3WtuwgyCZ6GPsIR7HdKb1SdEnls=
github.com/sagernet/sing-shadowsocks v0.2.7 h1:zaopR1tbHEw5Nk6FAkM05wCslV6ahVegEZaKMv9ipx8= github.com/sagernet/sing-shadowsocks v0.2.7 h1:zaopR1tbHEw5Nk6FAkM05wCslV6ahVegEZaKMv9ipx8=
github.com/sagernet/sing-shadowsocks v0.2.7/go.mod h1:0rIKJZBR65Qi0zwdKezt4s57y/Tl1ofkaq6NlkzVuyE= github.com/sagernet/sing-shadowsocks v0.2.7/go.mod h1:0rIKJZBR65Qi0zwdKezt4s57y/Tl1ofkaq6NlkzVuyE=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U= github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY= github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY=
github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM= github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM=

View File

@@ -151,7 +151,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
return return
} }
_, err = c.Write([]byte("Response")) _, err = c.Write([]byte(c.RemoteAddr().String()))
common.Must(err) common.Must(err)
}(conn) }(conn)
}) })
@@ -169,7 +169,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
var b [1024]byte var b [1024]byte
n, err := conn.Read(b[:]) n, err := conn.Read(b[:])
common.Must(err) common.Must(err)
if string(b[:n]) != "Response" { if string(b[:n]) != "1.1.1.1:0" {
t.Error("response: ", string(b[:n])) t.Error("response: ", string(b[:n]))
} }

View File

@@ -32,6 +32,7 @@ type DefaultDialerClient struct {
download *http.Client download *http.Client
upload *http.Client upload *http.Client
isH2 bool isH2 bool
isH3 bool
// pool of net.Conn, created using dialUploadConn // pool of net.Conn, created using dialUploadConn
uploadRawPool *sync.Pool uploadRawPool *sync.Pool
dialUploadConn func(ctxInner context.Context) (net.Conn, error) dialUploadConn func(ctxInner context.Context) (net.Conn, error)
@@ -93,6 +94,10 @@ func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string)
gotDownResponse.Close() gotDownResponse.Close()
}() }()
if c.isH3 {
gotConn.Close()
}
// we want to block Dial until we know the remote address of the server, // we want to block Dial until we know the remote address of the server,
// for logging purposes // for logging purposes
<-gotConn.Wait() <-gotConn.Wait()
@@ -118,7 +123,7 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string,
} }
req.Header = c.transportConfig.GetRequestHeader() req.Header = c.transportConfig.GetRequestHeader()
if c.isH2 { if c.isH2 || c.isH3 {
resp, err := c.upload.Do(req) resp, err := c.upload.Do(req)
if err != nil { if err != nil {
return err return err

View File

@@ -10,6 +10,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
@@ -39,6 +41,10 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
return &BrowserDialerClient{} return &BrowserDialerClient{}
} }
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1")
isH3 := tlsConfig != nil && (len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "h3")
globalDialerAccess.Lock() globalDialerAccess.Lock()
defer globalDialerAccess.Unlock() defer globalDialerAccess.Unlock()
@@ -46,17 +52,13 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
globalDialerMap = make(map[dialerConf]DialerClient) globalDialerMap = make(map[dialerConf]DialerClient)
} }
if isH3 {
dest.Network = net.Network_UDP
}
if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found { if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found {
return client return client
} }
if browser_dialer.HasBrowserDialer() {
return &BrowserDialerClient{}
}
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1")
var gotlsConfig *gotls.Config var gotlsConfig *gotls.Config
if tlsConfig != nil { if tlsConfig != nil {
@@ -83,10 +85,48 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
return conn, nil return conn, nil
} }
var uploadTransport http.RoundTripper
var downloadTransport http.RoundTripper var downloadTransport http.RoundTripper
var uploadTransport http.RoundTripper
if isH2 { if isH3 {
roundTripper := &http3.RoundTripper{
TLSClientConfig: gotlsConfig,
Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (quic.EarlyConnection, error) {
conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
if err != nil {
return nil, err
}
var udpConn *net.UDPConn
var udpAddr *net.UDPAddr
switch c := conn.(type) {
case *internet.PacketConnWrapper:
var ok bool
udpConn, ok = c.Conn.(*net.UDPConn)
if !ok {
return nil, errors.New("PacketConnWrapper does not contain a UDP connection")
}
udpAddr, err = net.ResolveUDPAddr("udp", c.Dest.String())
if err != nil {
return nil, err
}
case *net.UDPConn:
udpConn = c
udpAddr, err = net.ResolveUDPAddr("udp", c.RemoteAddr().String())
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported connection type: %T", conn)
}
return quic.DialEarly(ctx, udpConn, udpAddr, tlsCfg, cfg)
},
}
downloadTransport = roundTripper
uploadTransport = roundTripper
} else if isH2 {
downloadTransport = &http2.Transport{ downloadTransport = &http2.Transport{
DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) { DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) {
return dialContext(ctxInner) return dialContext(ctxInner)
@@ -107,7 +147,6 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
// http.Client and our custom dial context. // http.Client and our custom dial context.
DisableKeepAlives: true, DisableKeepAlives: true,
} }
// we use uploadRawPool for that // we use uploadRawPool for that
uploadTransport = nil uploadTransport = nil
} }
@@ -121,6 +160,7 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
Transport: uploadTransport, Transport: uploadTransport,
}, },
isH2: isH2, isH2: isH2,
isH3: isH3,
uploadRawPool: &sync.Pool{}, uploadRawPool: &sync.Pool{},
dialUploadConn: dialContext, dialUploadConn: dialContext,
} }

View File

@@ -11,6 +11,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
@@ -27,6 +29,7 @@ type requestHandler struct {
host string host string
path string path string
ln *Listener ln *Listener
sessionMu *sync.Mutex
sessions sync.Map sessions sync.Map
localAddr gonet.TCPAddr localAddr gonet.TCPAddr
} }
@@ -56,11 +59,21 @@ func (h *requestHandler) maybeReapSession(isFullyConnected *done.Instance, sessi
} }
func (h *requestHandler) upsertSession(sessionId string) *httpSession { func (h *requestHandler) upsertSession(sessionId string) *httpSession {
// fast path
currentSessionAny, ok := h.sessions.Load(sessionId) currentSessionAny, ok := h.sessions.Load(sessionId)
if ok { if ok {
return currentSessionAny.(*httpSession) return currentSessionAny.(*httpSession)
} }
// slow path
h.sessionMu.Lock()
defer h.sessionMu.Unlock()
currentSessionAny, ok = h.sessions.Load(sessionId)
if ok {
return currentSessionAny.(*httpSession)
}
s := &httpSession{ s := &httpSession{
uploadQueue: NewUploadQueue(int(2 * h.ln.config.GetNormalizedMaxConcurrentUploads())), uploadQueue: NewUploadQueue(int(2 * h.ln.config.GetNormalizedMaxConcurrentUploads())),
isFullyConnected: done.New(), isFullyConnected: done.New(),
@@ -222,10 +235,13 @@ func (c *httpResponseBodyWriter) Close() error {
type Listener struct { type Listener struct {
sync.Mutex sync.Mutex
server http.Server server http.Server
listener net.Listener h3server *http3.Server
config *Config listener net.Listener
addConn internet.ConnHandler h3listener *quic.EarlyListener
config *Config
addConn internet.ConnHandler
isH3 bool
} }
func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) { func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) {
@@ -242,6 +258,16 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
var listener net.Listener var listener net.Listener
var err error var err error
var localAddr = gonet.TCPAddr{} var localAddr = gonet.TCPAddr{}
handler := &requestHandler{
host: shSettings.Host,
path: shSettings.GetNormalizedPath(),
ln: l,
sessionMu: &sync.Mutex{},
sessions: sync.Map{},
localAddr: localAddr,
}
tlsConfig := getTLSConfig(streamSettings)
l.isH3 = len(tlsConfig.NextProtos) == 1 && tlsConfig.NextProtos[0] == "h3"
if port == net.Port(0) { // unix if port == net.Port(0) { // unix
listener, err = internet.ListenSystem(ctx, &net.UnixAddr{ listener, err = internet.ListenSystem(ctx, &net.UnixAddr{
@@ -252,6 +278,29 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
return nil, errors.New("failed to listen unix domain socket(for SH) on ", address).Base(err) return nil, errors.New("failed to listen unix domain socket(for SH) on ", address).Base(err)
} }
errors.LogInfo(ctx, "listening unix domain socket(for SH) on ", address) errors.LogInfo(ctx, "listening unix domain socket(for SH) on ", address)
} else if l.isH3 { // quic
Conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
IP: address.IP(),
Port: int(port),
}, streamSettings.SocketSettings)
if err != nil {
return nil, errors.New("failed to listen UDP(for SH3) on ", address, ":", port).Base(err)
}
h3listener, err := quic.ListenEarly(Conn, tlsConfig, nil)
if err != nil {
return nil, errors.New("failed to listen QUIC(for SH3) on ", address, ":", port).Base(err)
}
l.h3listener = h3listener
errors.LogInfo(ctx, "listening QUIC(for SH3) on ", address, ":", port)
l.h3server = &http3.Server{
Handler: handler,
}
go func() {
if err := l.h3server.ServeListener(l.h3listener); err != nil {
errors.LogWarningInner(ctx, err, "failed to serve http3 for splithttp")
}
}()
} else { // tcp } else { // tcp
localAddr = gonet.TCPAddr{ localAddr = gonet.TCPAddr{
IP: address.IP(), IP: address.IP(),
@@ -265,38 +314,32 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
return nil, errors.New("failed to listen TCP(for SH) on ", address, ":", port).Base(err) return nil, errors.New("failed to listen TCP(for SH) on ", address, ":", port).Base(err)
} }
errors.LogInfo(ctx, "listening TCP(for SH) on ", address, ":", port) errors.LogInfo(ctx, "listening TCP(for SH) on ", address, ":", port)
}
if config := v2tls.ConfigFromStreamSettings(streamSettings); config != nil { // h2cHandler can handle both plaintext HTTP/1.1 and h2c
if tlsConfig := config.GetTLSConfig(); tlsConfig != nil { h2cHandler := h2c.NewHandler(handler, &http2.Server{})
listener = tls.NewListener(listener, tlsConfig) l.server = http.Server{
Handler: h2cHandler,
ReadHeaderTimeout: time.Second * 4,
MaxHeaderBytes: 8192,
} }
} }
handler := &requestHandler{ // tcp/unix (h1/h2)
host: shSettings.Host, if listener != nil {
path: shSettings.GetNormalizedPath(), if config := v2tls.ConfigFromStreamSettings(streamSettings); config != nil {
ln: l, if tlsConfig := config.GetTLSConfig(); tlsConfig != nil {
sessions: sync.Map{}, listener = tls.NewListener(listener, tlsConfig)
localAddr: localAddr, }
}
// h2cHandler can handle both plaintext HTTP/1.1 and h2c
h2cHandler := h2c.NewHandler(handler, &http2.Server{})
l.listener = listener
l.server = http.Server{
Handler: h2cHandler,
ReadHeaderTimeout: time.Second * 4,
MaxHeaderBytes: 8192,
}
go func() {
if err := l.server.Serve(l.listener); err != nil {
errors.LogWarningInner(ctx, err, "failed to serve http for splithttp")
} }
}()
l.listener = listener
go func() {
if err := l.server.Serve(l.listener); err != nil {
errors.LogWarningInner(ctx, err, "failed to serve http for splithttp")
}
}()
}
return l, err return l, err
} }
@@ -308,9 +351,22 @@ func (ln *Listener) Addr() net.Addr {
// Close implements net.Listener.Close(). // Close implements net.Listener.Close().
func (ln *Listener) Close() error { func (ln *Listener) Close() error {
return ln.listener.Close() if ln.h3server != nil {
if err := ln.h3server.Close(); err != nil {
return err
}
} else if ln.listener != nil {
return ln.listener.Close()
}
return errors.New("listener does not have an HTTP/3 server or a net.listener")
}
func getTLSConfig(streamSettings *internet.MemoryStreamConfig) *tls.Config {
config := v2tls.ConfigFromStreamSettings(streamSettings)
if config == nil {
return &tls.Config{}
}
return config.GetTLSConfig()
} }
func init() { func init() {
common.Must(internet.RegisterTransportListener(protocolName, ListenSH)) common.Must(internet.RegisterTransportListener(protocolName, ListenSH))
} }

View File

@@ -2,6 +2,7 @@ package splithttp_test
import ( import (
"context" "context"
"crypto/rand"
gotls "crypto/tls" gotls "crypto/tls"
"fmt" "fmt"
gonet "net" gonet "net"
@@ -10,10 +11,13 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol/tls/cert" "github.com/xtls/xray-core/common/protocol/tls/cert"
"github.com/xtls/xray-core/testing/servers/tcp" "github.com/xtls/xray-core/testing/servers/tcp"
"github.com/xtls/xray-core/testing/servers/udp"
"github.com/xtls/xray-core/transport/internet" "github.com/xtls/xray-core/transport/internet"
. "github.com/xtls/xray-core/transport/internet/splithttp" . "github.com/xtls/xray-core/transport/internet/splithttp"
"github.com/xtls/xray-core/transport/internet/stat" "github.com/xtls/xray-core/transport/internet/stat"
@@ -63,8 +67,8 @@ func Test_listenSHAndDial(t *testing.T) {
} }
common.Must(conn.Close()) common.Must(conn.Close())
<-time.After(time.Second * 5)
conn, err = Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) conn, err = Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)
common.Must(err) common.Must(err)
_, err = conn.Write([]byte("Test connection 2")) _, err = conn.Write([]byte("Test connection 2"))
common.Must(err) common.Must(err)
@@ -96,7 +100,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
return return
} }
_, err = c.Write([]byte("Response")) _, err = c.Write([]byte(c.RemoteAddr().String()))
common.Must(err) common.Must(err)
}(conn) }(conn)
}) })
@@ -113,7 +117,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
var b [1024]byte var b [1024]byte
n, _ := conn.Read(b[:]) n, _ := conn.Read(b[:])
if string(b[:n]) != "Response" { if string(b[:n]) != "1.1.1.1:0" {
t.Error("response: ", string(b[:n])) t.Error("response: ", string(b[:n]))
} }
@@ -142,7 +146,16 @@ func Test_listenSHAndDial_TLS(t *testing.T) {
} }
listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) { listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) {
go func() { go func() {
_ = conn.Close() defer conn.Close()
var b [1024]byte
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
_, err := conn.Read(b[:])
if err != nil {
return
}
common.Must2(conn.Write([]byte("Response")))
}() }()
}) })
common.Must(err) common.Must(err)
@@ -150,7 +163,15 @@ func Test_listenSHAndDial_TLS(t *testing.T) {
conn, err := Dial(context.Background(), net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) conn, err := Dial(context.Background(), net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)
common.Must(err) common.Must(err)
_ = conn.Close()
_, err = conn.Write([]byte("Test connection 1"))
common.Must(err)
var b [1024]byte
n, _ := conn.Read(b[:])
if string(b[:n]) != "Response" {
t.Error("response: ", string(b[:n]))
}
end := time.Now() end := time.Now()
if !end.Before(start.Add(time.Second * 5)) { if !end.Before(start.Add(time.Second * 5)) {
@@ -204,3 +225,76 @@ func Test_listenSHAndDial_H2C(t *testing.T) {
t.Error("Expected h2 but got:", resp.ProtoMajor) t.Error("Expected h2 but got:", resp.ProtoMajor)
} }
} }
func Test_listenSHAndDial_QUIC(t *testing.T) {
if runtime.GOARCH == "arm64" {
return
}
listenPort := udp.PickPort()
start := time.Now()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{
Path: "shs",
},
SecurityType: "tls",
SecuritySettings: &tls.Config{
AllowInsecure: true,
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.CommonName("localhost")))},
NextProtocol: []string{"h3"},
},
}
listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) {
go func() {
defer conn.Close()
b := buf.New()
defer b.Release()
for {
b.Clear()
if _, err := b.ReadFrom(conn); err != nil {
return
}
common.Must2(conn.Write(b.Bytes()))
}
}()
})
common.Must(err)
defer listen.Close()
time.Sleep(time.Second)
conn, err := Dial(context.Background(), net.UDPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)
common.Must(err)
defer conn.Close()
const N = 1024
b1 := make([]byte, N)
common.Must2(rand.Read(b1))
b2 := buf.New()
common.Must2(conn.Write(b1))
b2.Clear()
common.Must2(b2.ReadFullFrom(conn, N))
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
t.Error(r)
}
common.Must2(conn.Write(b1))
b2.Clear()
common.Must2(b2.ReadFullFrom(conn, N))
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
t.Error(r)
}
end := time.Now()
if !end.Before(start.Add(time.Second * 5)) {
t.Error("end: ", end, " start: ", start)
}
}

View File

@@ -14,15 +14,19 @@ import (
var _ buf.Writer = (*connection)(nil) var _ buf.Writer = (*connection)(nil)
// connection is a wrapper for net.Conn over WebSocket connection. // connection is a wrapper for net.Conn over WebSocket connection.
// remoteAddr is used to pass "virtual" remote IP addresses in X-Forwarded-For.
// so we shouldn't directly read it form conn.
type connection struct { type connection struct {
conn *websocket.Conn conn *websocket.Conn
reader io.Reader reader io.Reader
remoteAddr net.Addr
} }
func NewConnection(conn *websocket.Conn, remoteAddr net.Addr, extraReader io.Reader) *connection { func NewConnection(conn *websocket.Conn, remoteAddr net.Addr, extraReader io.Reader) *connection {
return &connection{ return &connection{
conn: conn, conn: conn,
reader: extraReader, remoteAddr: remoteAddr,
reader: extraReader,
} }
} }
@@ -90,7 +94,7 @@ func (c *connection) LocalAddr() net.Addr {
} }
func (c *connection) RemoteAddr() net.Addr { func (c *connection) RemoteAddr() net.Addr {
return c.conn.RemoteAddr() return c.remoteAddr
} }
func (c *connection) SetDeadline(t time.Time) error { func (c *connection) SetDeadline(t time.Time) error {

View File

@@ -91,7 +91,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
return return
} }
_, err = c.Write([]byte("Response")) _, err = c.Write([]byte(c.RemoteAddr().String()))
common.Must(err) common.Must(err)
}(conn) }(conn)
}) })
@@ -109,7 +109,7 @@ func TestDialWithRemoteAddr(t *testing.T) {
var b [1024]byte var b [1024]byte
n, err := conn.Read(b[:]) n, err := conn.Read(b[:])
common.Must(err) common.Must(err)
if string(b[:n]) != "Response" { if string(b[:n]) != "1.1.1.1:0" {
t.Error("response: ", string(b[:n])) t.Error("response: ", string(b[:n]))
} }