| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627 |
- from __future__ import annotations
- import asyncio
- import contextlib
- import itertools
- import json
- import socket
- import struct
- from collections import deque
- from dataclasses import dataclass, field
- from typing import Dict
- from .config_udp import UdpConfig, UdpRelayNode
- from .logging_utils import log_print as print
- from .scheduler_udp import UdpScheduler
- from .protocol import AUTH, STATUS_ERR, STATUS_OK, UDP_RECV, UDP_SEND, Frame, read_frame, write_frame, encode_json
- SOCKS_VERSION = 5
- UDP_WARMUP_BROADCAST_PACKETS = 6
- UDP_SHADOW_PROBE_INTERVAL_SEC = 0.25
- UDP_FAST_FAILOVER_MISSES = 3
- UDP_FLOW_IDLE_CLEANUP_SEC = 30.0
- UDP_PACKET_CLIENT_MAP_LIMIT = 4096
- UDP_DIRECT_PENDING_LIMIT = 128
- UDP_SOCKET_BUFFER_BYTES = 1 << 20
- UDP_WARMUP_WINDOW_PACKETS = 8
- UDP_STABLE_WINNER_SWITCH_MISSES = 4
- UDP_WINNER_SWITCH_GRACE_SEC = 1.0
- UDP_WINNER_STALE_SEC = 1.5
- async def read_exact(reader: asyncio.StreamReader, size: int) -> bytes:
- return await reader.readexactly(size)
- @dataclass(eq=False)
- class RelayLink:
- node: UdpRelayNode
- reader: asyncio.StreamReader
- writer: asyncio.StreamWriter
- pump: asyncio.Task | None = None
- closed_event: asyncio.Event = field(default_factory=asyncio.Event)
- maintain_task: asyncio.Task | None = None
- udp_server: "UdpAssociateServer | None" = None
- closed: bool = False
- supports_udp: bool = True
- async def start(self) -> None:
- await write_frame(self.writer, Frame(AUTH, 0, 0, 0, 0, encode_json({"token": self.node.token})))
- frame = await read_frame(self.reader)
- if frame.kind != AUTH or frame.packet_id != STATUS_OK:
- raise ConnectionError(f"relay auth failed: {self.node.name}")
- ack = {}
- if frame.payload:
- try:
- ack = json.loads(frame.payload.decode("utf-8"))
- except Exception:
- ack = {}
- self.supports_udp = True
- self.closed = False
- self.closed_event.clear()
- self.pump = asyncio.create_task(self._pump())
- if ack:
- print(f"[edge] relay connected name={self.node.name} addr={self.node.host}:{self.node.port} udp_only={ack.get('udp_only', True)}")
- async def _pump(self) -> None:
- try:
- while True:
- try:
- frame = await read_frame(self.reader)
- except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError, OSError):
- break
- if frame.kind == UDP_RECV and self.udp_server:
- await self.udp_server.handle_from_relay(frame, self)
- finally:
- await self.close()
- async def send(self, frame: Frame) -> None:
- if self.closed:
- raise ConnectionError(f"relay closed: {self.node.name}")
- try:
- await write_frame(self.writer, frame)
- except (BrokenPipeError, ConnectionResetError, RuntimeError, OSError, asyncio.CancelledError) as exc:
- await self.close()
- raise ConnectionError(f"relay closed: {self.node.name}") from exc
- async def close(self) -> None:
- if self.closed:
- return
- self.closed = True
- self.closed_event.set()
- if self.pump and self.pump is not asyncio.current_task():
- self.pump.cancel()
- with contextlib.suppress(Exception):
- await self.pump
- self.writer.close()
- with contextlib.suppress(Exception):
- await self.writer.wait_closed()
- @dataclass
- class UdpFlowState:
- flow_id: int
- client_addr: tuple[str, int]
- target_host: str
- target_port: int
- created_at: float
- last_activity: float
- packets_sent: int = 0
- packets_received: int = 0
- duplicate_responses: int = 0
- winner_name: str | None = None
- candidate_names: tuple[str, ...] = ()
- link_streams: dict[str, int] = field(default_factory=dict)
- initialized_links: set[str] = field(default_factory=set)
- direct_sockets: dict[str, socket.socket] = field(default_factory=dict)
- direct_tasks: dict[str, asyncio.Task] = field(default_factory=dict)
- direct_failures: set[str] = field(default_factory=set)
- relay_failures: dict[str, int] = field(default_factory=dict)
- relay_error_seen: set[str] = field(default_factory=set)
- path_last_seen: dict[str, float] = field(default_factory=dict)
- packet_client_addrs: dict[int, tuple[str, int]] = field(default_factory=dict)
- direct_pending_clients: dict[str, deque[tuple[int, tuple[str, int]]]] = field(default_factory=dict)
- last_probe_at: float = 0.0
- winner_miss_streak: int = 0
- winner_stable_since: float = 0.0
- target_family: int = 0
- last_cleanup_at: float = 0.0
- def touch(self, now: float) -> None:
- self.last_activity = now
- class UdpAssociateServer(asyncio.DatagramProtocol):
- def __init__(self, edge: "UdpEdge") -> None:
- self.edge = edge
- self.transport: asyncio.DatagramTransport | None = None
- self.client_addr = None
- self.associate_peer = None
- self.packet_counter = itertools.count(1)
- self.last_packet_id = 0
- self.client_flows: dict[tuple[tuple[str, int], str, int], UdpFlowState] = {}
- self.flow_counter = itertools.count(1)
- self.last_summary_at = 0.0
- self.win_counts: Dict[str, int] = {}
- self._last_client_port_log_at = 0.0
- self._last_flow_cleanup_at = 0.0
- def connection_made(self, transport) -> None:
- self.transport = transport
- def register_associate(self, peer) -> None:
- peer_text = f"{peer[0]}:{peer[1]}" if isinstance(peer, tuple) and len(peer) >= 2 else str(peer)
- if self.associate_peer != peer_text:
- print(f"[edge] udp associate peer={peer_text}")
- self.associate_peer = peer_text
- def _client_flow_key(self, addr, host: str, port: int) -> tuple[tuple[str, int], str, int]:
- return ((addr[0], 0), host, port)
- def datagram_received(self, data: bytes, addr) -> None:
- if len(data) < 10:
- return
- if self.client_addr is None:
- self.client_addr = addr
- print(f"[edge] udp client bound addr={addr[0]}:{addr[1]}")
- elif addr != self.client_addr:
- if addr[0] == self.client_addr[0]:
- now = asyncio.get_running_loop().time()
- if now - self._last_client_port_log_at >= 30:
- self._last_client_port_log_at = now
- print(f"[edge] udp client port update host={addr[0]} old_port={self.client_addr[1]} new_port={addr[1]}")
- self.client_addr = addr
- else:
- print(f"[edge] udp client rebound old={self.client_addr[0]}:{self.client_addr[1]} new={addr[0]}:{addr[1]}")
- self._reset_client_state(addr)
- host, port, payload = self._parse_socks_udp(data)
- now = asyncio.get_running_loop().time()
- flow_key = self._client_flow_key(addr, host, port)
- flow = self.client_flows.get(flow_key)
- if flow is None:
- family = socket.AF_INET6 if ":" in host else socket.AF_INET
- flow = UdpFlowState(next(self.flow_counter), (addr[0], addr[1]), host, port, now, now, target_family=family)
- self.client_flows[flow_key] = flow
- flow.touch(now)
- flow.client_addr = (addr[0], addr[1])
- flow.packets_sent += 1
- packet_id = next(self.packet_counter)
- self.last_packet_id = packet_id
- flow.packet_client_addrs[packet_id] = (addr[0], addr[1])
- self._cleanup_packet_state(flow, now)
- asyncio.create_task(self.edge.forward_udp(flow, payload, packet_id, (addr[0], addr[1]), self))
- self._cleanup_inactive_flows(now)
- self._log_udp_summary()
- def _reset_client_state(self, addr) -> None:
- remapped: dict[tuple[tuple[str, int], str, int], UdpFlowState] = {}
- for flow in list(self.client_flows.values()):
- flow.client_addr = (addr[0], addr[1])
- remapped[self._client_flow_key(addr, flow.target_host, flow.target_port)] = flow
- self.client_flows = remapped
- self.client_addr = addr
- async def handle_from_relay(self, frame: Frame, link: RelayLink) -> None:
- if self.transport is None or self.client_addr is None:
- return
- flow = self.edge.udp_flow_sessions.get((frame.session_id, frame.stream_id))
- if flow is None:
- return
- if frame.packet_id == STATUS_ERR:
- flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1
- if link.node.name not in flow.relay_error_seen:
- flow.relay_error_seen.add(link.node.name)
- detail = frame.payload.decode("utf-8", errors="replace")
- print(f"[edge] udp relay error flow={flow.flow_id} relay={link.node.name} error={detail}")
- return
- await self._deliver_flow_packet(flow, frame.packet_id, frame.payload, link.node.name)
- async def handle_from_direct(self, flow: UdpFlowState, path_name: str, payload: bytes, packet_id: int = 0, client_addr: tuple[str, int] | None = None) -> None:
- if self.transport is None or self.client_addr is None:
- return
- await self._deliver_flow_packet(flow, packet_id, payload, path_name, client_addr)
- async def _deliver_flow_packet(self, flow: UdpFlowState, packet_id: int, payload: bytes, source_name: str, client_addr: tuple[str, int] | None = None) -> None:
- if self.transport is None or self.client_addr is None:
- return
- packet = self._build_socks_udp(flow.target_host, flow.target_port, payload)
- now = asyncio.get_running_loop().time()
- flow.touch(now)
- flow.path_last_seen[source_name] = now
- flow.packets_received += 1
- target_addr = client_addr or flow.packet_client_addrs.pop(packet_id, None) or flow.client_addr
- if flow.winner_name is None:
- flow.winner_name = source_name
- flow.winner_miss_streak = 0
- flow.winner_stable_since = now
- self.win_counts[source_name] = self.win_counts.get(source_name, 0) + 1
- self._log_udp_summary(force=True)
- elif flow.winner_name != source_name:
- flow.duplicate_responses += 1
- winner_last_seen = flow.path_last_seen.get(flow.winner_name, 0.0)
- if winner_last_seen and now - winner_last_seen >= max(
- self.edge.config.udp_failover_idle_ms / 1000,
- UDP_WINNER_SWITCH_GRACE_SEC,
- ):
- flow.winner_name = source_name
- flow.winner_miss_streak = 0
- flow.winner_stable_since = now
- self.win_counts[source_name] = self.win_counts.get(source_name, 0) + 1
- self._log_udp_summary(force=True)
- else:
- flow.winner_miss_streak = 0
- flow.winner_stable_since = flow.winner_stable_since or now
- if flow.winner_name == source_name and target_addr is not None:
- if flow.packets_received == 1:
- print(
- f"[edge] udp relay reply flow={flow.flow_id} relay={source_name} "
- f"target={flow.target_host}:{flow.target_port} bytes={len(payload)}"
- )
- self.transport.sendto(packet, target_addr)
- def _cleanup_packet_state(self, flow: UdpFlowState, now: float) -> None:
- if flow.last_cleanup_at and now - flow.last_cleanup_at < 1.0:
- return
- flow.last_cleanup_at = now
- expired_packet_ids = [
- packet_id
- for packet_id in flow.packet_client_addrs
- if packet_id <= (self.last_packet_id - UDP_PACKET_CLIENT_MAP_LIMIT)
- ]
- for packet_id in expired_packet_ids:
- flow.packet_client_addrs.pop(packet_id, None)
- for path_name, pending in list(flow.direct_pending_clients.items()):
- while len(pending) > UDP_DIRECT_PENDING_LIMIT:
- pending.popleft()
- if not pending:
- flow.direct_pending_clients.pop(path_name, None)
- def _cleanup_inactive_flows(self, now: float) -> None:
- if self._last_flow_cleanup_at and now - self._last_flow_cleanup_at < 5.0:
- return
- self._last_flow_cleanup_at = now
- expired_keys = [
- key
- for key, flow in self.client_flows.items()
- if now - flow.last_activity >= UDP_FLOW_IDLE_CLEANUP_SEC
- ]
- for key in expired_keys:
- flow = self.client_flows.pop(key, None)
- if flow is None:
- continue
- self.edge.release_udp_flow(flow)
- def set_flow_candidates(self, flow: UdpFlowState, candidate_names: tuple[str, ...]) -> None:
- if not flow.candidate_names:
- flow.candidate_names = candidate_names
- def note_unsent(self, flow: UdpFlowState, _packet_id: int) -> None:
- flow.touch(asyncio.get_running_loop().time())
- flow.relay_failures["unsent"] = flow.relay_failures.get("unsent", 0) + 1
- self._log_udp_summary(force=True)
- def _log_udp_summary(self, force: bool = False) -> None:
- now = asyncio.get_running_loop().time()
- if not force and now - self.last_summary_at < 10:
- return
- self.last_summary_at = now
- active_flows = len(self.client_flows)
- winners = sum(1 for flow in self.client_flows.values() if flow.winner_name)
- packets_sent = sum(flow.packets_sent for flow in self.client_flows.values())
- packets_received = sum(flow.packets_received for flow in self.client_flows.values())
- duplicates = sum(flow.duplicate_responses for flow in self.client_flows.values())
- direct_paths = sum(len(flow.direct_sockets) for flow in self.client_flows.values())
- relay_candidates = sum(len(flow.link_streams) for flow in self.client_flows.values())
- candidate_names: list[str] = []
- seen_candidates: set[str] = set()
- for flow in sorted(self.client_flows.values(), key=lambda item: item.flow_id):
- for name in flow.candidate_names:
- if name in seen_candidates:
- continue
- seen_candidates.add(name)
- candidate_names.append(name)
- direct_wins = sum(1 for flow in self.client_flows.values() if flow.winner_name and flow.winner_name.startswith("direct"))
- relay_wins = winners - direct_wins
- sample_flows = [f"{flow.flow_id}:{flow.winner_name or 'pending'}" for flow in sorted(self.client_flows.values(), key=lambda item: item.flow_id) if flow.winner_name][:5]
- relay_errors: list[str] = []
- for flow in self.client_flows.values():
- for name, count in flow.relay_failures.items():
- relay_errors.append(f"{name}={count}")
- bind = f"{self.client_addr[0]}:{self.client_addr[1]}" if self.client_addr else "unbound"
- print(
- f"[edge] udp summary bind={bind} flows={active_flows} winners={winners} "
- f"winner_breakdown=direct={direct_wins},relay={relay_wins} sample={', '.join(sample_flows) or 'none'} "
- f"candidates={candidate_names or ['none']} sent={packets_sent} recv={packets_received} dup={duplicates} "
- f"direct_paths={direct_paths} relay_paths={relay_candidates} relay_errors={', '.join(sorted(relay_errors)) or 'none'}"
- )
- def _parse_socks_udp(self, packet: bytes) -> tuple[str, int, bytes]:
- atyp = packet[3]
- offset = 4
- if atyp == 1:
- host = socket.inet_ntoa(packet[offset:offset + 4])
- offset += 4
- elif atyp == 3:
- size = packet[offset]
- offset += 1
- host = packet[offset:offset + size].decode()
- offset += size
- else:
- raise ValueError("unsupported udp atyp")
- port = struct.unpack("!H", packet[offset:offset + 2])[0]
- offset += 2
- return host, port, packet[offset:]
- def _build_socks_udp(self, host: str, port: int, payload: bytes) -> bytes:
- try:
- addr = socket.inet_aton(host)
- header = b"\x00\x00\x00\x01" + addr + struct.pack("!H", port)
- except OSError:
- raw = host.encode()
- header = b"\x00\x00\x00\x03" + bytes([len(raw)]) + raw + struct.pack("!H", port)
- return header + payload
- class UdpEdge:
- def __init__(self, listen_host: str, listen_port: int, config: UdpConfig) -> None:
- self.listen_host = listen_host
- self.listen_port = listen_port
- self.config = config
- self.scheduler = UdpScheduler(config)
- self.links: list[RelayLink] = []
- self.udp_stream_ids = itertools.count(1)
- self.udp_flow_sessions: dict[tuple[int, int], UdpFlowState] = {}
- self.udp_server: UdpAssociateServer | None = None
- def _udp_direct_copies(self) -> int:
- if self.config.udp_direct_copies is not None:
- return max(1, self.config.udp_direct_copies)
- return max(1, self.config.udp_redundancy + 1)
- def _udp_relay_copies(self) -> int:
- if self.config.udp_relay_copies is not None:
- return max(1, self.config.udp_relay_copies)
- return max(1, self.config.udp_redundancy + 1)
- def release_udp_flow(self, flow: UdpFlowState) -> None:
- for stream_id in list(flow.link_streams.values()):
- self.udp_flow_sessions.pop((flow.flow_id, stream_id), None)
- flow.link_streams.clear()
- flow.initialized_links.clear()
- flow.packet_client_addrs.clear()
- for task in list(flow.direct_tasks.values()):
- task.cancel()
- flow.direct_tasks.clear()
- for sock in list(flow.direct_sockets.values()):
- with contextlib.suppress(Exception):
- sock.close()
- flow.direct_sockets.clear()
- flow.direct_pending_clients.clear()
- async def start(self) -> None:
- await self.scheduler.start()
- await self._connect_relays()
- server = await asyncio.start_server(self._accept, self.listen_host, self.listen_port)
- sockets = ", ".join(str(sock.getsockname()) for sock in server.sockets or [])
- relay_mode = "direct-only" if not self.config.relays else "direct+relay"
- print(f"[edge] socks5 listening on {sockets} relay_mode={relay_mode}")
- async with server:
- await server.serve_forever()
- async def _connect_relays(self) -> None:
- loop = asyncio.get_running_loop()
- transport, protocol = await loop.create_datagram_endpoint(lambda: UdpAssociateServer(self), local_addr=(self.listen_host, 0))
- self.udp_server = protocol
- self.udp_transport = transport
- for node in self.config.relays:
- link = RelayLink(node=node, reader=None, writer=None) # type: ignore[arg-type]
- link.udp_server = protocol
- self.links.append(link)
- link.maintain_task = asyncio.create_task(self._maintain_link(link))
- async def _maintain_link(self, link: RelayLink) -> None:
- backoff = 1.0
- while True:
- try:
- reader, writer = await asyncio.open_connection(link.node.host, link.node.port)
- sock = writer.get_extra_info("socket")
- if sock is not None:
- with contextlib.suppress(OSError):
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- link.reader = reader
- link.writer = writer
- await link.start()
- backoff = 1.0
- await link.closed_event.wait()
- except asyncio.CancelledError:
- raise
- except Exception:
- await asyncio.sleep(backoff)
- backoff = min(10.0, backoff * 2)
- async def _accept(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
- try:
- peer = writer.get_extra_info("peername")
- _host, _port, udp_mode = await self._handshake(reader, writer, peer)
- if udp_mode:
- return
- except Exception:
- writer.close()
- with contextlib.suppress(Exception):
- await writer.wait_closed()
- def _selected_udp_links(self) -> list[RelayLink]:
- online = [link for link in self.links if not link.closed and link.writer is not None and link.supports_udp]
- if not online:
- return []
- return sorted(online, key=lambda link: self.scheduler.scores.get(link.node.name).score if link.node.name in self.scheduler.scores else 999999.0)
- def _udp_direct_redundancy_for_target(self, target_host: str) -> int:
- base = self.config.udp_direct_redundancy
- if ":" in target_host and self.config.udp_direct_redundancy_v6 is not None:
- base = self.config.udp_direct_redundancy_v6
- elif ":" not in target_host and self.config.udp_direct_redundancy_v4 is not None:
- base = self.config.udp_direct_redundancy_v4
- return max(1, base)
- async def _ensure_udp_direct_paths(self, flow: UdpFlowState, udp_server: UdpAssociateServer) -> None:
- target_count = self._udp_direct_redundancy_for_target(flow.target_host)
- for index in range(target_count):
- name = f"direct-{index + 1}" if target_count > 1 else "direct"
- if name in flow.direct_sockets or name in flow.direct_failures:
- continue
- try:
- family = socket.AF_INET6 if ":" in flow.target_host else socket.AF_INET
- sock = socket.socket(family, socket.SOCK_DGRAM)
- sock.setblocking(False)
- with contextlib.suppress(OSError):
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, UDP_SOCKET_BUFFER_BYTES)
- with contextlib.suppress(OSError):
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, UDP_SOCKET_BUFFER_BYTES)
- await asyncio.get_running_loop().sock_connect(sock, (flow.target_host, flow.target_port))
- flow.direct_sockets[name] = sock
- flow.direct_tasks[name] = asyncio.create_task(self._pump_udp_direct(flow, name, sock, udp_server))
- except Exception as exc:
- flow.direct_failures.add(name)
- print(f"[edge] udp direct open error flow={flow.flow_id} path={name} target={flow.target_host}:{flow.target_port} error={exc!r}")
- async def _pump_udp_direct(self, flow: UdpFlowState, path_name: str, sock: socket.socket, udp_server: UdpAssociateServer) -> None:
- loop = asyncio.get_running_loop()
- try:
- while True:
- data = await loop.sock_recv(sock, 65535)
- if not data:
- break
- pending = flow.direct_pending_clients.get(path_name)
- packet_id = 0
- client_addr = flow.client_addr
- if pending:
- packet_id, client_addr = pending.popleft()
- await udp_server.handle_from_direct(flow, path_name, data, packet_id, client_addr)
- finally:
- flow.direct_tasks.pop(path_name, None)
- flow.direct_sockets.pop(path_name, None)
- with contextlib.suppress(Exception):
- sock.close()
- async def forward_udp(self, flow: UdpFlowState, payload: bytes, packet_id: int, client_addr: tuple[str, int], udp_server: UdpAssociateServer) -> None:
- await self._ensure_udp_direct_paths(flow, udp_server)
- meta = encode_json({"host": flow.target_host, "port": flow.target_port, "family": flow.target_family})
- links = self._selected_udp_links()
- direct_names = tuple(name for name in sorted(flow.direct_sockets))
- relay_names = tuple(link.node.name for link in links)
- candidate_names = direct_names + relay_names
- udp_server.set_flow_candidates(flow, candidate_names)
- if not candidate_names:
- udp_server.note_unsent(flow, packet_id)
- return
- active_direct_names = list(direct_names)
- active_links = links
- now = asyncio.get_running_loop().time()
- warmup_mode = flow.packets_sent <= UDP_WARMUP_WINDOW_PACKETS
- shadow_probe = flow.winner_name is not None and now - flow.last_probe_at >= UDP_SHADOW_PROBE_INTERVAL_SEC
- if shadow_probe:
- flow.last_probe_at = now
- broadcast_mode = self.config.udp_always_broadcast or flow.winner_name is None or warmup_mode or shadow_probe
- if not broadcast_mode:
- winner_last_seen = flow.path_last_seen.get(flow.winner_name, 0.0) if flow.winner_name else 0.0
- winner_stale = bool(
- winner_last_seen
- and now - winner_last_seen >= max(self.config.udp_failover_idle_ms / 1000, UDP_WINNER_STALE_SEC)
- )
- if winner_stale:
- flow.winner_miss_streak += 1
- else:
- flow.winner_miss_streak = 0
- if winner_stale and flow.winner_miss_streak >= UDP_STABLE_WINNER_SWITCH_MISSES:
- flow.winner_name = None
- flow.winner_miss_streak = 0
- broadcast_mode = True
- else:
- active_direct_names = [name for name in active_direct_names if name == flow.winner_name]
- active_links = [link for link in active_links if link.node.name == flow.winner_name]
- if not active_direct_names and not active_links:
- if direct_names:
- active_direct_names = [direct_names[0]]
- elif links:
- active_links = links[:1]
- direct_copies = self._udp_direct_copies()
- relay_copies = self._udp_relay_copies()
- sent_any = False
- for attempt in range(max(direct_copies, relay_copies)):
- for path_name in active_direct_names if attempt < direct_copies else ():
- sock = flow.direct_sockets.get(path_name)
- if sock is None:
- continue
- try:
- flow.direct_pending_clients.setdefault(path_name, deque()).append((packet_id, client_addr))
- await asyncio.get_running_loop().sock_sendall(sock, payload)
- sent_any = True
- except Exception as exc:
- pending = flow.direct_pending_clients.get(path_name)
- if pending:
- with contextlib.suppress(Exception):
- pending.pop()
- flow.direct_failures.add(path_name)
- flow.direct_sockets.pop(path_name, None)
- task = flow.direct_tasks.pop(path_name, None)
- if task is not None:
- task.cancel()
- with contextlib.suppress(Exception):
- sock.close()
- flow.relay_failures[path_name] = flow.relay_failures.get(path_name, 0) + 1
- if path_name not in flow.relay_error_seen:
- flow.relay_error_seen.add(path_name)
- print(f"[edge] udp relay error flow={flow.flow_id} relay={path_name} error={exc!r}")
- for link in active_links if attempt < relay_copies else ():
- stream_id = flow.link_streams.get(link.node.name)
- if stream_id is None:
- stream_id = next(self.udp_stream_ids)
- flow.link_streams[link.node.name] = stream_id
- self.udp_flow_sessions[(flow.flow_id, stream_id)] = flow
- include_meta = link.node.name not in flow.initialized_links
- body = (meta + payload) if include_meta else payload
- meta_len = len(meta) if include_meta else 0
- try:
- await link.send(Frame(UDP_SEND, flow.flow_id, stream_id, 0, meta_len, body))
- flow.initialized_links.add(link.node.name)
- sent_any = True
- except Exception as exc:
- flow.link_streams.pop(link.node.name, None)
- self.udp_flow_sessions.pop((flow.flow_id, stream_id), None)
- flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1
- if link.node.name not in flow.relay_error_seen:
- flow.relay_error_seen.add(link.node.name)
- print(f"[edge] udp relay error flow={flow.flow_id} relay={link.node.name} error={exc!r}")
- if attempt + 1 < max(direct_copies, relay_copies) and self.config.udp_copy_interval_ms > 0:
- await asyncio.sleep(self.config.udp_copy_interval_ms / 1000)
- if not sent_any:
- udp_server.note_unsent(flow, packet_id)
- async def _handshake(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, peer) -> tuple[str, int, bool]:
- version, methods_len = (await read_exact(reader, 2))
- if version != SOCKS_VERSION:
- raise ValueError("unsupported socks version")
- await read_exact(reader, methods_len)
- writer.write(b"\x05\x00")
- await writer.drain()
- version, command, _, atyp = await read_exact(reader, 4)
- if version != SOCKS_VERSION:
- raise ValueError("unsupported socks version")
- if atyp == 1:
- host = socket.inet_ntoa(await read_exact(reader, 4))
- elif atyp == 3:
- size = (await read_exact(reader, 1))[0]
- host = (await read_exact(reader, size)).decode()
- else:
- raise ValueError("unsupported atyp")
- port = struct.unpack("!H", await read_exact(reader, 2))[0]
- peer_text = f"{peer[0]}:{peer[1]}" if isinstance(peer, tuple) and len(peer) >= 2 else str(peer)
- if command == 3 and self.udp_server and self.udp_server.transport:
- bind_host, bind_port = self.udp_server.transport.get_extra_info("sockname")[:2]
- self.udp_server.register_associate(peer)
- print(f"[edge] socks handshake peer={peer_text} command=udp_associate target={host}:{port} bind={bind_host}:{bind_port}")
- writer.write(b"\x05\x00\x00\x01" + socket.inet_aton(bind_host) + struct.pack("!H", bind_port))
- await writer.drain()
- return host, port, True
- raise ValueError("unsupported socks command")
|