Compare commits

...

5 Commits

Author SHA1 Message Date
RPRX
be43f66b63 v25.2.21
Announcement of NFTs by Project X: https://github.com/XTLS/Xray-core/discussions/3633
Project X NFT: https://opensea.io/assets/ethereum/0x5ee362866001613093361eb8569d59c4141b76d1/1

XHTTP: Beyond REALITY: https://github.com/XTLS/Xray-core/discussions/4113
REALITY NFT: https://opensea.io/assets/ethereum/0x5ee362866001613093361eb8569d59c4141b76d1/2
2025-02-21 07:58:31 +00:00
dependabot[bot]
71a6d89c23 Bump github.com/quic-go/quic-go from 0.49.0 to 0.50.0 (#4420)
Bumps [github.com/quic-go/quic-go](https://github.com/quic-go/quic-go) from 0.49.0 to 0.50.0.
- [Release notes](https://github.com/quic-go/quic-go/releases)
- [Changelog](https://github.com/quic-go/quic-go/blob/master/Changelog.md)
- [Commits](https://github.com/quic-go/quic-go/compare/v0.49.0...v0.50.0)

---
updated-dependencies:
- dependency-name: github.com/quic-go/quic-go
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-21 06:34:33 +00:00
lastrise
89792aee9d Outbound: Add outbound sendThrough origin behavior (#4349)
* added support of sending through origin for outbounds

* added strings package import

* usage of net.SplitHostPort instead of manual splitting

---------

Co-authored-by: poly <poly@>
2025-02-20 15:15:59 -05:00
RPRX
b786a50aee XHTTP server: Fix stream-up "single POST problem", Use united httpServerConn instead of recover()
https://github.com/XTLS/Xray-core/issues/4373#issuecomment-2671795675

https://github.com/XTLS/Xray-core/issues/4406#issuecomment-2668041926
2025-02-20 16:28:06 +00:00
风扇滑翔翼
b38a53e629 UDS: Use UnixListenerWrapper & UnixConnWrapper (#4413)
Fixes https://github.com/XTLS/Xray-core/issues/4411

---------

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2025-02-19 11:31:29 +00:00
9 changed files with 93 additions and 100 deletions

View File

@@ -273,7 +273,16 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (stat.Connecti
outbounds := session.OutboundsFromContext(ctx)
ob := outbounds[len(outbounds)-1]
if h.senderSettings.ViaCidr == "" {
ob.Gateway = h.senderSettings.Via.AsAddress()
if h.senderSettings.Via.AsAddress().Family().IsDomain() && h.senderSettings.Via.AsAddress().Domain() == "origin" {
if inbound := session.InboundFromContext(ctx); inbound != nil {
origin, _, err := net.SplitHostPort(inbound.Conn.LocalAddr().String())
if err == nil {
ob.Gateway = net.ParseAddress(origin)
}
}
} else {
ob.Gateway = h.senderSettings.Via.AsAddress()
}
} else { //Get a random address.
ob.Gateway = ParseRandomIPv6(h.senderSettings.Via.AsAddress(), h.senderSettings.ViaCidr)
}

View File

@@ -19,7 +19,7 @@ import (
var (
Version_x byte = 25
Version_y byte = 2
Version_z byte = 18
Version_z byte = 21
)
var (

2
go.mod
View File

@@ -12,7 +12,7 @@ require (
github.com/miekg/dns v1.1.63
github.com/pelletier/go-toml v1.9.5
github.com/pires/go-proxyproto v0.8.0
github.com/quic-go/quic-go v0.49.0
github.com/quic-go/quic-go v0.50.0
github.com/refraction-networking/utls v1.6.7
github.com/sagernet/sing v0.5.1
github.com/sagernet/sing-shadowsocks v0.2.7

4
go.sum
View File

@@ -54,8 +54,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.49.0 h1:w5iJHXwHxs1QxyBv1EHKuC50GX5to8mJAxvtnttJp94=
github.com/quic-go/quic-go v0.49.0/go.mod h1:s2wDnmCdooUQBmQfpUSTCYBl1/D4FcqbULMMkASvR6s=
github.com/quic-go/quic-go v0.50.0 h1:3H/ld1pa3CYhkcc20TPIyG1bNsdhn9qZBGN3b9/UyUo=
github.com/quic-go/quic-go v0.50.0/go.mod h1:Vim6OmUvlYdwBhXP9ZVrtGmCMWa3wEqhq3NgYrI8b4E=
github.com/refraction-networking/utls v1.6.7 h1:zVJ7sP1dJx/WtVuITug3qYUq034cDq9B2MR1K67ULZM=
github.com/refraction-networking/utls v1.6.7/go.mod h1:BC3O4vQzye5hqpmDTWUqi4P5DDhzJfkV1tdqtawQIH0=
github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 h1:f/FNXud6gA3MNr8meMVVGxhp+QBTqY91tM8HjEuMjGg=

View File

@@ -292,7 +292,9 @@ func (c *OutboundDetourConfig) Build() (*core.OutboundHandlerConfig, error) {
senderSettings.ViaCidr = strings.Split(*c.SendThrough, "/")[1]
} else {
if address.Family().IsDomain() {
return nil, errors.New("unable to send through: " + address.String())
if address.Address.Domain() != "origin" {
return nil, errors.New("unable to send through: " + address.String())
}
}
}
senderSettings.Via = address.Build()

View File

@@ -547,8 +547,8 @@ func UnwrapRawConn(conn net.Conn) (net.Conn, stats.Counter, stats.Counter) {
conn = pc.Raw()
// 8192 > 4096, there is no need to process pc's bufReader
}
if uc, ok := conn.(*internet.UDSWrapperConn); ok {
conn = uc.Conn
if uc, ok := conn.(*internet.UnixConnWrapper); ok {
conn = uc.UnixConn
}
}
return conn, readCounter, writerCounter

View File

@@ -47,21 +47,6 @@ type httpSession struct {
isFullyConnected *done.Instance
}
func (h *requestHandler) maybeReapSession(isFullyConnected *done.Instance, sessionId string) {
shouldReap := done.New()
go func() {
time.Sleep(30 * time.Second)
shouldReap.Close()
}()
select {
case <-isFullyConnected.Wait():
return
case <-shouldReap.Wait():
h.sessions.Delete(sessionId)
}
}
func (h *requestHandler) upsertSession(sessionId string) *httpSession {
// fast path
currentSessionAny, ok := h.sessions.Load(sessionId)
@@ -84,7 +69,21 @@ func (h *requestHandler) upsertSession(sessionId string) *httpSession {
}
h.sessions.Store(sessionId, s)
go h.maybeReapSession(s.isFullyConnected, sessionId)
shouldReap := done.New()
go func() {
time.Sleep(30 * time.Second)
shouldReap.Close()
}()
go func() {
select {
case <-shouldReap.Wait():
h.sessions.Delete(sessionId)
s.uploadQueue.Close()
case <-s.isFullyConnected.Wait():
}
}()
return s
}
@@ -183,12 +182,13 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
writer.WriteHeader(http.StatusBadRequest)
return
}
uploadDone := done.New()
httpSC := &httpServerConn{
Instance: done.New(),
Reader: request.Body,
ResponseWriter: writer,
}
err = currentSession.uploadQueue.Push(Packet{
Reader: &httpRequestBodyReader{
requestReader: request.Body,
uploadDone: uploadDone,
},
Reader: httpSC,
})
if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload (PushReader)")
@@ -200,25 +200,21 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
scStreamUpServerSecs := h.config.GetNormalizedScStreamUpServerSecs()
if referrer != "" && scStreamUpServerSecs.To > 0 {
go func() {
defer func() {
recover()
}()
for {
_, err := writer.Write(bytes.Repeat([]byte{'X'}, int(h.config.GetNormalizedXPaddingBytes().rand())))
_, err := httpSC.Write(bytes.Repeat([]byte{'X'}, int(h.config.GetNormalizedXPaddingBytes().rand())))
if err != nil {
break
}
writer.(http.Flusher).Flush()
time.Sleep(time.Duration(scStreamUpServerSecs.rand()) * time.Second)
}
}()
}
select {
case <-request.Context().Done():
case <-uploadDone.Wait():
case <-httpSC.Wait():
}
}
uploadDone.Close()
httpSC.Close()
return
}
@@ -262,11 +258,6 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
writer.WriteHeader(http.StatusOK)
} else if request.Method == "GET" || sessionId == "" { // stream-down, stream-one
responseFlusher, ok := writer.(http.Flusher)
if !ok {
panic("expected http.ResponseWriter to be an http.Flusher")
}
if sessionId != "" {
// after GET is done, the connection is finished. disable automatic
// session reaping, and handle it in defer
@@ -287,20 +278,18 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}
writer.WriteHeader(http.StatusOK)
writer.(http.Flusher).Flush()
responseFlusher.Flush()
downloadDone := done.New()
httpSC := &httpServerConn{
Instance: done.New(),
Reader: request.Body,
ResponseWriter: writer,
}
conn := splitConn{
writer: &httpResponseBodyWriter{
responseWriter: writer,
downloadDone: downloadDone,
responseFlusher: responseFlusher,
},
reader: request.Body,
localAddr: h.localAddr,
writer: httpSC,
reader: httpSC,
remoteAddr: remoteAddr,
localAddr: h.localAddr,
}
if sessionId != "" { // if not stream-one
conn.reader = currentSession.uploadQueue
@@ -311,7 +300,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
// "A ResponseWriter may not be used after [Handler.ServeHTTP] has returned."
select {
case <-request.Context().Done():
case <-downloadDone.Wait():
case <-httpSC.Wait():
}
conn.Close()
@@ -321,45 +310,30 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}
}
type httpRequestBodyReader struct {
requestReader io.ReadCloser
uploadDone *done.Instance
}
func (c *httpRequestBodyReader) Read(b []byte) (int, error) {
return c.requestReader.Read(b)
}
func (c *httpRequestBodyReader) Close() error {
defer c.uploadDone.Close()
return c.requestReader.Close()
}
type httpResponseBodyWriter struct {
type httpServerConn struct {
sync.Mutex
responseWriter http.ResponseWriter
responseFlusher http.Flusher
downloadDone *done.Instance
*done.Instance
io.Reader // no need to Close request.Body
http.ResponseWriter
}
func (c *httpResponseBodyWriter) Write(b []byte) (int, error) {
func (c *httpServerConn) Write(b []byte) (int, error) {
c.Lock()
defer c.Unlock()
if c.downloadDone.Done() {
if c.Done() {
return 0, io.ErrClosedPipe
}
n, err := c.responseWriter.Write(b)
n, err := c.ResponseWriter.Write(b)
if err == nil {
c.responseFlusher.Flush()
c.ResponseWriter.(http.Flusher).Flush()
}
return n, err
}
func (c *httpResponseBodyWriter) Close() error {
func (c *httpServerConn) Close() error {
c.Lock()
defer c.Unlock()
c.downloadDone.Close()
return nil
return c.Instance.Close()
}
type Listener struct {

View File

@@ -20,6 +20,7 @@ type Packet struct {
type uploadQueue struct {
reader io.ReadCloser
nomore bool
pushedPackets chan Packet
writeCloseMutex sync.Mutex
heap uploadHeap
@@ -42,19 +43,15 @@ func (h *uploadQueue) Push(p Packet) error {
h.writeCloseMutex.Lock()
defer h.writeCloseMutex.Unlock()
runtime.Gosched()
if h.reader != nil && p.Reader != nil {
p.Reader.Close()
return errors.New("h.reader already exists")
}
if h.closed {
if p.Reader != nil {
p.Reader.Close()
}
return errors.New("packet queue closed")
}
if h.nomore {
return errors.New("h.reader already exists")
}
if p.Reader != nil {
h.nomore = true
}
h.pushedPackets <- p
return nil
}
@@ -65,9 +62,20 @@ func (h *uploadQueue) Close() error {
if !h.closed {
h.closed = true
runtime.Gosched() // hope Read() gets the packet
f:
for {
select {
case p := <-h.pushedPackets:
if p.Reader != nil {
h.reader = p.Reader
}
default:
break f
}
}
close(h.pushedPackets)
}
runtime.Gosched()
if h.reader != nil {
return h.reader.Close()
}

View File

@@ -44,32 +44,32 @@ func getControlFunc(ctx context.Context, sockopt *SocketConfig, controllers []co
// For some reason, other component of ray will assume the listener is a TCP listener and have valid remote address.
// But in fact it doesn't. So we need to wrap the listener to make it return 0.0.0.0(unspecified) as remote address.
// If other issues encountered, we should able to fix it here.
type listenUDSWrapper struct {
net.Listener
type UnixListenerWrapper struct {
*net.UnixListener
locker *FileLocker
}
func (l *listenUDSWrapper) Accept() (net.Conn, error) {
conn, err := l.Listener.Accept()
func (l *UnixListenerWrapper) Accept() (net.Conn, error) {
conn, err := l.UnixListener.Accept()
if err != nil {
return nil, err
}
return &UDSWrapperConn{Conn: conn}, nil
return &UnixConnWrapper{UnixConn: conn.(*net.UnixConn)}, nil
}
func (l *listenUDSWrapper) Close() error {
func (l *UnixListenerWrapper) Close() error {
if l.locker != nil {
l.locker.Release()
l.locker = nil
}
return l.Listener.Close()
return l.UnixListener.Close()
}
type UDSWrapperConn struct {
net.Conn
type UnixConnWrapper struct {
*net.UnixConn
}
func (conn *UDSWrapperConn) RemoteAddr() net.Addr {
func (conn *UnixConnWrapper) RemoteAddr() net.Addr {
return &net.TCPAddr{
IP: []byte{0, 0, 0, 0},
}
@@ -136,7 +136,7 @@ func (dl *DefaultListener) Listen(ctx context.Context, addr net.Addr, sockopt *S
locker.Release()
return nil, err
}
l = &listenUDSWrapper{Listener: l, locker: locker}
l = &UnixListenerWrapper{UnixListener: l.(*net.UnixListener), locker: locker}
if filePerm == nil {
return l, nil
}