edge_udp.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. from __future__ import annotations
  2. import asyncio
  3. import contextlib
  4. import itertools
  5. import json
  6. import socket
  7. import struct
  8. from collections import deque
  9. from dataclasses import dataclass, field
  10. from typing import Dict
  11. from .config_udp import UdpConfig, UdpRelayNode
  12. from .logging_utils import log_print as print
  13. from .scheduler_udp import UdpScheduler
  14. from .protocol import AUTH, STATUS_ERR, STATUS_OK, UDP_RECV, UDP_SEND, Frame, read_frame, write_frame, encode_json
  15. SOCKS_VERSION = 5
  16. UDP_WARMUP_BROADCAST_PACKETS = 6
  17. UDP_SHADOW_PROBE_INTERVAL_SEC = 0.25
  18. UDP_FAST_FAILOVER_MISSES = 3
  19. UDP_FLOW_IDLE_CLEANUP_SEC = 30.0
  20. UDP_PACKET_CLIENT_MAP_LIMIT = 4096
  21. UDP_DIRECT_PENDING_LIMIT = 128
  22. UDP_SOCKET_BUFFER_BYTES = 1 << 20
  23. UDP_WARMUP_WINDOW_PACKETS = 8
  24. UDP_STABLE_WINNER_SWITCH_MISSES = 4
  25. UDP_WINNER_SWITCH_GRACE_SEC = 1.0
  26. UDP_WINNER_STALE_SEC = 1.5
  27. async def read_exact(reader: asyncio.StreamReader, size: int) -> bytes:
  28. return await reader.readexactly(size)
  29. @dataclass(eq=False)
  30. class RelayLink:
  31. node: UdpRelayNode
  32. reader: asyncio.StreamReader
  33. writer: asyncio.StreamWriter
  34. pump: asyncio.Task | None = None
  35. closed_event: asyncio.Event = field(default_factory=asyncio.Event)
  36. maintain_task: asyncio.Task | None = None
  37. udp_server: "UdpAssociateServer | None" = None
  38. closed: bool = False
  39. supports_udp: bool = True
  40. async def start(self) -> None:
  41. await write_frame(self.writer, Frame(AUTH, 0, 0, 0, 0, encode_json({"token": self.node.token})))
  42. frame = await read_frame(self.reader)
  43. if frame.kind != AUTH or frame.packet_id != STATUS_OK:
  44. raise ConnectionError(f"relay auth failed: {self.node.name}")
  45. ack = {}
  46. if frame.payload:
  47. try:
  48. ack = json.loads(frame.payload.decode("utf-8"))
  49. except Exception:
  50. ack = {}
  51. self.supports_udp = True
  52. self.closed = False
  53. self.closed_event.clear()
  54. self.pump = asyncio.create_task(self._pump())
  55. if ack:
  56. print(f"[edge] relay connected name={self.node.name} addr={self.node.host}:{self.node.port} udp_only={ack.get('udp_only', True)}")
  57. async def _pump(self) -> None:
  58. try:
  59. while True:
  60. try:
  61. frame = await read_frame(self.reader)
  62. except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError, OSError):
  63. break
  64. if frame.kind == UDP_RECV and self.udp_server:
  65. await self.udp_server.handle_from_relay(frame, self)
  66. finally:
  67. await self.close()
  68. async def send(self, frame: Frame) -> None:
  69. if self.closed:
  70. raise ConnectionError(f"relay closed: {self.node.name}")
  71. try:
  72. await write_frame(self.writer, frame)
  73. except (BrokenPipeError, ConnectionResetError, RuntimeError, OSError, asyncio.CancelledError) as exc:
  74. await self.close()
  75. raise ConnectionError(f"relay closed: {self.node.name}") from exc
  76. async def close(self) -> None:
  77. if self.closed:
  78. return
  79. self.closed = True
  80. self.closed_event.set()
  81. if self.pump and self.pump is not asyncio.current_task():
  82. self.pump.cancel()
  83. with contextlib.suppress(Exception):
  84. await self.pump
  85. self.writer.close()
  86. with contextlib.suppress(Exception):
  87. await self.writer.wait_closed()
  88. @dataclass
  89. class UdpFlowState:
  90. flow_id: int
  91. client_addr: tuple[str, int]
  92. target_host: str
  93. target_port: int
  94. created_at: float
  95. last_activity: float
  96. packets_sent: int = 0
  97. packets_received: int = 0
  98. duplicate_responses: int = 0
  99. winner_name: str | None = None
  100. candidate_names: tuple[str, ...] = ()
  101. link_streams: dict[str, int] = field(default_factory=dict)
  102. initialized_links: set[str] = field(default_factory=set)
  103. direct_sockets: dict[str, socket.socket] = field(default_factory=dict)
  104. direct_tasks: dict[str, asyncio.Task] = field(default_factory=dict)
  105. direct_failures: set[str] = field(default_factory=set)
  106. relay_failures: dict[str, int] = field(default_factory=dict)
  107. relay_error_seen: set[str] = field(default_factory=set)
  108. path_last_seen: dict[str, float] = field(default_factory=dict)
  109. packet_client_addrs: dict[int, tuple[str, int]] = field(default_factory=dict)
  110. direct_pending_clients: dict[str, deque[tuple[int, tuple[str, int]]]] = field(default_factory=dict)
  111. last_probe_at: float = 0.0
  112. winner_miss_streak: int = 0
  113. winner_stable_since: float = 0.0
  114. target_family: int = 0
  115. last_cleanup_at: float = 0.0
  116. def touch(self, now: float) -> None:
  117. self.last_activity = now
  118. class UdpAssociateServer(asyncio.DatagramProtocol):
  119. def __init__(self, edge: "UdpEdge") -> None:
  120. self.edge = edge
  121. self.transport: asyncio.DatagramTransport | None = None
  122. self.client_addr = None
  123. self.associate_peer = None
  124. self.packet_counter = itertools.count(1)
  125. self.last_packet_id = 0
  126. self.client_flows: dict[tuple[tuple[str, int], str, int], UdpFlowState] = {}
  127. self.flow_counter = itertools.count(1)
  128. self.last_summary_at = 0.0
  129. self.win_counts: Dict[str, int] = {}
  130. self._last_client_port_log_at = 0.0
  131. self._last_flow_cleanup_at = 0.0
  132. def connection_made(self, transport) -> None:
  133. self.transport = transport
  134. def register_associate(self, peer) -> None:
  135. peer_text = f"{peer[0]}:{peer[1]}" if isinstance(peer, tuple) and len(peer) >= 2 else str(peer)
  136. if self.associate_peer != peer_text:
  137. print(f"[edge] udp associate peer={peer_text}")
  138. self.associate_peer = peer_text
  139. def _client_flow_key(self, addr, host: str, port: int) -> tuple[tuple[str, int], str, int]:
  140. return ((addr[0], 0), host, port)
  141. def datagram_received(self, data: bytes, addr) -> None:
  142. if len(data) < 10:
  143. return
  144. if self.client_addr is None:
  145. self.client_addr = addr
  146. print(f"[edge] udp client bound addr={addr[0]}:{addr[1]}")
  147. elif addr != self.client_addr:
  148. if addr[0] == self.client_addr[0]:
  149. now = asyncio.get_running_loop().time()
  150. if now - self._last_client_port_log_at >= 30:
  151. self._last_client_port_log_at = now
  152. print(f"[edge] udp client port update host={addr[0]} old_port={self.client_addr[1]} new_port={addr[1]}")
  153. self.client_addr = addr
  154. else:
  155. print(f"[edge] udp client rebound old={self.client_addr[0]}:{self.client_addr[1]} new={addr[0]}:{addr[1]}")
  156. self._reset_client_state(addr)
  157. host, port, payload = self._parse_socks_udp(data)
  158. now = asyncio.get_running_loop().time()
  159. flow_key = self._client_flow_key(addr, host, port)
  160. flow = self.client_flows.get(flow_key)
  161. if flow is None:
  162. family = socket.AF_INET6 if ":" in host else socket.AF_INET
  163. flow = UdpFlowState(next(self.flow_counter), (addr[0], addr[1]), host, port, now, now, target_family=family)
  164. self.client_flows[flow_key] = flow
  165. flow.touch(now)
  166. flow.client_addr = (addr[0], addr[1])
  167. flow.packets_sent += 1
  168. packet_id = next(self.packet_counter)
  169. self.last_packet_id = packet_id
  170. flow.packet_client_addrs[packet_id] = (addr[0], addr[1])
  171. self._cleanup_packet_state(flow, now)
  172. asyncio.create_task(self.edge.forward_udp(flow, payload, packet_id, (addr[0], addr[1]), self))
  173. self._cleanup_inactive_flows(now)
  174. self._log_udp_summary()
  175. def _reset_client_state(self, addr) -> None:
  176. remapped: dict[tuple[tuple[str, int], str, int], UdpFlowState] = {}
  177. for flow in list(self.client_flows.values()):
  178. flow.client_addr = (addr[0], addr[1])
  179. remapped[self._client_flow_key(addr, flow.target_host, flow.target_port)] = flow
  180. self.client_flows = remapped
  181. self.client_addr = addr
  182. async def handle_from_relay(self, frame: Frame, link: RelayLink) -> None:
  183. if self.transport is None or self.client_addr is None:
  184. return
  185. flow = self.edge.udp_flow_sessions.get((frame.session_id, frame.stream_id))
  186. if flow is None:
  187. return
  188. if frame.packet_id == STATUS_ERR:
  189. flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1
  190. if link.node.name not in flow.relay_error_seen:
  191. flow.relay_error_seen.add(link.node.name)
  192. detail = frame.payload.decode("utf-8", errors="replace")
  193. print(f"[edge] udp relay error flow={flow.flow_id} relay={link.node.name} error={detail}")
  194. return
  195. await self._deliver_flow_packet(flow, frame.packet_id, frame.payload, link.node.name)
  196. async def handle_from_direct(self, flow: UdpFlowState, path_name: str, payload: bytes, packet_id: int = 0, client_addr: tuple[str, int] | None = None) -> None:
  197. if self.transport is None or self.client_addr is None:
  198. return
  199. await self._deliver_flow_packet(flow, packet_id, payload, path_name, client_addr)
  200. async def _deliver_flow_packet(self, flow: UdpFlowState, packet_id: int, payload: bytes, source_name: str, client_addr: tuple[str, int] | None = None) -> None:
  201. if self.transport is None or self.client_addr is None:
  202. return
  203. packet = self._build_socks_udp(flow.target_host, flow.target_port, payload)
  204. now = asyncio.get_running_loop().time()
  205. flow.touch(now)
  206. flow.path_last_seen[source_name] = now
  207. flow.packets_received += 1
  208. target_addr = client_addr or flow.packet_client_addrs.pop(packet_id, None) or flow.client_addr
  209. if flow.winner_name is None:
  210. flow.winner_name = source_name
  211. flow.winner_miss_streak = 0
  212. flow.winner_stable_since = now
  213. self.win_counts[source_name] = self.win_counts.get(source_name, 0) + 1
  214. self._log_udp_summary(force=True)
  215. elif flow.winner_name != source_name:
  216. flow.duplicate_responses += 1
  217. winner_last_seen = flow.path_last_seen.get(flow.winner_name, 0.0)
  218. if winner_last_seen and now - winner_last_seen >= max(
  219. self.edge.config.udp_failover_idle_ms / 1000,
  220. UDP_WINNER_SWITCH_GRACE_SEC,
  221. ):
  222. flow.winner_name = source_name
  223. flow.winner_miss_streak = 0
  224. flow.winner_stable_since = now
  225. self.win_counts[source_name] = self.win_counts.get(source_name, 0) + 1
  226. self._log_udp_summary(force=True)
  227. else:
  228. flow.winner_miss_streak = 0
  229. flow.winner_stable_since = flow.winner_stable_since or now
  230. if flow.winner_name == source_name and target_addr is not None:
  231. if flow.packets_received == 1:
  232. print(
  233. f"[edge] udp relay reply flow={flow.flow_id} relay={source_name} "
  234. f"target={flow.target_host}:{flow.target_port} bytes={len(payload)}"
  235. )
  236. self.transport.sendto(packet, target_addr)
  237. def _cleanup_packet_state(self, flow: UdpFlowState, now: float) -> None:
  238. if flow.last_cleanup_at and now - flow.last_cleanup_at < 1.0:
  239. return
  240. flow.last_cleanup_at = now
  241. expired_packet_ids = [
  242. packet_id
  243. for packet_id in flow.packet_client_addrs
  244. if packet_id <= (self.last_packet_id - UDP_PACKET_CLIENT_MAP_LIMIT)
  245. ]
  246. for packet_id in expired_packet_ids:
  247. flow.packet_client_addrs.pop(packet_id, None)
  248. for path_name, pending in list(flow.direct_pending_clients.items()):
  249. while len(pending) > UDP_DIRECT_PENDING_LIMIT:
  250. pending.popleft()
  251. if not pending:
  252. flow.direct_pending_clients.pop(path_name, None)
  253. def _cleanup_inactive_flows(self, now: float) -> None:
  254. if self._last_flow_cleanup_at and now - self._last_flow_cleanup_at < 5.0:
  255. return
  256. self._last_flow_cleanup_at = now
  257. expired_keys = [
  258. key
  259. for key, flow in self.client_flows.items()
  260. if now - flow.last_activity >= UDP_FLOW_IDLE_CLEANUP_SEC
  261. ]
  262. for key in expired_keys:
  263. flow = self.client_flows.pop(key, None)
  264. if flow is None:
  265. continue
  266. self.edge.release_udp_flow(flow)
  267. def set_flow_candidates(self, flow: UdpFlowState, candidate_names: tuple[str, ...]) -> None:
  268. if not flow.candidate_names:
  269. flow.candidate_names = candidate_names
  270. def note_unsent(self, flow: UdpFlowState, _packet_id: int) -> None:
  271. flow.touch(asyncio.get_running_loop().time())
  272. flow.relay_failures["unsent"] = flow.relay_failures.get("unsent", 0) + 1
  273. self._log_udp_summary(force=True)
  274. def _log_udp_summary(self, force: bool = False) -> None:
  275. now = asyncio.get_running_loop().time()
  276. if not force and now - self.last_summary_at < 10:
  277. return
  278. self.last_summary_at = now
  279. active_flows = len(self.client_flows)
  280. winners = sum(1 for flow in self.client_flows.values() if flow.winner_name)
  281. packets_sent = sum(flow.packets_sent for flow in self.client_flows.values())
  282. packets_received = sum(flow.packets_received for flow in self.client_flows.values())
  283. duplicates = sum(flow.duplicate_responses for flow in self.client_flows.values())
  284. direct_paths = sum(len(flow.direct_sockets) for flow in self.client_flows.values())
  285. relay_candidates = sum(len(flow.link_streams) for flow in self.client_flows.values())
  286. candidate_names: list[str] = []
  287. seen_candidates: set[str] = set()
  288. for flow in sorted(self.client_flows.values(), key=lambda item: item.flow_id):
  289. for name in flow.candidate_names:
  290. if name in seen_candidates:
  291. continue
  292. seen_candidates.add(name)
  293. candidate_names.append(name)
  294. direct_wins = sum(1 for flow in self.client_flows.values() if flow.winner_name and flow.winner_name.startswith("direct"))
  295. relay_wins = winners - direct_wins
  296. sample_flows = [f"{flow.flow_id}:{flow.winner_name or 'pending'}" for flow in sorted(self.client_flows.values(), key=lambda item: item.flow_id) if flow.winner_name][:5]
  297. relay_errors: list[str] = []
  298. for flow in self.client_flows.values():
  299. for name, count in flow.relay_failures.items():
  300. relay_errors.append(f"{name}={count}")
  301. bind = f"{self.client_addr[0]}:{self.client_addr[1]}" if self.client_addr else "unbound"
  302. print(
  303. f"[edge] udp summary bind={bind} flows={active_flows} winners={winners} "
  304. f"winner_breakdown=direct={direct_wins},relay={relay_wins} sample={', '.join(sample_flows) or 'none'} "
  305. f"candidates={candidate_names or ['none']} sent={packets_sent} recv={packets_received} dup={duplicates} "
  306. f"direct_paths={direct_paths} relay_paths={relay_candidates} relay_errors={', '.join(sorted(relay_errors)) or 'none'}"
  307. )
  308. def _parse_socks_udp(self, packet: bytes) -> tuple[str, int, bytes]:
  309. atyp = packet[3]
  310. offset = 4
  311. if atyp == 1:
  312. host = socket.inet_ntoa(packet[offset:offset + 4])
  313. offset += 4
  314. elif atyp == 3:
  315. size = packet[offset]
  316. offset += 1
  317. host = packet[offset:offset + size].decode()
  318. offset += size
  319. else:
  320. raise ValueError("unsupported udp atyp")
  321. port = struct.unpack("!H", packet[offset:offset + 2])[0]
  322. offset += 2
  323. return host, port, packet[offset:]
  324. def _build_socks_udp(self, host: str, port: int, payload: bytes) -> bytes:
  325. try:
  326. addr = socket.inet_aton(host)
  327. header = b"\x00\x00\x00\x01" + addr + struct.pack("!H", port)
  328. except OSError:
  329. raw = host.encode()
  330. header = b"\x00\x00\x00\x03" + bytes([len(raw)]) + raw + struct.pack("!H", port)
  331. return header + payload
  332. class UdpEdge:
  333. def __init__(self, listen_host: str, listen_port: int, config: UdpConfig) -> None:
  334. self.listen_host = listen_host
  335. self.listen_port = listen_port
  336. self.config = config
  337. self.scheduler = UdpScheduler(config)
  338. self.links: list[RelayLink] = []
  339. self.udp_stream_ids = itertools.count(1)
  340. self.udp_flow_sessions: dict[tuple[int, int], UdpFlowState] = {}
  341. self.udp_server: UdpAssociateServer | None = None
  342. def _udp_direct_copies(self) -> int:
  343. if self.config.udp_direct_copies is not None:
  344. return max(1, self.config.udp_direct_copies)
  345. return max(1, self.config.udp_redundancy + 1)
  346. def _udp_relay_copies(self) -> int:
  347. if self.config.udp_relay_copies is not None:
  348. return max(1, self.config.udp_relay_copies)
  349. return max(1, self.config.udp_redundancy + 1)
  350. def release_udp_flow(self, flow: UdpFlowState) -> None:
  351. for stream_id in list(flow.link_streams.values()):
  352. self.udp_flow_sessions.pop((flow.flow_id, stream_id), None)
  353. flow.link_streams.clear()
  354. flow.initialized_links.clear()
  355. flow.packet_client_addrs.clear()
  356. for task in list(flow.direct_tasks.values()):
  357. task.cancel()
  358. flow.direct_tasks.clear()
  359. for sock in list(flow.direct_sockets.values()):
  360. with contextlib.suppress(Exception):
  361. sock.close()
  362. flow.direct_sockets.clear()
  363. flow.direct_pending_clients.clear()
  364. async def start(self) -> None:
  365. await self.scheduler.start()
  366. await self._connect_relays()
  367. server = await asyncio.start_server(self._accept, self.listen_host, self.listen_port)
  368. sockets = ", ".join(str(sock.getsockname()) for sock in server.sockets or [])
  369. relay_mode = "direct-only" if not self.config.relays else "direct+relay"
  370. print(f"[edge] socks5 listening on {sockets} relay_mode={relay_mode}")
  371. async with server:
  372. await server.serve_forever()
  373. async def _connect_relays(self) -> None:
  374. loop = asyncio.get_running_loop()
  375. transport, protocol = await loop.create_datagram_endpoint(lambda: UdpAssociateServer(self), local_addr=(self.listen_host, 0))
  376. self.udp_server = protocol
  377. self.udp_transport = transport
  378. for node in self.config.relays:
  379. link = RelayLink(node=node, reader=None, writer=None) # type: ignore[arg-type]
  380. link.udp_server = protocol
  381. self.links.append(link)
  382. link.maintain_task = asyncio.create_task(self._maintain_link(link))
  383. async def _maintain_link(self, link: RelayLink) -> None:
  384. backoff = 1.0
  385. while True:
  386. try:
  387. reader, writer = await asyncio.open_connection(link.node.host, link.node.port)
  388. sock = writer.get_extra_info("socket")
  389. if sock is not None:
  390. with contextlib.suppress(OSError):
  391. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  392. link.reader = reader
  393. link.writer = writer
  394. await link.start()
  395. backoff = 1.0
  396. await link.closed_event.wait()
  397. except asyncio.CancelledError:
  398. raise
  399. except Exception:
  400. await asyncio.sleep(backoff)
  401. backoff = min(10.0, backoff * 2)
  402. async def _accept(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
  403. try:
  404. peer = writer.get_extra_info("peername")
  405. _host, _port, udp_mode = await self._handshake(reader, writer, peer)
  406. if udp_mode:
  407. return
  408. except Exception:
  409. writer.close()
  410. with contextlib.suppress(Exception):
  411. await writer.wait_closed()
  412. def _selected_udp_links(self) -> list[RelayLink]:
  413. online = [link for link in self.links if not link.closed and link.writer is not None and link.supports_udp]
  414. if not online:
  415. return []
  416. return sorted(online, key=lambda link: self.scheduler.scores.get(link.node.name).score if link.node.name in self.scheduler.scores else 999999.0)
  417. def _udp_direct_redundancy_for_target(self, target_host: str) -> int:
  418. base = self.config.udp_direct_redundancy
  419. if ":" in target_host and self.config.udp_direct_redundancy_v6 is not None:
  420. base = self.config.udp_direct_redundancy_v6
  421. elif ":" not in target_host and self.config.udp_direct_redundancy_v4 is not None:
  422. base = self.config.udp_direct_redundancy_v4
  423. return max(1, base)
  424. async def _ensure_udp_direct_paths(self, flow: UdpFlowState, udp_server: UdpAssociateServer) -> None:
  425. target_count = self._udp_direct_redundancy_for_target(flow.target_host)
  426. for index in range(target_count):
  427. name = f"direct-{index + 1}" if target_count > 1 else "direct"
  428. if name in flow.direct_sockets or name in flow.direct_failures:
  429. continue
  430. try:
  431. family = socket.AF_INET6 if ":" in flow.target_host else socket.AF_INET
  432. sock = socket.socket(family, socket.SOCK_DGRAM)
  433. sock.setblocking(False)
  434. with contextlib.suppress(OSError):
  435. sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, UDP_SOCKET_BUFFER_BYTES)
  436. with contextlib.suppress(OSError):
  437. sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, UDP_SOCKET_BUFFER_BYTES)
  438. await asyncio.get_running_loop().sock_connect(sock, (flow.target_host, flow.target_port))
  439. flow.direct_sockets[name] = sock
  440. flow.direct_tasks[name] = asyncio.create_task(self._pump_udp_direct(flow, name, sock, udp_server))
  441. except Exception as exc:
  442. flow.direct_failures.add(name)
  443. print(f"[edge] udp direct open error flow={flow.flow_id} path={name} target={flow.target_host}:{flow.target_port} error={exc!r}")
  444. async def _pump_udp_direct(self, flow: UdpFlowState, path_name: str, sock: socket.socket, udp_server: UdpAssociateServer) -> None:
  445. loop = asyncio.get_running_loop()
  446. try:
  447. while True:
  448. data = await loop.sock_recv(sock, 65535)
  449. if not data:
  450. break
  451. pending = flow.direct_pending_clients.get(path_name)
  452. packet_id = 0
  453. client_addr = flow.client_addr
  454. if pending:
  455. packet_id, client_addr = pending.popleft()
  456. await udp_server.handle_from_direct(flow, path_name, data, packet_id, client_addr)
  457. finally:
  458. flow.direct_tasks.pop(path_name, None)
  459. flow.direct_sockets.pop(path_name, None)
  460. with contextlib.suppress(Exception):
  461. sock.close()
  462. async def forward_udp(self, flow: UdpFlowState, payload: bytes, packet_id: int, client_addr: tuple[str, int], udp_server: UdpAssociateServer) -> None:
  463. await self._ensure_udp_direct_paths(flow, udp_server)
  464. meta = encode_json({"host": flow.target_host, "port": flow.target_port, "family": flow.target_family})
  465. links = self._selected_udp_links()
  466. direct_names = tuple(name for name in sorted(flow.direct_sockets))
  467. relay_names = tuple(link.node.name for link in links)
  468. candidate_names = direct_names + relay_names
  469. udp_server.set_flow_candidates(flow, candidate_names)
  470. if not candidate_names:
  471. udp_server.note_unsent(flow, packet_id)
  472. return
  473. active_direct_names = list(direct_names)
  474. active_links = links
  475. now = asyncio.get_running_loop().time()
  476. warmup_mode = flow.packets_sent <= UDP_WARMUP_WINDOW_PACKETS
  477. shadow_probe = flow.winner_name is not None and now - flow.last_probe_at >= UDP_SHADOW_PROBE_INTERVAL_SEC
  478. if shadow_probe:
  479. flow.last_probe_at = now
  480. broadcast_mode = self.config.udp_always_broadcast or flow.winner_name is None or warmup_mode or shadow_probe
  481. if not broadcast_mode:
  482. winner_last_seen = flow.path_last_seen.get(flow.winner_name, 0.0) if flow.winner_name else 0.0
  483. winner_stale = bool(
  484. winner_last_seen
  485. and now - winner_last_seen >= max(self.config.udp_failover_idle_ms / 1000, UDP_WINNER_STALE_SEC)
  486. )
  487. if winner_stale:
  488. flow.winner_miss_streak += 1
  489. else:
  490. flow.winner_miss_streak = 0
  491. if winner_stale and flow.winner_miss_streak >= UDP_STABLE_WINNER_SWITCH_MISSES:
  492. flow.winner_name = None
  493. flow.winner_miss_streak = 0
  494. broadcast_mode = True
  495. else:
  496. active_direct_names = [name for name in active_direct_names if name == flow.winner_name]
  497. active_links = [link for link in active_links if link.node.name == flow.winner_name]
  498. if not active_direct_names and not active_links:
  499. if direct_names:
  500. active_direct_names = [direct_names[0]]
  501. elif links:
  502. active_links = links[:1]
  503. direct_copies = self._udp_direct_copies()
  504. relay_copies = self._udp_relay_copies()
  505. sent_any = False
  506. for attempt in range(max(direct_copies, relay_copies)):
  507. for path_name in active_direct_names if attempt < direct_copies else ():
  508. sock = flow.direct_sockets.get(path_name)
  509. if sock is None:
  510. continue
  511. try:
  512. flow.direct_pending_clients.setdefault(path_name, deque()).append((packet_id, client_addr))
  513. await asyncio.get_running_loop().sock_sendall(sock, payload)
  514. sent_any = True
  515. except Exception as exc:
  516. pending = flow.direct_pending_clients.get(path_name)
  517. if pending:
  518. with contextlib.suppress(Exception):
  519. pending.pop()
  520. flow.direct_failures.add(path_name)
  521. flow.direct_sockets.pop(path_name, None)
  522. task = flow.direct_tasks.pop(path_name, None)
  523. if task is not None:
  524. task.cancel()
  525. with contextlib.suppress(Exception):
  526. sock.close()
  527. flow.relay_failures[path_name] = flow.relay_failures.get(path_name, 0) + 1
  528. if path_name not in flow.relay_error_seen:
  529. flow.relay_error_seen.add(path_name)
  530. print(f"[edge] udp relay error flow={flow.flow_id} relay={path_name} error={exc!r}")
  531. for link in active_links if attempt < relay_copies else ():
  532. stream_id = flow.link_streams.get(link.node.name)
  533. if stream_id is None:
  534. stream_id = next(self.udp_stream_ids)
  535. flow.link_streams[link.node.name] = stream_id
  536. self.udp_flow_sessions[(flow.flow_id, stream_id)] = flow
  537. include_meta = link.node.name not in flow.initialized_links
  538. body = (meta + payload) if include_meta else payload
  539. meta_len = len(meta) if include_meta else 0
  540. try:
  541. await link.send(Frame(UDP_SEND, flow.flow_id, stream_id, 0, meta_len, body))
  542. flow.initialized_links.add(link.node.name)
  543. sent_any = True
  544. except Exception as exc:
  545. flow.link_streams.pop(link.node.name, None)
  546. self.udp_flow_sessions.pop((flow.flow_id, stream_id), None)
  547. flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1
  548. if link.node.name not in flow.relay_error_seen:
  549. flow.relay_error_seen.add(link.node.name)
  550. print(f"[edge] udp relay error flow={flow.flow_id} relay={link.node.name} error={exc!r}")
  551. if attempt + 1 < max(direct_copies, relay_copies) and self.config.udp_copy_interval_ms > 0:
  552. await asyncio.sleep(self.config.udp_copy_interval_ms / 1000)
  553. if not sent_any:
  554. udp_server.note_unsent(flow, packet_id)
  555. async def _handshake(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, peer) -> tuple[str, int, bool]:
  556. version, methods_len = (await read_exact(reader, 2))
  557. if version != SOCKS_VERSION:
  558. raise ValueError("unsupported socks version")
  559. await read_exact(reader, methods_len)
  560. writer.write(b"\x05\x00")
  561. await writer.drain()
  562. version, command, _, atyp = await read_exact(reader, 4)
  563. if version != SOCKS_VERSION:
  564. raise ValueError("unsupported socks version")
  565. if atyp == 1:
  566. host = socket.inet_ntoa(await read_exact(reader, 4))
  567. elif atyp == 3:
  568. size = (await read_exact(reader, 1))[0]
  569. host = (await read_exact(reader, size)).decode()
  570. else:
  571. raise ValueError("unsupported atyp")
  572. port = struct.unpack("!H", await read_exact(reader, 2))[0]
  573. peer_text = f"{peer[0]}:{peer[1]}" if isinstance(peer, tuple) and len(peer) >= 2 else str(peer)
  574. if command == 3 and self.udp_server and self.udp_server.transport:
  575. bind_host, bind_port = self.udp_server.transport.get_extra_info("sockname")[:2]
  576. self.udp_server.register_associate(peer)
  577. print(f"[edge] socks handshake peer={peer_text} command=udp_associate target={host}:{port} bind={bind_host}:{bind_port}")
  578. writer.write(b"\x05\x00\x00\x01" + socket.inet_aton(bind_host) + struct.pack("!H", bind_port))
  579. await writer.drain()
  580. return host, port, True
  581. raise ValueError("unsupported socks command")