|
@@ -424,38 +424,63 @@ class UdpFlow:
|
|
|
target: TargetAddress
|
|
target: TargetAddress
|
|
|
send_response: Callable[[PeerAddress, bytes], Awaitable[None]]
|
|
send_response: Callable[[PeerAddress, bytes], Awaitable[None]]
|
|
|
paths: list[BasePath]
|
|
paths: list[BasePath]
|
|
|
|
|
+ redundancy: int = 0
|
|
|
|
|
+ always_broadcast: bool = True
|
|
|
|
|
+ copy_interval_ms: int = 0
|
|
|
winner: BasePath | None = None
|
|
winner: BasePath | None = None
|
|
|
closed: bool = False
|
|
closed: bool = False
|
|
|
last_activity: float = 0.0
|
|
last_activity: float = 0.0
|
|
|
|
|
+ packets_sent: int = 0
|
|
|
|
|
+ packets_received: int = 0
|
|
|
|
|
+ duplicate_responses: int = 0
|
|
|
|
|
+ send_task: asyncio.Task | None = None
|
|
|
|
|
|
|
|
async def start(self) -> None:
|
|
async def start(self) -> None:
|
|
|
await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True)
|
|
await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True)
|
|
|
|
|
|
|
|
async def send(self, payload: bytes) -> None:
|
|
async def send(self, payload: bytes) -> None:
|
|
|
self.last_activity = asyncio.get_running_loop().time()
|
|
self.last_activity = asyncio.get_running_loop().time()
|
|
|
|
|
+ self.packets_sent += 1
|
|
|
active = [path for path in self.paths if path.opened and not path.closed]
|
|
active = [path for path in self.paths if path.opened and not path.closed]
|
|
|
- if self.winner is None:
|
|
|
|
|
- await asyncio.gather(*(path.send(payload) for path in active), return_exceptions=True)
|
|
|
|
|
- elif not self.winner.closed:
|
|
|
|
|
- await self.winner.send(payload)
|
|
|
|
|
|
|
+ if not active:
|
|
|
|
|
+ return
|
|
|
|
|
+ copies = max(1, self.redundancy + 1)
|
|
|
|
|
+ targets = active if self.always_broadcast or self.winner is None or self.winner.closed else [self.winner]
|
|
|
|
|
+ for attempt in range(copies):
|
|
|
|
|
+ await asyncio.gather(*(path.send(payload) for path in targets), return_exceptions=True)
|
|
|
|
|
+ if attempt + 1 < copies and self.copy_interval_ms > 0:
|
|
|
|
|
+ await asyncio.sleep(self.copy_interval_ms / 1000)
|
|
|
|
|
|
|
|
async def handle_path(self, path: BasePath, event: str, payload: bytes | None) -> None:
|
|
async def handle_path(self, path: BasePath, event: str, payload: bytes | None) -> None:
|
|
|
self.last_activity = asyncio.get_running_loop().time()
|
|
self.last_activity = asyncio.get_running_loop().time()
|
|
|
if event == "data" and payload is not None:
|
|
if event == "data" and payload is not None:
|
|
|
|
|
+ self.packets_received += 1
|
|
|
if self.winner is None:
|
|
if self.winner is None:
|
|
|
self.winner = path
|
|
self.winner = path
|
|
|
- print(f"[edge] udp flow={self.flow_id} winner={path.name} target={self.target.host}:{self.target.port}")
|
|
|
|
|
|
|
+ mode = "redundant" if self.redundancy > 0 else "single"
|
|
|
|
|
+ print(f"[edge] udp flow={self.flow_id} winner={path.name} target={self.target.host}:{self.target.port} mode={mode} candidates={len(self.paths)}")
|
|
|
|
|
+ elif path is not self.winner:
|
|
|
|
|
+ self.duplicate_responses += 1
|
|
|
if path is self.winner:
|
|
if path is self.winner:
|
|
|
await self.send_response(self.source, payload)
|
|
await self.send_response(self.source, payload)
|
|
|
if event == "close":
|
|
if event == "close":
|
|
|
path.closed = True
|
|
path.closed = True
|
|
|
if path is self.winner:
|
|
if path is self.winner:
|
|
|
- self.winner = None
|
|
|
|
|
|
|
+ remaining = [candidate for candidate in self.paths if candidate.opened and not candidate.closed]
|
|
|
|
|
+ self.winner = remaining[0] if remaining else None
|
|
|
|
|
|
|
|
async def close(self) -> None:
|
|
async def close(self) -> None:
|
|
|
if self.closed:
|
|
if self.closed:
|
|
|
return
|
|
return
|
|
|
self.closed = True
|
|
self.closed = True
|
|
|
|
|
+ if self.send_task and self.send_task is not asyncio.current_task():
|
|
|
|
|
+ self.send_task.cancel()
|
|
|
|
|
+ with contextlib.suppress(Exception):
|
|
|
|
|
+ await self.send_task
|
|
|
|
|
+ print(
|
|
|
|
|
+ f"[edge] udp flow={self.flow_id} closed target={self.target.host}:{self.target.port} "
|
|
|
|
|
+ f"sent={self.packets_sent} received={self.packets_received} dup={self.duplicate_responses}"
|
|
|
|
|
+ )
|
|
|
await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
|
|
await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
|
|
|
|
|
|
|
|
|
|
|
|
@@ -466,6 +491,13 @@ class TransparentUdpListener:
|
|
|
self.bind_host = bind_host
|
|
self.bind_host = bind_host
|
|
|
self.port = port
|
|
self.port = port
|
|
|
self.socket: socket.socket | None = None
|
|
self.socket: socket.socket | None = None
|
|
|
|
|
+ self.udp_packets_received = 0
|
|
|
|
|
+ self.udp_recv_errors = 0
|
|
|
|
|
+ self.udp_parse_errors = 0
|
|
|
|
|
+ self.udp_missing_original = 0
|
|
|
|
|
+ self.udp_self_loop_skipped = 0
|
|
|
|
|
+ self.udp_flows_created = 0
|
|
|
|
|
+ self.last_summary_at = 0.0
|
|
|
|
|
|
|
|
def start(self) -> None:
|
|
def start(self) -> None:
|
|
|
sock = socket.socket(self.family, socket.SOCK_DGRAM)
|
|
sock = socket.socket(self.family, socket.SOCK_DGRAM)
|
|
@@ -481,29 +513,65 @@ class TransparentUdpListener:
|
|
|
asyncio.get_running_loop().add_reader(sock.fileno(), self._on_readable)
|
|
asyncio.get_running_loop().add_reader(sock.fileno(), self._on_readable)
|
|
|
print(f"[edge] transparent udp listening on {sock.getsockname()}")
|
|
print(f"[edge] transparent udp listening on {sock.getsockname()}")
|
|
|
|
|
|
|
|
|
|
+ def _log_udp_summary(self, force: bool = False) -> None:
|
|
|
|
|
+ now = asyncio.get_running_loop().time()
|
|
|
|
|
+ if not force and now - self.last_summary_at < 10:
|
|
|
|
|
+ return
|
|
|
|
|
+ self.last_summary_at = now
|
|
|
|
|
+ print(
|
|
|
|
|
+ f"[edge] udp summary family={self.family} bind={self.bind_host}:{self.port} "
|
|
|
|
|
+ f"received={self.udp_packets_received} flows={self.udp_flows_created} "
|
|
|
|
|
+ f"self_loop={self.udp_self_loop_skipped} missing_original={self.udp_missing_original} "
|
|
|
|
|
+ f"parse_error={self.udp_parse_errors} recv_error={self.udp_recv_errors}"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
def _on_readable(self) -> None:
|
|
def _on_readable(self) -> None:
|
|
|
assert self.socket is not None
|
|
assert self.socket is not None
|
|
|
try:
|
|
try:
|
|
|
data, ancdata, _flags, src = self.socket.recvmsg(65535, 512)
|
|
data, ancdata, _flags, src = self.socket.recvmsg(65535, 512)
|
|
|
except BlockingIOError:
|
|
except BlockingIOError:
|
|
|
return
|
|
return
|
|
|
- except Exception:
|
|
|
|
|
|
|
+ except Exception as exc:
|
|
|
|
|
+ self.udp_recv_errors += 1
|
|
|
|
|
+ print(f"[edge] udp recvmsg error family={self.family} error={exc!r}")
|
|
|
|
|
+ self._log_udp_summary(force=True)
|
|
|
return
|
|
return
|
|
|
|
|
+ self.udp_packets_received += 1
|
|
|
original = None
|
|
original = None
|
|
|
for level, ctype, cdata in ancdata:
|
|
for level, ctype, cdata in ancdata:
|
|
|
if self.family == socket.AF_INET and level == socket.SOL_IP and ctype == IP_RECVORIGDSTADDR:
|
|
if self.family == socket.AF_INET and level == socket.SOL_IP and ctype == IP_RECVORIGDSTADDR:
|
|
|
- original = parse_sockaddr(cdata)
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ original = parse_sockaddr(cdata)
|
|
|
|
|
+ except Exception as exc:
|
|
|
|
|
+ self.udp_parse_errors += 1
|
|
|
|
|
+ print(f"[edge] udp parse original dst error family={self.family} src={src} error={exc!r} raw_len={len(cdata)}")
|
|
|
|
|
+ self._log_udp_summary(force=True)
|
|
|
|
|
+ return
|
|
|
break
|
|
break
|
|
|
if self.family == socket.AF_INET6 and level == socket.IPPROTO_IPV6 and ctype == IPV6_RECVORIGDSTADDR:
|
|
if self.family == socket.AF_INET6 and level == socket.IPPROTO_IPV6 and ctype == IPV6_RECVORIGDSTADDR:
|
|
|
- original = parse_sockaddr(cdata)
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ original = parse_sockaddr(cdata)
|
|
|
|
|
+ except Exception as exc:
|
|
|
|
|
+ self.udp_parse_errors += 1
|
|
|
|
|
+ print(f"[edge] udp parse original dst error family={self.family} src={src} error={exc!r} raw_len={len(cdata)}")
|
|
|
|
|
+ self._log_udp_summary(force=True)
|
|
|
|
|
+ return
|
|
|
break
|
|
break
|
|
|
if original is None:
|
|
if original is None:
|
|
|
|
|
+ self.udp_missing_original += 1
|
|
|
|
|
+ self._log_udp_summary()
|
|
|
return
|
|
return
|
|
|
if self.family == socket.AF_INET:
|
|
if self.family == socket.AF_INET:
|
|
|
source = PeerAddress(host=src[0], port=src[1], family=socket.AF_INET)
|
|
source = PeerAddress(host=src[0], port=src[1], family=socket.AF_INET)
|
|
|
else:
|
|
else:
|
|
|
source = PeerAddress(host=src[0], port=src[1], family=socket.AF_INET6)
|
|
source = PeerAddress(host=src[0], port=src[1], family=socket.AF_INET6)
|
|
|
if original.port == self.port and (original.host in ("127.0.0.1", "::1") or original.host == self.bind_host):
|
|
if original.port == self.port and (original.host in ("127.0.0.1", "::1") or original.host == self.bind_host):
|
|
|
|
|
+ self.udp_self_loop_skipped += 1
|
|
|
|
|
+ print(
|
|
|
|
|
+ f"[edge] udp self_loop family={self.family} src={source.host}:{source.port} "
|
|
|
|
|
+ f"original={original.host}:{original.port} size={len(data)}"
|
|
|
|
|
+ )
|
|
|
|
|
+ self._log_udp_summary()
|
|
|
return
|
|
return
|
|
|
asyncio.create_task(self.edge.handle_udp_datagram(source, original, data, self))
|
|
asyncio.create_task(self.edge.handle_udp_datagram(source, original, data, self))
|
|
|
|
|
|
|
@@ -617,6 +685,17 @@ class TransparentEdge:
|
|
|
for index in range(count)
|
|
for index in range(count)
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
|
|
+ def _build_udp_direct_paths(self, target: TargetAddress, flow_id: int) -> list[BasePath]:
|
|
|
|
|
+ count = max(1, self.config.udp_direct_redundancy)
|
|
|
|
|
+ return [
|
|
|
|
|
+ DirectUdpPath(
|
|
|
|
|
+ name=f"direct-{index + 1}" if count > 1 else "direct",
|
|
|
|
|
+ on_frame=lambda path, event, data, fid=flow_id: self._handle_udp_path(fid, path, event, data),
|
|
|
|
|
+ target=target,
|
|
|
|
|
+ )
|
|
|
|
|
+ for index in range(count)
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
def _start_udp_listeners(self) -> None:
|
|
def _start_udp_listeners(self) -> None:
|
|
|
binds = []
|
|
binds = []
|
|
|
if self.listen_host == "127.0.0.1":
|
|
if self.listen_host == "127.0.0.1":
|
|
@@ -678,13 +757,24 @@ class TransparentEdge:
|
|
|
flow = self.udp_flows.get(key)
|
|
flow = self.udp_flows.get(key)
|
|
|
if flow is None:
|
|
if flow is None:
|
|
|
flow_id = next(self.udp_flow_ids)
|
|
flow_id = next(self.udp_flow_ids)
|
|
|
- paths: list[BasePath] = [DirectUdpPath(name="direct", on_frame=lambda path, event, data, fid=flow_id: self._handle_udp_path(fid, path, event, data), target=target)]
|
|
|
|
|
|
|
+ paths: list[BasePath] = self._build_udp_direct_paths(target, flow_id)
|
|
|
for connection in self.manager.available():
|
|
for connection in self.manager.available():
|
|
|
stream_id = next(self.stream_ids)
|
|
stream_id = next(self.stream_ids)
|
|
|
paths.append(RelayUdpPath(name=connection.node.name, on_frame=lambda path, event, data, fid=flow_id: self._handle_udp_path(fid, path, event, data), connection=connection, session_id=flow_id, stream_id=stream_id, target=target))
|
|
paths.append(RelayUdpPath(name=connection.node.name, on_frame=lambda path, event, data, fid=flow_id: self._handle_udp_path(fid, path, event, data), connection=connection, session_id=flow_id, stream_id=stream_id, target=target))
|
|
|
- flow = UdpFlow(flow_id=flow_id, source=source, target=target, send_response=listener.send_response, paths=paths)
|
|
|
|
|
|
|
+ flow = UdpFlow(
|
|
|
|
|
+ flow_id=flow_id,
|
|
|
|
|
+ source=source,
|
|
|
|
|
+ target=target,
|
|
|
|
|
+ send_response=listener.send_response,
|
|
|
|
|
+ paths=paths,
|
|
|
|
|
+ redundancy=self.config.udp_redundancy,
|
|
|
|
|
+ always_broadcast=self.config.udp_always_broadcast,
|
|
|
|
|
+ copy_interval_ms=self.config.udp_copy_interval_ms,
|
|
|
|
|
+ )
|
|
|
self.udp_flows[key] = flow
|
|
self.udp_flows[key] = flow
|
|
|
- print(f"[edge] udp flow={flow_id} target={target.host}:{target.port} candidates={[path.name for path in paths]}")
|
|
|
|
|
|
|
+ listener.udp_flows_created += 1
|
|
|
|
|
+ listener._log_udp_summary(force=True)
|
|
|
|
|
+ print(f"[edge] udp flow={flow_id} source={source.host}:{source.port} target={target.host}:{target.port} redundancy={self.config.udp_redundancy} direct_redundancy={self.config.udp_direct_redundancy} always_broadcast={self.config.udp_always_broadcast} candidates={[path.name for path in paths]}")
|
|
|
await flow.start()
|
|
await flow.start()
|
|
|
await flow.send(payload)
|
|
await flow.send(payload)
|
|
|
|
|
|