Parcourir la source

修改日志和Udp错误

Gogs il y a 2 semaines
Parent
commit
c5b613b9e5
2 fichiers modifiés avec 40 ajouts et 8 suppressions
  1. 10 4
      relay_server.py
  2. 30 4
      socks_edge.py

+ 10 - 4
relay_server.py

@@ -53,6 +53,11 @@ class RelayChannel:
     frame_count: int = 0
     authed_kind: str = "normal"
 
+    def _should_log_lifecycle(self, lived: float) -> bool:
+        if self.authed_kind == "probe":
+            return False
+        return lived >= 15 or self.frame_count > 20
+
     async def run(self) -> None:
         peer = self.writer.get_extra_info("peername")
         authed = False
@@ -76,16 +81,17 @@ class RelayChannel:
                 self.frame_count += 1
                 await self.handle(frame)
         except asyncio.IncompleteReadError:
-            if authed and self.authed_kind != "probe":
+            if authed:
                 lived = time.monotonic() - self.authed_at if self.authed_at else 0.0
-                if lived >= 15 or self.frame_count > 20:
+                if self._should_log_lifecycle(lived):
                     print(f"[relay] session closed peer={peer} kind={self.authed_kind} lived={lived:.1f}s frames={self.frame_count}")
         except asyncio.CancelledError:
             pass
         except Exception as exc:
-            if authed and self.authed_kind != "probe":
+            if authed:
                 lived = time.monotonic() - self.authed_at if self.authed_at else 0.0
-                print(f"[relay] session error peer={peer} kind={self.authed_kind} lived={lived:.1f}s frames={self.frame_count} error={exc!r}")
+                if self._should_log_lifecycle(lived):
+                    print(f"[relay] session error peer={peer} kind={self.authed_kind} lived={lived:.1f}s frames={self.frame_count} error={exc!r}")
         finally:
             await self.close()
 

+ 30 - 4
socks_edge.py

@@ -72,6 +72,8 @@ class RelayLink:
             return
         self.closed = True
         self.closed_event.set()
+        if self.udp_server:
+            self.udp_server.handle_relay_closed(self.node.name)
         if self.pump and self.pump is not asyncio.current_task():
             self.pump.cancel()
             with contextlib.suppress(Exception):
@@ -325,8 +327,14 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
             self.transport.sendto(packet, self.client_addr)
 
     def set_flow_candidates(self, flow: UdpFlowState, candidate_names: tuple[str, ...]) -> None:
-        if not flow.candidate_names:
-            flow.candidate_names = candidate_names
+        flow.candidate_names = candidate_names
+
+    def handle_relay_closed(self, relay_name: str) -> None:
+        for flow in self.client_flows.values():
+            stream_id = flow.link_streams.pop(relay_name, None)
+            if stream_id is not None:
+                self.edge.udp_flow_sessions.pop((flow.flow_id, stream_id), None)
+            flow.initialized_links.discard(relay_name)
 
     def note_unsent(self, flow: UdpFlowState, packet_id: int) -> None:
         flow.touch(asyncio.get_running_loop().time())
@@ -346,21 +354,39 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         direct_paths = sum(len(flow.direct_sockets) for flow in self.client_flows.values())
         relay_candidates = sum(len(flow.link_streams) for flow in self.client_flows.values())
         winner_detail = ", ".join(f"{flow.flow_id}:{flow.winner_name}" for flow in self.client_flows.values() if flow.winner_name) or "none"
+        candidate_detail_parts: list[str] = []
         relay_errors: list[str] = []
         for flow in self.client_flows.values():
+            states: list[str] = []
+            for name in flow.candidate_names:
+                if name in flow.direct_sockets:
+                    state = "direct"
+                elif name in flow.link_streams:
+                    state = "relay"
+                elif name in flow.direct_failures:
+                    state = "direct_down"
+                elif name in flow.relay_failures:
+                    state = "relay_down"
+                else:
+                    state = "pending"
+                if flow.winner_name == name:
+                    state += ":win"
+                states.append(f"{name}={state}")
+            candidate_detail_parts.append(f"{flow.flow_id}[{', '.join(states) or 'none'}]")
             for name, count in flow.relay_failures.items():
                 relay_errors.append(f"{name}={count}")
+        candidate_detail = "; ".join(candidate_detail_parts) or "none"
         relay_error_detail = ", ".join(sorted(relay_errors)) or "none"
         if self.client_addr:
             print(
                 f"[edge] udp summary bind={self.client_addr[0]}:{self.client_addr[1]} active_flows={active_flows} "
                 f"winner_flows={winners} winner_detail={winner_detail} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} "
-                f"direct_paths={direct_paths} relay_paths={relay_candidates} relay_errors={relay_error_detail}"
+                f"direct_paths={direct_paths} relay_paths={relay_candidates} candidates={candidate_detail} relay_errors={relay_error_detail}"
             )
         else:
             print(
                 f"[edge] udp summary bind=unbound active_flows={active_flows} winner_flows={winners} winner_detail={winner_detail} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} "
-                f"direct_paths={direct_paths} relay_paths={relay_candidates} relay_errors={relay_error_detail}"
+                f"direct_paths={direct_paths} relay_paths={relay_candidates} candidates={candidate_detail} relay_errors={relay_error_detail}"
             )
 
     def _parse_socks_udp(self, packet: bytes) -> tuple[str, int, bytes]: