|
|
@@ -2,6 +2,7 @@ from __future__ import annotations
|
|
|
|
|
|
import asyncio
|
|
|
import contextlib
|
|
|
+import random
|
|
|
import socket
|
|
|
from dataclasses import dataclass
|
|
|
import time
|
|
|
@@ -27,6 +28,7 @@ class RelayConnection:
|
|
|
keepalive_task: asyncio.Task | None = None
|
|
|
last_pong_at: float = 0.0
|
|
|
send_lock: asyncio.Lock | None = None
|
|
|
+ closed_event: asyncio.Event | None = None
|
|
|
|
|
|
def __post_init__(self) -> None:
|
|
|
if self.handlers is None:
|
|
|
@@ -35,6 +37,8 @@ class RelayConnection:
|
|
|
self.dispatch_tasks = {}
|
|
|
if self.send_lock is None:
|
|
|
self.send_lock = asyncio.Lock()
|
|
|
+ if self.closed_event is None:
|
|
|
+ self.closed_event = asyncio.Event()
|
|
|
|
|
|
async def start(self) -> None:
|
|
|
print(f"[edge] connecting relay name={self.node.name} addr={self.node.host}:{self.node.port}")
|
|
|
@@ -127,6 +131,8 @@ class RelayConnection:
|
|
|
if self.closed:
|
|
|
return
|
|
|
self.closed = True
|
|
|
+ assert self.closed_event is not None
|
|
|
+ self.closed_event.set()
|
|
|
handlers = list(self.handlers.items())
|
|
|
self.handlers.clear()
|
|
|
dispatch_tasks = list(self.dispatch_tasks.values())
|
|
|
@@ -166,14 +172,17 @@ class RelayManager:
|
|
|
self.tasks.append(asyncio.create_task(self._maintain(node)))
|
|
|
|
|
|
async def _maintain(self, node: RelayNode) -> None:
|
|
|
+ backoff = self.config.relay_reconnect_delay
|
|
|
while True:
|
|
|
- if node.name in self.connections and not self.connections[node.name].closed:
|
|
|
- await asyncio.sleep(2)
|
|
|
+ current = self.connections.get(node.name)
|
|
|
+ if current is not None and not current.closed:
|
|
|
+ assert current.closed_event is not None
|
|
|
+ await current.closed_event.wait()
|
|
|
continue
|
|
|
- connected = False
|
|
|
- for attempt in range(1, self.config.relay_reconnect_attempts + 1):
|
|
|
+ attempt = 1
|
|
|
+ while True:
|
|
|
try:
|
|
|
- print(f"[edge] relay reconnect attempt name={node.name} addr={node.host}:{node.port} attempt={attempt}/{self.config.relay_reconnect_attempts}")
|
|
|
+ print(f"[edge] relay reconnect attempt name={node.name} addr={node.host}:{node.port} attempt={attempt} backoff={backoff:.1f}s")
|
|
|
reader, writer = await asyncio.wait_for(asyncio.open_connection(node.host, node.port), timeout=self.config.relay_open_timeout)
|
|
|
connection = RelayConnection(node=node, manager=self, reader=reader, writer=writer)
|
|
|
sock = writer.get_extra_info("socket")
|
|
|
@@ -182,16 +191,19 @@ class RelayManager:
|
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
await connection.start()
|
|
|
self.connections[node.name] = connection
|
|
|
- connected = True
|
|
|
- await connection.pump_task
|
|
|
+ backoff = self.config.relay_reconnect_delay
|
|
|
+ assert connection.closed_event is not None
|
|
|
+ await connection.closed_event.wait()
|
|
|
+ print(f"[edge] relay supervisor noticed close name={node.name} addr={node.host}:{node.port}")
|
|
|
break
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ raise
|
|
|
except Exception as exc:
|
|
|
- print(f"[edge] relay connect failed name={node.name} addr={node.host}:{node.port} attempt={attempt}/{self.config.relay_reconnect_attempts} error={exc!r}")
|
|
|
- if attempt < self.config.relay_reconnect_attempts:
|
|
|
- await asyncio.sleep(self.config.relay_reconnect_delay)
|
|
|
- if not connected:
|
|
|
- print(f"[edge] relay reconnect exhausted name={node.name} addr={node.host}:{node.port} attempts={self.config.relay_reconnect_attempts}")
|
|
|
- await asyncio.sleep(self.config.relay_reconnect_delay)
|
|
|
+ print(f"[edge] relay connect failed name={node.name} addr={node.host}:{node.port} attempt={attempt} error={exc!r}")
|
|
|
+ jitter = random.uniform(0, min(1.0, backoff * 0.2))
|
|
|
+ await asyncio.sleep(backoff + jitter)
|
|
|
+ backoff = min(self.config.relay_reconnect_max_delay, max(self.config.relay_reconnect_delay, backoff * 2))
|
|
|
+ attempt += 1
|
|
|
|
|
|
def on_closed(self, connection: RelayConnection) -> None:
|
|
|
current = self.connections.get(connection.node.name)
|