|
|
@@ -4,10 +4,11 @@ import asyncio
|
|
|
import contextlib
|
|
|
import socket
|
|
|
from dataclasses import dataclass
|
|
|
+import time
|
|
|
from typing import Awaitable, Callable, Dict
|
|
|
|
|
|
from .config import Config, RelayNode
|
|
|
-from .protocol import AUTH, STATUS_OK, Frame, encode_json, read_frame, write_frame
|
|
|
+from .protocol import AUTH, PING, PONG, STATUS_OK, Frame, encode_json, read_frame, write_frame
|
|
|
from .scheduler import Scheduler
|
|
|
|
|
|
FrameHandler = Callable[["RelayConnection", Frame], Awaitable[None]]
|
|
|
@@ -22,6 +23,8 @@ class RelayConnection:
|
|
|
closed: bool = False
|
|
|
handlers: Dict[tuple[int, int], FrameHandler] = None
|
|
|
pump_task: asyncio.Task | None = None
|
|
|
+ keepalive_task: asyncio.Task | None = None
|
|
|
+ last_pong_at: float = 0.0
|
|
|
|
|
|
def __post_init__(self) -> None:
|
|
|
if self.handlers is None:
|
|
|
@@ -34,12 +37,33 @@ class RelayConnection:
|
|
|
if frame.kind != AUTH or frame.packet_id != STATUS_OK:
|
|
|
raise ConnectionError(f"relay auth failed: {self.node.name}")
|
|
|
print(f"[edge] relay connected name={self.node.name} addr={self.node.host}:{self.node.port}")
|
|
|
+ self.last_pong_at = time.monotonic()
|
|
|
+ self.keepalive_task = asyncio.create_task(self._keepalive())
|
|
|
self.pump_task = asyncio.create_task(self._pump())
|
|
|
|
|
|
+ async def _keepalive(self) -> None:
|
|
|
+ try:
|
|
|
+ while not self.closed:
|
|
|
+ await asyncio.sleep(self.manager.config.relay_ping_interval)
|
|
|
+ if self.closed:
|
|
|
+ break
|
|
|
+ if time.monotonic() - self.last_pong_at > self.manager.config.relay_ping_timeout:
|
|
|
+ print(f"[edge] relay health timeout name={self.node.name} addr={self.node.host}:{self.node.port} timeout={self.manager.config.relay_ping_timeout}")
|
|
|
+ await self.close()
|
|
|
+ break
|
|
|
+ await self.send(Frame(PING, 0, 0, 0, 0, b""))
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ pass
|
|
|
+ except Exception:
|
|
|
+ await self.close()
|
|
|
+
|
|
|
async def _pump(self) -> None:
|
|
|
try:
|
|
|
while True:
|
|
|
frame = await read_frame(self.reader)
|
|
|
+ if frame.kind == PONG:
|
|
|
+ self.last_pong_at = time.monotonic()
|
|
|
+ continue
|
|
|
handler = self.handlers.get((frame.session_id, frame.stream_id))
|
|
|
if handler:
|
|
|
await handler(self, frame)
|
|
|
@@ -66,6 +90,10 @@ class RelayConnection:
|
|
|
return
|
|
|
self.closed = True
|
|
|
self.manager.on_closed(self)
|
|
|
+ if self.keepalive_task and self.keepalive_task is not asyncio.current_task():
|
|
|
+ self.keepalive_task.cancel()
|
|
|
+ with contextlib.suppress(Exception):
|
|
|
+ await self.keepalive_task
|
|
|
self.writer.close()
|
|
|
with contextlib.suppress(Exception):
|
|
|
await self.writer.wait_closed()
|
|
|
@@ -88,18 +116,27 @@ class RelayManager:
|
|
|
if node.name in self.connections and not self.connections[node.name].closed:
|
|
|
await asyncio.sleep(2)
|
|
|
continue
|
|
|
- try:
|
|
|
- reader, writer = await asyncio.wait_for(asyncio.open_connection(node.host, node.port), timeout=self.config.relay_open_timeout)
|
|
|
- connection = RelayConnection(node=node, manager=self, reader=reader, writer=writer)
|
|
|
- sock = writer.get_extra_info("socket")
|
|
|
- if sock is not None and self.config.relay_tcp_nodelay:
|
|
|
- with contextlib.suppress(OSError):
|
|
|
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
- await connection.start()
|
|
|
- self.connections[node.name] = connection
|
|
|
- await connection.pump_task
|
|
|
- except Exception as exc:
|
|
|
- print(f"[edge] relay connect failed name={node.name} addr={node.host}:{node.port} error={exc!r}")
|
|
|
+ connected = False
|
|
|
+ for attempt in range(1, self.config.relay_reconnect_attempts + 1):
|
|
|
+ try:
|
|
|
+ print(f"[edge] relay reconnect attempt name={node.name} addr={node.host}:{node.port} attempt={attempt}/{self.config.relay_reconnect_attempts}")
|
|
|
+ reader, writer = await asyncio.wait_for(asyncio.open_connection(node.host, node.port), timeout=self.config.relay_open_timeout)
|
|
|
+ connection = RelayConnection(node=node, manager=self, reader=reader, writer=writer)
|
|
|
+ sock = writer.get_extra_info("socket")
|
|
|
+ if sock is not None and self.config.relay_tcp_nodelay:
|
|
|
+ with contextlib.suppress(OSError):
|
|
|
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
+ await connection.start()
|
|
|
+ self.connections[node.name] = connection
|
|
|
+ connected = True
|
|
|
+ await connection.pump_task
|
|
|
+ break
|
|
|
+ except Exception as exc:
|
|
|
+ print(f"[edge] relay connect failed name={node.name} addr={node.host}:{node.port} attempt={attempt}/{self.config.relay_reconnect_attempts} error={exc!r}")
|
|
|
+ if attempt < self.config.relay_reconnect_attempts:
|
|
|
+ await asyncio.sleep(self.config.relay_reconnect_delay)
|
|
|
+ if not connected:
|
|
|
+ print(f"[edge] relay reconnect exhausted name={node.name} addr={node.host}:{node.port} attempts={self.config.relay_reconnect_attempts}")
|
|
|
await asyncio.sleep(self.config.relay_reconnect_delay)
|
|
|
|
|
|
def on_closed(self, connection: RelayConnection) -> None:
|