Browse Source

修改分离版relay报错

Gogs 3 ngày trước cách đây
mục cha
commit
3c9cffa661
9 tập tin đã thay đổi với 250 bổ sung70 xóa
  1. 8 7
      config-tcp.json
  2. 2 2
      config-udp.json
  3. 4 0
      config_tcp.py
  4. 4 0
      config_udp.py
  5. 54 43
      edge_tcp.py
  6. 82 6
      edge_udp.py
  7. 34 6
      relay_client_tcp.py
  8. 31 2
      relay_server_udp.py
  9. 31 4
      scripts/commands/tcp_only_start.sh

+ 8 - 7
config-tcp.json

@@ -1,16 +1,17 @@
 {
   "strategy": "top3",
-  "redundancy": 3,
-  "direct_redundancy": 3,
-  "direct_max_redundancy": 6,
-  "direct_redundancy_v4": 4,
-  "direct_redundancy_v6": 4,
+  "redundancy": 2,
+  "direct_redundancy": 2,
+  "direct_max_redundancy": 3,
+  "direct_redundancy_v6": 1,
   "direct_ipv6_enabled": true,
   "direct_open_timeout": 2.0,
   "relay_open_timeout": 3.0,
   "tcp_connect_happy_eyeballs_delay": 0.05,
-  "tcp_warmup_bytes": 65536,
-  "tcp_loser_grace_ms": 30,
+  "tcp_warmup_bytes": 1097152,
+  "tcp_loser_grace_ms": 800,
+  "ssh_warmup_bytes": 4096,
+  "ssh_loser_grace_ms": 80,
   "relay_reconnect_delay": 1,
   "relay_reconnect_max_delay": 10,
   "probe_interval": 3,

+ 2 - 2
config-udp.json

@@ -1,6 +1,6 @@
 {
   "strategy": "top3",
-  "redundancy": 3,
+  "redundancy": 1,
   "udp_redundancy": 0,
   "udp_direct_redundancy": 2,
   "udp_direct_redundancy_v4": 2,
@@ -15,6 +15,6 @@
   "socks_host": "127.0.0.1",
   "socks_port": 19180,
   "relays": [
-    
+    {"name": "b159", "host": "23.95.134.159", "port": 9010, "token": "130", "weight": 100}
   ]
 }

+ 4 - 0
config_tcp.py

@@ -30,6 +30,8 @@ class TcpConfig:
     direct_open_timeout: float = 10.0
     relay_open_timeout: float = 10.0
     tcp_connect_happy_eyeballs_delay: float | None = None
+    ssh_warmup_bytes: int = 4096
+    ssh_loser_grace_ms: int = 80
     direct_ipv6_enabled: bool = True
     tcp_failover_idle_ms: int = 1200
     relay_reconnect_delay: float = 3.0
@@ -57,6 +59,8 @@ class TcpConfig:
             direct_open_timeout=raw.get("direct_open_timeout", 10.0),
             relay_open_timeout=raw.get("relay_open_timeout", 10.0),
             tcp_connect_happy_eyeballs_delay=raw.get("tcp_connect_happy_eyeballs_delay"),
+            ssh_warmup_bytes=max(0, raw.get("ssh_warmup_bytes", 4096)),
+            ssh_loser_grace_ms=max(0, raw.get("ssh_loser_grace_ms", 80)),
             direct_ipv6_enabled=raw.get("direct_ipv6_enabled", True),
             tcp_failover_idle_ms=max(100, raw.get("tcp_failover_idle_ms", 1200)),
             relay_reconnect_delay=raw.get("relay_reconnect_delay", 3.0),

+ 4 - 0
config_udp.py

@@ -31,6 +31,8 @@ class UdpConfig:
     relay_ping_timeout: float = 25.0
     relay_tcp_nodelay: bool = True
     udp_redundancy: int = 2
+    udp_direct_copies: int | None = None
+    udp_relay_copies: int | None = None
     udp_direct_redundancy: int = 3
     udp_direct_redundancy_v4: int | None = None
     udp_direct_redundancy_v6: int | None = None
@@ -56,6 +58,8 @@ class UdpConfig:
             relay_ping_timeout=max(1.0, raw.get("relay_ping_timeout", 25.0)),
             relay_tcp_nodelay=raw.get("relay_tcp_nodelay", True),
             udp_redundancy=max(0, raw.get("udp_redundancy", 2)),
+            udp_direct_copies=max(1, raw["udp_direct_copies"]) if raw.get("udp_direct_copies") is not None else None,
+            udp_relay_copies=max(1, raw["udp_relay_copies"]) if raw.get("udp_relay_copies") is not None else None,
             udp_direct_redundancy=max(1, raw.get("udp_direct_redundancy", 3)),
             udp_direct_redundancy_v4=raw.get("udp_direct_redundancy_v4"),
             udp_direct_redundancy_v6=raw.get("udp_direct_redundancy_v6"),

+ 54 - 43
edge_tcp.py

@@ -123,10 +123,6 @@ class DirectTcpPath(BasePath):
         if self.closed:
             return
         self.closed = True
-        if self.pump_task and self.pump_task is not asyncio.current_task():
-            self.pump_task.cancel()
-            with contextlib.suppress(*SUPPRESSED_CLOSE_EXCEPTIONS):
-                await self.pump_task
         if self.writer:
             self.writer.close()
             with contextlib.suppress(*SUPPRESSED_CLOSE_EXCEPTIONS):
@@ -205,20 +201,20 @@ class TcpSession:
     uplink_bytes: int = 0
     open_event: asyncio.Event = field(default_factory=asyncio.Event)
     winner_event: asyncio.Event = field(default_factory=asyncio.Event)
+    close_event: asyncio.Event = field(default_factory=asyncio.Event)
     closed: bool = False
+    closing: bool = False
+    close_task: asyncio.Task | None = None
     pump_task: asyncio.Task | None = None
     loser_close_task: asyncio.Task | None = None
     open_tasks: list[asyncio.Task] = field(default_factory=list)
-    backup_path: BasePath | None = None
-    converged: bool = False
 
-    def _select_backup_path(self, winner: BasePath) -> BasePath | None:
-        candidates = [path for path in self.paths if path is not winner and path.opened and not path.closed]
-        if not candidates:
-            return None
-        winner_is_direct = winner_group(winner.name) == "direct"
-        opposite = [path for path in candidates if (winner_group(path.name) == "direct") != winner_is_direct]
-        return (opposite or candidates)[0]
+    def _choose_winner(self, winner: BasePath) -> None:
+        if self.winner is not None:
+            return
+        self.winner = winner
+        self._record_win(winner)
+        self.winner_event.set()
 
     def _record_win(self, winner: BasePath) -> None:
         self.stats[winner.name] = self.stats.get(winner.name, 0) + 1
@@ -262,14 +258,12 @@ class TcpSession:
                 else:
                     if self.winner is None:
                         await self.winner_event.wait()
-                    if self.winner:
-                        send_targets = [self.winner]
-                        if not self.converged and self.backup_path and self.backup_path.opened and not self.backup_path.closed and self.backup_path is not self.winner:
-                            send_targets.append(self.backup_path)
-                        await asyncio.gather(*(path.send(chunk) for path in send_targets), return_exceptions=True)
-                        self.converged = True
+                    if self.winner and self.winner.opened and not self.winner.closed:
+                        await self.winner.send(chunk)
+                    else:
+                        break
         finally:
-            await self.close()
+            self._request_close()
 
     async def handle_path(self, path: BasePath, event: str, payload: bytes | None) -> None:
         if self.closed:
@@ -285,15 +279,11 @@ class TcpSession:
             return
         if event == "data":
             if self.winner is None:
-                self.winner = path
-                self._record_win(path)
-                self.backup_path = self._select_backup_path(path)
-                self.winner_event.set()
-                self.converged = False
+                self._choose_winner(path)
                 if self.loser_grace_ms > 0:
                     self.loser_close_task = asyncio.create_task(self._close_losers_after_grace(path))
                 else:
-                    await self._close_losers(path)
+                    self.loser_close_task = asyncio.create_task(self._close_losers(path))
             if path is self.winner and payload is not None:
                 self.writer.write(payload)
                 await self.writer.drain()
@@ -303,41 +293,53 @@ class TcpSession:
             if self.winner is None:
                 remaining = [candidate for candidate in self.paths if candidate.opened and not candidate.closed]
                 if not remaining:
-                    await self.close()
+                    self._request_close()
             elif path is self.winner:
-                await self.close()
+                self._request_close()
 
     async def _close_losers(self, winner: BasePath) -> None:
-        await asyncio.gather(*(path.close() for path in self.paths if path is not winner and path is not self.backup_path), return_exceptions=True)
+        await asyncio.gather(*(path.close() for path in self.paths if path is not winner), return_exceptions=True)
 
     async def _close_losers_after_grace(self, winner: BasePath) -> None:
         await asyncio.sleep(self.loser_grace_ms / 1000)
         if not self.closed:
             await self._close_losers(winner)
 
-    async def close(self) -> None:
+    def _request_close(self) -> None:
+        if self.closing:
+            return
+        self.closing = True
+        self.close_task = asyncio.create_task(self._finalize())
+
+    async def _finalize(self) -> None:
         if self.closed:
+            self.close_event.set()
             return
         self.closed = True
-        if self.pump_task and self.pump_task is not asyncio.current_task():
+        if self.pump_task and not self.pump_task.done():
             self.pump_task.cancel()
-            with contextlib.suppress(*SUPPRESSED_CLOSE_EXCEPTIONS):
-                await self.pump_task
-        if self.loser_close_task and self.loser_close_task is not asyncio.current_task():
+        if self.loser_close_task and not self.loser_close_task.done():
             self.loser_close_task.cancel()
-            with contextlib.suppress(*SUPPRESSED_CLOSE_EXCEPTIONS):
-                await self.loser_close_task
         for task in self.open_tasks:
-            if task is not asyncio.current_task():
+            if not task.done():
                 task.cancel()
+        if self.pump_task:
+            with contextlib.suppress(*SUPPRESSED_CLOSE_EXCEPTIONS):
+                await self.pump_task
         for task in self.open_tasks:
-            if task is not asyncio.current_task():
-                with contextlib.suppress(*SUPPRESSED_CLOSE_EXCEPTIONS):
-                    await task
+            with contextlib.suppress(*SUPPRESSED_CLOSE_EXCEPTIONS):
+                await task
         await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
         self.writer.close()
         with contextlib.suppress(*SUPPRESSED_CLOSE_EXCEPTIONS):
             await self.writer.wait_closed()
+        self.close_event.set()
+
+    async def close(self) -> None:
+        self._request_close()
+        if asyncio.current_task() is self.pump_task:
+            return
+        await self.close_event.wait()
 
 
 class TcpEdge:
@@ -352,6 +354,8 @@ class TcpEdge:
         self.tcp_win_counts: dict[str, int] = {}
         self.tcp_target_wins: dict[tuple[str, int], dict[str, int]] = {}
         self.tcp_family_wins: dict[str, dict[str, int]] = {"ipv4": {}, "ipv6": {}}
+        self._accept_log_every = 25
+        self._interactive_ports = {22, 29765}
 
     def _resolve_kernel_mode(self, cli_kernel_mode: str, config_kernel_mode: str) -> str:
         mode = cli_kernel_mode if cli_kernel_mode != "auto" else config_kernel_mode
@@ -442,19 +446,25 @@ class TcpEdge:
     def _tcp_relay_connections(self) -> list[TcpRelayConnection]:
         return self.manager.available()
 
+    def _session_race_profile(self, target: TargetAddress) -> tuple[int, int]:
+        if target.port in self._interactive_ports:
+            return self.config.ssh_warmup_bytes, self.config.ssh_loser_grace_ms
+        return self.config.tcp_warmup_bytes, self.config.tcp_loser_grace_ms
+
     async def _accept(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
         peer = writer.get_extra_info("peername")
         try:
             target = self._get_original_dst(writer)
             session_id = next(self.session_ids)
+            warmup_bytes, loser_grace_ms = self._session_race_profile(target)
             session = TcpSession(
                 session_id=session_id,
                 target=target,
                 reader=reader,
                 writer=writer,
                 paths=[],
-                warmup_bytes=self.config.tcp_warmup_bytes,
-                loser_grace_ms=self.config.tcp_loser_grace_ms,
+                warmup_bytes=warmup_bytes,
+                loser_grace_ms=loser_grace_ms,
                 stats=self.tcp_win_counts,
                 target_stats=self.tcp_target_wins,
                 family_stats=self.tcp_family_wins,
@@ -466,7 +476,8 @@ class TcpEdge:
             if not paths:
                 raise RuntimeError("no tcp candidates available")
             session.paths = paths
-            print(f"[edge] accept peer={peer} session={session_id} target={target.host}:{target.port} candidates={[path.name for path in paths]}")
+            if session_id == 1 or session_id % self._accept_log_every == 0:
+                print(f"[edge] accept peer={peer} session={session_id} target={target.host}:{target.port} candidates={[path.name for path in paths]}")
             await session.start()
         except Exception as exc:
             print(f"[edge] accept failed peer={peer} error={exc!r}")

+ 82 - 6
edge_udp.py

@@ -18,6 +18,10 @@ SOCKS_VERSION = 5
 UDP_WARMUP_BROADCAST_PACKETS = 6
 UDP_SHADOW_PROBE_INTERVAL_SEC = 0.25
 UDP_FAST_FAILOVER_MISSES = 3
+UDP_FLOW_IDLE_CLEANUP_SEC = 30.0
+UDP_PACKET_CLIENT_MAP_LIMIT = 4096
+UDP_DIRECT_PENDING_LIMIT = 128
+UDP_SOCKET_BUFFER_BYTES = 1 << 20
 
 
 async def read_exact(reader: asyncio.StreamReader, size: int) -> bytes:
@@ -112,6 +116,7 @@ class UdpFlowState:
     last_probe_at: float = 0.0
     winner_miss_streak: int = 0
     target_family: int = 0
+    last_cleanup_at: float = 0.0
 
     def touch(self, now: float) -> None:
         self.last_activity = now
@@ -124,10 +129,13 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         self.client_addr = None
         self.associate_peer = None
         self.packet_counter = itertools.count(1)
+        self.last_packet_id = 0
         self.client_flows: dict[tuple[tuple[str, int], str, int], UdpFlowState] = {}
         self.flow_counter = itertools.count(1)
         self.last_summary_at = 0.0
         self.win_counts: Dict[str, int] = {}
+        self._last_client_port_log_at = 0.0
+        self._last_flow_cleanup_at = 0.0
 
     def connection_made(self, transport) -> None:
         self.transport = transport
@@ -149,7 +157,10 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
             print(f"[edge] udp client bound addr={addr[0]}:{addr[1]}")
         elif addr != self.client_addr:
             if addr[0] == self.client_addr[0]:
-                print(f"[edge] udp client port update host={addr[0]} old_port={self.client_addr[1]} new_port={addr[1]}")
+                now = asyncio.get_running_loop().time()
+                if now - self._last_client_port_log_at >= 30:
+                    self._last_client_port_log_at = now
+                    print(f"[edge] udp client port update host={addr[0]} old_port={self.client_addr[1]} new_port={addr[1]}")
                 self.client_addr = addr
             else:
                 print(f"[edge] udp client rebound old={self.client_addr[0]}:{self.client_addr[1]} new={addr[0]}:{addr[1]}")
@@ -166,8 +177,11 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         flow.client_addr = (addr[0], addr[1])
         flow.packets_sent += 1
         packet_id = next(self.packet_counter)
+        self.last_packet_id = packet_id
         flow.packet_client_addrs[packet_id] = (addr[0], addr[1])
+        self._cleanup_packet_state(flow, now)
         asyncio.create_task(self.edge.forward_udp(flow, payload, packet_id, (addr[0], addr[1]), self))
+        self._cleanup_inactive_flows(now)
         self._log_udp_summary()
 
     def _reset_client_state(self, addr) -> None:
@@ -225,6 +239,38 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         if flow.winner_name == source_name and target_addr is not None:
             self.transport.sendto(packet, target_addr)
 
+    def _cleanup_packet_state(self, flow: UdpFlowState, now: float) -> None:
+        if flow.last_cleanup_at and now - flow.last_cleanup_at < 1.0:
+            return
+        flow.last_cleanup_at = now
+        expired_packet_ids = [
+            packet_id
+            for packet_id in flow.packet_client_addrs
+            if packet_id <= (self.last_packet_id - UDP_PACKET_CLIENT_MAP_LIMIT)
+        ]
+        for packet_id in expired_packet_ids:
+            flow.packet_client_addrs.pop(packet_id, None)
+        for path_name, pending in list(flow.direct_pending_clients.items()):
+            while len(pending) > UDP_DIRECT_PENDING_LIMIT:
+                pending.popleft()
+            if not pending:
+                flow.direct_pending_clients.pop(path_name, None)
+
+    def _cleanup_inactive_flows(self, now: float) -> None:
+        if self._last_flow_cleanup_at and now - self._last_flow_cleanup_at < 5.0:
+            return
+        self._last_flow_cleanup_at = now
+        expired_keys = [
+            key
+            for key, flow in self.client_flows.items()
+            if now - flow.last_activity >= UDP_FLOW_IDLE_CLEANUP_SEC
+        ]
+        for key in expired_keys:
+            flow = self.client_flows.pop(key, None)
+            if flow is None:
+                continue
+            self.edge.release_udp_flow(flow)
+
     def set_flow_candidates(self, flow: UdpFlowState, candidate_names: tuple[str, ...]) -> None:
         if not flow.candidate_names:
             flow.candidate_names = candidate_names
@@ -307,6 +353,31 @@ class UdpEdge:
         self.udp_flow_sessions: dict[tuple[int, int], UdpFlowState] = {}
         self.udp_server: UdpAssociateServer | None = None
 
+    def _udp_direct_copies(self) -> int:
+        if self.config.udp_direct_copies is not None:
+            return max(1, self.config.udp_direct_copies)
+        return max(1, self.config.udp_redundancy + 1)
+
+    def _udp_relay_copies(self) -> int:
+        if self.config.udp_relay_copies is not None:
+            return max(1, self.config.udp_relay_copies)
+        return max(1, self.config.udp_redundancy + 1)
+
+    def release_udp_flow(self, flow: UdpFlowState) -> None:
+        for stream_id in list(flow.link_streams.values()):
+            self.udp_flow_sessions.pop((flow.flow_id, stream_id), None)
+        flow.link_streams.clear()
+        flow.initialized_links.clear()
+        flow.packet_client_addrs.clear()
+        for task in list(flow.direct_tasks.values()):
+            task.cancel()
+        flow.direct_tasks.clear()
+        for sock in list(flow.direct_sockets.values()):
+            with contextlib.suppress(Exception):
+                sock.close()
+        flow.direct_sockets.clear()
+        flow.direct_pending_clients.clear()
+
     async def start(self) -> None:
         await self.scheduler.start()
         await self._connect_relays()
@@ -383,6 +454,10 @@ class UdpEdge:
                 family = socket.AF_INET6 if ":" in flow.target_host else socket.AF_INET
                 sock = socket.socket(family, socket.SOCK_DGRAM)
                 sock.setblocking(False)
+                with contextlib.suppress(OSError):
+                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, UDP_SOCKET_BUFFER_BYTES)
+                with contextlib.suppress(OSError):
+                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, UDP_SOCKET_BUFFER_BYTES)
                 await asyncio.get_running_loop().sock_connect(sock, (flow.target_host, flow.target_port))
                 flow.direct_sockets[name] = sock
                 flow.direct_tasks[name] = asyncio.create_task(self._pump_udp_direct(flow, name, sock, udp_server))
@@ -445,10 +520,11 @@ class UdpEdge:
                 active_direct_names = [direct_names[0]]
             elif links:
                 active_links = links[:1]
-        copies = max(1, self.config.udp_redundancy + 1)
+        direct_copies = self._udp_direct_copies()
+        relay_copies = self._udp_relay_copies()
         sent_any = False
-        for attempt in range(copies):
-            for path_name in active_direct_names:
+        for attempt in range(max(direct_copies, relay_copies)):
+            for path_name in active_direct_names if attempt < direct_copies else ():
                 sock = flow.direct_sockets.get(path_name)
                 if sock is None:
                     continue
@@ -472,7 +548,7 @@ class UdpEdge:
                     if path_name not in flow.relay_error_seen:
                         flow.relay_error_seen.add(path_name)
                         print(f"[edge] udp relay error flow={flow.flow_id} relay={path_name} error={exc!r}")
-            for link in active_links:
+            for link in active_links if attempt < relay_copies else ():
                 stream_id = flow.link_streams.get(link.node.name)
                 if stream_id is None:
                     stream_id = next(self.udp_stream_ids)
@@ -492,7 +568,7 @@ class UdpEdge:
                     if link.node.name not in flow.relay_error_seen:
                         flow.relay_error_seen.add(link.node.name)
                         print(f"[edge] udp relay error flow={flow.flow_id} relay={link.node.name} error={exc!r}")
-            if attempt + 1 < copies and self.config.udp_copy_interval_ms > 0:
+            if attempt + 1 < max(direct_copies, relay_copies) and self.config.udp_copy_interval_ms > 0:
                 await asyncio.sleep(self.config.udp_copy_interval_ms / 1000)
         if not sent_any:
             udp_server.note_unsent(flow, packet_id)

+ 34 - 6
relay_client_tcp.py

@@ -192,6 +192,7 @@ class TcpRelayManager:
         self.scheduler = TcpScheduler(config)
         self.connections: Dict[str, TcpRelayConnection] = {}
         self.tasks: list[asyncio.Task] = []
+        self._logged_attempts: set[str] = set()
 
     async def start(self) -> None:
         await self.scheduler.start()
@@ -200,16 +201,21 @@ class TcpRelayManager:
 
     async def _maintain(self, node: TcpRelayNode) -> None:
         backoff = self.config.relay_reconnect_delay
+        reconnect_attempt = 1
+        failure_streak = 0
+        healthy_since: float | None = None
         while True:
             current = self.connections.get(node.name)
             if current is not None and not current.closed:
                 assert current.closed_event is not None
                 await current.closed_event.wait()
                 continue
-            attempt = 1
             while True:
                 try:
-                    print(f"[edge] relay reconnect attempt name={node.name} addr={node.host}:{node.port} attempt={attempt} backoff={backoff:.1f}s")
+                    marker = f"{node.name}:{reconnect_attempt}:{round(backoff, 1)}"
+                    if marker not in self._logged_attempts:
+                        self._logged_attempts.add(marker)
+                        print(f"[edge] relay reconnect attempt name={node.name} addr={node.host}:{node.port} attempt={reconnect_attempt} backoff={backoff:.1f}s")
                     reader, writer = await asyncio.wait_for(asyncio.open_connection(node.host, node.port), timeout=self.config.relay_open_timeout)
                     connection = TcpRelayConnection(node=node, manager=self, reader=reader, writer=writer)
                     sock = writer.get_extra_info("socket")
@@ -218,19 +224,41 @@ class TcpRelayManager:
                             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
                     await connection.start()
                     self.connections[node.name] = connection
-                    backoff = self.config.relay_reconnect_delay
+                    healthy_since = time.monotonic()
+                    failure_streak = 0
+                    reconnect_attempt = 1
                     assert connection.closed_event is not None
                     await connection.closed_event.wait()
-                    print(f"[edge] relay supervisor noticed close name={node.name} addr={node.host}:{node.port}")
+                    if healthy_since is not None:
+                        healthy_runtime = time.monotonic() - healthy_since
+                        if healthy_runtime >= self.config.relay_ping_interval + self.config.relay_ping_timeout:
+                            backoff = self.config.relay_reconnect_delay
+                        else:
+                            backoff = min(self.config.relay_reconnect_max_delay, max(self.config.relay_reconnect_delay, backoff * 2))
+                    else:
+                        backoff = min(self.config.relay_reconnect_max_delay, max(self.config.relay_reconnect_delay, backoff * 2))
                     break
                 except asyncio.CancelledError:
                     raise
                 except Exception as exc:
-                    print(f"[edge] relay connect failed name={node.name} addr={node.host}:{node.port} attempt={attempt} error={exc!r}")
+                    failure_streak += 1
+                    if failure_streak >= self.config.relay_reconnect_attempts:
+                        cooldown = min(
+                            self.config.relay_reconnect_max_delay,
+                            max(self.config.relay_reconnect_delay, backoff * 2),
+                        )
+                        print(f"[edge] relay cooldown name={node.name} addr={node.host}:{node.port} failures={failure_streak} cooldown={cooldown:.1f}s")
+                        await asyncio.sleep(cooldown)
+                        backoff = min(self.config.relay_reconnect_max_delay, cooldown * 2)
+                        failure_streak = 0
+                        reconnect_attempt = 1
+                        continue
+                    if reconnect_attempt == 1 or failure_streak >= self.config.relay_reconnect_attempts:
+                        print(f"[edge] relay connect failed name={node.name} addr={node.host}:{node.port} attempt={reconnect_attempt} error={exc!r}")
                     jitter = random.uniform(0, min(1.0, backoff * 0.2))
                     await asyncio.sleep(backoff + jitter)
                     backoff = min(self.config.relay_reconnect_max_delay, max(self.config.relay_reconnect_delay, backoff * 2))
-                    attempt += 1
+                    reconnect_attempt += 1
 
     def on_closed(self, connection: TcpRelayConnection) -> None:
         current = self.connections.get(connection.node.name)

+ 31 - 2
relay_server_udp.py

@@ -14,7 +14,7 @@ class UdpRelayProtocol(asyncio.DatagramProtocol):
         self.stream_id = stream_id
 
     def datagram_received(self, data: bytes, _addr) -> None:
-        asyncio.create_task(self.channel.safe_send(Frame(UDP_RECV, self.session_id, self.stream_id, 0, 0, data)))
+        self.channel.enqueue_send(Frame(UDP_RECV, self.session_id, self.stream_id, 0, 0, data))
 
 
 @dataclass
@@ -35,9 +35,12 @@ class UdpRelayChannel:
     udp_sessions: dict[tuple[int, int], UdpRelaySession] = field(default_factory=dict)
     closed: bool = False
     send_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
+    send_queue: asyncio.Queue[Frame | None] = field(default_factory=asyncio.Queue)
+    send_task: asyncio.Task | None = None
 
     async def run(self) -> None:
         try:
+            self.send_task = asyncio.create_task(self._send_loop())
             auth = await read_frame(self.reader)
             if auth.kind != AUTH:
                 return
@@ -51,6 +54,24 @@ class UdpRelayChannel:
         finally:
             await self.close()
 
+    def enqueue_send(self, frame: Frame) -> None:
+        if self.closed:
+            return
+        with contextlib.suppress(asyncio.QueueFull):
+            self.send_queue.put_nowait(frame)
+
+    async def _send_loop(self) -> None:
+        try:
+            while True:
+                frame = await self.send_queue.get()
+                if frame is None:
+                    break
+                ok = await self.safe_send(frame)
+                if not ok:
+                    break
+        except asyncio.CancelledError:
+            pass
+
     async def safe_send(self, frame: Frame) -> bool:
         if self.closed:
             return False
@@ -103,10 +124,15 @@ class UdpRelayChannel:
         if self.closed:
             return
         self.closed = True
+        self.send_queue.put_nowait(None)
         for session in self.udp_sessions.values():
             if session.transport:
                 session.transport.close()
         self.udp_sessions.clear()
+        if self.send_task and self.send_task is not asyncio.current_task():
+            self.send_task.cancel()
+            with contextlib.suppress(Exception):
+                await self.send_task
         self.writer.close()
         with contextlib.suppress(Exception):
             await self.writer.wait_closed()
@@ -118,7 +144,10 @@ class UdpRelayServer:
 
     async def start(self, host: str, port: int) -> None:
         async def accept(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
-            await UdpRelayChannel(reader, writer, self.token).run()
+            try:
+                await UdpRelayChannel(reader, writer, self.token).run()
+            except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError, OSError):
+                pass
 
         server = await asyncio.start_server(accept, host, port)
         async with server:

+ 31 - 4
scripts/commands/tcp_only_start.sh

@@ -15,6 +15,7 @@ LOG_FILE="/var/log/mynetspeeder-tcp-edge.log"
 CHAIN4="MYNETSPEEDER_TCP"
 CHAIN6="MYNETSPEEDER_TCP6"
 SSH_PORTS="${MYNETSPEEDER_TCP_SSH_PORTS:-}"
+CAPTURE_UID="${MYNETSPEEDER_TCP_CAPTURE_UID:-}"
 SELF_EXCLUDE_V4="127.0.0.0/8 169.254.0.0/16"
 SELF_EXCLUDE_V6="::1/128 fe80::/10"
 
@@ -46,6 +47,11 @@ if ! [[ "$LISTEN_PORT" =~ ^[0-9]+$ ]]; then
   exit 1
 fi
 
+if [[ -n "$CAPTURE_UID" ]] && ! [[ "$CAPTURE_UID" =~ ^[0-9]+$ ]]; then
+  echo "capture uid must be numeric"
+  exit 1
+fi
+
 if [[ -z "$SSH_PORTS" && -n "${SSH_CONNECTION:-}" ]]; then
   SSH_PORTS="${SSH_CONNECTION##* }"
 fi
@@ -69,7 +75,19 @@ cleanup_rules() {
   ensure_nat_chain_absent ip6tables "$CHAIN6"
 }
 
-trap 'cleanup_rules; stop_pid_file "$PID_FILE" "edge-tcp --listen-host ${LISTEN_HOST} --listen-port ${LISTEN_PORT} --config ${CONFIG_PATH}"' ERR
+cleanup_required=1
+cleanup_done=0
+
+cleanup_on_exit() {
+  if [[ "$cleanup_done" -eq 1 || "$cleanup_required" -eq 0 ]]; then
+    return 0
+  fi
+  cleanup_done=1
+  cleanup_rules
+  stop_pid_file "$PID_FILE" "edge-tcp --listen-host ${LISTEN_HOST} --listen-port ${LISTEN_PORT} --config ${CONFIG_PATH}"
+}
+
+trap 'cleanup_on_exit' ERR EXIT INT TERM
 
 readarray -t RELAY_HOSTS < <(python3 - <<'PY' "$CONFIG_PATH"
 import json, sys
@@ -112,7 +130,11 @@ done
 for host in "${RELAY_HOSTS[@]}"; do
   [[ -n "$host" && "$host" != *:* ]] && iptables -t nat -A "$CHAIN4" -d "$host" -j RETURN
 done
-iptables -t nat -A "$CHAIN4" -p tcp -j REDIRECT --to-ports "$LISTEN_PORT"
+if [[ -n "$CAPTURE_UID" ]]; then
+  iptables -t nat -A "$CHAIN4" -p tcp -m owner --uid-owner "$CAPTURE_UID" -j REDIRECT --to-ports "$LISTEN_PORT"
+else
+  iptables -t nat -A "$CHAIN4" -p tcp -j REDIRECT --to-ports "$LISTEN_PORT"
+fi
 ensure_nat_jump_absent iptables nat OUTPUT -p tcp -j "$CHAIN4"
 iptables -t nat -A OUTPUT -p tcp -j "$CHAIN4"
 
@@ -130,10 +152,15 @@ if command -v ip6tables >/dev/null 2>&1 && ip6tables -t nat -S >/dev/null 2>&1;
   for host in "${RELAY_HOSTS[@]}"; do
     [[ -n "$host" && "$host" == *:* ]] && ip6tables -t nat -A "$CHAIN6" -d "$host" -j RETURN
   done
-  ip6tables -t nat -A "$CHAIN6" -p tcp -j REDIRECT --to-ports "$LISTEN_PORT"
+  if [[ -n "$CAPTURE_UID" ]]; then
+    ip6tables -t nat -A "$CHAIN6" -p tcp -m owner --uid-owner "$CAPTURE_UID" -j REDIRECT --to-ports "$LISTEN_PORT"
+  else
+    ip6tables -t nat -A "$CHAIN6" -p tcp -j REDIRECT --to-ports "$LISTEN_PORT"
+  fi
   ensure_nat_jump_absent ip6tables nat OUTPUT -p tcp -j "$CHAIN6"
   ip6tables -t nat -A OUTPUT -p tcp -j "$CHAIN6"
 fi
 
-trap - ERR
+cleanup_required=0
+trap - ERR EXIT INT TERM
 echo "tcp-only started on ${LISTEN_HOST}:${LISTEN_PORT}"