|
|
@@ -95,7 +95,8 @@ class UdpFlowState:
|
|
|
direct_sockets: dict[str, socket.socket] = field(default_factory=dict)
|
|
|
direct_tasks: dict[str, asyncio.Task] = field(default_factory=dict)
|
|
|
direct_failures: set[str] = field(default_factory=set)
|
|
|
- relay_failures: dict[str, str] = field(default_factory=dict)
|
|
|
+ relay_failures: dict[str, int] = field(default_factory=dict)
|
|
|
+ relay_error_seen: set[str] = field(default_factory=set)
|
|
|
|
|
|
def touch(self, now: float) -> None:
|
|
|
self.last_activity = now
|
|
|
@@ -213,6 +214,7 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
|
|
|
self.flow_counter = itertools.count(1)
|
|
|
self.last_summary_at = 0.0
|
|
|
self.win_counts: Dict[str, int] = {}
|
|
|
+ self.relay_error_counts: Dict[str, int] = {}
|
|
|
|
|
|
def connection_made(self, transport) -> None:
|
|
|
self.transport = transport
|
|
|
@@ -290,6 +292,7 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
|
|
|
if flow.winner_name is None:
|
|
|
flow.winner_name = source_name
|
|
|
self.win_counts[source_name] = self.win_counts.get(source_name, 0) + 1
|
|
|
+ self._log_udp_summary(force=True)
|
|
|
elif flow.winner_name != source_name:
|
|
|
flow.duplicate_responses += 1
|
|
|
if flow.winner_name == source_name:
|
|
|
@@ -302,7 +305,7 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
|
|
|
|
|
|
def note_unsent(self, flow: UdpFlowState, packet_id: int) -> None:
|
|
|
flow.touch(asyncio.get_running_loop().time())
|
|
|
- flow.relay_failures.setdefault("summary", f"packet_id={packet_id}")
|
|
|
+ flow.relay_failures["unsent"] = flow.relay_failures.get("unsent", 0) + 1
|
|
|
self._log_udp_summary(force=True)
|
|
|
|
|
|
def _log_udp_summary(self, force: bool = False) -> None:
|
|
|
@@ -317,13 +320,23 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
|
|
|
duplicates = sum(flow.duplicate_responses for flow in self.client_flows.values())
|
|
|
direct_paths = sum(len(flow.direct_sockets) for flow in self.client_flows.values())
|
|
|
relay_candidates = sum(len(flow.link_streams) for flow in self.client_flows.values())
|
|
|
- print(
|
|
|
- f"[edge] udp summary bind={self.client_addr[0]}:{self.client_addr[1]} active_flows={active_flows} "
|
|
|
- f"winner_flows={winners} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} "
|
|
|
- f"direct_paths={direct_paths} relay_paths={relay_candidates}"
|
|
|
- if self.client_addr
|
|
|
- else f"[edge] udp summary bind=unbound active_flows={active_flows} winner_flows={winners} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} direct_paths={direct_paths} relay_paths={relay_candidates}"
|
|
|
- )
|
|
|
+ winner_detail = ", ".join(f"{flow.flow_id}:{flow.winner_name}" for flow in self.client_flows.values() if flow.winner_name) or "none"
|
|
|
+ relay_errors: list[str] = []
|
|
|
+ for flow in self.client_flows.values():
|
|
|
+ for name, count in flow.relay_failures.items():
|
|
|
+ relay_errors.append(f"{name}={count}")
|
|
|
+ relay_error_detail = ", ".join(sorted(relay_errors)) or "none"
|
|
|
+ if self.client_addr:
|
|
|
+ print(
|
|
|
+ f"[edge] udp summary bind={self.client_addr[0]}:{self.client_addr[1]} active_flows={active_flows} "
|
|
|
+ f"winner_flows={winners} winner_detail={winner_detail} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} "
|
|
|
+ f"direct_paths={direct_paths} relay_paths={relay_candidates} relay_errors={relay_error_detail}"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ print(
|
|
|
+ f"[edge] udp summary bind=unbound active_flows={active_flows} winner_flows={winners} winner_detail={winner_detail} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} "
|
|
|
+ f"direct_paths={direct_paths} relay_paths={relay_candidates} relay_errors={relay_error_detail}"
|
|
|
+ )
|
|
|
|
|
|
def _parse_socks_udp(self, packet: bytes) -> tuple[str, int, bytes]:
|
|
|
atyp = packet[3]
|
|
|
@@ -487,9 +500,7 @@ class SocksEdge:
|
|
|
direct_names = tuple(name for name in sorted(flow.direct_sockets))
|
|
|
relay_names = tuple(link.node.name for link in links)
|
|
|
candidate_names = direct_names + relay_names
|
|
|
- link_names = ",".join(candidate_names) or "none"
|
|
|
udp_server.set_flow_candidates(flow, candidate_names)
|
|
|
- print(f"[edge] udp forward packet_id={packet_id} target={flow.target_host}:{flow.target_port} size={len(payload)} links={link_names}")
|
|
|
if not candidate_names:
|
|
|
udp_server.note_unsent(flow, packet_id)
|
|
|
return
|
|
|
@@ -521,10 +532,12 @@ class SocksEdge:
|
|
|
task.cancel()
|
|
|
with contextlib.suppress(Exception):
|
|
|
sock.close()
|
|
|
- print(
|
|
|
- f"[edge] udp send error flow={flow.flow_id} packet_id={packet_id} "
|
|
|
- f"relay={path_name} error={exc!r}"
|
|
|
- )
|
|
|
+ flow.relay_failures[path_name] = flow.relay_failures.get(path_name, 0) + 1
|
|
|
+ if path_name not in flow.relay_error_seen:
|
|
|
+ flow.relay_error_seen.add(path_name)
|
|
|
+ print(
|
|
|
+ f"[edge] udp relay error flow={flow.flow_id} relay={path_name} error={exc!r}"
|
|
|
+ )
|
|
|
for link in active_links:
|
|
|
stream_id = flow.link_streams.get(link.node.name)
|
|
|
if stream_id is None:
|
|
|
@@ -541,10 +554,12 @@ class SocksEdge:
|
|
|
except Exception as exc:
|
|
|
flow.link_streams.pop(link.node.name, None)
|
|
|
self.udp_flow_sessions.pop((flow.flow_id, stream_id), None)
|
|
|
- print(
|
|
|
- f"[edge] udp send error flow={flow.flow_id} packet_id={packet_id} "
|
|
|
- f"relay={link.node.name} error={exc!r}"
|
|
|
- )
|
|
|
+ flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1
|
|
|
+ if link.node.name not in flow.relay_error_seen:
|
|
|
+ flow.relay_error_seen.add(link.node.name)
|
|
|
+ print(
|
|
|
+ f"[edge] udp relay error flow={flow.flow_id} relay={link.node.name} error={exc!r}"
|
|
|
+ )
|
|
|
if attempt + 1 < copies and self.config.udp_copy_interval_ms > 0:
|
|
|
await asyncio.sleep(self.config.udp_copy_interval_ms / 1000)
|
|
|
if not sent_any:
|