Compare commits

...

8 Commits

Author SHA1 Message Date
RPRX
c27d652d80 v1.8.21 2024-07-21 21:32:26 +00:00
RPRX
0f65aa8ed8 Fix SplitHTTP H3 waited for downResponse before uploading
https://github.com/XTLS/Xray-core/issues/3560#issuecomment-2241750579
2024-07-21 20:45:05 +00:00
RPRX
22535d8643 Fix SplitHTTP H3 didn't always reuse QUIC connection
https://github.com/XTLS/Xray-core/issues/3560#issuecomment-2241531502
2024-07-21 08:55:03 +00:00
mmmray
529f206d33 Fix serverside TLS support of SplitHTTP H1/H2 (#3567)
Fix #3566

Also update testsuite so that all tests read and write some data. Opening a connection is not enough to trigger connection errors, because the connection is so lazy.
2024-07-20 19:35:24 -05:00
チセ
964859b4bc SplitHTTP: Remove unnecessary keepalives (#3565)
Remove keep alive since quic-go/http3 doesn't support stream reuse
Discussion see https://t.me/projectXray/3782492

Co-authored-by: Fangliding <Fangliding.fshxy@outlook.com>
Co-authored-by: xqzr <34030394+xqzr@users.noreply.github.com>
Co-authored-by: ll11l1lIllIl1lll <88377095+ll11l1lIllIl1lll@users.noreply.github.com>
2024-07-20 19:34:57 -05:00
RPRX
8deb953aec v1.8.20 2024-07-20 06:10:40 +00:00
ll11l1lIllIl1lll
a0040f13dd SplitHTTP: Server supports HTTP/3 (#3554)
Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com>
2024-07-19 17:53:47 +00:00
ll11l1lIllIl1lll
d8994b7603 Fix SplitHTTP H3 crash on v2rayNG (#3559)
Fixes https://github.com/XTLS/Xray-core/issues/3556
2024-07-19 17:52:34 +00:00
5 changed files with 211 additions and 53 deletions

View File

@@ -21,7 +21,7 @@ import (
var ( var (
Version_x byte = 1 Version_x byte = 1
Version_y byte = 8 Version_y byte = 8
Version_z byte = 19 Version_z byte = 21
) )
var ( var (

View File

@@ -94,6 +94,10 @@ func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string)
gotDownResponse.Close() gotDownResponse.Close()
}() }()
if c.isH3 {
gotConn.Close()
}
// we want to block Dial until we know the remote address of the server, // we want to block Dial until we know the remote address of the server,
// for logging purposes // for logging purposes
<-gotConn.Wait() <-gotConn.Wait()

View File

@@ -41,6 +41,10 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
return &BrowserDialerClient{} return &BrowserDialerClient{}
} }
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1")
isH3 := tlsConfig != nil && (len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "h3")
globalDialerAccess.Lock() globalDialerAccess.Lock()
defer globalDialerAccess.Unlock() defer globalDialerAccess.Unlock()
@@ -48,14 +52,13 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
globalDialerMap = make(map[dialerConf]DialerClient) globalDialerMap = make(map[dialerConf]DialerClient)
} }
if isH3 {
dest.Network = net.Network_UDP
}
if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found { if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found {
return client return client
} }
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1")
isH3 := tlsConfig != nil && (len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "h3")
var gotlsConfig *gotls.Config var gotlsConfig *gotls.Config
if tlsConfig != nil { if tlsConfig != nil {
@@ -86,26 +89,39 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
var uploadTransport http.RoundTripper var uploadTransport http.RoundTripper
if isH3 { if isH3 {
dest.Network = net.Network_UDP
quicConfig := &quic.Config{
HandshakeIdleTimeout: 10 * time.Second,
MaxIdleTimeout: 90 * time.Second,
KeepAlivePeriod: 3 * time.Second,
Allow0RTT: true,
}
roundTripper := &http3.RoundTripper{ roundTripper := &http3.RoundTripper{
TLSClientConfig: gotlsConfig, TLSClientConfig: gotlsConfig,
QUICConfig: quicConfig,
Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (quic.EarlyConnection, error) { Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (quic.EarlyConnection, error) {
conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings) conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
if err != nil { if err != nil {
return nil, err return nil, err
} }
udpAddr, err := net.ResolveUDPAddr("udp", conn.RemoteAddr().String())
var udpConn *net.UDPConn
var udpAddr *net.UDPAddr
switch c := conn.(type) {
case *internet.PacketConnWrapper:
var ok bool
udpConn, ok = c.Conn.(*net.UDPConn)
if !ok {
return nil, errors.New("PacketConnWrapper does not contain a UDP connection")
}
udpAddr, err = net.ResolveUDPAddr("udp", c.Dest.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return quic.DialEarly(ctx, conn.(*internet.PacketConnWrapper).Conn.(*net.UDPConn), udpAddr, tlsCfg, cfg) case *net.UDPConn:
udpConn = c
udpAddr, err = net.ResolveUDPAddr("udp", c.RemoteAddr().String())
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported connection type: %T", conn)
}
return quic.DialEarly(ctx, udpConn, udpAddr, tlsCfg, cfg)
}, },
} }
downloadTransport = roundTripper downloadTransport = roundTripper

View File

@@ -11,6 +11,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
@@ -234,9 +236,12 @@ func (c *httpResponseBodyWriter) Close() error {
type Listener struct { type Listener struct {
sync.Mutex sync.Mutex
server http.Server server http.Server
h3server *http3.Server
listener net.Listener listener net.Listener
h3listener *quic.EarlyListener
config *Config config *Config
addConn internet.ConnHandler addConn internet.ConnHandler
isH3 bool
} }
func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) { func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) {
@@ -253,6 +258,16 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
var listener net.Listener var listener net.Listener
var err error var err error
var localAddr = gonet.TCPAddr{} var localAddr = gonet.TCPAddr{}
handler := &requestHandler{
host: shSettings.Host,
path: shSettings.GetNormalizedPath(),
ln: l,
sessionMu: &sync.Mutex{},
sessions: sync.Map{},
localAddr: localAddr,
}
tlsConfig := getTLSConfig(streamSettings)
l.isH3 = len(tlsConfig.NextProtos) == 1 && tlsConfig.NextProtos[0] == "h3"
if port == net.Port(0) { // unix if port == net.Port(0) { // unix
listener, err = internet.ListenSystem(ctx, &net.UnixAddr{ listener, err = internet.ListenSystem(ctx, &net.UnixAddr{
@@ -263,6 +278,29 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
return nil, errors.New("failed to listen unix domain socket(for SH) on ", address).Base(err) return nil, errors.New("failed to listen unix domain socket(for SH) on ", address).Base(err)
} }
errors.LogInfo(ctx, "listening unix domain socket(for SH) on ", address) errors.LogInfo(ctx, "listening unix domain socket(for SH) on ", address)
} else if l.isH3 { // quic
Conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
IP: address.IP(),
Port: int(port),
}, streamSettings.SocketSettings)
if err != nil {
return nil, errors.New("failed to listen UDP(for SH3) on ", address, ":", port).Base(err)
}
h3listener, err := quic.ListenEarly(Conn, tlsConfig, nil)
if err != nil {
return nil, errors.New("failed to listen QUIC(for SH3) on ", address, ":", port).Base(err)
}
l.h3listener = h3listener
errors.LogInfo(ctx, "listening QUIC(for SH3) on ", address, ":", port)
l.h3server = &http3.Server{
Handler: handler,
}
go func() {
if err := l.h3server.ServeListener(l.h3listener); err != nil {
errors.LogWarningInner(ctx, err, "failed to serve http3 for splithttp")
}
}()
} else { // tcp } else { // tcp
localAddr = gonet.TCPAddr{ localAddr = gonet.TCPAddr{
IP: address.IP(), IP: address.IP(),
@@ -276,39 +314,32 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet
return nil, errors.New("failed to listen TCP(for SH) on ", address, ":", port).Base(err) return nil, errors.New("failed to listen TCP(for SH) on ", address, ":", port).Base(err)
} }
errors.LogInfo(ctx, "listening TCP(for SH) on ", address, ":", port) errors.LogInfo(ctx, "listening TCP(for SH) on ", address, ":", port)
// h2cHandler can handle both plaintext HTTP/1.1 and h2c
h2cHandler := h2c.NewHandler(handler, &http2.Server{})
l.server = http.Server{
Handler: h2cHandler,
ReadHeaderTimeout: time.Second * 4,
MaxHeaderBytes: 8192,
}
} }
// tcp/unix (h1/h2)
if listener != nil {
if config := v2tls.ConfigFromStreamSettings(streamSettings); config != nil { if config := v2tls.ConfigFromStreamSettings(streamSettings); config != nil {
if tlsConfig := config.GetTLSConfig(); tlsConfig != nil { if tlsConfig := config.GetTLSConfig(); tlsConfig != nil {
listener = tls.NewListener(listener, tlsConfig) listener = tls.NewListener(listener, tlsConfig)
} }
} }
handler := &requestHandler{
host: shSettings.Host,
path: shSettings.GetNormalizedPath(),
ln: l,
sessionMu: &sync.Mutex{},
sessions: sync.Map{},
localAddr: localAddr,
}
// h2cHandler can handle both plaintext HTTP/1.1 and h2c
h2cHandler := h2c.NewHandler(handler, &http2.Server{})
l.listener = listener l.listener = listener
l.server = http.Server{
Handler: h2cHandler,
ReadHeaderTimeout: time.Second * 4,
MaxHeaderBytes: 8192,
}
go func() { go func() {
if err := l.server.Serve(l.listener); err != nil { if err := l.server.Serve(l.listener); err != nil {
errors.LogWarningInner(ctx, err, "failed to serve http for splithttp") errors.LogWarningInner(ctx, err, "failed to serve http for splithttp")
} }
}() }()
}
return l, err return l, err
} }
@@ -320,9 +351,22 @@ func (ln *Listener) Addr() net.Addr {
// Close implements net.Listener.Close(). // Close implements net.Listener.Close().
func (ln *Listener) Close() error { func (ln *Listener) Close() error {
if ln.h3server != nil {
if err := ln.h3server.Close(); err != nil {
return err
}
} else if ln.listener != nil {
return ln.listener.Close() return ln.listener.Close()
} }
return errors.New("listener does not have an HTTP/3 server or a net.listener")
}
func getTLSConfig(streamSettings *internet.MemoryStreamConfig) *tls.Config {
config := v2tls.ConfigFromStreamSettings(streamSettings)
if config == nil {
return &tls.Config{}
}
return config.GetTLSConfig()
}
func init() { func init() {
common.Must(internet.RegisterTransportListener(protocolName, ListenSH)) common.Must(internet.RegisterTransportListener(protocolName, ListenSH))
} }

View File

@@ -2,6 +2,7 @@ package splithttp_test
import ( import (
"context" "context"
"crypto/rand"
gotls "crypto/tls" gotls "crypto/tls"
"fmt" "fmt"
gonet "net" gonet "net"
@@ -10,10 +11,13 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol/tls/cert" "github.com/xtls/xray-core/common/protocol/tls/cert"
"github.com/xtls/xray-core/testing/servers/tcp" "github.com/xtls/xray-core/testing/servers/tcp"
"github.com/xtls/xray-core/testing/servers/udp"
"github.com/xtls/xray-core/transport/internet" "github.com/xtls/xray-core/transport/internet"
. "github.com/xtls/xray-core/transport/internet/splithttp" . "github.com/xtls/xray-core/transport/internet/splithttp"
"github.com/xtls/xray-core/transport/internet/stat" "github.com/xtls/xray-core/transport/internet/stat"
@@ -142,7 +146,16 @@ func Test_listenSHAndDial_TLS(t *testing.T) {
} }
listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) { listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) {
go func() { go func() {
_ = conn.Close() defer conn.Close()
var b [1024]byte
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
_, err := conn.Read(b[:])
if err != nil {
return
}
common.Must2(conn.Write([]byte("Response")))
}() }()
}) })
common.Must(err) common.Must(err)
@@ -150,7 +163,15 @@ func Test_listenSHAndDial_TLS(t *testing.T) {
conn, err := Dial(context.Background(), net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) conn, err := Dial(context.Background(), net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)
common.Must(err) common.Must(err)
_ = conn.Close()
_, err = conn.Write([]byte("Test connection 1"))
common.Must(err)
var b [1024]byte
n, _ := conn.Read(b[:])
if string(b[:n]) != "Response" {
t.Error("response: ", string(b[:n]))
}
end := time.Now() end := time.Now()
if !end.Before(start.Add(time.Second * 5)) { if !end.Before(start.Add(time.Second * 5)) {
@@ -204,3 +225,76 @@ func Test_listenSHAndDial_H2C(t *testing.T) {
t.Error("Expected h2 but got:", resp.ProtoMajor) t.Error("Expected h2 but got:", resp.ProtoMajor)
} }
} }
func Test_listenSHAndDial_QUIC(t *testing.T) {
if runtime.GOARCH == "arm64" {
return
}
listenPort := udp.PickPort()
start := time.Now()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{
Path: "shs",
},
SecurityType: "tls",
SecuritySettings: &tls.Config{
AllowInsecure: true,
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.CommonName("localhost")))},
NextProtocol: []string{"h3"},
},
}
listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) {
go func() {
defer conn.Close()
b := buf.New()
defer b.Release()
for {
b.Clear()
if _, err := b.ReadFrom(conn); err != nil {
return
}
common.Must2(conn.Write(b.Bytes()))
}
}()
})
common.Must(err)
defer listen.Close()
time.Sleep(time.Second)
conn, err := Dial(context.Background(), net.UDPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)
common.Must(err)
defer conn.Close()
const N = 1024
b1 := make([]byte, N)
common.Must2(rand.Read(b1))
b2 := buf.New()
common.Must2(conn.Write(b1))
b2.Clear()
common.Must2(b2.ReadFullFrom(conn, N))
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
t.Error(r)
}
common.Must2(conn.Write(b1))
b2.Clear()
common.Must2(b2.ReadFullFrom(conn, N))
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
t.Error(r)
}
end := time.Now()
if !end.Before(start.Add(time.Second * 5)) {
t.Error("end: ", end, " start: ", start)
}
}