Gogs 3 дней назад
Родитель
Сommit
f1e95aae71
3 измененных файлов с 58 добавлено и 34 удалено
  1. 12 7
      relay_server.py
  2. 1 1
      scheduler.py
  3. 45 26
      socks_edge.py

+ 12 - 7
relay_server.py

@@ -51,33 +51,38 @@ class RelayChannel:
     send_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
     authed_at: float = 0.0
     frame_count: int = 0
+    authed_kind: str = "normal"
 
     async def run(self) -> None:
         peer = self.writer.get_extra_info("peername")
         authed = False
         try:
             auth = await read_frame(self.reader)
-            if auth.kind != AUTH or decode_json(auth.payload).get("token") != self.token:
+            payload = decode_json(auth.payload)
+            if auth.kind != AUTH or payload.get("token") != self.token:
                 raise PermissionError("invalid token")
             authed = True
             self.authed_at = time.monotonic()
-            await self.safe_send(Frame(AUTH, 0, 0, 0, STATUS_OK, b"ok"))
+            self.authed_kind = payload.get("purpose", "normal")
+            if self.authed_kind != "probe":
+                await self.safe_send(Frame(AUTH, 0, 0, 0, STATUS_OK, b"ok"))
+            else:
+                await self.safe_send(Frame(AUTH, 0, 0, 0, STATUS_OK, encode_json({"status": "ok", "kind": "probe"})))
             while True:
                 frame = await read_frame(self.reader)
                 self.frame_count += 1
                 await self.handle(frame)
         except asyncio.IncompleteReadError:
-            if authed:
+            if authed and self.authed_kind != "probe":
                 lived = time.monotonic() - self.authed_at if self.authed_at else 0.0
-                if lived >= 5 or self.frame_count > 2:
-                    print(f"[relay] disconnected peer={peer} lived={lived:.1f}s")
+                if lived >= 15 or self.frame_count > 20:
+                    print(f"[relay] session closed peer={peer} kind={self.authed_kind} lived={lived:.1f}s frames={self.frame_count}")
         except asyncio.CancelledError:
             pass
         except Exception as exc:
             if authed:
                 lived = time.monotonic() - self.authed_at if self.authed_at else 0.0
-                if lived >= 5 or self.frame_count > 2:
-                    print(f"[relay] channel error peer={peer} lived={lived:.1f}s error={exc!r}")
+                print(f"[relay] session error peer={peer} kind={self.authed_kind} lived={lived:.1f}s frames={self.frame_count} error={exc!r}")
         finally:
             await self.close()
 

+ 1 - 1
scheduler.py

@@ -40,7 +40,7 @@ class Scheduler:
         started = time.perf_counter()
         try:
             reader, writer = await asyncio.wait_for(asyncio.open_connection(node.host, node.port), timeout=3)
-            await write_frame(writer, Frame(AUTH, 0, 0, 0, 0, encode_json({"token": node.token})))
+            await write_frame(writer, Frame(AUTH, 0, 0, 0, 0, encode_json({"token": node.token, "purpose": "probe"})))
             auth = await asyncio.wait_for(read_frame(reader), timeout=3)
             if auth.kind != AUTH or auth.packet_id != STATUS_OK:
                 raise ConnectionError(f"relay auth probe failed: {node.name}")

+ 45 - 26
socks_edge.py

@@ -25,6 +25,8 @@ class RelayLink:
     reader: asyncio.StreamReader
     writer: asyncio.StreamWriter
     pump: asyncio.Task | None = None
+    closed_event: asyncio.Event = field(default_factory=asyncio.Event)
+    maintain_task: asyncio.Task | None = None
     tcp_sessions: Dict[tuple[int, int], "TcpRaceSession"] = field(default_factory=dict)
     udp_server: "UdpAssociateServer | None" = None
     closed: bool = False
@@ -34,6 +36,8 @@ class RelayLink:
         frame = await read_frame(self.reader)
         if frame.kind != AUTH or frame.packet_id != STATUS_OK:
             raise ConnectionError(f"relay auth failed: {self.node.name}")
+        self.closed = False
+        self.closed_event.clear()
         self.pump = asyncio.create_task(self._pump())
 
     async def _pump(self) -> None:
@@ -47,7 +51,9 @@ class RelayLink:
                         await session.handle_frame(self, frame)
                 elif frame.kind == UDP_RECV and self.udp_server:
                     await self.udp_server.handle_from_relay(frame, self)
-        except asyncio.IncompleteReadError:
+        except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError, OSError):
+            pass
+        except Exception:
             pass
         finally:
             await self.close()
@@ -61,6 +67,11 @@ class RelayLink:
         if self.closed:
             return
         self.closed = True
+        self.closed_event.set()
+        if self.pump and self.pump is not asyncio.current_task():
+            self.pump.cancel()
+            with contextlib.suppress(Exception):
+                await self.pump
         self.writer.close()
         with contextlib.suppress(Exception):
             await self.writer.wait_closed()
@@ -84,6 +95,7 @@ class UdpFlowState:
     direct_sockets: dict[str, socket.socket] = field(default_factory=dict)
     direct_tasks: dict[str, asyncio.Task] = field(default_factory=dict)
     direct_failures: set[str] = field(default_factory=set)
+    relay_failures: dict[str, str] = field(default_factory=dict)
 
     def touch(self, now: float) -> None:
         self.last_activity = now
@@ -238,7 +250,6 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         flow.touch(now)
         flow.packets_sent += 1
         packet_id = next(self.packet_counter)
