| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808 |
- from __future__ import annotations
- from pathlib import Path
- import asyncio
- import contextlib
- import itertools
- import os
- import socket
- import struct
- from dataclasses import dataclass, field
- from typing import Awaitable, Callable
- from .config import Config
- from .protocol import STATUS_OK, TCP_CLOSE, TCP_DATA, TCP_OPEN, TCP_STATUS, UDP_RECV, UDP_SEND, Frame, encode_json
- from .relay_client import RelayConnection, RelayManager
- SO_ORIGINAL_DST = 80
- IP6T_SO_ORIGINAL_DST = 80
- IP_RECVORIGDSTADDR = 20
- IPV6_RECVORIGDSTADDR = 74
- @dataclass(frozen=True)
- class TargetAddress:
- host: str
- port: int
- family: int
- @dataclass(frozen=True)
- class PeerAddress:
- host: str
- port: int
- family: int
- def parse_sockaddr(raw: bytes) -> TargetAddress:
- if len(raw) < 8:
- raise ValueError("invalid transparent destination payload")
- family = struct.unpack_from("=H", raw, 0)[0]
- port = struct.unpack_from("!H", raw, 2)[0]
- if family == socket.AF_INET:
- host = socket.inet_ntoa(raw[4:8])
- return TargetAddress(host=host, port=port, family=family)
- if family == socket.AF_INET6:
- if len(raw) < 28:
- raise ValueError("invalid IPv6 transparent destination payload")
- host = socket.inet_ntop(socket.AF_INET6, raw[8:24])
- return TargetAddress(host=host, port=port, family=family)
- raise ValueError(f"unsupported family={family}")
- def winner_group(name: str) -> str:
- return "direct" if name.startswith("direct") else name
- def grouped_total(stats: dict[str, int], group: str) -> int:
- return sum(count for name, count in stats.items() if winner_group(name) == group)
- class BasePath:
- def __init__(self, name: str, on_frame: Callable[["BasePath", str, bytes | None], Awaitable[None]]) -> None:
- self.name = name
- self.on_frame = on_frame
- self.opened = False
- self.closed = False
- async def open(self, target: TargetAddress) -> None:
- raise NotImplementedError
- async def send(self, data: bytes) -> None:
- raise NotImplementedError
- async def close(self) -> None:
- raise NotImplementedError
- class DirectTcpPath(BasePath):
- def __init__(self, name: str, on_frame: Callable[[BasePath, str, bytes | None], Awaitable[None]], open_timeout: float, happy_eyeballs_delay: float | None, tcp_nodelay: bool = True) -> None:
- super().__init__(name, on_frame)
- self.reader: asyncio.StreamReader | None = None
- self.writer: asyncio.StreamWriter | None = None
- self.pump_task: asyncio.Task | None = None
- self.open_timeout = open_timeout
- self.happy_eyeballs_delay = happy_eyeballs_delay
- self.tcp_nodelay = tcp_nodelay
- async def open(self, target: TargetAddress) -> None:
- try:
- family = socket.AF_INET6 if target.family == socket.AF_INET6 else socket.AF_INET
- kwargs = {"host": target.host, "port": target.port, "family": family}
- if self.happy_eyeballs_delay is not None:
- kwargs["happy_eyeballs_delay"] = self.happy_eyeballs_delay
- self.reader, self.writer = await asyncio.wait_for(asyncio.open_connection(**kwargs), timeout=self.open_timeout)
- sock = self.writer.get_extra_info("socket")
- if sock is not None and self.tcp_nodelay:
- with contextlib.suppress(OSError):
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- self.opened = True
- self.pump_task = asyncio.create_task(self._pump())
- await self.on_frame(self, "status", b"ok")
- except Exception as exc:
- await self.on_frame(self, "status", str(exc).encode())
- async def _pump(self) -> None:
- assert self.reader is not None
- try:
- while True:
- chunk = await self.reader.read(65536)
- if not chunk:
- break
- await self.on_frame(self, "data", chunk)
- except Exception:
- pass
- finally:
- await self.on_frame(self, "close", None)
- async def send(self, data: bytes) -> None:
- if self.closed or self.writer is None:
- return
- try:
- self.writer.write(data)
- await self.writer.drain()
- except (BrokenPipeError, ConnectionResetError, RuntimeError, OSError, asyncio.CancelledError) as exc:
- await self.close()
- raise ConnectionError("relay closed") from exc
- async def close(self) -> None:
- if self.closed:
- return
- self.closed = True
- if self.pump_task and self.pump_task is not asyncio.current_task():
- self.pump_task.cancel()
- with contextlib.suppress(Exception):
- await self.pump_task
- if self.writer:
- self.writer.close()
- with contextlib.suppress(Exception):
- await self.writer.wait_closed()
- class RelayTcpPath(BasePath):
- def __init__(self, name: str, on_frame: Callable[[BasePath, str, bytes | None], Awaitable[None]], connection: RelayConnection, session_id: int, stream_id: int) -> None:
- super().__init__(name, on_frame)
- self.connection = connection
- self.session_id = session_id
- self.stream_id = stream_id
- self.unbind_task: asyncio.Task | None = None
- async def open(self, target: TargetAddress) -> None:
- if self.connection.closed:
- await self.on_frame(self, "status", b"relay unavailable")
- return
- self.connection.bind(self.session_id, self.stream_id, self._handle_frame)
- try:
- await self.connection.send(Frame(TCP_OPEN, self.session_id, self.stream_id, 0, 0, encode_json({"host": target.host, "port": target.port, "family": target.family})))
- except Exception as exc:
- self.connection.unbind(self.session_id, self.stream_id)
- await self.on_frame(self, "status", str(exc).encode())
- async def _handle_frame(self, _conn: RelayConnection, frame: Frame) -> None:
- if frame.kind == TCP_STATUS:
- if frame.packet_id == STATUS_OK:
- self.opened = True
- await self.on_frame(self, "status", b"ok")
- else:
- await self.on_frame(self, "status", frame.payload)
- return
- if frame.kind == TCP_DATA:
- await self.on_frame(self, "data", frame.payload)
- return
- if frame.kind == TCP_CLOSE:
- await self.on_frame(self, "close", None)
- async def send(self, data: bytes) -> None:
- if self.closed or self.connection.closed:
- return
- await self.connection.send(Frame(TCP_DATA, self.session_id, self.stream_id, 0, 0, data))
- async def close(self) -> None:
- if self.closed:
- return
- self.closed = True
- if self.unbind_task is None or self.unbind_task.done():
- self.unbind_task = asyncio.create_task(self._delayed_unbind())
- if not self.connection.closed:
- with contextlib.suppress(Exception):
- await self.connection.send(Frame(TCP_CLOSE, self.session_id, self.stream_id, 0, 0, b""))
- async def _delayed_unbind(self) -> None:
- await asyncio.sleep(0.5)
- self.connection.unbind(self.session_id, self.stream_id)
- @dataclass
- class TransparentSession:
- session_id: int
- target: TargetAddress
- reader: asyncio.StreamReader
- writer: asyncio.StreamWriter
- paths: list[BasePath]
- warmup_bytes: int
- loser_grace_ms: int
- stats: dict[str, int]
- target_stats: dict[tuple[str, int], dict[str, int]]
- family_stats: dict[str, dict[str, int]]
- opened_count: int = 0
- status_count: int = 0
- errors: list[str] = field(default_factory=list)
- winner: BasePath | None = None
- uplink_bytes: int = 0
- open_event: asyncio.Event = field(default_factory=asyncio.Event)
- winner_event: asyncio.Event = field(default_factory=asyncio.Event)
- closed: bool = False
- pump_task: asyncio.Task | None = None
- loser_close_task: asyncio.Task | None = None
- def _record_win(self, winner: BasePath) -> None:
- self.stats[winner.name] = self.stats.get(winner.name, 0) + 1
- key = (self.target.host, self.target.port)
- target_stats = self.target_stats.setdefault(key, {})
- target_stats[winner.name] = target_stats.get(winner.name, 0) + 1
- family_key = "ipv6" if self.target.family == socket.AF_INET6 else "ipv4"
- family_stats = self.family_stats.setdefault(family_key, {})
- family_stats[winner.name] = family_stats.get(winner.name, 0) + 1
- direct_wins = grouped_total(self.stats, "direct")
- relay_wins = sum(count for name, count in self.stats.items() if winner_group(name) != "direct")
- target_direct = grouped_total(target_stats, "direct")
- target_relay = sum(count for name, count in target_stats.items() if winner_group(name) != "direct")
- family_direct = grouped_total(family_stats, "direct")
- family_relay = sum(count for name, count in family_stats.items() if winner_group(name) != "direct")
- relay_detail = ", ".join(f"{name}={count}" for name, count in sorted(self.stats.items()) if winner_group(name) != "direct") or "none"
- target_detail = ", ".join(f"{name}={count}" for name, count in sorted(target_stats.items()) if winner_group(name) != "direct") or "none"
- target_pref = "relay" if target_relay > target_direct else "direct"
- family_pref = "relay" if family_relay > family_direct else "direct"
- print(f"[edge] tcp win session={self.session_id} target={self.target.host}:{self.target.port} winner={winner.name} direct={direct_wins} relay={relay_wins} relay_breakdown={relay_detail} target_pref={target_pref} target_direct={target_direct} target_relay={target_relay} target_breakdown={target_detail} family_pref={family_pref} family={family_key} family_direct={family_direct} family_relay={family_relay}")
- async def start(self) -> None:
- await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True)
- await asyncio.wait_for(self.open_event.wait(), timeout=15)
- if self.opened_count == 0:
- raise ConnectionError(self.errors[0] if self.errors else "all paths failed")
- self.pump_task = asyncio.create_task(self._pump_local())
- async def _pump_local(self) -> None:
- try:
- while True:
- chunk = await self.reader.read(65536)
- if not chunk:
- break
- self.uplink_bytes += len(chunk)
- active = [path for path in self.paths if path.opened and not path.closed]
- if not active:
- break
- if self.uplink_bytes <= self.warmup_bytes:
- await asyncio.gather(*(path.send(chunk) for path in active), return_exceptions=True)
- else:
- if self.winner is None:
- await self.winner_event.wait()
- if self.winner:
- await self.winner.send(chunk)
- except Exception:
- pass
- finally:
- await self.close()
- async def handle_path(self, path: BasePath, event: str, payload: bytes | None) -> None:
- if self.closed:
- return
- if event == "status":
- self.status_count += 1
- if payload == b"ok":
- self.opened_count += 1
- elif payload is not None:
- self.errors.append(payload.decode("utf-8", errors="replace"))
- if self.opened_count > 0 or self.status_count == len(self.paths):
- self.open_event.set()
- return
- if event == "data":
- if self.winner is None:
- self.winner = path
- self._record_win(path)
- self.winner_event.set()
- if self.loser_grace_ms > 0:
- self.loser_close_task = asyncio.create_task(self._close_losers_after_grace(path))
- else:
- await self._close_losers(path)
- if path is self.winner and payload is not None:
- self.writer.write(payload)
- await self.writer.drain()
- return
- if event == "close":
- path.closed = True
- if self.winner is None:
- remaining = [candidate for candidate in self.paths if candidate.opened and not candidate.closed]
- if not remaining:
- await self.close()
- elif path is self.winner:
- await self.close()
- async def _close_losers(self, winner: BasePath) -> None:
- await asyncio.gather(*(path.close() for path in self.paths if path is not winner), return_exceptions=True)
- async def _close_losers_after_grace(self, winner: BasePath) -> None:
- await asyncio.sleep(self.loser_grace_ms / 1000)
- if not self.closed:
- await self._close_losers(winner)
- async def close(self) -> None:
- if self.closed:
- return
- self.closed = True
- print(f"[edge] session={self.session_id} closed target={self.target.host}:{self.target.port}")
- if self.pump_task and self.pump_task is not asyncio.current_task():
- self.pump_task.cancel()
- with contextlib.suppress(Exception):
- await self.pump_task
- if self.loser_close_task and self.loser_close_task is not asyncio.current_task():
- self.loser_close_task.cancel()
- with contextlib.suppress(Exception):
- await self.loser_close_task
- await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
- self.writer.close()
- with contextlib.suppress(Exception):
- await self.writer.wait_closed()
- class DirectUdpPath(BasePath):
- def __init__(self, name: str, on_frame: Callable[[BasePath, str, bytes | None], Awaitable[None]], target: TargetAddress) -> None:
- super().__init__(name, on_frame)
- self.target = target
- self.socket: socket.socket | None = None
- self.read_task: asyncio.Task | None = None
- async def open(self, _target: TargetAddress) -> None:
- try:
- family = socket.AF_INET6 if self.target.family == socket.AF_INET6 else socket.AF_INET
- self.socket = socket.socket(family, socket.SOCK_DGRAM)
- self.socket.setblocking(False)
- await asyncio.get_running_loop().sock_connect(self.socket, (self.target.host, self.target.port))
- self.opened = True
- self.read_task = asyncio.create_task(self._pump())
- await self.on_frame(self, "status", b"ok")
- except Exception as exc:
- await self.on_frame(self, "status", str(exc).encode())
- async def _pump(self) -> None:
- assert self.socket is not None
- loop = asyncio.get_running_loop()
- try:
- while True:
- data = await loop.sock_recv(self.socket, 65535)
- if not data:
- break
- await self.on_frame(self, "data", data)
- except Exception:
- pass
- finally:
- await self.on_frame(self, "close", None)
- async def send(self, data: bytes) -> None:
- if self.closed or self.socket is None:
- return
- await asyncio.get_running_loop().sock_sendall(self.socket, data)
- async def close(self) -> None:
- if self.closed:
- return
- self.closed = True
- if self.read_task and self.read_task is not asyncio.current_task():
- self.read_task.cancel()
- with contextlib.suppress(Exception):
- await self.read_task
- if self.socket:
- self.socket.close()
- class RelayUdpPath(BasePath):
- def __init__(self, name: str, on_frame: Callable[[BasePath, str, bytes | None], Awaitable[None]], connection: RelayConnection, session_id: int, stream_id: int, target: TargetAddress) -> None:
- super().__init__(name, on_frame)
- self.connection = connection
- self.session_id = session_id
- self.stream_id = stream_id
- self.target = target
- self.unbind_task: asyncio.Task | None = None
- async def open(self, _target: TargetAddress) -> None:
- if self.connection.closed:
- await self.on_frame(self, "status", b"relay unavailable")
- return
- self.connection.bind(self.session_id, self.stream_id, self._handle_frame)
- try:
- self.opened = True
- await self.on_frame(self, "status", b"ok")
- except Exception:
- self.connection.unbind(self.session_id, self.stream_id)
- self.closed = True
- raise
- async def _handle_frame(self, _conn: RelayConnection, frame: Frame) -> None:
- if frame.kind == UDP_RECV:
- await self.on_frame(self, "data", frame.payload)
- async def send(self, data: bytes) -> None:
- if self.closed or self.connection.closed:
- return
- meta = encode_json({"host": self.target.host, "port": self.target.port, "family": self.target.family})
- payload = meta + data
- try:
- await self.connection.send(Frame(UDP_SEND, self.session_id, self.stream_id, 0, len(meta), payload))
- except Exception:
- self.closed = True
- raise
- async def close(self) -> None:
- if self.closed:
- return
- self.closed = True
- if self.unbind_task is None or self.unbind_task.done():
- self.unbind_task = asyncio.create_task(self._delayed_unbind())
- async def _delayed_unbind(self) -> None:
- await asyncio.sleep(0.5)
- self.connection.unbind(self.session_id, self.stream_id)
- @dataclass
- class UdpFlow:
- flow_id: int
- source: PeerAddress
- target: TargetAddress
- send_response: Callable[[PeerAddress, bytes], Awaitable[None]]
- paths: list[BasePath]
- redundancy: int = 0
- always_broadcast: bool = True
- copy_interval_ms: int = 0
- winner: BasePath | None = None
- closed: bool = False
- last_activity: float = 0.0
- packets_sent: int = 0
- packets_received: int = 0
- duplicate_responses: int = 0
- send_task: asyncio.Task | None = None
- async def start(self) -> None:
- await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True)
- async def send(self, payload: bytes) -> None:
- self.last_activity = asyncio.get_running_loop().time()
- self.packets_sent += 1
- active = [path for path in self.paths if path.opened and not path.closed]
- if not active:
- return
- copies = max(1, self.redundancy + 1)
- targets = active if self.always_broadcast or self.winner is None or self.winner.closed else [self.winner]
- for attempt in range(copies):
- await asyncio.gather(*(path.send(payload) for path in targets), return_exceptions=True)
- if attempt + 1 < copies and self.copy_interval_ms > 0:
- await asyncio.sleep(self.copy_interval_ms / 1000)
- async def handle_path(self, path: BasePath, event: str, payload: bytes | None) -> None:
- self.last_activity = asyncio.get_running_loop().time()
- if event == "data" and payload is not None:
- self.packets_received += 1
- if self.winner is None:
- self.winner = path
- mode = "redundant" if self.redundancy > 0 else "single"
- print(f"[edge] udp flow={self.flow_id} winner={path.name} target={self.target.host}:{self.target.port} mode={mode} candidates={len(self.paths)}")
- elif path is not self.winner:
- self.duplicate_responses += 1
- if path is self.winner:
- await self.send_response(self.source, payload)
- if event == "close":
- path.closed = True
- if path is self.winner:
- remaining = [candidate for candidate in self.paths if candidate.opened and not candidate.closed]
- self.winner = remaining[0] if remaining else None
- async def close(self) -> None:
- if self.closed:
- return
- self.closed = True
- if self.send_task and self.send_task is not asyncio.current_task():
- self.send_task.cancel()
- with contextlib.suppress(Exception):
- await self.send_task
- print(
- f"[edge] udp flow={self.flow_id} closed target={self.target.host}:{self.target.port} "
- f"sent={self.packets_sent} received={self.packets_received} dup={self.duplicate_responses}"
- )
- await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
- class TransparentUdpListener:
- def __init__(self, edge: "TransparentEdge", family: int, bind_host: str, port: int) -> None:
- self.edge = edge
- self.family = family
- self.bind_host = bind_host
- self.port = port
- self.socket: socket.socket | None = None
- self.udp_packets_received = 0
- self.udp_recv_errors = 0
- self.udp_parse_errors = 0
- self.udp_missing_original = 0
- self.udp_self_loop_skipped = 0
- self.udp_flows_created = 0
- self.last_summary_at = 0.0
- def start(self) -> None:
- sock = socket.socket(self.family, socket.SOCK_DGRAM)
- sock.setblocking(False)
- if self.family == socket.AF_INET:
- sock.setsockopt(socket.SOL_IP, IP_RECVORIGDSTADDR, 1)
- sock.bind((self.bind_host, self.port))
- else:
- sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
- sock.setsockopt(socket.IPPROTO_IPV6, IPV6_RECVORIGDSTADDR, 1)
- sock.bind((self.bind_host, self.port, 0, 0))
- self.socket = sock
- asyncio.get_running_loop().add_reader(sock.fileno(), self._on_readable)
- print(f"[edge] transparent udp listening on {sock.getsockname()}")
- def _log_udp_summary(self, force: bool = False) -> None:
- now = asyncio.get_running_loop().time()
- if not force and now - self.last_summary_at < 10:
- return
- self.last_summary_at = now
- print(
- f"[edge] udp summary family={self.family} bind={self.bind_host}:{self.port} "
- f"received={self.udp_packets_received} flows={self.udp_flows_created} "
- f"self_loop={self.udp_self_loop_skipped} missing_original={self.udp_missing_original} "
- f"parse_error={self.udp_parse_errors} recv_error={self.udp_recv_errors}"
- )
- def _on_readable(self) -> None:
- assert self.socket is not None
- try:
- data, ancdata, _flags, src = self.socket.recvmsg(65535, 512)
- except BlockingIOError:
- return
- except Exception as exc:
- self.udp_recv_errors += 1
- print(f"[edge] udp recvmsg error family={self.family} error={exc!r}")
- self._log_udp_summary(force=True)
- return
- self.udp_packets_received += 1
- original = None
- for level, ctype, cdata in ancdata:
- if self.family == socket.AF_INET and level == socket.SOL_IP and ctype == IP_RECVORIGDSTADDR:
- try:
- original = parse_sockaddr(cdata)
- except Exception as exc:
- self.udp_parse_errors += 1
- print(f"[edge] udp parse original dst error family={self.family} src={src} error={exc!r} raw_len={len(cdata)}")
- self._log_udp_summary(force=True)
- return
- break
- if self.family == socket.AF_INET6 and level == socket.IPPROTO_IPV6 and ctype == IPV6_RECVORIGDSTADDR:
- try:
- original = parse_sockaddr(cdata)
- except Exception as exc:
- self.udp_parse_errors += 1
- print(f"[edge] udp parse original dst error family={self.family} src={src} error={exc!r} raw_len={len(cdata)}")
- self._log_udp_summary(force=True)
- return
- break
- if original is None:
- self.udp_missing_original += 1
- self._log_udp_summary()
- return
- if self.family == socket.AF_INET:
- source = PeerAddress(host=src[0], port=src[1], family=socket.AF_INET)
- else:
- source = PeerAddress(host=src[0], port=src[1], family=socket.AF_INET6)
- if original.port == self.port and (original.host in ("127.0.0.1", "::1") or original.host == self.bind_host):
- self.udp_self_loop_skipped += 1
- print(
- f"[edge] udp self_loop family={self.family} src={source.host}:{source.port} "
- f"original={original.host}:{original.port} size={len(data)}"
- )
- self._log_udp_summary()
- return
- asyncio.create_task(self.edge.handle_udp_datagram(source, original, data, self))
- async def send_response(self, source: PeerAddress, payload: bytes) -> None:
- assert self.socket is not None
- if source.family == socket.AF_INET:
- self.socket.sendto(payload, (source.host, source.port))
- else:
- self.socket.sendto(payload, (source.host, source.port, 0, 0))
- async def close(self) -> None:
- if self.socket is None:
- return
- asyncio.get_running_loop().remove_reader(self.socket.fileno())
- self.socket.close()
- self.socket = None
- class TransparentEdge:
- def __init__(self, listen_host: str, listen_port: int, config: Config, enable_udp: bool = False, kernel_mode: str = "auto") -> None:
- self.listen_host = listen_host
- self.listen_port = listen_port
- self.config = config
- self.enable_udp = enable_udp
- self.kernel_mode = self._resolve_kernel_mode(kernel_mode, config.kernel_mode)
- self.manager = RelayManager(config)
- self.session_ids = itertools.count(1)
- self.stream_ids = itertools.count(1)
- self.udp_listeners: list[TransparentUdpListener] = []
- self.udp_flows: dict[tuple[PeerAddress, TargetAddress], UdpFlow] = {}
- self.udp_flow_ids = itertools.count(1)
- self.udp_gc_task: asyncio.Task | None = None
- self.tcp_win_counts: dict[str, int] = {}
- self.tcp_target_wins: dict[tuple[str, int], dict[str, int]] = {}
- self.tcp_family_wins: dict[str, dict[str, int]] = {"ipv4": {}, "ipv6": {}}
- def _resolve_kernel_mode(self, cli_kernel_mode: str, config_kernel_mode: str) -> str:
- mode = cli_kernel_mode if cli_kernel_mode != "auto" else config_kernel_mode
- if mode != "auto":
- return mode
- try:
- if Path("/etc/os-release").exists() and 'VERSION_ID="24' in Path("/etc/os-release").read_text(errors="ignore"):
- return "24"
- except Exception:
- pass
- try:
- release = os.uname().release
- if release.startswith("6."):
- return "24"
- except Exception:
- pass
- return "20"
- async def start(self) -> None:
- if self.kernel_mode == "24":
- if self.config.direct_open_timeout == 10.0:
- self.config.direct_open_timeout = 6.0
- if self.config.relay_open_timeout == 10.0:
- self.config.relay_open_timeout = 6.0
- if self.config.tcp_connect_happy_eyeballs_delay is None:
- self.config.tcp_connect_happy_eyeballs_delay = 0.25
- await self.manager.start()
- print(f"[edge] kernel_mode={self.kernel_mode} relay snapshot: {self.manager.snapshot()}")
- server4 = await asyncio.start_server(self._accept, self.listen_host, self.listen_port, family=socket.AF_INET)
- sockets = [str(sock.getsockname()) for sock in server4.sockets or []]
- server6 = None
- if self.listen_host in ("::", "::1", "0.0.0.0", "127.0.0.1"):
- host6 = "::1" if self.listen_host == "127.0.0.1" else "::"
- try:
- server6 = await asyncio.start_server(self._accept, host6, self.listen_port, family=socket.AF_INET6)
- sockets.extend(str(sock.getsockname()) for sock in server6.sockets or [])
- except Exception as exc:
- print(f"[edge] ipv6 tcp listener skipped: {exc!r}")
- if self.enable_udp:
- self._start_udp_listeners()
- self.udp_gc_task = asyncio.create_task(self._gc_udp_flows())
- print(f"[edge] transparent tcp listening on {', '.join(sockets)}")
- if server6 is None:
- async with server4:
- await server4.serve_forever()
- else:
- async with server4, server6:
- await asyncio.gather(server4.serve_forever(), server6.serve_forever())
- def _direct_redundancy_for_target(self, target: TargetAddress) -> int:
- base = self.config.direct_redundancy
- if target.family == socket.AF_INET6 and self.config.direct_redundancy_v6 is not None:
- base = self.config.direct_redundancy_v6
- elif target.family == socket.AF_INET and self.config.direct_redundancy_v4 is not None:
- base = self.config.direct_redundancy_v4
- base = max(1, min(base, self.config.direct_max_redundancy))
- target_stats = self.tcp_target_wins.get((target.host, target.port), {})
- family_key = "ipv6" if target.family == socket.AF_INET6 else "ipv4"
- family_stats = self.tcp_family_wins.get(family_key, {})
- target_prefers_relay = sum(count for name, count in target_stats.items() if winner_group(name) != "direct") > grouped_total(target_stats, "direct")
- family_prefers_relay = sum(count for name, count in family_stats.items() if winner_group(name) != "direct") > grouped_total(family_stats, "direct")
- if target_prefers_relay or family_prefers_relay:
- return min(self.config.direct_max_redundancy, base + 1)
- return base
- def _build_direct_paths(self, session: TransparentSession) -> list[BasePath]:
- count = self._direct_redundancy_for_target(session.target)
- return [
- DirectTcpPath(
- name=f"direct-{index + 1}" if count > 1 else "direct",
- on_frame=lambda path, event, payload, s=session: self._handle_tcp_session(s, path, event, payload),
- open_timeout=self.config.direct_open_timeout,
- happy_eyeballs_delay=self.config.tcp_connect_happy_eyeballs_delay,
- tcp_nodelay=self.config.relay_tcp_nodelay,
- )
- for index in range(count)
- ]
- def _build_udp_direct_paths(self, target: TargetAddress, flow_id: int) -> list[BasePath]:
- count = max(1, self.config.udp_direct_redundancy)
- if target.family == socket.AF_INET6 and self.config.udp_direct_redundancy_v6 is not None:
- count = max(1, self.config.udp_direct_redundancy_v6)
- elif target.family == socket.AF_INET and self.config.udp_direct_redundancy_v4 is not None:
- count = max(1, self.config.udp_direct_redundancy_v4)
- return [
- DirectUdpPath(
- name=f"direct-{index + 1}" if count > 1 else "direct",
- on_frame=lambda path, event, data, fid=flow_id: self._handle_udp_path(fid, path, event, data),
- target=target,
- )
- for index in range(count)
- ]
- def _start_udp_listeners(self) -> None:
- binds = []
- if self.listen_host == "127.0.0.1":
- binds = [(socket.AF_INET, "127.0.0.1"), (socket.AF_INET6, "::1")]
- elif self.listen_host == "0.0.0.0":
- binds = [(socket.AF_INET, "0.0.0.0"), (socket.AF_INET6, "::")]
- else:
- family = socket.AF_INET6 if ":" in self.listen_host else socket.AF_INET
- binds = [(family, self.listen_host)]
- for family, host in binds:
- try:
- listener = TransparentUdpListener(self, family, host, self.listen_port)
- listener.start()
- self.udp_listeners.append(listener)
- except Exception as exc:
- print(f"[edge] udp listener skipped family={family} host={host} error={exc!r}")
- async def _accept(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
- peer = writer.get_extra_info("peername")
- try:
- target = self._get_original_dst(writer)
- session_id = next(self.session_ids)
- session = TransparentSession(session_id=session_id, target=target, reader=reader, writer=writer, paths=[], warmup_bytes=self.config.tcp_warmup_bytes, loser_grace_ms=self.config.tcp_loser_grace_ms, stats=self.tcp_win_counts, target_stats=self.tcp_target_wins, family_stats=self.tcp_family_wins)
- paths: list[BasePath] = self._build_direct_paths(session)
- for connection in self.manager.available():
- stream_id = next(self.stream_ids)
- paths.append(RelayTcpPath(name=connection.node.name, on_frame=lambda path, event, payload, s=session: self._handle_tcp_session(s, path, event, payload), connection=connection, session_id=session_id, stream_id=stream_id))
- session.paths = paths
- print(f"[edge] accept peer={peer} session={session_id} target={target.host}:{target.port} candidates={[path.name for path in paths]}")
- await session.start()
- except Exception as exc:
- print(f"[edge] accept failed peer={peer} error={exc!r}")
- writer.close()
- with contextlib.suppress(Exception):
- await writer.wait_closed()
- async def _handle_tcp_session(self, session: TransparentSession, path: BasePath, event: str, payload: bytes | None) -> None:
- await session.handle_path(path, event, payload)
- def _get_original_dst(self, writer: asyncio.StreamWriter) -> TargetAddress:
- sock = writer.get_extra_info("socket")
- if sock is None:
- raise RuntimeError("socket unavailable")
- family = sock.family
- if family == socket.AF_INET:
- raw = sock.getsockopt(socket.SOL_IP, SO_ORIGINAL_DST, 16)
- return parse_sockaddr(raw)
- if family == socket.AF_INET6:
- raw = sock.getsockopt(socket.IPPROTO_IPV6, IP6T_SO_ORIGINAL_DST, 128)
- return parse_sockaddr(raw)
- raise RuntimeError(f"unsupported socket family={family}")
- async def handle_udp_datagram(self, source: PeerAddress, target: TargetAddress, payload: bytes, listener: TransparentUdpListener) -> None:
- if not self.enable_udp:
- return
- if target.port == self.listen_port and target.host in ("127.0.0.1", "::1", self.listen_host):
- return
- key = (source, target)
- flow = self.udp_flows.get(key)
- if flow is None:
- flow_id = next(self.udp_flow_ids)
- paths: list[BasePath] = self._build_udp_direct_paths(target, flow_id)
- for connection in self.manager.available():
- stream_id = next(self.stream_ids)
- paths.append(RelayUdpPath(name=connection.node.name, on_frame=lambda path, event, data, fid=flow_id: self._handle_udp_path(fid, path, event, data), connection=connection, session_id=flow_id, stream_id=stream_id, target=target))
- flow = UdpFlow(
- flow_id=flow_id,
- source=source,
- target=target,
- send_response=listener.send_response,
- paths=paths,
- redundancy=self.config.udp_redundancy,
- always_broadcast=self.config.udp_always_broadcast,
- copy_interval_ms=self.config.udp_copy_interval_ms,
- )
- self.udp_flows[key] = flow
- listener.udp_flows_created += 1
- listener._log_udp_summary(force=True)
- print(f"[edge] udp flow={flow_id} source={source.host}:{source.port} target={target.host}:{target.port} redundancy={self.config.udp_redundancy} direct_redundancy={self.config.udp_direct_redundancy} always_broadcast={self.config.udp_always_broadcast} candidates={[path.name for path in paths]}")
- await flow.start()
- await flow.send(payload)
- async def _handle_udp_path(self, flow_id: int, path: BasePath, event: str, payload: bytes | None) -> None:
- for flow in list(self.udp_flows.values()):
- if flow.flow_id == flow_id:
- await flow.handle_path(path, event, payload)
- break
- async def _gc_udp_flows(self) -> None:
- loop = asyncio.get_running_loop()
- while True:
- await asyncio.sleep(30)
- now = loop.time()
- stale = [key for key, flow in self.udp_flows.items() if flow.last_activity and now - flow.last_activity > 120]
- for key in stale:
- flow = self.udp_flows.pop(key, None)
- if flow:
- await flow.close()
|