Bläddra i källkod

新增udp独立冗余

Gogs 3 dagar sedan
förälder
incheckning
4884a91ae0
2 ändrade filer med 74 tillägg och 60 borttagningar
  1. 11 3
      relay_server.py
  2. 63 57
      socks_edge.py

+ 11 - 3
relay_server.py

@@ -22,6 +22,9 @@ class UdpSession:
     stream_id: int
     transport: asyncio.DatagramTransport | None = None
     protocol: "RelayUdpProtocol | None" = None
+    host: str = ""
+    port: int = 0
+    family: int = 0
 
 
 class RelayUdpProtocol(asyncio.DatagramProtocol):
@@ -111,17 +114,22 @@ class RelayChannel:
             await self._close_tcp(key)
             return
         if frame.kind == UDP_SEND:
-            meta = decode_json(frame.payload[: frame.packet_id])
-            payload = frame.payload[frame.packet_id :]
             session = self.udp_sessions.get(key)
+            meta = None
+            payload = frame.payload
+            if frame.packet_id > 0:
+                meta = decode_json(frame.payload[: frame.packet_id])
+                payload = frame.payload[frame.packet_id :]
             if session is None:
+                if meta is None:
+                    return
                 family = int(meta.get("family", 0)) or 0
                 transport, protocol = await asyncio.get_running_loop().create_datagram_endpoint(
                     lambda: RelayUdpProtocol(self, frame.session_id, frame.stream_id),
                     remote_addr=(meta["host"], int(meta["port"])),
                     family=family or 0,
                 )
-                session = UdpSession(frame.session_id, frame.stream_id, transport, protocol)
+                session = UdpSession(frame.session_id, frame.stream_id, transport, protocol, meta["host"], int(meta["port"]), family)
                 self.udp_sessions[key] = session
             with contextlib.suppress(Exception):
                 session.transport.sendto(payload)

+ 63 - 57
socks_edge.py

@@ -79,6 +79,8 @@ class UdpFlowState:
     duplicate_responses: int = 0
     winner_name: str | None = None
     candidate_names: tuple[str, ...] = ()
+    link_streams: dict[str, int] = field(default_factory=dict)
+    initialized_links: set[str] = field(default_factory=set)
 
     def touch(self, now: float) -> None:
         self.last_activity = now
@@ -191,8 +193,6 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         self.transport: asyncio.DatagramTransport | None = None
         self.client_addr = None
         self.packet_counter = itertools.count(1)
-        self.pending: set[int] = set()
-        self.packet_flows: dict[int, int] = {}
         self.client_flows: dict[tuple[tuple[str, int], str, int], UdpFlowState] = {}
         self.flow_counter = itertools.count(1)
         self.last_summary_at = 0.0
@@ -227,38 +227,37 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         flow.touch(now)
         flow.packets_sent += 1
         packet_id = next(self.packet_counter)
-        self.pending.add(packet_id)
-        self.packet_flows[packet_id] = flow.flow_id
         print(f"[edge] udp recv flow={flow.flow_id} packet_id={packet_id} target={host}:{port} size={len(payload)}")
-        asyncio.create_task(self.edge.forward_udp(host, port, payload, packet_id, self))
+        asyncio.create_task(self.edge.forward_udp(flow, payload, packet_id, self))
         self._log_udp_summary()
 
     async def handle_from_relay(self, frame: Frame, link: RelayLink) -> None:
-        if frame.packet_id not in self.pending or self.transport is None or self.client_addr is None:
+        if self.transport is None or self.client_addr is None:
             return
-        self.pending.discard(frame.packet_id)
-        flow_id = self.packet_flows.pop(frame.packet_id, 0)
-        host = self.edge.udp_targets.get(frame.packet_id, ("0.0.0.0", 0))[0]
-        port = self.edge.udp_targets.get(frame.packet_id, ("0.0.0.0", 0))[1]
+        flow = self.edge.udp_flow_sessions.get((frame.session_id, frame.stream_id))
+        if flow is None:
+            return
+        flow_id = flow.flow_id
+        host = flow.target_host
+        port = flow.target_port
         packet = self._build_socks_udp(host, port, frame.payload)
         winner_log = ""
