from __future__ import annotations import asyncio import contextlib import itertools import json import socket import struct from collections import deque from dataclasses import dataclass, field from typing import Dict from .config_udp import UdpConfig, UdpRelayNode from .logging_utils import log_print as print from .scheduler_udp import UdpScheduler from .protocol import AUTH, STATUS_ERR, STATUS_OK, UDP_RECV, UDP_SEND, Frame, read_frame, write_frame, encode_json SOCKS_VERSION = 5 UDP_WARMUP_BROADCAST_PACKETS = 6 UDP_SHADOW_PROBE_INTERVAL_SEC = 0.25 UDP_FAST_FAILOVER_MISSES = 3 UDP_FLOW_IDLE_CLEANUP_SEC = 30.0 UDP_PACKET_CLIENT_MAP_LIMIT = 4096 UDP_DIRECT_PENDING_LIMIT = 128 UDP_SOCKET_BUFFER_BYTES = 1 << 20 async def read_exact(reader: asyncio.StreamReader, size: int) -> bytes: return await reader.readexactly(size) @dataclass(eq=False) class RelayLink: node: UdpRelayNode reader: asyncio.StreamReader writer: asyncio.StreamWriter pump: asyncio.Task | None = None closed_event: asyncio.Event = field(default_factory=asyncio.Event) maintain_task: asyncio.Task | None = None udp_server: "UdpAssociateServer | None" = None closed: bool = False supports_udp: bool = True async def start(self) -> None: await write_frame(self.writer, Frame(AUTH, 0, 0, 0, 0, encode_json({"token": self.node.token}))) frame = await read_frame(self.reader) if frame.kind != AUTH or frame.packet_id != STATUS_OK: raise ConnectionError(f"relay auth failed: {self.node.name}") ack = {} if frame.payload: try: ack = json.loads(frame.payload.decode("utf-8")) except Exception: ack = {} self.supports_udp = True self.closed = False self.closed_event.clear() self.pump = asyncio.create_task(self._pump()) if ack: print(f"[edge] relay connected name={self.node.name} addr={self.node.host}:{self.node.port} udp_only={ack.get('udp_only', True)}") async def _pump(self) -> None: try: while True: try: frame = await read_frame(self.reader) except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError, OSError): break if frame.kind == UDP_RECV and self.udp_server: await self.udp_server.handle_from_relay(frame, self) finally: await self.close() async def send(self, frame: Frame) -> None: if self.closed: raise ConnectionError(f"relay closed: {self.node.name}") try: await write_frame(self.writer, frame) except (BrokenPipeError, ConnectionResetError, RuntimeError, OSError, asyncio.CancelledError) as exc: await self.close() raise ConnectionError(f"relay closed: {self.node.name}") from exc async def close(self) -> None: if self.closed: return self.closed = True self.closed_event.set() if self.pump and self.pump is not asyncio.current_task(): self.pump.cancel() with contextlib.suppress(Exception): await self.pump self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed() @dataclass class UdpFlowState: flow_id: int client_addr: tuple[str, int] target_host: str target_port: int created_at: float last_activity: float packets_sent: int = 0 packets_received: int = 0 duplicate_responses: int = 0 winner_name: str | None = None candidate_names: tuple[str, ...] = () link_streams: dict[str, int] = field(default_factory=dict) initialized_links: set[str] = field(default_factory=set) direct_sockets: dict[str, socket.socket] = field(default_factory=dict) direct_tasks: dict[str, asyncio.Task] = field(default_factory=dict) direct_failures: set[str] = field(default_factory=set) relay_failures: dict[str, int] = field(default_factory=dict) relay_error_seen: set[str] = field(default_factory=set) path_last_seen: dict[str, float] = field(default_factory=dict) packet_client_addrs: dict[int, tuple[str, int]] = field(default_factory=dict) direct_pending_clients: dict[str, deque[tuple[int, tuple[str, int]]]] = field(default_factory=dict) last_probe_at: float = 0.0 winner_miss_streak: int = 0 target_family: int = 0 last_cleanup_at: float = 0.0 def touch(self, now: float) -> None: self.last_activity = now class UdpAssociateServer(asyncio.DatagramProtocol): def __init__(self, edge: "UdpEdge") -> None: self.edge = edge self.transport: asyncio.DatagramTransport | None = None self.client_addr = None self.associate_peer = None self.packet_counter = itertools.count(1) self.last_packet_id = 0 self.client_flows: dict[tuple[tuple[str, int], str, int], UdpFlowState] = {} self.flow_counter = itertools.count(1) self.last_summary_at = 0.0 self.win_counts: Dict[str, int] = {} self._last_client_port_log_at = 0.0 self._last_flow_cleanup_at = 0.0 def connection_made(self, transport) -> None: self.transport = transport def register_associate(self, peer) -> None: peer_text = f"{peer[0]}:{peer[1]}" if isinstance(peer, tuple) and len(peer) >= 2 else str(peer) if self.associate_peer != peer_text: print(f"[edge] udp associate peer={peer_text}") self.associate_peer = peer_text def _client_flow_key(self, addr, host: str, port: int) -> tuple[tuple[str, int], str, int]: return ((addr[0], 0), host, port) def datagram_received(self, data: bytes, addr) -> None: if len(data) < 10: return if self.client_addr is None: self.client_addr = addr print(f"[edge] udp client bound addr={addr[0]}:{addr[1]}") elif addr != self.client_addr: if addr[0] == self.client_addr[0]: now = asyncio.get_running_loop().time() if now - self._last_client_port_log_at >= 30: self._last_client_port_log_at = now print(f"[edge] udp client port update host={addr[0]} old_port={self.client_addr[1]} new_port={addr[1]}") self.client_addr = addr else: print(f"[edge] udp client rebound old={self.client_addr[0]}:{self.client_addr[1]} new={addr[0]}:{addr[1]}") self._reset_client_state(addr) host, port, payload = self._parse_socks_udp(data) now = asyncio.get_running_loop().time() flow_key = self._client_flow_key(addr, host, port) flow = self.client_flows.get(flow_key) if flow is None: family = socket.AF_INET6 if ":" in host else socket.AF_INET flow = UdpFlowState(next(self.flow_counter), (addr[0], addr[1]), host, port, now, now, target_family=family) self.client_flows[flow_key] = flow flow.touch(now) flow.client_addr = (addr[0], addr[1]) flow.packets_sent += 1 packet_id = next(self.packet_counter) self.last_packet_id = packet_id flow.packet_client_addrs[packet_id] = (addr[0], addr[1]) self._cleanup_packet_state(flow, now) asyncio.create_task(self.edge.forward_udp(flow, payload, packet_id, (addr[0], addr[1]), self)) self._cleanup_inactive_flows(now) self._log_udp_summary() def _reset_client_state(self, addr) -> None: remapped: dict[tuple[tuple[str, int], str, int], UdpFlowState] = {} for flow in list(self.client_flows.values()): flow.client_addr = (addr[0], addr[1]) remapped[self._client_flow_key(addr, flow.target_host, flow.target_port)] = flow self.client_flows = remapped self.client_addr = addr async def handle_from_relay(self, frame: Frame, link: RelayLink) -> None: if self.transport is None or self.client_addr is None: return flow = self.edge.udp_flow_sessions.get((frame.session_id, frame.stream_id)) if flow is None: return if frame.packet_id == STATUS_ERR: flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1 if link.node.name not in flow.relay_error_seen: flow.relay_error_seen.add(link.node.name) detail = frame.payload.decode("utf-8", errors="replace") print(f"[edge] udp relay error flow={flow.flow_id} relay={link.node.name} error={detail}") return await self._deliver_flow_packet(flow, frame.packet_id, frame.payload, link.node.name) async def handle_from_direct(self, flow: UdpFlowState, path_name: str, payload: bytes, packet_id: int = 0, client_addr: tuple[str, int] | None = None) -> None: if self.transport is None or self.client_addr is None: return await self._deliver_flow_packet(flow, packet_id, payload, path_name, client_addr) async def _deliver_flow_packet(self, flow: UdpFlowState, packet_id: int, payload: bytes, source_name: str, client_addr: tuple[str, int] | None = None) -> None: if self.transport is None or self.client_addr is None: return packet = self._build_socks_udp(flow.target_host, flow.target_port, payload) now = asyncio.get_running_loop().time() flow.touch(now) flow.path_last_seen[source_name] = now flow.packets_received += 1 target_addr = client_addr or flow.packet_client_addrs.pop(packet_id, None) or flow.client_addr if flow.winner_name is None: flow.winner_name = source_name flow.winner_miss_streak = 0 self.win_counts[source_name] = self.win_counts.get(source_name, 0) + 1 self._log_udp_summary(force=True) elif flow.winner_name != source_name: flow.duplicate_responses += 1 winner_last_seen = flow.path_last_seen.get(flow.winner_name, 0.0) if winner_last_seen and now - winner_last_seen >= (self.edge.config.udp_failover_idle_ms / 1000): flow.winner_name = source_name flow.winner_miss_streak = 0 self.win_counts[source_name] = self.win_counts.get(source_name, 0) + 1 self._log_udp_summary(force=True) else: flow.winner_miss_streak = 0 if flow.winner_name == source_name and target_addr is not None: if flow.packets_received == 1: print( f"[edge] udp relay reply flow={flow.flow_id} relay={source_name} " f"target={flow.target_host}:{flow.target_port} bytes={len(payload)}" ) self.transport.sendto(packet, target_addr) def _cleanup_packet_state(self, flow: UdpFlowState, now: float) -> None: if flow.last_cleanup_at and now - flow.last_cleanup_at < 1.0: return flow.last_cleanup_at = now expired_packet_ids = [ packet_id for packet_id in flow.packet_client_addrs if packet_id <= (self.last_packet_id - UDP_PACKET_CLIENT_MAP_LIMIT) ] for packet_id in expired_packet_ids: flow.packet_client_addrs.pop(packet_id, None) for path_name, pending in list(flow.direct_pending_clients.items()): while len(pending) > UDP_DIRECT_PENDING_LIMIT: pending.popleft() if not pending: flow.direct_pending_clients.pop(path_name, None) def _cleanup_inactive_flows(self, now: float) -> None: if self._last_flow_cleanup_at and now - self._last_flow_cleanup_at < 5.0: return self._last_flow_cleanup_at = now expired_keys = [ key for key, flow in self.client_flows.items() if now - flow.last_activity >= UDP_FLOW_IDLE_CLEANUP_SEC ] for key in expired_keys: flow = self.client_flows.pop(key, None) if flow is None: continue self.edge.release_udp_flow(flow) def set_flow_candidates(self, flow: UdpFlowState, candidate_names: tuple[str, ...]) -> None: if not flow.candidate_names: flow.candidate_names = candidate_names def note_unsent(self, flow: UdpFlowState, _packet_id: int) -> None: flow.touch(asyncio.get_running_loop().time()) flow.relay_failures["unsent"] = flow.relay_failures.get("unsent", 0) + 1 self._log_udp_summary(force=True) def _log_udp_summary(self, force: bool = False) -> None: now = asyncio.get_running_loop().time() if not force and now - self.last_summary_at < 10: return self.last_summary_at = now active_flows = len(self.client_flows) winners = sum(1 for flow in self.client_flows.values() if flow.winner_name) packets_sent = sum(flow.packets_sent for flow in self.client_flows.values()) packets_received = sum(flow.packets_received for flow in self.client_flows.values()) duplicates = sum(flow.duplicate_responses for flow in self.client_flows.values()) direct_paths = sum(len(flow.direct_sockets) for flow in self.client_flows.values()) relay_candidates = sum(len(flow.link_streams) for flow in self.client_flows.values()) candidate_names: list[str] = [] seen_candidates: set[str] = set() for flow in sorted(self.client_flows.values(), key=lambda item: item.flow_id): for name in flow.candidate_names: if name in seen_candidates: continue seen_candidates.add(name) candidate_names.append(name) direct_wins = sum(1 for flow in self.client_flows.values() if flow.winner_name and flow.winner_name.startswith("direct")) relay_wins = winners - direct_wins sample_flows = [f"{flow.flow_id}:{flow.winner_name or 'pending'}" for flow in sorted(self.client_flows.values(), key=lambda item: item.flow_id) if flow.winner_name][:5] relay_errors: list[str] = [] for flow in self.client_flows.values(): for name, count in flow.relay_failures.items(): relay_errors.append(f"{name}={count}") bind = f"{self.client_addr[0]}:{self.client_addr[1]}" if self.client_addr else "unbound" print( f"[edge] udp summary bind={bind} flows={active_flows} winners={winners} " f"winner_breakdown=direct={direct_wins},relay={relay_wins} sample={', '.join(sample_flows) or 'none'} " f"candidates={candidate_names or ['none']} sent={packets_sent} recv={packets_received} dup={duplicates} " f"direct_paths={direct_paths} relay_paths={relay_candidates} relay_errors={', '.join(sorted(relay_errors)) or 'none'}" ) def _parse_socks_udp(self, packet: bytes) -> tuple[str, int, bytes]: atyp = packet[3] offset = 4 if atyp == 1: host = socket.inet_ntoa(packet[offset:offset + 4]) offset += 4 elif atyp == 3: size = packet[offset] offset += 1 host = packet[offset:offset + size].decode() offset += size else: raise ValueError("unsupported udp atyp") port = struct.unpack("!H", packet[offset:offset + 2])[0] offset += 2 return host, port, packet[offset:] def _build_socks_udp(self, host: str, port: int, payload: bytes) -> bytes: try: addr = socket.inet_aton(host) header = b"\x00\x00\x00\x01" + addr + struct.pack("!H", port) except OSError: raw = host.encode() header = b"\x00\x00\x00\x03" + bytes([len(raw)]) + raw + struct.pack("!H", port) return header + payload class UdpEdge: def __init__(self, listen_host: str, listen_port: int, config: UdpConfig) -> None: self.listen_host = listen_host self.listen_port = listen_port self.config = config self.scheduler = UdpScheduler(config) self.links: list[RelayLink] = [] self.udp_stream_ids = itertools.count(1) self.udp_flow_sessions: dict[tuple[int, int], UdpFlowState] = {} self.udp_server: UdpAssociateServer | None = None def _udp_direct_copies(self) -> int: if self.config.udp_direct_copies is not None: return max(1, self.config.udp_direct_copies) return max(1, self.config.udp_redundancy + 1) def _udp_relay_copies(self) -> int: if self.config.udp_relay_copies is not None: return max(1, self.config.udp_relay_copies) return max(1, self.config.udp_redundancy + 1) def release_udp_flow(self, flow: UdpFlowState) -> None: for stream_id in list(flow.link_streams.values()): self.udp_flow_sessions.pop((flow.flow_id, stream_id), None) flow.link_streams.clear() flow.initialized_links.clear() flow.packet_client_addrs.clear() for task in list(flow.direct_tasks.values()): task.cancel() flow.direct_tasks.clear() for sock in list(flow.direct_sockets.values()): with contextlib.suppress(Exception): sock.close() flow.direct_sockets.clear() flow.direct_pending_clients.clear() async def start(self) -> None: await self.scheduler.start() await self._connect_relays() server = await asyncio.start_server(self._accept, self.listen_host, self.listen_port) sockets = ", ".join(str(sock.getsockname()) for sock in server.sockets or []) relay_mode = "direct-only" if not self.config.relays else "direct+relay" print(f"[edge] socks5 listening on {sockets} relay_mode={relay_mode}") async with server: await server.serve_forever() async def _connect_relays(self) -> None: loop = asyncio.get_running_loop() transport, protocol = await loop.create_datagram_endpoint(lambda: UdpAssociateServer(self), local_addr=(self.listen_host, 0)) self.udp_server = protocol self.udp_transport = transport for node in self.config.relays: link = RelayLink(node=node, reader=None, writer=None) # type: ignore[arg-type] link.udp_server = protocol self.links.append(link) link.maintain_task = asyncio.create_task(self._maintain_link(link)) async def _maintain_link(self, link: RelayLink) -> None: backoff = 1.0 while True: try: reader, writer = await asyncio.open_connection(link.node.host, link.node.port) sock = writer.get_extra_info("socket") if sock is not None: with contextlib.suppress(OSError): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) link.reader = reader link.writer = writer await link.start() backoff = 1.0 await link.closed_event.wait() except asyncio.CancelledError: raise except Exception: await asyncio.sleep(backoff) backoff = min(10.0, backoff * 2) async def _accept(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: try: peer = writer.get_extra_info("peername") _host, _port, udp_mode = await self._handshake(reader, writer, peer) if udp_mode: return except Exception: writer.close() with contextlib.suppress(Exception): await writer.wait_closed() def _selected_udp_links(self) -> list[RelayLink]: online = [link for link in self.links if not link.closed and link.writer is not None and link.supports_udp] if not online: return [] return sorted(online, key=lambda link: self.scheduler.scores.get(link.node.name).score if link.node.name in self.scheduler.scores else 999999.0) def _udp_direct_redundancy_for_target(self, target_host: str) -> int: base = self.config.udp_direct_redundancy if ":" in target_host and self.config.udp_direct_redundancy_v6 is not None: base = self.config.udp_direct_redundancy_v6 elif ":" not in target_host and self.config.udp_direct_redundancy_v4 is not None: base = self.config.udp_direct_redundancy_v4 return max(1, base) async def _ensure_udp_direct_paths(self, flow: UdpFlowState, udp_server: UdpAssociateServer) -> None: target_count = self._udp_direct_redundancy_for_target(flow.target_host) for index in range(target_count): name = f"direct-{index + 1}" if target_count > 1 else "direct" if name in flow.direct_sockets or name in flow.direct_failures: continue try: family = socket.AF_INET6 if ":" in flow.target_host else socket.AF_INET sock = socket.socket(family, socket.SOCK_DGRAM) sock.setblocking(False) with contextlib.suppress(OSError): sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, UDP_SOCKET_BUFFER_BYTES) with contextlib.suppress(OSError): sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, UDP_SOCKET_BUFFER_BYTES) await asyncio.get_running_loop().sock_connect(sock, (flow.target_host, flow.target_port)) flow.direct_sockets[name] = sock flow.direct_tasks[name] = asyncio.create_task(self._pump_udp_direct(flow, name, sock, udp_server)) except Exception as exc: flow.direct_failures.add(name) print(f"[edge] udp direct open error flow={flow.flow_id} path={name} target={flow.target_host}:{flow.target_port} error={exc!r}") async def _pump_udp_direct(self, flow: UdpFlowState, path_name: str, sock: socket.socket, udp_server: UdpAssociateServer) -> None: loop = asyncio.get_running_loop() try: while True: data = await loop.sock_recv(sock, 65535) if not data: break pending = flow.direct_pending_clients.get(path_name) packet_id = 0 client_addr = flow.client_addr if pending: packet_id, client_addr = pending.popleft() await udp_server.handle_from_direct(flow, path_name, data, packet_id, client_addr) finally: flow.direct_tasks.pop(path_name, None) flow.direct_sockets.pop(path_name, None) with contextlib.suppress(Exception): sock.close() async def forward_udp(self, flow: UdpFlowState, payload: bytes, packet_id: int, client_addr: tuple[str, int], udp_server: UdpAssociateServer) -> None: await self._ensure_udp_direct_paths(flow, udp_server) meta = encode_json({"host": flow.target_host, "port": flow.target_port, "family": flow.target_family}) links = self._selected_udp_links() direct_names = tuple(name for name in sorted(flow.direct_sockets)) relay_names = tuple(link.node.name for link in links) candidate_names = direct_names + relay_names udp_server.set_flow_candidates(flow, candidate_names) if not candidate_names: udp_server.note_unsent(flow, packet_id) return active_direct_names = list(direct_names) active_links = links now = asyncio.get_running_loop().time() warmup_mode = flow.packets_sent <= UDP_WARMUP_BROADCAST_PACKETS shadow_probe = flow.winner_name is not None and now - flow.last_probe_at >= UDP_SHADOW_PROBE_INTERVAL_SEC if shadow_probe: flow.last_probe_at = now broadcast_mode = self.config.udp_always_broadcast or flow.winner_name is None or warmup_mode or shadow_probe if not broadcast_mode: winner_last_seen = flow.path_last_seen.get(flow.winner_name, 0.0) if flow.winner_name else 0.0 winner_stale = bool(winner_last_seen and now - winner_last_seen >= (self.config.udp_failover_idle_ms / 1000)) if not winner_stale: flow.winner_miss_streak += 1 if winner_stale or flow.winner_miss_streak >= UDP_FAST_FAILOVER_MISSES: flow.winner_name = None flow.winner_miss_streak = 0 broadcast_mode = True else: active_direct_names = [name for name in active_direct_names if name == flow.winner_name] active_links = [link for link in active_links if link.node.name == flow.winner_name] if not active_direct_names and not active_links: if direct_names: active_direct_names = [direct_names[0]] elif links: active_links = links[:1] direct_copies = self._udp_direct_copies() relay_copies = self._udp_relay_copies() sent_any = False for attempt in range(max(direct_copies, relay_copies)): for path_name in active_direct_names if attempt < direct_copies else (): sock = flow.direct_sockets.get(path_name) if sock is None: continue try: flow.direct_pending_clients.setdefault(path_name, deque()).append((packet_id, client_addr)) await asyncio.get_running_loop().sock_sendall(sock, payload) sent_any = True except Exception as exc: pending = flow.direct_pending_clients.get(path_name) if pending: with contextlib.suppress(Exception): pending.pop() flow.direct_failures.add(path_name) flow.direct_sockets.pop(path_name, None) task = flow.direct_tasks.pop(path_name, None) if task is not None: task.cancel() with contextlib.suppress(Exception): sock.close() flow.relay_failures[path_name] = flow.relay_failures.get(path_name, 0) + 1 if path_name not in flow.relay_error_seen: flow.relay_error_seen.add(path_name) print(f"[edge] udp relay error flow={flow.flow_id} relay={path_name} error={exc!r}") for link in active_links if attempt < relay_copies else (): stream_id = flow.link_streams.get(link.node.name) if stream_id is None: stream_id = next(self.udp_stream_ids) flow.link_streams[link.node.name] = stream_id self.udp_flow_sessions[(flow.flow_id, stream_id)] = flow include_meta = link.node.name not in flow.initialized_links body = (meta + payload) if include_meta else payload meta_len = len(meta) if include_meta else 0 try: await link.send(Frame(UDP_SEND, flow.flow_id, stream_id, 0, meta_len, body)) flow.initialized_links.add(link.node.name) sent_any = True except Exception as exc: flow.link_streams.pop(link.node.name, None) self.udp_flow_sessions.pop((flow.flow_id, stream_id), None) flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1 if link.node.name not in flow.relay_error_seen: flow.relay_error_seen.add(link.node.name) print(f"[edge] udp relay error flow={flow.flow_id} relay={link.node.name} error={exc!r}") if attempt + 1 < max(direct_copies, relay_copies) and self.config.udp_copy_interval_ms > 0: await asyncio.sleep(self.config.udp_copy_interval_ms / 1000) if not sent_any: udp_server.note_unsent(flow, packet_id) async def _handshake(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, peer) -> tuple[str, int, bool]: version, methods_len = (await read_exact(reader, 2)) if version != SOCKS_VERSION: raise ValueError("unsupported socks version") await read_exact(reader, methods_len) writer.write(b"\x05\x00") await writer.drain() version, command, _, atyp = await read_exact(reader, 4) if version != SOCKS_VERSION: raise ValueError("unsupported socks version") if atyp == 1: host = socket.inet_ntoa(await read_exact(reader, 4)) elif atyp == 3: size = (await read_exact(reader, 1))[0] host = (await read_exact(reader, size)).decode() else: raise ValueError("unsupported atyp") port = struct.unpack("!H", await read_exact(reader, 2))[0] peer_text = f"{peer[0]}:{peer[1]}" if isinstance(peer, tuple) and len(peer) >= 2 else str(peer) if command == 3 and self.udp_server and self.udp_server.transport: bind_host, bind_port = self.udp_server.transport.get_extra_info("sockname")[:2] self.udp_server.register_associate(peer) print(f"[edge] socks handshake peer={peer_text} command=udp_associate target={host}:{port} bind={bind_host}:{bind_port}") writer.write(b"\x05\x00\x00\x01" + socket.inet_aton(bind_host) + struct.pack("!H", bind_port)) await writer.drain() return host, port, True raise ValueError("unsupported socks command")