-        print(f"[edge] udp recv flow={flow.flow_id} packet_id={packet_id} target={host}:{port} size={len(payload)}")
         asyncio.create_task(self.edge.forward_udp(flow, payload, packet_id, self))
         self._log_udp_summary()
 
@@ -273,27 +284,14 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         if self.transport is None or self.client_addr is None:
             return
         packet = self._build_socks_udp(flow.target_host, flow.target_port, payload)
-        winner_log = ""
         now = asyncio.get_running_loop().time()
         flow.touch(now)
         flow.packets_received += 1
         if flow.winner_name is None:
             flow.winner_name = source_name
             self.win_counts[source_name] = self.win_counts.get(source_name, 0) + 1
-            relay_detail = ", ".join(f"{name}={count}" for name, count in sorted(self.win_counts.items())) or "none"
-            mode = "redundant" if self.edge.config.udp_redundancy > 0 else "single"
-            print(
-                f"[edge] udp flow={flow.flow_id} winner={source_name} "
-                f"target={flow.target_host}:{flow.target_port} mode={mode} candidates={len(flow.candidate_names)}"
-            )
-            print(f"[edge] udp win relay_breakdown={relay_detail}")
         elif flow.winner_name != source_name:
             flow.duplicate_responses += 1
-            winner_log = f" duplicate=1 winner={flow.winner_name} from={source_name}"
-        print(
-            f"[edge] udp send flow={flow.flow_id} packet_id={packet_id or 'direct'} "
-            f"target={flow.target_host}:{flow.target_port} size={len(payload)} relay={source_name}{winner_log}"
-        )
         if flow.winner_name == source_name:
             self.transport.sendto(packet, self.client_addr)
         self._log_udp_summary()
@@ -304,7 +302,7 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
 
     def note_unsent(self, flow: UdpFlowState, packet_id: int) -> None:
         flow.touch(asyncio.get_running_loop().time())
-        print(f"[edge] udp drop flow={flow.flow_id} packet_id={packet_id} reason=no_available_links")
+        flow.relay_failures.setdefault("summary", f"packet_id={packet_id}")
         self._log_udp_summary(force=True)
 
     def _log_udp_summary(self, force: bool = False) -> None:
@@ -317,11 +315,14 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         packets_sent = sum(flow.packets_sent for flow in self.client_flows.values())
         packets_received = sum(flow.packets_received for flow in self.client_flows.values())
         duplicates = sum(flow.duplicate_responses for flow in self.client_flows.values())
+        direct_paths = sum(len(flow.direct_sockets) for flow in self.client_flows.values())
+        relay_candidates = sum(len(flow.link_streams) for flow in self.client_flows.values())
         print(
             f"[edge] udp summary bind={self.client_addr[0]}:{self.client_addr[1]} active_flows={active_flows} "
-            f"winner_flows={winners} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates}"
+            f"winner_flows={winners} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} "
+            f"direct_paths={direct_paths} relay_paths={relay_candidates}"
             if self.client_addr
-            else f"[edge] udp summary bind=unbound active_flows={active_flows} winner_flows={winners} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates}"
+            else f"[edge] udp summary bind=unbound active_flows={active_flows} winner_flows={winners} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} direct_paths={direct_paths} relay_paths={relay_candidates}"
         )
 
     def _parse_socks_udp(self, packet: bytes) -> tuple[str, int, bytes]:
@@ -373,17 +374,35 @@ class SocksEdge:
             await server.serve_forever()
 
     async def _connect_relays(self) -> None:
-        for node in self.config.relays:
-            reader, writer = await asyncio.open_connection(node.host, node.port)
-            link = RelayLink(node, reader, writer)
-            await link.start()
-            self.links.append(link)
         loop = asyncio.get_running_loop()
         transport, protocol = await loop.create_datagram_endpoint(lambda: UdpAssociateServer(self), local_addr=(self.listen_host, 0))
         self.udp_server = protocol
-        for link in self.links:
-            link.udp_server = protocol
         self.udp_transport = transport
+        for node in self.config.relays:
+            link = RelayLink(node=node, reader=None, writer=None)  # type: ignore[arg-type]
+            link.udp_server = protocol
+            self.links.append(link)
+            link.maintain_task = asyncio.create_task(self._maintain_link(link))
+
+    async def _maintain_link(self, link: RelayLink) -> None:
+        backoff = 1.0
+        while True:
+            try:
+                reader, writer = await asyncio.open_connection(link.node.host, link.node.port)
+                sock = writer.get_extra_info("socket")
+                if sock is not None:
+                    with contextlib.suppress(OSError):
+                        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+                link.reader = reader
+                link.writer = writer
+                await link.start()
+                backoff = 1.0
+                await link.closed_event.wait()
+            except asyncio.CancelledError:
+                raise
+            except Exception:
+                await asyncio.sleep(backoff)
+                backoff = min(10.0, backoff * 2)
 
     async def _accept(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
         try:
@@ -414,7 +433,7 @@ class SocksEdge:
         return links or [link for link in self.links if not link.closed][:1]
 
     def _selected_udp_links(self) -> list[RelayLink]:
-        online = [link for link in self.links if not link.closed]
+        online = [link for link in self.links if not link.closed and link.writer is not None]
         if not online:
             return []
         ordered = sorted(online, key=lambda link: self.scheduler.scores.get(link.node.name).score if link.node.name in self.scheduler.scores else 999999.0)