Bläddra i källkod

修改子节点日志输出内容

Gogs 3 dagar sedan
förälder
incheckning
7dcd9eabc6
2 ändrade filer med 15 tillägg och 8 borttagningar
  1. 5 0
      edge_udp.py
  2. 10 8
      relay_server_udp.py

+ 5 - 0
edge_udp.py

@@ -241,6 +241,11 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         else:
             flow.winner_miss_streak = 0
         if flow.winner_name == source_name and target_addr is not None:
+            if flow.packets_received == 1:
+                print(
+                    f"[edge] udp relay reply flow={flow.flow_id} relay={source_name} "
+                    f"target={flow.target_host}:{flow.target_port} bytes={len(payload)}"
+                )
             self.transport.sendto(packet, target_addr)
 
     def _cleanup_packet_state(self, flow: UdpFlowState, now: float) -> None:

+ 10 - 8
relay_server_udp.py

@@ -15,6 +15,7 @@ class UdpRelayProtocol(asyncio.DatagramProtocol):
         self.stream_id = stream_id
 
     def datagram_received(self, data: bytes, _addr) -> None:
+        self.channel.log_udp_reply(self.session_id, self.stream_id, len(data))
         self.channel.enqueue_send(Frame(UDP_RECV, self.session_id, self.stream_id, 0, 0, data))
 
 
@@ -38,6 +39,7 @@ class UdpRelayChannel:
     send_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
     send_queue: asyncio.Queue[Frame | None] = field(default_factory=asyncio.Queue)
     send_task: asyncio.Task | None = None
+    _logged_sessions: set[tuple[int, int]] = field(default_factory=set)
 
     async def run(self) -> None:
         try:
@@ -114,6 +116,7 @@ class UdpRelayChannel:
                 )
                 session = UdpRelaySession(frame.session_id, frame.stream_id, transport, meta["host"], int(meta["port"]), family)
                 self.udp_sessions[key] = session
+                print(f"[relay] udp session opened session={frame.session_id} stream={frame.stream_id} target={meta['host']}:{int(meta['port'])}")
             except Exception as exc:
                 await self.safe_send(Frame(UDP_RECV, frame.session_id, frame.stream_id, 0, STATUS_ERR, str(exc).encode()))
                 return
@@ -121,6 +124,13 @@ class UdpRelayChannel:
             with contextlib.suppress(Exception):
                 session.transport.sendto(payload)
 
+    def log_udp_reply(self, session_id: int, stream_id: int, size: int) -> None:
+        key = (session_id, stream_id)
+        if key in self._logged_sessions:
+            return
+        self._logged_sessions.add(key)
+        print(f"[relay] udp reply session={session_id} stream={stream_id} bytes={size}")
+
     async def close(self) -> None:
         if self.closed:
             return
@@ -146,17 +156,9 @@ class UdpRelayServer:
     async def start(self, host: str, port: int) -> None:
         async def accept(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
             try:
-                peer = writer.get_extra_info("peername")
-                if peer:
-                    print(f"[relay] udp client connected peer={peer[0]}:{peer[1]}")
                 await UdpRelayChannel(reader, writer, self.token).run()
             except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError, OSError):
                 pass
-            finally:
-                with contextlib.suppress(Exception):
-                    peer = writer.get_extra_info("peername")
-                    if peer:
-                        print(f"[relay] udp client closed peer={peer[0]}:{peer[1]}")
 
         print(f"[relay] udp server listening on {host}:{port}")
         server = await asyncio.start_server(accept, host, port)