mirror of
https://github.com/XTLS/Xray-core.git
synced 2025-08-22 09:36:49 +08:00
Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
93f72db9fd | ||
![]() |
ff4331a7a8 | ||
![]() |
a8559a1b46 | ||
![]() |
42aea01fb5 | ||
![]() |
a7909f8671 | ||
![]() |
b287d6419b | ||
![]() |
d54d20abea | ||
![]() |
868799ef04 | ||
![]() |
db934f0832 | ||
![]() |
53b04d560b | ||
![]() |
1410b6335b |
@@ -106,7 +106,7 @@ func init() {
|
||||
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
|
||||
d := new(DefaultDispatcher)
|
||||
if err := core.RequireFeatures(ctx, func(om outbound.Manager, router routing.Router, pm policy.Manager, sm stats.Manager, dc dns.Client) error {
|
||||
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) { // FakeDNSEngine is optional
|
||||
core.OptionalFeatures(ctx, func(fdns dns.FakeDNSEngine) {
|
||||
d.fdns = fdns
|
||||
})
|
||||
return d.Init(config.(*Config), om, router, pm, sm, dc)
|
||||
|
@@ -56,7 +56,7 @@ func NewServer(ctx context.Context, dest net.Destination, dispatcher routing.Dis
|
||||
return NewTCPLocalNameServer(u, queryStrategy)
|
||||
case strings.EqualFold(u.String(), "fakedns"):
|
||||
var fd dns.FakeDNSEngine
|
||||
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) { // FakeDNSEngine is optional
|
||||
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) {
|
||||
fd = fdns
|
||||
})
|
||||
return NewFakeDNSServer(fd), nil
|
||||
|
@@ -8,7 +8,6 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/xtls/xray-core/common"
|
||||
@@ -35,7 +34,6 @@ type DoHNameServer struct {
|
||||
ips map[string]*record
|
||||
pub *pubsub.Service
|
||||
cleanup *task.Periodic
|
||||
reqID uint32
|
||||
httpClient *http.Client
|
||||
dohURL string
|
||||
name string
|
||||
@@ -222,7 +220,7 @@ func (s *DoHNameServer) updateIP(req *dnsRequest, ipRec *IPRecord) {
|
||||
}
|
||||
|
||||
func (s *DoHNameServer) newReqID() uint16 {
|
||||
return uint16(atomic.AddUint32(&s.reqID, 1))
|
||||
return 0
|
||||
}
|
||||
|
||||
func (s *DoHNameServer) sendQuery(ctx context.Context, domain string, clientIP net.IP, option dns_feature.IPOption) {
|
||||
|
@@ -6,10 +6,9 @@ import (
|
||||
"encoding/binary"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/quic-go/quic-go"
|
||||
"github.com/xtls/quic-go"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/buf"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
@@ -37,7 +36,6 @@ type QUICNameServer struct {
|
||||
ips map[string]*record
|
||||
pub *pubsub.Service
|
||||
cleanup *task.Periodic
|
||||
reqID uint32
|
||||
name string
|
||||
destination *net.Destination
|
||||
connection quic.Connection
|
||||
@@ -156,7 +154,7 @@ func (s *QUICNameServer) updateIP(req *dnsRequest, ipRec *IPRecord) {
|
||||
}
|
||||
|
||||
func (s *QUICNameServer) newReqID() uint16 {
|
||||
return uint16(atomic.AddUint32(&s.reqID, 1))
|
||||
return 0
|
||||
}
|
||||
|
||||
func (s *QUICNameServer) sendQuery(ctx context.Context, domain string, clientIP net.IP, option dns_feature.IPOption) {
|
||||
|
@@ -38,7 +38,7 @@ func init() {
|
||||
sv := &service{v: s}
|
||||
err := s.RequireFeatures(func(Observatory extension.Observatory) {
|
||||
sv.observatory = Observatory
|
||||
})
|
||||
}, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -177,7 +177,7 @@ func (s *service) Register(server *grpc.Server) {
|
||||
common.Must(s.v.RequireFeatures(func(im inbound.Manager, om outbound.Manager) {
|
||||
hs.ihm = im
|
||||
hs.ohm = om
|
||||
}))
|
||||
}, false))
|
||||
RegisterHandlerServiceServer(server, hs)
|
||||
|
||||
// For compatibility purposes
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
sync "sync"
|
||||
|
||||
"github.com/xtls/xray-core/app/observatory"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
"github.com/xtls/xray-core/core"
|
||||
"github.com/xtls/xray-core/features/extension"
|
||||
@@ -31,9 +32,10 @@ type RoundRobinStrategy struct {
|
||||
func (s *RoundRobinStrategy) InjectContext(ctx context.Context) {
|
||||
s.ctx = ctx
|
||||
if len(s.FallbackTag) > 0 {
|
||||
core.RequireFeaturesAsync(s.ctx, func(observatory extension.Observatory) {
|
||||
common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
|
||||
s.observatory = observatory
|
||||
})
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -135,7 +135,7 @@ func (s *service) Register(server *grpc.Server) {
|
||||
vCoreDesc := RoutingService_ServiceDesc
|
||||
vCoreDesc.ServiceName = "v2ray.core.app.router.command.RoutingService"
|
||||
server.RegisterService(&vCoreDesc, rs)
|
||||
}))
|
||||
}, false))
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/xtls/xray-core/app/observatory"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/dice"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
"github.com/xtls/xray-core/core"
|
||||
@@ -59,9 +60,10 @@ type node struct {
|
||||
|
||||
func (s *LeastLoadStrategy) InjectContext(ctx context.Context) {
|
||||
s.ctx = ctx
|
||||
core.RequireFeaturesAsync(s.ctx, func(observatory extension.Observatory) {
|
||||
common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
|
||||
s.observer = observatory
|
||||
})
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
func (s *LeastLoadStrategy) PickOutbound(candidates []string) string {
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/xtls/xray-core/app/observatory"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
"github.com/xtls/xray-core/core"
|
||||
"github.com/xtls/xray-core/features/extension"
|
||||
@@ -20,9 +21,10 @@ func (l *LeastPingStrategy) GetPrincipleTarget(strings []string) []string {
|
||||
|
||||
func (l *LeastPingStrategy) InjectContext(ctx context.Context) {
|
||||
l.ctx = ctx
|
||||
core.RequireFeaturesAsync(l.ctx, func(observatory extension.Observatory) {
|
||||
common.Must(core.RequireFeatures(l.ctx, func(observatory extension.Observatory) error {
|
||||
l.observatory = observatory
|
||||
})
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
func (l *LeastPingStrategy) PickOutbound(strings []string) string {
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/xtls/xray-core/app/observatory"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/dice"
|
||||
"github.com/xtls/xray-core/core"
|
||||
"github.com/xtls/xray-core/features/extension"
|
||||
@@ -20,9 +21,10 @@ type RandomStrategy struct {
|
||||
func (s *RandomStrategy) InjectContext(ctx context.Context) {
|
||||
s.ctx = ctx
|
||||
if len(s.FallbackTag) > 0 {
|
||||
core.RequireFeaturesAsync(s.ctx, func(observatory extension.Observatory) {
|
||||
common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
|
||||
s.observatory = observatory
|
||||
})
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -38,7 +38,7 @@ func Error2(v interface{}, err error) error {
|
||||
func envFile() (string, error) {
|
||||
if file := os.Getenv("GOENV"); file != "" {
|
||||
if file == "off" {
|
||||
return "", fmt.Errorf("GOENV=off")
|
||||
return "", errors.New("GOENV=off")
|
||||
}
|
||||
return file, nil
|
||||
}
|
||||
@@ -47,7 +47,7 @@ func envFile() (string, error) {
|
||||
return "", err
|
||||
}
|
||||
if dir == "" {
|
||||
return "", fmt.Errorf("missing user-config dir")
|
||||
return "", errors.New("missing user-config dir")
|
||||
}
|
||||
return filepath.Join(dir, "go", "env"), nil
|
||||
}
|
||||
@@ -60,7 +60,7 @@ func GetRuntimeEnv(key string) (string, error) {
|
||||
return "", err
|
||||
}
|
||||
if file == "" {
|
||||
return "", fmt.Errorf("missing runtime env file")
|
||||
return "", errors.New("missing runtime env file")
|
||||
}
|
||||
var data []byte
|
||||
var runtimeEnv string
|
||||
|
@@ -8,7 +8,7 @@ import (
|
||||
"encoding/binary"
|
||||
"io"
|
||||
|
||||
"github.com/quic-go/quic-go/quicvarint"
|
||||
"github.com/xtls/quic-go/quicvarint"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/buf"
|
||||
"github.com/xtls/xray-core/common/bytespool"
|
||||
|
@@ -19,7 +19,7 @@ import (
|
||||
var (
|
||||
Version_x byte = 24
|
||||
Version_y byte = 12
|
||||
Version_z byte = 18
|
||||
Version_z byte = 28
|
||||
)
|
||||
|
||||
var (
|
||||
|
162
core/xray.go
162
core/xray.go
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
@@ -45,22 +44,13 @@ func getFeature(allFeatures []features.Feature, t reflect.Type) features.Feature
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *resolution) resolve(allFeatures []features.Feature) (bool, error) {
|
||||
var fs []features.Feature
|
||||
for _, d := range r.deps {
|
||||
f := getFeature(allFeatures, d)
|
||||
if f == nil {
|
||||
return false, nil
|
||||
}
|
||||
fs = append(fs, f)
|
||||
}
|
||||
|
||||
func (r *resolution) callbackResolution(allFeatures []features.Feature) error {
|
||||
callback := reflect.ValueOf(r.callback)
|
||||
var input []reflect.Value
|
||||
callbackType := callback.Type()
|
||||
for i := 0; i < callbackType.NumIn(); i++ {
|
||||
pt := callbackType.In(i)
|
||||
for _, f := range fs {
|
||||
for _, f := range allFeatures {
|
||||
if reflect.TypeOf(f).AssignableTo(pt) {
|
||||
input = append(input, reflect.ValueOf(f))
|
||||
break
|
||||
@@ -85,15 +75,17 @@ func (r *resolution) resolve(allFeatures []features.Feature) (bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return true, err
|
||||
return err
|
||||
}
|
||||
|
||||
// Instance combines all Xray features.
|
||||
type Instance struct {
|
||||
access sync.Mutex
|
||||
features []features.Feature
|
||||
featureResolutions []resolution
|
||||
running bool
|
||||
statusLock sync.Mutex
|
||||
features []features.Feature
|
||||
pendingResolutions []resolution
|
||||
pendingOptionalResolutions []resolution
|
||||
running bool
|
||||
resolveLock sync.Mutex
|
||||
|
||||
ctx context.Context
|
||||
}
|
||||
@@ -154,13 +146,14 @@ func addOutboundHandlers(server *Instance, configs []*OutboundHandlerConfig) err
|
||||
// See Instance.RequireFeatures for more information.
|
||||
func RequireFeatures(ctx context.Context, callback interface{}) error {
|
||||
v := MustFromContext(ctx)
|
||||
return v.RequireFeatures(callback)
|
||||
return v.RequireFeatures(callback, false)
|
||||
}
|
||||
|
||||
// RequireFeaturesAsync registers a callback, which will be called when all dependent features are registered. The order of app init doesn't matter
|
||||
func RequireFeaturesAsync(ctx context.Context, callback interface{}) {
|
||||
// OptionalFeatures is a helper function to aquire features from Instance in context.
|
||||
// See Instance.RequireFeatures for more information.
|
||||
func OptionalFeatures(ctx context.Context, callback interface{}) error {
|
||||
v := MustFromContext(ctx)
|
||||
v.RequireFeaturesAsync(callback)
|
||||
return v.RequireFeatures(callback, true)
|
||||
}
|
||||
|
||||
// New returns a new Xray instance based on given configuration.
|
||||
@@ -234,9 +227,12 @@ func initInstanceWithConfig(config *Config, server *Instance) (bool, error) {
|
||||
}(),
|
||||
)
|
||||
|
||||
if server.featureResolutions != nil {
|
||||
server.resolveLock.Lock()
|
||||
if server.pendingResolutions != nil {
|
||||
server.resolveLock.Unlock()
|
||||
return true, errors.New("not all dependencies are resolved.")
|
||||
}
|
||||
server.resolveLock.Unlock()
|
||||
|
||||
if err := addInboundHandlers(server, config.Inbound); err != nil {
|
||||
return true, err
|
||||
@@ -255,8 +251,8 @@ func (s *Instance) Type() interface{} {
|
||||
|
||||
// Close shutdown the Xray instance.
|
||||
func (s *Instance) Close() error {
|
||||
s.access.Lock()
|
||||
defer s.access.Unlock()
|
||||
s.statusLock.Lock()
|
||||
defer s.statusLock.Unlock()
|
||||
|
||||
s.running = false
|
||||
|
||||
@@ -275,7 +271,7 @@ func (s *Instance) Close() error {
|
||||
|
||||
// RequireFeatures registers a callback, which will be called when all dependent features are registered.
|
||||
// The callback must be a func(). All its parameters must be features.Feature.
|
||||
func (s *Instance) RequireFeatures(callback interface{}) error {
|
||||
func (s *Instance) RequireFeatures(callback interface{}, optional bool) error {
|
||||
callbackType := reflect.TypeOf(callback)
|
||||
if callbackType.Kind() != reflect.Func {
|
||||
panic("not a function")
|
||||
@@ -290,47 +286,32 @@ func (s *Instance) RequireFeatures(callback interface{}) error {
|
||||
deps: featureTypes,
|
||||
callback: callback,
|
||||
}
|
||||
if finished, err := r.resolve(s.features); finished {
|
||||
return err
|
||||
}
|
||||
s.featureResolutions = append(s.featureResolutions, r)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RequireFeaturesAsync registers a callback, which will be called when all dependent features are registered. The order of app init doesn't matter
|
||||
func (s *Instance) RequireFeaturesAsync(callback interface{}) {
|
||||
callbackType := reflect.TypeOf(callback)
|
||||
if callbackType.Kind() != reflect.Func {
|
||||
panic("not a function")
|
||||
}
|
||||
|
||||
var featureTypes []reflect.Type
|
||||
for i := 0; i < callbackType.NumIn(); i++ {
|
||||
featureTypes = append(featureTypes, reflect.PtrTo(callbackType.In(i)))
|
||||
}
|
||||
|
||||
r := resolution{
|
||||
deps: featureTypes,
|
||||
callback: callback,
|
||||
}
|
||||
go func() {
|
||||
var finished = false
|
||||
for i := 0; !finished; i++ {
|
||||
if i > 100000 {
|
||||
errors.LogError(s.ctx, "RequireFeaturesAsync failed after count ", i)
|
||||
break;
|
||||
}
|
||||
finished, _ = r.resolve(s.features)
|
||||
time.Sleep(time.Millisecond)
|
||||
s.resolveLock.Lock()
|
||||
foundAll := true
|
||||
for _, d := range r.deps {
|
||||
f := getFeature(s.features, d)
|
||||
if f == nil {
|
||||
foundAll = false
|
||||
break
|
||||
}
|
||||
s.featureResolutions = append(s.featureResolutions, r)
|
||||
}()
|
||||
}
|
||||
if foundAll {
|
||||
s.resolveLock.Unlock()
|
||||
return r.callbackResolution(s.features)
|
||||
} else {
|
||||
if optional {
|
||||
s.pendingOptionalResolutions = append(s.pendingOptionalResolutions, r)
|
||||
} else {
|
||||
s.pendingResolutions = append(s.pendingResolutions, r)
|
||||
}
|
||||
s.resolveLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// AddFeature registers a feature into current Instance.
|
||||
func (s *Instance) AddFeature(feature features.Feature) error {
|
||||
s.features = append(s.features, feature)
|
||||
|
||||
if s.running {
|
||||
if err := feature.Start(); err != nil {
|
||||
errors.LogInfoInner(s.ctx, err, "failed to start feature")
|
||||
@@ -338,27 +319,52 @@ func (s *Instance) AddFeature(feature features.Feature) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.featureResolutions == nil {
|
||||
return nil
|
||||
}
|
||||
s.resolveLock.Lock()
|
||||
s.features = append(s.features, feature)
|
||||
|
||||
var pendingResolutions []resolution
|
||||
for _, r := range s.featureResolutions {
|
||||
finished, err := r.resolve(s.features)
|
||||
if finished && err != nil {
|
||||
return err
|
||||
var availableResolution []resolution
|
||||
var pending []resolution
|
||||
for _, r := range s.pendingResolutions {
|
||||
foundAll := true
|
||||
for _, d := range r.deps {
|
||||
f := getFeature(s.features, d)
|
||||
if f == nil {
|
||||
foundAll = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !finished {
|
||||
pendingResolutions = append(pendingResolutions, r)
|
||||
if foundAll {
|
||||
availableResolution = append(availableResolution, r)
|
||||
} else {
|
||||
pending = append(pending, r)
|
||||
}
|
||||
}
|
||||
if len(pendingResolutions) == 0 {
|
||||
s.featureResolutions = nil
|
||||
} else if len(pendingResolutions) < len(s.featureResolutions) {
|
||||
s.featureResolutions = pendingResolutions
|
||||
}
|
||||
s.pendingResolutions = pending
|
||||
|
||||
return nil
|
||||
var pendingOptional []resolution
|
||||
for _, r := range s.pendingOptionalResolutions {
|
||||
foundAll := true
|
||||
for _, d := range r.deps {
|
||||
f := getFeature(s.features, d)
|
||||
if f == nil {
|
||||
foundAll = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if foundAll {
|
||||
availableResolution = append(availableResolution, r)
|
||||
} else {
|
||||
pendingOptional = append(pendingOptional, r)
|
||||
}
|
||||
}
|
||||
s.pendingOptionalResolutions = pendingOptional
|
||||
s.resolveLock.Unlock()
|
||||
|
||||
var err error
|
||||
for _, r := range availableResolution {
|
||||
err = r.callbackResolution(s.features) // only return the last error for now
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// GetFeature returns a feature of the given type, or nil if such feature is not registered.
|
||||
@@ -371,8 +377,8 @@ func (s *Instance) GetFeature(featureType interface{}) features.Feature {
|
||||
//
|
||||
// xray:api:stable
|
||||
func (s *Instance) Start() error {
|
||||
s.access.Lock()
|
||||
defer s.access.Unlock()
|
||||
s.statusLock.Lock()
|
||||
defer s.statusLock.Unlock()
|
||||
|
||||
s.running = true
|
||||
for _, f := range s.features {
|
||||
|
@@ -30,7 +30,7 @@ func TestXrayDependency(t *testing.T) {
|
||||
t.Error("expected dns client fulfilled, but actually nil")
|
||||
}
|
||||
wait <- true
|
||||
})
|
||||
}, false)
|
||||
instance.AddFeature(localdns.New())
|
||||
<-wait
|
||||
}
|
||||
|
8
go.mod
8
go.mod
@@ -2,8 +2,6 @@ module github.com/xtls/xray-core
|
||||
|
||||
go 1.21.4
|
||||
|
||||
replace github.com/quic-go/quic-go v0.46.0 => github.com/xtls/quic-go v0.46.2
|
||||
|
||||
require (
|
||||
github.com/OmarTariq612/goech v0.0.0-20240405204721-8e2e1dafd3a0
|
||||
github.com/cloudflare/circl v1.4.0
|
||||
@@ -14,7 +12,6 @@ require (
|
||||
github.com/miekg/dns v1.1.62
|
||||
github.com/pelletier/go-toml v1.9.5
|
||||
github.com/pires/go-proxyproto v0.8.0
|
||||
github.com/quic-go/quic-go v0.46.0
|
||||
github.com/refraction-networking/utls v1.6.7
|
||||
github.com/sagernet/sing v0.5.1
|
||||
github.com/sagernet/sing-shadowsocks v0.2.7
|
||||
@@ -22,15 +19,16 @@ require (
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/v2fly/ss-bloomring v0.0.0-20210312155135-28617310f63e
|
||||
github.com/vishvananda/netlink v1.3.0
|
||||
github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087
|
||||
github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d
|
||||
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba
|
||||
golang.org/x/crypto v0.31.0
|
||||
golang.org/x/net v0.32.0
|
||||
golang.org/x/net v0.33.0
|
||||
golang.org/x/sync v0.10.0
|
||||
golang.org/x/sys v0.28.0
|
||||
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173
|
||||
google.golang.org/grpc v1.67.1
|
||||
google.golang.org/protobuf v1.36.0
|
||||
google.golang.org/protobuf v1.36.1
|
||||
gvisor.dev/gvisor v0.0.0-20231202080848-1f7806d17489
|
||||
h12.io/socks v1.0.3
|
||||
lukechampine.com/blake3 v1.3.0
|
||||
|
12
go.sum
12
go.sum
@@ -68,8 +68,8 @@ github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQ
|
||||
github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
|
||||
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
|
||||
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
|
||||
github.com/xtls/quic-go v0.46.2 h1:bzUnZIQIH8SyqYGR6fAXzEvZauA+32J/2w2AtnEFa1o=
|
||||
github.com/xtls/quic-go v0.46.2/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI=
|
||||
github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087 h1:kKPg/cJPSKnE50VXVBskDYYSBkl4X3sMCIbTy+XKNGk=
|
||||
github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087/go.mod h1:mN9lAuc8Vt7eHvnQkDIH5+uHh+DcLmTBma9rLqk/rPY=
|
||||
github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d h1:+B97uD9uHLgAAulhigmys4BVwZZypzK7gPN3WtpgRJg=
|
||||
github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d/go.mod h1:dm4y/1QwzjGaK17ofi0Vs6NpKAHegZky8qk6J2JJZAE=
|
||||
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
@@ -89,8 +89,8 @@ golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI=
|
||||
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
|
||||
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
|
||||
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||
@@ -129,8 +129,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
|
||||
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
|
||||
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
|
||||
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
|
||||
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
|
||||
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
@@ -287,8 +287,8 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
|
||||
if c.Xmux == (XmuxConfig{}) {
|
||||
c.Xmux.MaxConcurrency.From = 16
|
||||
c.Xmux.MaxConcurrency.To = 32
|
||||
c.Xmux.CMaxReuseTimes.From = 64
|
||||
c.Xmux.CMaxReuseTimes.To = 128
|
||||
c.Xmux.CMaxReuseTimes.From = 256
|
||||
c.Xmux.CMaxReuseTimes.To = 512
|
||||
c.Xmux.HMaxRequestTimes.From = 800
|
||||
c.Xmux.HMaxRequestTimes.To = 900
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"go/build"
|
||||
@@ -18,7 +19,7 @@ var directory = flag.String("pwd", "", "Working directory of Xray vformat.")
|
||||
func envFile() (string, error) {
|
||||
if file := os.Getenv("GOENV"); file != "" {
|
||||
if file == "off" {
|
||||
return "", fmt.Errorf("GOENV=off")
|
||||
return "", errors.New("GOENV=off")
|
||||
}
|
||||
return file, nil
|
||||
}
|
||||
@@ -27,7 +28,7 @@ func envFile() (string, error) {
|
||||
return "", err
|
||||
}
|
||||
if dir == "" {
|
||||
return "", fmt.Errorf("missing user-config dir")
|
||||
return "", errors.New("missing user-config dir")
|
||||
}
|
||||
return filepath.Join(dir, "go", "env"), nil
|
||||
}
|
||||
@@ -40,7 +41,7 @@ func GetRuntimeEnv(key string) (string, error) {
|
||||
return "", err
|
||||
}
|
||||
if file == "" {
|
||||
return "", fmt.Errorf("missing runtime env file")
|
||||
return "", errors.New("missing runtime env file")
|
||||
}
|
||||
var data []byte
|
||||
var runtimeEnv string
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"go/build"
|
||||
@@ -22,7 +23,7 @@ var directory = flag.String("pwd", "", "Working directory of Xray vprotogen.")
|
||||
func envFile() (string, error) {
|
||||
if file := os.Getenv("GOENV"); file != "" {
|
||||
if file == "off" {
|
||||
return "", fmt.Errorf("GOENV=off")
|
||||
return "", errors.New("GOENV=off")
|
||||
}
|
||||
return file, nil
|
||||
}
|
||||
@@ -31,7 +32,7 @@ func envFile() (string, error) {
|
||||
return "", err
|
||||
}
|
||||
if dir == "" {
|
||||
return "", fmt.Errorf("missing user-config dir")
|
||||
return "", errors.New("missing user-config dir")
|
||||
}
|
||||
return filepath.Join(dir, "go", "env"), nil
|
||||
}
|
||||
@@ -44,7 +45,7 @@ func GetRuntimeEnv(key string) (string, error) {
|
||||
return "", err
|
||||
}
|
||||
if file == "" {
|
||||
return "", fmt.Errorf("missing runtime env file")
|
||||
return "", errors.New("missing runtime env file")
|
||||
}
|
||||
var data []byte
|
||||
var runtimeEnv string
|
||||
@@ -101,12 +102,12 @@ Download %s v%s or later from https://github.com/protocolbuffers/protobuf/releas
|
||||
func getProjectProtocVersion(url string) (string, error) {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("can not get the version of protobuf used in xray project")
|
||||
return "", errors.New("can not get the version of protobuf used in xray project")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("can not read from body")
|
||||
return "", errors.New("can not read from body")
|
||||
}
|
||||
versionRegexp := regexp.MustCompile(`\/\/\s*protoc\s*v\d+\.(\d+\.\d+)`)
|
||||
matched := versionRegexp.FindStringSubmatch(string(body))
|
||||
|
@@ -3,6 +3,7 @@ package api
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -101,7 +102,7 @@ func fetchHTTPContent(target string) ([]byte, error) {
|
||||
|
||||
content, err := buf.ReadAllToBytes(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read HTTP response")
|
||||
return nil, errors.New("failed to read HTTP response")
|
||||
}
|
||||
|
||||
return content, nil
|
||||
|
@@ -27,7 +27,7 @@ func init() {
|
||||
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
|
||||
h := new(Handler)
|
||||
if err := core.RequireFeatures(ctx, func(dnsClient dns.Client, policyManager policy.Manager) error {
|
||||
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) { // FakeDNSEngine is optional
|
||||
core.OptionalFeatures(ctx, func(fdns dns.FakeDNSEngine) {
|
||||
h.fdns = fdns
|
||||
})
|
||||
return h.Init(config.(*Config), dnsClient, policyManager)
|
||||
|
@@ -2,6 +2,7 @@ package dokodemo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/xtls/xray-core/common"
|
||||
@@ -147,10 +148,6 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn st
|
||||
return nil
|
||||
}
|
||||
|
||||
tproxyRequest := func() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var writer buf.Writer
|
||||
if network == net.Network_TCP {
|
||||
writer = buf.NewWriter(conn)
|
||||
@@ -180,7 +177,12 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn st
|
||||
return err
|
||||
}
|
||||
writer = NewPacketWriter(pConn, &dest, mark, back)
|
||||
defer writer.(*PacketWriter).Close()
|
||||
defer func() {
|
||||
runtime.Gosched()
|
||||
common.Interrupt(link.Reader) // maybe duplicated
|
||||
runtime.Gosched()
|
||||
writer.(*PacketWriter).Close() // close fake UDP conns
|
||||
}()
|
||||
/*
|
||||
sockopt := &internet.SocketConfig{
|
||||
Tproxy: internet.SocketConfig_TProxy,
|
||||
@@ -219,17 +221,24 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn st
|
||||
responseDone := func() error {
|
||||
defer timer.SetTimeout(plcy.Timeouts.UplinkOnly)
|
||||
|
||||
if network == net.Network_UDP && destinationOverridden {
|
||||
buf.Copy(link.Reader, writer) // respect upload's timeout
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := buf.Copy(link.Reader, writer, buf.UpdateActivity(timer)); err != nil {
|
||||
return errors.New("failed to transport response").Base(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := task.Run(ctx, task.OnSuccess(func() error {
|
||||
return task.Run(ctx, requestDone, tproxyRequest)
|
||||
}, task.Close(link.Writer)), responseDone); err != nil {
|
||||
common.Interrupt(link.Reader)
|
||||
if err := task.Run(ctx,
|
||||
task.OnSuccess(func() error { return task.Run(ctx, requestDone) }, task.Close(link.Writer)),
|
||||
responseDone); err != nil {
|
||||
runtime.Gosched()
|
||||
common.Interrupt(link.Writer)
|
||||
runtime.Gosched()
|
||||
common.Interrupt(link.Reader)
|
||||
return errors.New("connection ends").Base(err)
|
||||
}
|
||||
|
||||
|
@@ -17,16 +17,12 @@ func (c *BrowserDialerClient) IsClosed() bool {
|
||||
panic("not implemented yet")
|
||||
}
|
||||
|
||||
func (c *BrowserDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
|
||||
panic("not implemented yet")
|
||||
}
|
||||
func (c *BrowserDialerClient) OpenStream(ctx context.Context, url string, body io.Reader, uploadOnly bool) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
|
||||
if body != nil {
|
||||
panic("not implemented yet")
|
||||
}
|
||||
|
||||
func (c *BrowserDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
|
||||
panic("not implemented yet")
|
||||
}
|
||||
|
||||
func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
|
||||
conn, err := browser_dialer.DialGet(baseURL)
|
||||
conn, err := browser_dialer.DialGet(url)
|
||||
dummyAddr := &gonet.IPAddr{}
|
||||
if err != nil {
|
||||
return nil, dummyAddr, dummyAddr, err
|
||||
@@ -35,8 +31,8 @@ func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string)
|
||||
return websocket.NewConnection(conn, dummyAddr, nil, 0), conn.RemoteAddr(), conn.LocalAddr(), nil
|
||||
}
|
||||
|
||||
func (c *BrowserDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
|
||||
bytes, err := io.ReadAll(payload)
|
||||
func (c *BrowserDialerClient) PostPacket(ctx context.Context, url string, body io.Reader, contentLength int64) error {
|
||||
bytes, err := io.ReadAll(body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -20,21 +20,11 @@ import (
|
||||
type DialerClient interface {
|
||||
IsClosed() bool
|
||||
|
||||
// (ctx, baseURL, payload) -> err
|
||||
// baseURL already contains sessionId and seq
|
||||
SendUploadRequest(context.Context, string, io.ReadWriteCloser, int64) error
|
||||
// ctx, url, body, uploadOnly
|
||||
OpenStream(context.Context, string, io.Reader, bool) (io.ReadCloser, net.Addr, net.Addr, error)
|
||||
|
||||
// (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr)
|
||||
// baseURL already contains sessionId
|
||||
OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error)
|
||||
|
||||
// (ctx, baseURL) -> uploadWriter
|
||||
// baseURL already contains sessionId
|
||||
OpenUpload(context.Context, string) io.WriteCloser
|
||||
|
||||
// (ctx, pureURL) -> (uploadWriter, downloadReader)
|
||||
// pureURL can not contain sessionId
|
||||
Open(context.Context, string) (io.WriteCloser, io.ReadCloser)
|
||||
// ctx, url, body, contentLength
|
||||
PostPacket(context.Context, string, io.Reader, int64) error
|
||||
}
|
||||
|
||||
// implements splithttp.DialerClient in terms of direct network connections
|
||||
@@ -52,136 +42,56 @@ func (c *DefaultDialerClient) IsClosed() bool {
|
||||
return c.closed
|
||||
}
|
||||
|
||||
func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
|
||||
reader, writer := io.Pipe()
|
||||
req, _ := http.NewRequestWithContext(ctx, "POST", pureURL, reader)
|
||||
req.Header = c.transportConfig.GetRequestHeader()
|
||||
if !c.transportConfig.NoGRPCHeader {
|
||||
req.Header.Set("Content-Type", "application/grpc")
|
||||
}
|
||||
wrc := &WaitReadCloser{Wait: make(chan struct{})}
|
||||
go func() {
|
||||
response, err := c.client.Do(req)
|
||||
if err != nil || response.StatusCode != 200 {
|
||||
if err != nil {
|
||||
errors.LogInfoInner(ctx, err, "failed to open ", pureURL)
|
||||
} else {
|
||||
// c.closed = true
|
||||
response.Body.Close()
|
||||
errors.LogInfo(ctx, "unexpected status ", response.StatusCode)
|
||||
}
|
||||
wrc.Close()
|
||||
return
|
||||
}
|
||||
wrc.Set(response.Body)
|
||||
}()
|
||||
return writer, wrc
|
||||
}
|
||||
|
||||
func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
|
||||
reader, writer := io.Pipe()
|
||||
req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader)
|
||||
req.Header = c.transportConfig.GetRequestHeader()
|
||||
if !c.transportConfig.NoGRPCHeader {
|
||||
req.Header.Set("Content-Type", "application/grpc")
|
||||
}
|
||||
go func() {
|
||||
if resp, err := c.client.Do(req); err == nil {
|
||||
if resp.StatusCode != 200 {
|
||||
// c.closed = true
|
||||
}
|
||||
resp.Body.Close()
|
||||
}
|
||||
}()
|
||||
return writer
|
||||
}
|
||||
|
||||
func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
|
||||
var remoteAddr gonet.Addr
|
||||
var localAddr gonet.Addr
|
||||
func (c *DefaultDialerClient) OpenStream(ctx context.Context, url string, body io.Reader, uploadOnly bool) (wrc io.ReadCloser, remoteAddr, localAddr gonet.Addr, err error) {
|
||||
// this is done when the TCP/UDP connection to the server was established,
|
||||
// and we can unblock the Dial function and print correct net addresses in
|
||||
// logs
|
||||
gotConn := done.New()
|
||||
ctx = httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
|
||||
GotConn: func(connInfo httptrace.GotConnInfo) {
|
||||
remoteAddr = connInfo.Conn.RemoteAddr()
|
||||
localAddr = connInfo.Conn.LocalAddr()
|
||||
gotConn.Close()
|
||||
},
|
||||
})
|
||||
|
||||
var downResponse io.ReadCloser
|
||||
gotDownResponse := done.New()
|
||||
|
||||
ctx, ctxCancel := context.WithCancel(ctx)
|
||||
method := "GET"
|
||||
if body != nil {
|
||||
method = "POST"
|
||||
}
|
||||
req, _ := http.NewRequestWithContext(ctx, method, url, body)
|
||||
req.Header = c.transportConfig.GetRequestHeader()
|
||||
if method == "POST" && !c.transportConfig.NoGRPCHeader {
|
||||
req.Header.Set("Content-Type", "application/grpc")
|
||||
}
|
||||
|
||||
wrc = &WaitReadCloser{Wait: make(chan struct{})}
|
||||
go func() {
|
||||
trace := &httptrace.ClientTrace{
|
||||
GotConn: func(connInfo httptrace.GotConnInfo) {
|
||||
remoteAddr = connInfo.Conn.RemoteAddr()
|
||||
localAddr = connInfo.Conn.LocalAddr()
|
||||
gotConn.Close()
|
||||
},
|
||||
}
|
||||
|
||||
// in case we hit an error, we want to unblock this part
|
||||
defer gotConn.Close()
|
||||
|
||||
ctx = httptrace.WithClientTrace(ctx, trace)
|
||||
|
||||
req, err := http.NewRequestWithContext(
|
||||
ctx,
|
||||
"GET",
|
||||
baseURL,
|
||||
nil,
|
||||
)
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
errors.LogInfoInner(ctx, err, "failed to construct download http request")
|
||||
gotDownResponse.Close()
|
||||
errors.LogInfoInner(ctx, err, "failed to "+method+" "+url)
|
||||
gotConn.Close()
|
||||
wrc.Close()
|
||||
return
|
||||
}
|
||||
|
||||
req.Header = c.transportConfig.GetRequestHeader()
|
||||
|
||||
response, err := c.client.Do(req)
|
||||
gotConn.Close()
|
||||
if err != nil {
|
||||
errors.LogInfoInner(ctx, err, "failed to send download http request")
|
||||
gotDownResponse.Close()
|
||||
return
|
||||
}
|
||||
|
||||
if response.StatusCode != 200 {
|
||||
if resp.StatusCode != 200 && !uploadOnly {
|
||||
// c.closed = true
|
||||
response.Body.Close()
|
||||
errors.LogInfo(ctx, "invalid status code on download:", response.Status)
|
||||
gotDownResponse.Close()
|
||||
errors.LogInfo(ctx, "unexpected status ", resp.StatusCode)
|
||||
}
|
||||
if resp.StatusCode != 200 || uploadOnly {
|
||||
resp.Body.Close()
|
||||
wrc.Close()
|
||||
return
|
||||
}
|
||||
|
||||
downResponse = response.Body
|
||||
gotDownResponse.Close()
|
||||
wrc.(*WaitReadCloser).Set(resp.Body)
|
||||
}()
|
||||
|
||||
<-gotConn.Wait()
|
||||
|
||||
lazyDownload := &LazyReader{
|
||||
CreateReader: func() (io.Reader, error) {
|
||||
<-gotDownResponse.Wait()
|
||||
if downResponse == nil {
|
||||
return nil, errors.New("downResponse failed")
|
||||
}
|
||||
return downResponse, nil
|
||||
},
|
||||
}
|
||||
|
||||
// workaround for https://github.com/quic-go/quic-go/issues/2143 --
|
||||
// always cancel request context so that Close cancels any Read.
|
||||
// Should then match the behavior of http2 and http1.
|
||||
reader := downloadBody{
|
||||
lazyDownload,
|
||||
ctxCancel,
|
||||
}
|
||||
|
||||
return reader, remoteAddr, localAddr, nil
|
||||
return
|
||||
}
|
||||
|
||||
func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, payload)
|
||||
func (c *DefaultDialerClient) PostPacket(ctx context.Context, url string, body io.Reader, contentLength int64) error {
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -257,16 +167,6 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string,
|
||||
return nil
|
||||
}
|
||||
|
||||
type downloadBody struct {
|
||||
io.Reader
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (c downloadBody) Close() error {
|
||||
c.cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
type WaitReadCloser struct {
|
||||
Wait chan struct{}
|
||||
io.ReadCloser
|
||||
|
@@ -13,8 +13,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/quic-go/quic-go"
|
||||
"github.com/quic-go/quic-go/http3"
|
||||
"github.com/xtls/quic-go"
|
||||
"github.com/xtls/quic-go/http3"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/buf"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
@@ -343,44 +343,19 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
|
||||
errors.LogInfo(ctx, fmt.Sprintf("XHTTP is downloading from %s, mode %s, HTTP version %s, host %s", dest2, "stream-down", httpVersion2, requestURL2.Host))
|
||||
}
|
||||
|
||||
var writer io.WriteCloser
|
||||
var reader io.ReadCloser
|
||||
var remoteAddr, localAddr net.Addr
|
||||
var err error
|
||||
|
||||
if mode == "stream-one" {
|
||||
requestURL.Path = transportConfiguration.GetNormalizedPath()
|
||||
if xmuxClient != nil {
|
||||
xmuxClient.LeftRequests.Add(-1)
|
||||
}
|
||||
writer, reader = httpClient.Open(context.WithoutCancel(ctx), requestURL.String())
|
||||
remoteAddr = &net.TCPAddr{}
|
||||
localAddr = &net.TCPAddr{}
|
||||
} else {
|
||||
if xmuxClient2 != nil {
|
||||
xmuxClient2.LeftRequests.Add(-1)
|
||||
}
|
||||
reader, remoteAddr, localAddr, err = httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if xmuxClient != nil {
|
||||
xmuxClient.OpenUsage.Add(1)
|
||||
}
|
||||
if xmuxClient2 != nil && xmuxClient2 != xmuxClient {
|
||||
xmuxClient2.OpenUsage.Add(1)
|
||||
}
|
||||
var once atomic.Int32
|
||||
var closed atomic.Int32
|
||||
|
||||
reader, writer := io.Pipe()
|
||||
conn := splitConn{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
remoteAddr: remoteAddr,
|
||||
localAddr: localAddr,
|
||||
writer: writer,
|
||||
onClose: func() {
|
||||
if once.Add(-1) < 0 {
|
||||
if closed.Add(1) > 1 {
|
||||
return
|
||||
}
|
||||
if xmuxClient != nil {
|
||||
@@ -393,16 +368,27 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
|
||||
}
|
||||
|
||||
if mode == "stream-one" {
|
||||
requestURL.Path = transportConfiguration.GetNormalizedPath()
|
||||
if xmuxClient != nil {
|
||||
xmuxClient.LeftRequests.Add(-1)
|
||||
}
|
||||
conn.reader, conn.remoteAddr, conn.localAddr, _ = httpClient.OpenStream(context.WithoutCancel(ctx), requestURL.String(), reader, false)
|
||||
return stat.Connection(&conn), nil
|
||||
} else { // stream-down
|
||||
var err error
|
||||
if xmuxClient2 != nil {
|
||||
xmuxClient2.LeftRequests.Add(-1)
|
||||
}
|
||||
conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient2.OpenStream(context.WithoutCancel(ctx), requestURL2.String(), nil, false)
|
||||
if err != nil { // browser dialer only
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if mode == "stream-up" {
|
||||
if xmuxClient != nil {
|
||||
xmuxClient.LeftRequests.Add(-1)
|
||||
}
|
||||
conn.writer = httpClient.OpenUpload(ctx, requestURL.String())
|
||||
httpClient.OpenStream(ctx, requestURL.String(), reader, true)
|
||||
return stat.Connection(&conn), nil
|
||||
}
|
||||
|
||||
@@ -466,7 +452,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := httpClient.SendUploadRequest(
|
||||
err := httpClient.PostPacket(
|
||||
context.WithoutCancel(ctx),
|
||||
url.String(),
|
||||
&buf.MultiBufferContainer{MultiBuffer: chunk},
|
||||
|
@@ -11,8 +11,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/quic-go/quic-go"
|
||||
"github.com/quic-go/quic-go/http3"
|
||||
"github.com/xtls/quic-go"
|
||||
"github.com/xtls/quic-go/http3"
|
||||
goreality "github.com/xtls/reality"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
|
@@ -1,47 +0,0 @@
|
||||
package splithttp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Close is intentionally not supported by LazyReader because it's not clear
|
||||
// how CreateReader should be aborted in case of Close. It's best to wrap
|
||||
// LazyReader in another struct that handles Close correctly, or better, stop
|
||||
// using LazyReader entirely.
|
||||
type LazyReader struct {
|
||||
readerSync sync.Mutex
|
||||
CreateReader func() (io.Reader, error)
|
||||
reader io.Reader
|
||||
readerError error
|
||||
}
|
||||
|
||||
func (r *LazyReader) getReader() (io.Reader, error) {
|
||||
r.readerSync.Lock()
|
||||
defer r.readerSync.Unlock()
|
||||
if r.reader != nil {
|
||||
return r.reader, nil
|
||||
}
|
||||
|
||||
if r.readerError != nil {
|
||||
return nil, r.readerError
|
||||
}
|
||||
|
||||
reader, err := r.CreateReader()
|
||||
if err != nil {
|
||||
r.readerError = err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.reader = reader
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
func (r *LazyReader) Read(b []byte) (int, error) {
|
||||
reader, err := r.getReader()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err := reader.Read(b)
|
||||
return n, err
|
||||
}
|
Reference in New Issue
Block a user