Compare commits

..

2 Commits

Author SHA1 Message Date
patterniha
c56e1650a1 remove unused 2025-08-21 12:57:52 +03:30
patterniha
6be9c4a429 remove UDP cache 2025-08-21 11:39:15 +03:30
9 changed files with 48 additions and 107 deletions

View File

@@ -90,9 +90,7 @@ func (s *ClassicNameServer) RequestsCleanup() error {
// HandleResponse handles udp response packet from remote DNS server.
func (s *ClassicNameServer) HandleResponse(ctx context.Context, packet *udp_proto.Packet) {
payload := packet.Payload
ipRec, err := parseResponse(payload.Bytes())
payload.Release()
ipRec, err := parseResponse(packet.Payload.Bytes())
if err != nil {
errors.LogError(ctx, s.Name(), " fail to parse responded DNS udp")
return

View File

@@ -382,7 +382,7 @@ func (w *udpWorker) clean() error {
}
for addr, conn := range w.activeConn {
if nowSec-atomic.LoadInt64(&conn.lastActivityTime) > 60 {
if nowSec-atomic.LoadInt64(&conn.lastActivityTime) > 2*60 {
if !conn.inactive {
conn.setInactive()
delete(w.activeConn, addr)
@@ -409,7 +409,7 @@ func (w *udpWorker) Start() error {
w.cone = w.ctx.Value("cone").(bool)
w.checker = &task.Periodic{
Interval: 30 * time.Second,
Interval: time.Minute,
Execute: w.clean,
}

2
go.mod
View File

@@ -27,7 +27,7 @@ require (
golang.org/x/sys v0.35.0
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173
google.golang.org/grpc v1.75.0
google.golang.org/protobuf v1.36.8
google.golang.org/protobuf v1.36.7
gvisor.dev/gvisor v0.0.0-20250428193742-2d800c3129d5
h12.io/socks v1.0.3
lukechampine.com/blake3 v1.4.1

4
go.sum
View File

@@ -145,8 +145,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4=
google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A=
google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -18,7 +18,6 @@ import (
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/common/utils"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/stats"
@@ -190,7 +189,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
writer = buf.NewWriter(conn)
}
} else {
writer = NewPacketWriter(conn, h, UDPOverride, destination)
writer = NewPacketWriter(conn, UDPOverride, destination)
if h.config.Noises != nil {
errors.LogDebug(ctx, "NOISE", h.config.Noises)
writer = &NoisePacketWriter{
@@ -262,7 +261,7 @@ func isTLSConn(conn stat.Connection) bool {
return false
}
func NewPacketReader(conn net.Conn, UDPOverride net.Destination, DialDest net.Destination) buf.Reader {
func NewPacketReader(conn net.Conn, UDPOverride net.Destination, destination net.Destination) buf.Reader {
iConn := conn
statConn, ok := iConn.(*stat.CounterConnection)
if ok {
@@ -282,7 +281,7 @@ func NewPacketReader(conn net.Conn, UDPOverride net.Destination, DialDest net.De
PacketConnWrapper: c,
Counter: counter,
IsOverridden: isOverridden,
InitUnchangedAddr: DialDest.Address,
InitUnchangedAddr: destination.Address,
InitChangedAddr: net.DestinationFromAddr(conn.RemoteAddr()).Address,
}
}
@@ -326,7 +325,7 @@ func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
}
// DialDest means the dial target used in the dialer when creating conn
func NewPacketWriter(conn net.Conn, h *Handler, UDPOverride net.Destination, DialDest net.Destination) buf.Writer {
func NewPacketWriter(conn net.Conn, UDPOverride net.Destination, destination net.Destination) buf.Writer {
iConn := conn
statConn, ok := iConn.(*stat.CounterConnection)
if ok {
@@ -337,19 +336,12 @@ func NewPacketWriter(conn net.Conn, h *Handler, UDPOverride net.Destination, Dia
counter = statConn.WriteCounter
}
if c, ok := iConn.(*internet.PacketConnWrapper); ok {
// If DialDest is a domain, it will be resolved in dialer
// check this behavior and add it to map
resolvedUDPAddr := utils.NewTypedSyncMap[string, net.Address]()
if DialDest.Address.Family().IsDomain() {
resolvedUDPAddr.Store(DialDest.Address.Domain(), net.DestinationFromAddr(conn.RemoteAddr()).Address)
}
return &PacketWriter{
PacketConnWrapper: c,
Counter: counter,
Handler: h,
UDPOverride: UDPOverride,
ResolvedUDPAddr: resolvedUDPAddr,
LocalAddr: net.DestinationFromAddr(conn.LocalAddr()).Address,
InitUnchangedAddr: destination.Address,
InitChangedAddr: net.DestinationFromAddr(conn.RemoteAddr()).Address,
}
}
@@ -359,15 +351,10 @@ func NewPacketWriter(conn net.Conn, h *Handler, UDPOverride net.Destination, Dia
type PacketWriter struct {
*internet.PacketConnWrapper
stats.Counter
*Handler
UDPOverride net.Destination
// Dest of udp packets might be a domain, we will resolve them to IP
// But resolver will return a random one if the domain has many IPs
// Resulting in these packets being sent to many different IPs randomly
// So, cache and keep the resolve result
ResolvedUDPAddr *utils.TypedSyncMap[string, net.Address]
LocalAddr net.Address
InitUnchangedAddr net.Address
InitChangedAddr net.Address
}
func (w *PacketWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
@@ -386,44 +373,15 @@ func (w *PacketWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
if w.UDPOverride.Port != 0 {
b.UDP.Port = w.UDPOverride.Port
}
if b.UDP.Address == w.InitUnchangedAddr {
b.UDP.Address = w.InitChangedAddr
}
if b.UDP.Address.Family().IsDomain() {
if ip, ok := w.ResolvedUDPAddr.Load(b.UDP.Address.Domain()); ok {
b.UDP.Address = ip
} else {
ShouldUseSystemResolver := true
if w.Handler.config.DomainStrategy.HasStrategy() {
ips, err := internet.LookupForIP(b.UDP.Address.Domain(), w.Handler.config.DomainStrategy, w.LocalAddr)
if err != nil {
// drop packet if resolve failed when forceIP
if w.Handler.config.DomainStrategy.ForceIP() {
b.Release()
continue
}
} else {
ip = net.IPAddress(ips[dice.Roll(len(ips))])
ShouldUseSystemResolver = false
}
}
if ShouldUseSystemResolver {
udpAddr, err := net.ResolveUDPAddr("udp", b.UDP.NetAddr())
if err != nil {
b.Release()
continue
} else {
ip = net.IPAddress(udpAddr.IP)
}
}
if ip != nil {
b.UDP.Address, _ = w.ResolvedUDPAddr.LoadOrStore(b.UDP.Address.Domain(), ip)
}
}
}
destAddr := b.UDP.RawNetAddr()
if destAddr == nil {
b.Release()
continue
buf.ReleaseMulti(mb)
return errors.New("multiple domains cone is not supported")
}
n, err = w.PacketConnWrapper.WriteTo(b.Bytes(), destAddr)
n, err = w.PacketConnWrapper.WriteTo(b.Bytes(), b.UDP.RawNetAddr())
} else {
n, err = w.PacketConnWrapper.Write(b.Bytes())
}

View File

@@ -104,12 +104,12 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Con
func (s *Server) handleUDPPayload(ctx context.Context, conn stat.Connection, dispatcher routing.Dispatcher) error {
udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
request := protocol.RequestHeaderFromContext(ctx)
payload := packet.Payload
if request == nil {
payload.Release()
return
}
payload := packet.Payload
if payload.UDP != nil {
request = &protocol.RequestHeader{
User: request.User,
@@ -124,9 +124,9 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn stat.Connection, dis
errors.LogWarningInner(ctx, err, "failed to encode UDP packet")
return
}
defer data.Release()
conn.Write(data.Bytes())
data.Release()
})
defer udpServer.RemoveRay()

View File

@@ -231,7 +231,6 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn stat.Connection, dis
request := protocol.RequestHeaderFromContext(ctx)
if request == nil {
payload.Release()
return
}
@@ -250,9 +249,9 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn stat.Connection, dis
errors.LogWarningInner(ctx, err, "failed to write UDP response")
return
}
defer udpMessage.Release()
conn.Write(udpMessage.Bytes())
udpMessage.Release()
})
defer udpServer.RemoveRay()

View File

@@ -113,11 +113,9 @@ func (w *PacketWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
target = b.UDP
}
if _, err := w.writePacket(b.Bytes(), *target); err != nil {
b.Release()
buf.ReleaseMulti(mb)
return err
}
b.Release()
}
return nil
}

View File

@@ -22,24 +22,8 @@ type ResponseCallback func(ctx context.Context, packet *udp.Packet)
type connEntry struct {
link *transport.Link
timer *signal.ActivityTimer
timer signal.ActivityUpdater
cancel context.CancelFunc
closed bool
}
func (c *connEntry) Close() error {
c.timer.SetTimeout(0)
return nil
}
func (c *connEntry) terminate() {
if c.closed {
panic("terminate called more than once")
}
c.closed = true
c.cancel()
common.Interrupt(c.link.Reader)
common.Interrupt(c.link.Writer)
}
type Dispatcher struct {
@@ -48,7 +32,6 @@ type Dispatcher struct {
dispatcher routing.Dispatcher
callback ResponseCallback
callClose func() error
closed bool
}
func NewDispatcher(dispatcher routing.Dispatcher, callback ResponseCallback) *Dispatcher {
@@ -61,9 +44,13 @@ func NewDispatcher(dispatcher routing.Dispatcher, callback ResponseCallback) *Di
func (v *Dispatcher) RemoveRay() {
v.Lock()
defer v.Unlock()
v.closed = true
v.removeRay()
}
func (v *Dispatcher) removeRay() {
if v.conn != nil {
v.conn.Close()
common.Interrupt(v.conn.link.Reader)
common.Close(v.conn.link.Writer)
v.conn = nil
}
}
@@ -72,34 +59,35 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) (*
v.Lock()
defer v.Unlock()
if v.closed {
return nil, errors.New("dispatcher is closed")
}
if v.conn != nil {
if v.conn.closed {
v.conn = nil
} else {
return v.conn, nil
}
return v.conn, nil
}
errors.LogInfo(ctx, "establishing new connection for ", dest)
ctx, cancel := context.WithCancel(ctx)
entry := &connEntry{}
removeRay := func() {
v.Lock()
defer v.Unlock()
// sometimes the entry is already removed by others, don't close again
if entry == v.conn {
cancel()
v.removeRay()
}
}
timer := signal.CancelAfterInactivity(ctx, removeRay, time.Minute)
link, err := v.dispatcher.Dispatch(ctx, dest)
if err != nil {
cancel()
return nil, errors.New("failed to dispatch request to ", dest).Base(err)
}
entry := &connEntry{
*entry = connEntry{
link: link,
cancel: cancel,
timer: timer,
cancel: removeRay,
}
entry.timer = signal.CancelAfterInactivity(ctx, entry.terminate, 30*time.Second) // The UDP timeout is set to 30 seconds in most NAT configurations
v.conn = entry
go handleInput(ctx, entry, dest, v.callback, v.callClose)
return entry, nil
@@ -118,7 +106,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination,
if outputStream != nil {
if err := outputStream.WriteMultiBuffer(buf.MultiBuffer{payload}); err != nil {
errors.LogInfoInner(ctx, err, "failed to write first UDP payload")
conn.Close()
conn.cancel()
return
}
}
@@ -126,7 +114,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination,
func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, callback ResponseCallback, callClose func() error) {
defer func() {
conn.Close()
conn.cancel()
if callClose != nil {
callClose()
}