فهرست منبع

优化日志和kind3丢弃

Gogs 4 روز پیش
والد
کامیت
b15d2dfc93
2فایلهای تغییر یافته به همراه38 افزوده شده و 2 حذف شده
  1. 25 1
      relay_client.py
  2. 13 1
      transparent_edge.py

+ 25 - 1
relay_client.py

@@ -29,6 +29,8 @@ class RelayConnection:
     last_pong_at: float = 0.0
     send_lock: asyncio.Lock | None = None
     closed_event: asyncio.Event | None = None
+    dropped_frames: Dict[int, int] = None
+    dropped_report_task: asyncio.Task | None = None
 
     def __post_init__(self) -> None:
         if self.handlers is None:
@@ -39,6 +41,8 @@ class RelayConnection:
             self.send_lock = asyncio.Lock()
         if self.closed_event is None:
             self.closed_event = asyncio.Event()
+        if self.dropped_frames is None:
+            self.dropped_frames = {}
 
     async def start(self) -> None:
         print(f"[edge] connecting relay name={self.node.name} addr={self.node.host}:{self.node.port}")
@@ -78,7 +82,7 @@ class RelayConnection:
                 if handler:
                     self._dispatch_frame(frame, handler)
                 else:
-                    print(f"[edge] relay frame dropped name={self.node.name} session={frame.session_id} stream={frame.stream_id} kind={frame.kind}")
+                    self._record_dropped_frame(frame.kind)
         except asyncio.IncompleteReadError:
             print(f"[edge] relay disconnected name={self.node.name} eof=true")
         except Exception as exc:
@@ -109,6 +113,22 @@ class RelayConnection:
             if self.dispatch_tasks.get(key) is asyncio.current_task():
                 self.dispatch_tasks.pop(key, None)
 
+    def _record_dropped_frame(self, kind: int) -> None:
+        self.dropped_frames[kind] = self.dropped_frames.get(kind, 0) + 1
+        if self.dropped_report_task is None or self.dropped_report_task.done():
+            self.dropped_report_task = asyncio.create_task(self._report_dropped_frames())
+
+    async def _report_dropped_frames(self) -> None:
+        try:
+            await asyncio.sleep(5)
+            dropped = self.dropped_frames
+            self.dropped_frames = {}
+            if dropped:
+                detail = ", ".join(f"kind={kind} count={count}" for kind, count in sorted(dropped.items()))
+                print(f"[edge] relay frame dropped summary name={self.node.name} {detail}")
+        except asyncio.CancelledError:
+            pass
+
     async def send(self, frame: Frame) -> None:
         if self.closed:
             raise ConnectionError(f"relay closed: {self.node.name}")
@@ -146,6 +166,10 @@ class RelayConnection:
         for task in dispatch_tasks:
             with contextlib.suppress(Exception):
                 await task
+        if self.dropped_report_task and self.dropped_report_task is not asyncio.current_task():
+            self.dropped_report_task.cancel()
+            with contextlib.suppress(Exception):
+                await self.dropped_report_task
         if self.pump_task and self.pump_task is not asyncio.current_task():
             self.pump_task.cancel()
             with contextlib.suppress(Exception):

+ 13 - 1
transparent_edge.py

@@ -142,6 +142,7 @@ class RelayTcpPath(BasePath):
         self.connection = connection
         self.session_id = session_id
         self.stream_id = stream_id
+        self.unbind_task: asyncio.Task | None = None
 
     async def open(self, target: TargetAddress) -> None:
         if self.connection.closed:
@@ -177,11 +178,16 @@ class RelayTcpPath(BasePath):
         if self.closed:
             return
         self.closed = True
-        self.connection.unbind(self.session_id, self.stream_id)
+        if self.unbind_task is None or self.unbind_task.done():
+            self.unbind_task = asyncio.create_task(self._delayed_unbind())
         if not self.connection.closed:
             with contextlib.suppress(Exception):
                 await self.connection.send(Frame(TCP_CLOSE, self.session_id, self.stream_id, 0, 0, b""))
 
+    async def _delayed_unbind(self) -> None:
+        await asyncio.sleep(0.5)
+        self.connection.unbind(self.session_id, self.stream_id)
+
 
 @dataclass
 class TransparentSession:
@@ -373,6 +379,7 @@ class RelayUdpPath(BasePath):
         self.session_id = session_id
         self.stream_id = stream_id
         self.target = target
+        self.unbind_task: asyncio.Task | None = None
 
     async def open(self, _target: TargetAddress) -> None:
         if self.connection.closed:
@@ -402,6 +409,11 @@ class RelayUdpPath(BasePath):
         if self.closed:
             return
         self.closed = True
+        if self.unbind_task is None or self.unbind_task.done():
+            self.unbind_task = asyncio.create_task(self._delayed_unbind())
+
+    async def _delayed_unbind(self) -> None:
+        await asyncio.sleep(0.5)
         self.connection.unbind(self.session_id, self.stream_id)