|
@@ -29,6 +29,8 @@ class RelayConnection:
|
|
|
last_pong_at: float = 0.0
|
|
last_pong_at: float = 0.0
|
|
|
send_lock: asyncio.Lock | None = None
|
|
send_lock: asyncio.Lock | None = None
|
|
|
closed_event: asyncio.Event | None = None
|
|
closed_event: asyncio.Event | None = None
|
|
|
|
|
+ dropped_frames: Dict[int, int] = None
|
|
|
|
|
+ dropped_report_task: asyncio.Task | None = None
|
|
|
|
|
|
|
|
def __post_init__(self) -> None:
|
|
def __post_init__(self) -> None:
|
|
|
if self.handlers is None:
|
|
if self.handlers is None:
|
|
@@ -39,6 +41,8 @@ class RelayConnection:
|
|
|
self.send_lock = asyncio.Lock()
|
|
self.send_lock = asyncio.Lock()
|
|
|
if self.closed_event is None:
|
|
if self.closed_event is None:
|
|
|
self.closed_event = asyncio.Event()
|
|
self.closed_event = asyncio.Event()
|
|
|
|
|
+ if self.dropped_frames is None:
|
|
|
|
|
+ self.dropped_frames = {}
|
|
|
|
|
|
|
|
async def start(self) -> None:
|
|
async def start(self) -> None:
|
|
|
print(f"[edge] connecting relay name={self.node.name} addr={self.node.host}:{self.node.port}")
|
|
print(f"[edge] connecting relay name={self.node.name} addr={self.node.host}:{self.node.port}")
|
|
@@ -78,7 +82,7 @@ class RelayConnection:
|
|
|
if handler:
|
|
if handler:
|
|
|
self._dispatch_frame(frame, handler)
|
|
self._dispatch_frame(frame, handler)
|
|
|
else:
|
|
else:
|
|
|
- print(f"[edge] relay frame dropped name={self.node.name} session={frame.session_id} stream={frame.stream_id} kind={frame.kind}")
|
|
|
|
|
|
|
+ self._record_dropped_frame(frame.kind)
|
|
|
except asyncio.IncompleteReadError:
|
|
except asyncio.IncompleteReadError:
|
|
|
print(f"[edge] relay disconnected name={self.node.name} eof=true")
|
|
print(f"[edge] relay disconnected name={self.node.name} eof=true")
|
|
|
except Exception as exc:
|
|
except Exception as exc:
|
|
@@ -109,6 +113,22 @@ class RelayConnection:
|
|
|
if self.dispatch_tasks.get(key) is asyncio.current_task():
|
|
if self.dispatch_tasks.get(key) is asyncio.current_task():
|
|
|
self.dispatch_tasks.pop(key, None)
|
|
self.dispatch_tasks.pop(key, None)
|
|
|
|
|
|
|
|
|
|
+ def _record_dropped_frame(self, kind: int) -> None:
|
|
|
|
|
+ self.dropped_frames[kind] = self.dropped_frames.get(kind, 0) + 1
|
|
|
|
|
+ if self.dropped_report_task is None or self.dropped_report_task.done():
|
|
|
|
|
+ self.dropped_report_task = asyncio.create_task(self._report_dropped_frames())
|
|
|
|
|
+
|
|
|
|
|
+ async def _report_dropped_frames(self) -> None:
|
|
|
|
|
+ try:
|
|
|
|
|
+ await asyncio.sleep(5)
|
|
|
|
|
+ dropped = self.dropped_frames
|
|
|
|
|
+ self.dropped_frames = {}
|
|
|
|
|
+ if dropped:
|
|
|
|
|
+ detail = ", ".join(f"kind={kind} count={count}" for kind, count in sorted(dropped.items()))
|
|
|
|
|
+ print(f"[edge] relay frame dropped summary name={self.node.name} {detail}")
|
|
|
|
|
+ except asyncio.CancelledError:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
async def send(self, frame: Frame) -> None:
|
|
async def send(self, frame: Frame) -> None:
|
|
|
if self.closed:
|
|
if self.closed:
|
|
|
raise ConnectionError(f"relay closed: {self.node.name}")
|
|
raise ConnectionError(f"relay closed: {self.node.name}")
|
|
@@ -146,6 +166,10 @@ class RelayConnection:
|
|
|
for task in dispatch_tasks:
|
|
for task in dispatch_tasks:
|
|
|
with contextlib.suppress(Exception):
|
|
with contextlib.suppress(Exception):
|
|
|
await task
|
|
await task
|
|
|
|
|
+ if self.dropped_report_task and self.dropped_report_task is not asyncio.current_task():
|
|
|
|
|
+ self.dropped_report_task.cancel()
|
|
|
|
|
+ with contextlib.suppress(Exception):
|
|
|
|
|
+ await self.dropped_report_task
|
|
|
if self.pump_task and self.pump_task is not asyncio.current_task():
|
|
if self.pump_task and self.pump_task is not asyncio.current_task():
|
|
|
self.pump_task.cancel()
|
|
self.pump_task.cancel()
|
|
|
with contextlib.suppress(Exception):
|
|
with contextlib.suppress(Exception):
|