|
|
@@ -50,6 +50,11 @@ class RelayLink:
|
|
|
session = self.tcp_sessions.get(key)
|
|
|
if session:
|
|
|
await session.handle_frame(self, frame)
|
|
|
+ elif self.udp_server:
|
|
|
+ if frame.kind == TCP_STATUS:
|
|
|
+ await self.udp_server.handle_relay_status(frame, self)
|
|
|
+ elif frame.kind == TCP_CLOSE:
|
|
|
+ await self.udp_server.handle_relay_close(frame, self)
|
|
|
elif frame.kind == UDP_RECV and self.udp_server:
|
|
|
await self.udp_server.handle_from_relay(frame, self)
|
|
|
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError, OSError):
|
|
|
@@ -309,6 +314,33 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
|
|
|
return
|
|
|
await self._deliver_flow_packet(flow, frame.packet_id, frame.payload, link.node.name)
|
|
|
|
|
|
+ async def handle_relay_status(self, frame: Frame, link: RelayLink) -> None:
|
|
|
+ if frame.packet_id == STATUS_OK:
|
|
|
+ return
|
|
|
+ flow = self.edge.udp_flow_sessions.get((frame.session_id, frame.stream_id))
|
|
|
+ if flow is None:
|
|
|
+ return
|
|
|
+ detail = frame.payload.decode("utf-8", errors="replace") if frame.payload else "unknown"
|
|
|
+ current_stream = flow.link_streams.get(link.node.name)
|
|
|
+ if current_stream == frame.stream_id:
|
|
|
+ flow.link_streams.pop(link.node.name, None)
|
|
|
+ self.edge.udp_flow_sessions.pop((frame.session_id, frame.stream_id), None)
|
|
|
+ flow.initialized_links.discard(link.node.name)
|
|
|
+ flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1
|
|
|
+ print(f"[edge] udp relay status error flow={flow.flow_id} relay={link.node.name} detail={detail}")
|
|
|
+
|
|
|
+ async def handle_relay_close(self, frame: Frame, link: RelayLink) -> None:
|
|
|
+ flow = self.edge.udp_flow_sessions.get((frame.session_id, frame.stream_id))
|
|
|
+ if flow is None:
|
|
|
+ return
|
|
|
+ current_stream = flow.link_streams.get(link.node.name)
|
|
|
+ if current_stream == frame.stream_id:
|
|
|
+ flow.link_streams.pop(link.node.name, None)
|
|
|
+ self.edge.udp_flow_sessions.pop((frame.session_id, frame.stream_id), None)
|
|
|
+ flow.initialized_links.discard(link.node.name)
|
|
|
+ flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1
|
|
|
+ print(f"[edge] udp relay close flow={flow.flow_id} relay={link.node.name}")
|
|
|
+
|
|
|
async def handle_from_direct(self, flow: UdpFlowState, path_name: str, payload: bytes) -> None:
|
|
|
if self.transport is None or self.client_addr is None:
|
|
|
return
|
|
|
@@ -588,16 +620,15 @@ class SocksEdge:
|
|
|
stream_id = next(self.udp_stream_ids)
|
|
|
flow.link_streams[link.node.name] = stream_id
|
|
|
self.udp_flow_sessions[(flow.flow_id, stream_id)] = flow
|
|
|
- include_meta = link.node.name not in flow.initialized_links
|
|
|
- body = (meta + payload) if include_meta else payload
|
|
|
- meta_len = len(meta) if include_meta else 0
|
|
|
+ body = meta + payload
|
|
|
+ meta_len = len(meta)
|
|
|
try:
|
|
|
await link.send(Frame(UDP_SEND, flow.flow_id, stream_id, 0, meta_len, body))
|
|
|
- flow.initialized_links.add(link.node.name)
|
|
|
sent_any = True
|
|
|
except Exception as exc:
|
|
|
flow.link_streams.pop(link.node.name, None)
|
|
|
self.udp_flow_sessions.pop((flow.flow_id, stream_id), None)
|
|
|
+ flow.initialized_links.discard(link.node.name)
|
|
|
flow.relay_failures[link.node.name] = flow.relay_failures.get(link.node.name, 0) + 1
|
|
|
if link.node.name not in flow.relay_error_seen:
|
|
|
flow.relay_error_seen.add(link.node.name)
|