-        flow = self._find_flow(flow_id)
-        if flow is not None:
-            now = asyncio.get_running_loop().time()
-            flow.touch(now)
-            flow.packets_received += 1
-            if flow.winner_name is None:
-                flow.winner_name = link.node.name
-                self.win_counts[link.node.name] = self.win_counts.get(link.node.name, 0) + 1
-                relay_detail = ", ".join(f"{name}={count}" for name, count in sorted(self.win_counts.items())) or "none"
-                print(
-                    f"[edge] udp flow={flow.flow_id} winner={link.node.name} "
-                    f"target={flow.target_host}:{flow.target_port} mode=single candidates={len(flow.candidate_names) or len(self.edge.links)}"
-                )
-                print(f"[edge] udp win relay_breakdown={relay_detail}")
-            elif flow.winner_name != link.node.name:
-                flow.duplicate_responses += 1
-                winner_log = f" duplicate=1 winner={flow.winner_name} from={link.node.name}"
+        now = asyncio.get_running_loop().time()
+        flow.touch(now)
+        flow.packets_received += 1
+        if flow.winner_name is None:
+            flow.winner_name = link.node.name
+            self.win_counts[link.node.name] = self.win_counts.get(link.node.name, 0) + 1
+            relay_detail = ", ".join(f"{name}={count}" for name, count in sorted(self.win_counts.items())) or "none"
+            mode = "redundant" if self.edge.config.udp_redundancy > 0 else "single"
+            print(
+                f"[edge] udp flow={flow.flow_id} winner={link.node.name} "
+                f"target={flow.target_host}:{flow.target_port} mode={mode} candidates={len(flow.candidate_names) or len(self.edge.links)}"
+            )
+            print(f"[edge] udp win relay_breakdown={relay_detail}")
+        elif flow.winner_name != link.node.name:
+            flow.duplicate_responses += 1
+            winner_log = f" duplicate=1 winner={flow.winner_name} from={link.node.name}"
         print(
             f"[edge] udp send flow={flow_id or 'unknown'} packet_id={frame.packet_id} "
             f"target={host}:{port} size={len(frame.payload)} relay={link.node.name}{winner_log}"
@@ -266,29 +265,15 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         self.transport.sendto(packet, self.client_addr)
         self._log_udp_summary()
 
-    def set_flow_candidates(self, packet_id: int, candidate_names: tuple[str, ...]) -> None:
-        flow_id = self.packet_flows.get(packet_id)
-        flow = self._find_flow(flow_id)
-        if flow is not None and not flow.candidate_names:
+    def set_flow_candidates(self, flow: UdpFlowState, candidate_names: tuple[str, ...]) -> None:
+        if not flow.candidate_names:
             flow.candidate_names = candidate_names
 
-    def note_unsent(self, packet_id: int) -> None:
-        flow_id = self.packet_flows.pop(packet_id, 0)
-        self.pending.discard(packet_id)
-        flow = self._find_flow(flow_id)
-        if flow is not None:
-            flow.touch(asyncio.get_running_loop().time())
-        print(f"[edge] udp drop flow={flow_id or 'unknown'} packet_id={packet_id} reason=no_available_links")
+    def note_unsent(self, flow: UdpFlowState, packet_id: int) -> None:
+        flow.touch(asyncio.get_running_loop().time())
+        print(f"[edge] udp drop flow={flow.flow_id} packet_id={packet_id} reason=no_available_links")
         self._log_udp_summary(force=True)
 
-    def _find_flow(self, flow_id: int | None) -> UdpFlowState | None:
-        if not flow_id:
-            return None
-        for flow in self.client_flows.values():
-            if flow.flow_id == flow_id:
-                return flow
-        return None
-
     def _log_udp_summary(self, force: bool = False) -> None:
         now = asyncio.get_running_loop().time()
         if not force and now - self.last_summary_at < 10:
@@ -341,7 +326,8 @@ class SocksEdge:
         self.scheduler = Scheduler(config)
         self.links: list[RelayLink] = []
         self.session_ids = itertools.count(1)
-        self.udp_targets: dict[int, tuple[str, int]] = {}
+        self.udp_stream_ids = itertools.count(1)
+        self.udp_flow_sessions: dict[tuple[int, int], UdpFlowState] = {}
         self.udp_server: UdpAssociateServer | None = None
 
     async def start(self) -> None:
@@ -394,19 +380,39 @@ class SocksEdge:
         links = [link for link in self.links if link.node.name in chosen and not link.closed]
         return links or [link for link in self.links if not link.closed][:1]
 
-    async def forward_udp(self, host: str, port: int, payload: bytes, packet_id: int, udp_server: UdpAssociateServer) -> None:
-        self.udp_targets[packet_id] = (host, port)
-        meta = encode_json({"host": host, "port": port})
-        links = self._selected_links()
+    def _selected_udp_links(self) -> list[RelayLink]:
+        online = [link for link in self.links if not link.closed]
+        if not online:
+            return []
+        ordered = sorted(online, key=lambda link: self.scheduler.scores.get(link.node.name).score if link.node.name in self.scheduler.scores else 999999.0)
+        return ordered
+
+    async def forward_udp(self, flow: UdpFlowState, payload: bytes, packet_id: int, udp_server: UdpAssociateServer) -> None:
+        meta = encode_json({"host": flow.target_host, "port": flow.target_port})
+        links = self._selected_udp_links()
         link_names = ",".join(link.node.name for link in links) or "none"
-        udp_server.set_flow_candidates(packet_id, tuple(link.node.name for link in links))
-        print(f"[edge] udp forward packet_id={packet_id} target={host}:{port} size={len(payload)} links={link_names}")
+        udp_server.set_flow_candidates(flow, tuple(link.node.name for link in links))
+        print(f"[edge] udp forward packet_id={packet_id} target={flow.target_host}:{flow.target_port} size={len(payload)} links={link_names}")
         if not links:
-            udp_server.note_unsent(packet_id)
+            udp_server.note_unsent(flow, packet_id)
             return
-        for index, link in enumerate(links):
-            body = meta + payload if index == 0 else payload
-            await link.send(Frame(UDP_SEND, 1, index, 0, packet_id if index == 0 else 0, body))
+        active_links = links if self.config.udp_always_broadcast or flow.winner_name is None else [link for link in links if link.node.name == flow.winner_name]
+        active_links = active_links or links[:1]
+        copies = max(1, self.config.udp_redundancy + 1)
+        for attempt in range(copies):
+            for link in active_links:
+                stream_id = flow.link_streams.get(link.node.name)
+                if stream_id is None:
+                    stream_id = next(self.udp_stream_ids)
+                    flow.link_streams[link.node.name] = stream_id
+                    self.udp_flow_sessions[(flow.flow_id, stream_id)] = flow
+                include_meta = link.node.name not in flow.initialized_links
+                body = (meta + payload) if include_meta else payload
+                meta_len = len(meta) if include_meta else 0
+                await link.send(Frame(UDP_SEND, flow.flow_id, stream_id, 0, meta_len, body))
+                flow.initialized_links.add(link.node.name)
+            if attempt + 1 < copies and self.config.udp_copy_interval_ms > 0:
+                await asyncio.sleep(self.config.udp_copy_interval_ms / 1000)
 
     async def _handshake(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, peer) -> tuple[str, int, bool]:
         version, methods_len = (await read_exact(reader, 2))