from __future__ import annotations from pathlib import Path import asyncio import contextlib import itertools import os import socket import struct from dataclasses import dataclass, field from typing import Awaitable, Callable from .config import Config from .protocol import STATUS_OK, TCP_CLOSE, TCP_DATA, TCP_OPEN, TCP_STATUS, UDP_RECV, UDP_SEND, Frame, encode_json from .relay_client import RelayConnection, RelayManager SO_ORIGINAL_DST = 80 IP6T_SO_ORIGINAL_DST = 80 IP_RECVORIGDSTADDR = 20 IPV6_RECVORIGDSTADDR = 74 @dataclass(frozen=True) class TargetAddress: host: str port: int family: int @dataclass(frozen=True) class PeerAddress: host: str port: int family: int def parse_sockaddr(raw: bytes) -> TargetAddress: if len(raw) < 8: raise ValueError("invalid transparent destination payload") family = struct.unpack_from("=H", raw, 0)[0] port = struct.unpack_from("!H", raw, 2)[0] if family == socket.AF_INET: host = socket.inet_ntoa(raw[4:8]) return TargetAddress(host=host, port=port, family=family) if family == socket.AF_INET6: if len(raw) < 28: raise ValueError("invalid IPv6 transparent destination payload") host = socket.inet_ntop(socket.AF_INET6, raw[8:24]) return TargetAddress(host=host, port=port, family=family) raise ValueError(f"unsupported family={family}") def winner_group(name: str) -> str: return "direct" if name.startswith("direct") else name def grouped_total(stats: dict[str, int], group: str) -> int: return sum(count for name, count in stats.items() if winner_group(name) == group) class BasePath: def __init__(self, name: str, on_frame: Callable[["BasePath", str, bytes | None], Awaitable[None]]) -> None: self.name = name self.on_frame = on_frame self.opened = False self.closed = False async def open(self, target: TargetAddress) -> None: raise NotImplementedError async def send(self, data: bytes) -> None: raise NotImplementedError async def close(self) -> None: raise NotImplementedError class DirectTcpPath(BasePath): def __init__(self, name: str, on_frame: Callable[[BasePath, str, bytes | None], Awaitable[None]], open_timeout: float, happy_eyeballs_delay: float | None, tcp_nodelay: bool = True) -> None: super().__init__(name, on_frame) self.reader: asyncio.StreamReader | None = None self.writer: asyncio.StreamWriter | None = None self.pump_task: asyncio.Task | None = None self.open_timeout = open_timeout self.happy_eyeballs_delay = happy_eyeballs_delay self.tcp_nodelay = tcp_nodelay async def open(self, target: TargetAddress) -> None: try: family = socket.AF_INET6 if target.family == socket.AF_INET6 else socket.AF_INET kwargs = {"host": target.host, "port": target.port, "family": family} if self.happy_eyeballs_delay is not None: kwargs["happy_eyeballs_delay"] = self.happy_eyeballs_delay self.reader, self.writer = await asyncio.wait_for(asyncio.open_connection(**kwargs), timeout=self.open_timeout) sock = self.writer.get_extra_info("socket") if sock is not None and self.tcp_nodelay: with contextlib.suppress(OSError): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.opened = True self.pump_task = asyncio.create_task(self._pump()) await self.on_frame(self, "status", b"ok") except Exception as exc: await self.on_frame(self, "status", str(exc).encode()) async def _pump(self) -> None: assert self.reader is not None try: while True: chunk = await self.reader.read(65536) if not chunk: break await self.on_frame(self, "data", chunk) except Exception: pass finally: await self.on_frame(self, "close", None) async def send(self, data: bytes) -> None: if self.closed or self.writer is None: return self.writer.write(data) await self.writer.drain() async def close(self) -> None: if self.closed: return self.closed = True if self.pump_task and self.pump_task is not asyncio.current_task(): self.pump_task.cancel() with contextlib.suppress(Exception): await self.pump_task if self.writer: self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed() class RelayTcpPath(BasePath): def __init__(self, name: str, on_frame: Callable[[BasePath, str, bytes | None], Awaitable[None]], connection: RelayConnection, session_id: int, stream_id: int) -> None: super().__init__(name, on_frame) self.connection = connection self.session_id = session_id self.stream_id = stream_id self.unbind_task: asyncio.Task | None = None async def open(self, target: TargetAddress) -> None: if self.connection.closed: await self.on_frame(self, "status", b"relay unavailable") return self.connection.bind(self.session_id, self.stream_id, self._handle_frame) try: await self.connection.send(Frame(TCP_OPEN, self.session_id, self.stream_id, 0, 0, encode_json({"host": target.host, "port": target.port, "family": target.family}))) except Exception as exc: self.connection.unbind(self.session_id, self.stream_id) await self.on_frame(self, "status", str(exc).encode()) async def _handle_frame(self, _conn: RelayConnection, frame: Frame) -> None: if frame.kind == TCP_STATUS: if frame.packet_id == STATUS_OK: self.opened = True await self.on_frame(self, "status", b"ok") else: await self.on_frame(self, "status", frame.payload) return if frame.kind == TCP_DATA: await self.on_frame(self, "data", frame.payload) return if frame.kind == TCP_CLOSE: await self.on_frame(self, "close", None) async def send(self, data: bytes) -> None: if self.closed or self.connection.closed: return await self.connection.send(Frame(TCP_DATA, self.session_id, self.stream_id, 0, 0, data)) async def close(self) -> None: if self.closed: return self.closed = True if self.unbind_task is None or self.unbind_task.done(): self.unbind_task = asyncio.create_task(self._delayed_unbind()) if not self.connection.closed: with contextlib.suppress(Exception): await self.connection.send(Frame(TCP_CLOSE, self.session_id, self.stream_id, 0, 0, b"")) async def _delayed_unbind(self) -> None: await asyncio.sleep(0.5) self.connection.unbind(self.session_id, self.stream_id) @dataclass class TransparentSession: session_id: int target: TargetAddress reader: asyncio.StreamReader writer: asyncio.StreamWriter paths: list[BasePath] warmup_bytes: int loser_grace_ms: int stats: dict[str, int] target_stats: dict[tuple[str, int], dict[str, int]] family_stats: dict[str, dict[str, int]] opened_count: int = 0 status_count: int = 0 errors: list[str] = field(default_factory=list) winner: BasePath | None = None uplink_bytes: int = 0 open_event: asyncio.Event = field(default_factory=asyncio.Event) winner_event: asyncio.Event = field(default_factory=asyncio.Event) closed: bool = False pump_task: asyncio.Task | None = None loser_close_task: asyncio.Task | None = None def _record_win(self, winner: BasePath) -> None: self.stats[winner.name] = self.stats.get(winner.name, 0) + 1 key = (self.target.host, self.target.port) target_stats = self.target_stats.setdefault(key, {}) target_stats[winner.name] = target_stats.get(winner.name, 0) + 1 family_key = "ipv6" if self.target.family == socket.AF_INET6 else "ipv4" family_stats = self.family_stats.setdefault(family_key, {}) family_stats[winner.name] = family_stats.get(winner.name, 0) + 1 direct_wins = grouped_total(self.stats, "direct") relay_wins = sum(count for name, count in self.stats.items() if winner_group(name) != "direct") target_direct = grouped_total(target_stats, "direct") target_relay = sum(count for name, count in target_stats.items() if winner_group(name) != "direct") family_direct = grouped_total(family_stats, "direct") family_relay = sum(count for name, count in family_stats.items() if winner_group(name) != "direct") relay_detail = ", ".join(f"{name}={count}" for name, count in sorted(self.stats.items()) if winner_group(name) != "direct") or "none" target_detail = ", ".join(f"{name}={count}" for name, count in sorted(target_stats.items()) if winner_group(name) != "direct") or "none" target_pref = "relay" if target_relay > target_direct else "direct" family_pref = "relay" if family_relay > family_direct else "direct" print(f"[edge] tcp win session={self.session_id} target={self.target.host}:{self.target.port} winner={winner.name} direct={direct_wins} relay={relay_wins} relay_breakdown={relay_detail} target_pref={target_pref} target_direct={target_direct} target_relay={target_relay} target_breakdown={target_detail} family_pref={family_pref} family={family_key} family_direct={family_direct} family_relay={family_relay}") async def start(self) -> None: await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True) await asyncio.wait_for(self.open_event.wait(), timeout=15) if self.opened_count == 0: raise ConnectionError(self.errors[0] if self.errors else "all paths failed") self.pump_task = asyncio.create_task(self._pump_local()) async def _pump_local(self) -> None: try: while True: chunk = await self.reader.read(65536) if not chunk: break self.uplink_bytes += len(chunk) active = [path for path in self.paths if path.opened and not path.closed] if not active: break if self.uplink_bytes <= self.warmup_bytes: await asyncio.gather(*(path.send(chunk) for path in active), return_exceptions=True) else: if self.winner is None: await self.winner_event.wait() if self.winner: await self.winner.send(chunk) except Exception: pass finally: await self.close() async def handle_path(self, path: BasePath, event: str, payload: bytes | None) -> None: if self.closed: return if event == "status": self.status_count += 1 if payload == b"ok": self.opened_count += 1 elif payload is not None: self.errors.append(payload.decode("utf-8", errors="replace")) if self.opened_count > 0 or self.status_count == len(self.paths): self.open_event.set() return if event == "data": if self.winner is None: self.winner = path self._record_win(path) self.winner_event.set() if self.loser_grace_ms > 0: self.loser_close_task = asyncio.create_task(self._close_losers_after_grace(path)) else: await self._close_losers(path) if path is self.winner and payload is not None: self.writer.write(payload) await self.writer.drain() return if event == "close": path.closed = True if self.winner is None: remaining = [candidate for candidate in self.paths if candidate.opened and not candidate.closed] if not remaining: await self.close() elif path is self.winner: await self.close() async def _close_losers(self, winner: BasePath) -> None: await asyncio.gather(*(path.close() for path in self.paths if path is not winner), return_exceptions=True) async def _close_losers_after_grace(self, winner: BasePath) -> None: await asyncio.sleep(self.loser_grace_ms / 1000) if not self.closed: await self._close_losers(winner) async def close(self) -> None: if self.closed: return self.closed = True print(f"[edge] session={self.session_id} closed target={self.target.host}:{self.target.port}") if self.pump_task and self.pump_task is not asyncio.current_task(): self.pump_task.cancel() with contextlib.suppress(Exception): await self.pump_task if self.loser_close_task and self.loser_close_task is not asyncio.current_task(): self.loser_close_task.cancel() with contextlib.suppress(Exception): await self.loser_close_task await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True) self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed() class DirectUdpPath(BasePath): def __init__(self, name: str, on_frame: Callable[[BasePath, str, bytes | None], Awaitable[None]], target: TargetAddress) -> None: super().__init__(name, on_frame) self.target = target self.socket: socket.socket | None = None self.read_task: asyncio.Task | None = None async def open(self, _target: TargetAddress) -> None: try: family = socket.AF_INET6 if self.target.family == socket.AF_INET6 else socket.AF_INET self.socket = socket.socket(family, socket.SOCK_DGRAM) self.socket.setblocking(False) await asyncio.get_running_loop().sock_connect(self.socket, (self.target.host, self.target.port)) self.opened = True self.read_task = asyncio.create_task(self._pump()) await self.on_frame(self, "status", b"ok") except Exception as exc: await self.on_frame(self, "status", str(exc).encode()) async def _pump(self) -> None: assert self.socket is not None loop = asyncio.get_running_loop() try: while True: data = await loop.sock_recv(self.socket, 65535) if not data: break await self.on_frame(self, "data", data) except Exception: pass finally: await self.on_frame(self, "close", None) async def send(self, data: bytes) -> None: if self.closed or self.socket is None: return await asyncio.get_running_loop().sock_sendall(self.socket, data) async def close(self) -> None: if self.closed: return self.closed = True if self.read_task and self.read_task is not asyncio.current_task(): self.read_task.cancel() with contextlib.suppress(Exception): await self.read_task if self.socket: self.socket.close() class RelayUdpPath(BasePath): def __init__(self, name: str, on_frame: Callable[[BasePath, str, bytes | None], Awaitable[None]], connection: RelayConnection, session_id: int, stream_id: int, target: TargetAddress) -> None: super().__init__(name, on_frame) self.connection = connection self.session_id = session_id self.stream_id = stream_id self.target = target self.unbind_task: asyncio.Task | None = None async def open(self, _target: TargetAddress) -> None: if self.connection.closed: await self.on_frame(self, "status", b"relay unavailable") return self.connection.bind(self.session_id, self.stream_id, self._handle_frame) try: self.opened = True await self.on_frame(self, "status", b"ok") except Exception: self.connection.unbind(self.session_id, self.stream_id) self.closed = True raise async def _handle_frame(self, _conn: RelayConnection, frame: Frame) -> None: if frame.kind == UDP_RECV: await self.on_frame(self, "data", frame.payload) async def send(self, data: bytes) -> None: if self.closed or self.connection.closed: return meta = encode_json({"host": self.target.host, "port": self.target.port, "family": self.target.family}) payload = meta + data await self.connection.send(Frame(UDP_SEND, self.session_id, self.stream_id, 0, len(meta), payload)) async def close(self) -> None: if self.closed: return self.closed = True if self.unbind_task is None or self.unbind_task.done(): self.unbind_task = asyncio.create_task(self._delayed_unbind()) async def _delayed_unbind(self) -> None: await asyncio.sleep(0.5) self.connection.unbind(self.session_id, self.stream_id) @dataclass class UdpFlow: flow_id: int source: PeerAddress target: TargetAddress send_response: Callable[[PeerAddress, bytes], Awaitable[None]] paths: list[BasePath] winner: BasePath | None = None closed: bool = False last_activity: float = 0.0 async def start(self) -> None: await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True) async def send(self, payload: bytes) -> None: self.last_activity = asyncio.get_running_loop().time() active = [path for path in self.paths if path.opened and not path.closed] if self.winner is None: await asyncio.gather(*(path.send(payload) for path in active), return_exceptions=True) elif not self.winner.closed: await self.winner.send(payload) async def handle_path(self, path: BasePath, event: str, payload: bytes | None) -> None: self.last_activity = asyncio.get_running_loop().time() if event == "data" and payload is not None: if self.winner is None: self.winner = path print(f"[edge] udp flow={self.flow_id} winner={path.name} target={self.target.host}:{self.target.port}") if path is self.winner: await self.send_response(self.source, payload) if event == "close": path.closed = True if path is self.winner: self.winner = None async def close(self) -> None: if self.closed: return self.closed = True await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True) class TransparentUdpListener: def __init__(self, edge: "TransparentEdge", family: int, bind_host: str, port: int) -> None: self.edge = edge self.family = family self.bind_host = bind_host self.port = port self.socket: socket.socket | None = None def start(self) -> None: sock = socket.socket(self.family, socket.SOCK_DGRAM) sock.setblocking(False) if self.family == socket.AF_INET: sock.setsockopt(socket.SOL_IP, IP_RECVORIGDSTADDR, 1) sock.bind((self.bind_host, self.port)) else: sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) sock.setsockopt(socket.IPPROTO_IPV6, IPV6_RECVORIGDSTADDR, 1) sock.bind((self.bind_host, self.port, 0, 0)) self.socket = sock asyncio.get_running_loop().add_reader(sock.fileno(), self._on_readable) print(f"[edge] transparent udp listening on {sock.getsockname()}") def _on_readable(self) -> None: assert self.socket is not None try: data, ancdata, _flags, src = self.socket.recvmsg(65535, 512) except BlockingIOError: return except Exception: return original = None for level, ctype, cdata in ancdata: if self.family == socket.AF_INET and level == socket.SOL_IP and ctype == IP_RECVORIGDSTADDR: original = parse_sockaddr(cdata) break if self.family == socket.AF_INET6 and level == socket.IPPROTO_IPV6 and ctype == IPV6_RECVORIGDSTADDR: original = parse_sockaddr(cdata) break if original is None: return if self.family == socket.AF_INET: source = PeerAddress(host=src[0], port=src[1], family=socket.AF_INET) else: source = PeerAddress(host=src[0], port=src[1], family=socket.AF_INET6) if original.port == self.port and (original.host in ("127.0.0.1", "::1") or original.host == self.bind_host): return asyncio.create_task(self.edge.handle_udp_datagram(source, original, data, self)) async def send_response(self, source: PeerAddress, payload: bytes) -> None: assert self.socket is not None if source.family == socket.AF_INET: self.socket.sendto(payload, (source.host, source.port)) else: self.socket.sendto(payload, (source.host, source.port, 0, 0)) async def close(self) -> None: if self.socket is None: return asyncio.get_running_loop().remove_reader(self.socket.fileno()) self.socket.close() self.socket = None class TransparentEdge: def __init__(self, listen_host: str, listen_port: int, config: Config, enable_udp: bool = False, kernel_mode: str = "auto") -> None: self.listen_host = listen_host self.listen_port = listen_port self.config = config self.enable_udp = enable_udp self.kernel_mode = self._resolve_kernel_mode(kernel_mode, config.kernel_mode) self.manager = RelayManager(config) self.session_ids = itertools.count(1) self.stream_ids = itertools.count(1) self.udp_listeners: list[TransparentUdpListener] = [] self.udp_flows: dict[tuple[PeerAddress, TargetAddress], UdpFlow] = {} self.udp_flow_ids = itertools.count(1) self.udp_gc_task: asyncio.Task | None = None self.tcp_win_counts: dict[str, int] = {} self.tcp_target_wins: dict[tuple[str, int], dict[str, int]] = {} self.tcp_family_wins: dict[str, dict[str, int]] = {"ipv4": {}, "ipv6": {}} def _resolve_kernel_mode(self, cli_kernel_mode: str, config_kernel_mode: str) -> str: mode = cli_kernel_mode if cli_kernel_mode != "auto" else config_kernel_mode if mode != "auto": return mode try: if Path("/etc/os-release").exists() and 'VERSION_ID="24' in Path("/etc/os-release").read_text(errors="ignore"): return "24" except Exception: pass try: release = os.uname().release if release.startswith("6."): return "24" except Exception: pass return "20" async def start(self) -> None: if self.kernel_mode == "24": if self.config.direct_open_timeout == 10.0: self.config.direct_open_timeout = 6.0 if self.config.relay_open_timeout == 10.0: self.config.relay_open_timeout = 6.0 if self.config.tcp_connect_happy_eyeballs_delay is None: self.config.tcp_connect_happy_eyeballs_delay = 0.25 await self.manager.start() print(f"[edge] kernel_mode={self.kernel_mode} relay snapshot: {self.manager.snapshot()}") server4 = await asyncio.start_server(self._accept, self.listen_host, self.listen_port, family=socket.AF_INET) sockets = [str(sock.getsockname()) for sock in server4.sockets or []] server6 = None if self.listen_host in ("::", "::1", "0.0.0.0", "127.0.0.1"): host6 = "::1" if self.listen_host == "127.0.0.1" else "::" try: server6 = await asyncio.start_server(self._accept, host6, self.listen_port, family=socket.AF_INET6) sockets.extend(str(sock.getsockname()) for sock in server6.sockets or []) except Exception as exc: print(f"[edge] ipv6 tcp listener skipped: {exc!r}") if self.enable_udp: self._start_udp_listeners() self.udp_gc_task = asyncio.create_task(self._gc_udp_flows()) print(f"[edge] transparent tcp listening on {', '.join(sockets)}") if server6 is None: async with server4: await server4.serve_forever() else: async with server4, server6: await asyncio.gather(server4.serve_forever(), server6.serve_forever()) def _direct_redundancy_for_target(self, target: TargetAddress) -> int: base = self.config.direct_redundancy if target.family == socket.AF_INET6 and self.config.direct_redundancy_v6 is not None: base = self.config.direct_redundancy_v6 elif target.family == socket.AF_INET and self.config.direct_redundancy_v4 is not None: base = self.config.direct_redundancy_v4 base = max(1, min(base, self.config.direct_max_redundancy)) target_stats = self.tcp_target_wins.get((target.host, target.port), {}) family_key = "ipv6" if target.family == socket.AF_INET6 else "ipv4" family_stats = self.tcp_family_wins.get(family_key, {}) target_prefers_relay = sum(count for name, count in target_stats.items() if winner_group(name) != "direct") > grouped_total(target_stats, "direct") family_prefers_relay = sum(count for name, count in family_stats.items() if winner_group(name) != "direct") > grouped_total(family_stats, "direct") if target_prefers_relay or family_prefers_relay: return min(self.config.direct_max_redundancy, base + 1) return base def _build_direct_paths(self, session: TransparentSession) -> list[BasePath]: count = self._direct_redundancy_for_target(session.target) return [ DirectTcpPath( name=f"direct-{index + 1}" if count > 1 else "direct", on_frame=lambda path, event, payload, s=session: self._handle_tcp_session(s, path, event, payload), open_timeout=self.config.direct_open_timeout, happy_eyeballs_delay=self.config.tcp_connect_happy_eyeballs_delay, tcp_nodelay=self.config.relay_tcp_nodelay, ) for index in range(count) ] def _start_udp_listeners(self) -> None: binds = [] if self.listen_host == "127.0.0.1": binds = [(socket.AF_INET, "127.0.0.1"), (socket.AF_INET6, "::1")] elif self.listen_host == "0.0.0.0": binds = [(socket.AF_INET, "0.0.0.0"), (socket.AF_INET6, "::")] else: family = socket.AF_INET6 if ":" in self.listen_host else socket.AF_INET binds = [(family, self.listen_host)] for family, host in binds: try: listener = TransparentUdpListener(self, family, host, self.listen_port) listener.start() self.udp_listeners.append(listener) except Exception as exc: print(f"[edge] udp listener skipped family={family} host={host} error={exc!r}") async def _accept(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: peer = writer.get_extra_info("peername") try: target = self._get_original_dst(writer) session_id = next(self.session_ids) session = TransparentSession(session_id=session_id, target=target, reader=reader, writer=writer, paths=[], warmup_bytes=self.config.tcp_warmup_bytes, loser_grace_ms=self.config.tcp_loser_grace_ms, stats=self.tcp_win_counts, target_stats=self.tcp_target_wins, family_stats=self.tcp_family_wins) paths: list[BasePath] = self._build_direct_paths(session) for connection in self.manager.available(): stream_id = next(self.stream_ids) paths.append(RelayTcpPath(name=connection.node.name, on_frame=lambda path, event, payload, s=session: self._handle_tcp_session(s, path, event, payload), connection=connection, session_id=session_id, stream_id=stream_id)) session.paths = paths print(f"[edge] accept peer={peer} session={session_id} target={target.host}:{target.port} candidates={[path.name for path in paths]}") await session.start() except Exception as exc: print(f"[edge] accept failed peer={peer} error={exc!r}") writer.close() with contextlib.suppress(Exception): await writer.wait_closed() async def _handle_tcp_session(self, session: TransparentSession, path: BasePath, event: str, payload: bytes | None) -> None: await session.handle_path(path, event, payload) def _get_original_dst(self, writer: asyncio.StreamWriter) -> TargetAddress: sock = writer.get_extra_info("socket") if sock is None: raise RuntimeError("socket unavailable") family = sock.family if family == socket.AF_INET: raw = sock.getsockopt(socket.SOL_IP, SO_ORIGINAL_DST, 16) return parse_sockaddr(raw) if family == socket.AF_INET6: raw = sock.getsockopt(socket.IPPROTO_IPV6, IP6T_SO_ORIGINAL_DST, 128) return parse_sockaddr(raw) raise RuntimeError(f"unsupported socket family={family}") async def handle_udp_datagram(self, source: PeerAddress, target: TargetAddress, payload: bytes, listener: TransparentUdpListener) -> None: if not self.enable_udp: return if target.port == self.listen_port and target.host in ("127.0.0.1", "::1", self.listen_host): return key = (source, target) flow = self.udp_flows.get(key) if flow is None: flow_id = next(self.udp_flow_ids) paths: list[BasePath] = [DirectUdpPath(name="direct", on_frame=lambda path, event, data, fid=flow_id: self._handle_udp_path(fid, path, event, data), target=target)] for connection in self.manager.available(): stream_id = next(self.stream_ids) paths.append(RelayUdpPath(name=connection.node.name, on_frame=lambda path, event, data, fid=flow_id: self._handle_udp_path(fid, path, event, data), connection=connection, session_id=flow_id, stream_id=stream_id, target=target)) flow = UdpFlow(flow_id=flow_id, source=source, target=target, send_response=listener.send_response, paths=paths) self.udp_flows[key] = flow print(f"[edge] udp flow={flow_id} target={target.host}:{target.port} candidates={[path.name for path in paths]}") await flow.start() await flow.send(payload) async def _handle_udp_path(self, flow_id: int, path: BasePath, event: str, payload: bytes | None) -> None: for flow in list(self.udp_flows.values()): if flow.flow_id == flow_id: await flow.handle_path(path, event, payload) break async def _gc_udp_flows(self) -> None: loop = asyncio.get_running_loop() while True: await asyncio.sleep(30) now = loop.time() stale = [key for key, flow in self.udp_flows.items() if flow.last_activity and now - flow.last_activity > 120] for key in stale: flow = self.udp_flows.pop(key, None) if flow: await flow.close()