Browse Source

增加本机多路增强

Gogs 6 ngày trước cách đây
mục cha
commit
4486b690c5

+ 32 - 0
README.md

@@ -34,6 +34,9 @@
 {
   "strategy": "top3",
   "redundancy": 3,
+  "direct_redundancy": 2,
+  "direct_max_redundancy": 3,
+  "direct_redundancy_v6": 3,
   "tcp_warmup_bytes": 1048576,
   "tcp_loser_grace_ms": 1500,
   "probe_interval": 15,
@@ -157,6 +160,35 @@ sudo /home/mynetspeeder/scripts/start-transparent.sh --kernel 24 --capture-uid $
 }
 ```
 
+
+## 单机增强 direct 竞速
+
+当前版本新增更激进的主节点 `direct` 冗余能力,默认保持 relay 逻辑不变:
+
+- `direct_redundancy`:默认 direct 并发连接数
+- `direct_max_redundancy`:当某目标或某地址族近期更偏向 relay 胜出时,允许自动放大到的最大 direct 并发数
+- `direct_redundancy_v4`:可单独指定 IPv4 目标的 direct 并发数
+- `direct_redundancy_v6`:可单独指定 IPv6 目标的 direct 并发数
+
+示例:
+
+```json
+{
+  "direct_redundancy": 2,
+  "direct_max_redundancy": 3,
+  "direct_redundancy_v4": 2,
+  "direct_redundancy_v6": 3
+}
+```
+
+说明:
+
+- 同一目标会同时发起多条 `direct` 连接
+- 谁先拿到有效下行,谁成为 winner
+- 其他 `direct` 副本会按现有 `tcp_loser_grace_ms` 延迟关闭
+- 若某个目标或某个地址族近期经常是 relay 胜出,主节点会自动把该目标的 direct 冗余数提升 1 档,但不会超过 `direct_max_redundancy`
+- 当前优化只增强主节点 direct 路径,不改变子节点 relay 内部出站逻辑
+
 ## 工作方式
 
 透明模式启动后:

BIN
__pycache__/cli.cpython-313.pyc


BIN
__pycache__/config.cpython-313.pyc


BIN
__pycache__/transparent_edge.cpython-313.pyc


+ 6 - 2
cli.py

@@ -22,6 +22,10 @@ UDP_WIN_RE = re.compile(
 )
 
 
+def normalize_winner(name: str) -> str:
+    return "direct" if name.startswith("direct") else name
+
+
 def build_parser() -> argparse.ArgumentParser:
     parser = argparse.ArgumentParser(prog="mynetspeeder")
     parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
@@ -99,7 +103,7 @@ def handle_summary(args: argparse.Namespace) -> int:
         tcp_match = TCP_WIN_RE.search(line)
         if tcp_match:
             tcp_total += 1
-            winner = tcp_match.group("winner")
+            winner = normalize_winner(tcp_match.group("winner"))
             host = tcp_match.group("host")
             port = tcp_match.group("port")
             key = f"{host}:{port}"
@@ -114,7 +118,7 @@ def handle_summary(args: argparse.Namespace) -> int:
         udp_match = UDP_WIN_RE.search(line)
         if udp_match:
             udp_total += 1
-            winner = udp_match.group("winner")
+            winner = normalize_winner(udp_match.group("winner"))
             host = udp_match.group("host")
             port = udp_match.group("port")
             key = f"{host}:{port}"

+ 3 - 0
config.json

@@ -1,6 +1,9 @@
 {
   "strategy": "top3",
   "redundancy": 3,
+  "direct_redundancy": 2,
+  "direct_max_redundancy": 3,
+  "direct_redundancy_v6": 3,
   "tcp_warmup_bytes": 1048576,
   "tcp_loser_grace_ms": 1500,
   "probe_interval": 3,

+ 8 - 0
config.py

@@ -32,6 +32,10 @@ class Config:
     tcp_connect_happy_eyeballs_delay: float | None = None
     relay_reconnect_delay: float = 3.0
     relay_tcp_nodelay: bool = True
+    direct_redundancy: int = 2
+    direct_redundancy_v4: int | None = None
+    direct_redundancy_v6: int | None = None
+    direct_max_redundancy: int = 3
 
     @classmethod
     def load(cls, path: str) -> "Config":
@@ -50,4 +54,8 @@ class Config:
             tcp_connect_happy_eyeballs_delay=raw.get("tcp_connect_happy_eyeballs_delay"),
             relay_reconnect_delay=raw.get("relay_reconnect_delay", 3.0),
             relay_tcp_nodelay=raw.get("relay_tcp_nodelay", True),
+            direct_redundancy=max(1, raw.get("direct_redundancy", 2)),
+            direct_redundancy_v4=raw.get("direct_redundancy_v4"),
+            direct_redundancy_v6=raw.get("direct_redundancy_v6"),
+            direct_max_redundancy=max(1, raw.get("direct_max_redundancy", 3)),
         )

+ 1 - 1
scripts/start_udp.sh

@@ -1,3 +1,3 @@
 #!/usr/bin/env bash
 set -euo pipefail
-exec "$(dirname "$0")/start-transparent.sh" --enable-udp "$@"
+exec "$(dirname "$0")/start-transparent.sh"  --enable-udp "$@"

+ 54 - 9
transparent_edge.py

@@ -51,6 +51,14 @@ def parse_sockaddr(raw: bytes) -> TargetAddress:
     raise ValueError(f"unsupported family={family}")
 
 
+def winner_group(name: str) -> str:
+    return "direct" if name.startswith("direct") else name
+
+
+def grouped_total(stats: dict[str, int], group: str) -> int:
+    return sum(count for name, count in stats.items() if winner_group(name) == group)
+
+
 class BasePath:
     def __init__(self, name: str, on_frame: Callable[["BasePath", str, bytes | None], Awaitable[None]]) -> None:
         self.name = name
@@ -185,6 +193,7 @@ class TransparentSession:
     loser_grace_ms: int
     stats: dict[str, int]
     target_stats: dict[tuple[str, int], dict[str, int]]
+    family_stats: dict[str, dict[str, int]]
     opened_count: int = 0
     status_count: int = 0
     errors: list[str] = field(default_factory=list)
@@ -201,14 +210,20 @@ class TransparentSession:
         key = (self.target.host, self.target.port)
         target_stats = self.target_stats.setdefault(key, {})
         target_stats[winner.name] = target_stats.get(winner.name, 0) + 1
-        direct_wins = self.stats.get("direct", 0)
-        relay_wins = sum(count for name, count in self.stats.items() if name != "direct")
-        target_direct = target_stats.get("direct", 0)
-        target_relay = sum(count for name, count in target_stats.items() if name != "direct")
-        relay_detail = ", ".join(f"{name}={count}" for name, count in sorted(self.stats.items()) if name != "direct") or "none"
-        target_detail = ", ".join(f"{name}={count}" for name, count in sorted(target_stats.items()) if name != "direct") or "none"
+        family_key = "ipv6" if self.target.family == socket.AF_INET6 else "ipv4"
+        family_stats = self.family_stats.setdefault(family_key, {})
+        family_stats[winner.name] = family_stats.get(winner.name, 0) + 1
+        direct_wins = grouped_total(self.stats, "direct")
+        relay_wins = sum(count for name, count in self.stats.items() if winner_group(name) != "direct")
+        target_direct = grouped_total(target_stats, "direct")
+        target_relay = sum(count for name, count in target_stats.items() if winner_group(name) != "direct")
+        family_direct = grouped_total(family_stats, "direct")
+        family_relay = sum(count for name, count in family_stats.items() if winner_group(name) != "direct")
+        relay_detail = ", ".join(f"{name}={count}" for name, count in sorted(self.stats.items()) if winner_group(name) != "direct") or "none"
+        target_detail = ", ".join(f"{name}={count}" for name, count in sorted(target_stats.items()) if winner_group(name) != "direct") or "none"
         target_pref = "relay" if target_relay > target_direct else "direct"
-        print(f"[edge] tcp win session={self.session_id} target={self.target.host}:{self.target.port} winner={winner.name} direct={direct_wins} relay={relay_wins} relay_breakdown={relay_detail} target_pref={target_pref} target_direct={target_direct} target_relay={target_relay} target_breakdown={target_detail}")
+        family_pref = "relay" if family_relay > family_direct else "direct"
+        print(f"[edge] tcp win session={self.session_id} target={self.target.host}:{self.target.port} winner={winner.name} direct={direct_wins} relay={relay_wins} relay_breakdown={relay_detail} target_pref={target_pref} target_direct={target_direct} target_relay={target_relay} target_breakdown={target_detail} family_pref={family_pref} family={family_key} family_direct={family_direct} family_relay={family_relay}")
 
     async def start(self) -> None:
         await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True)
@@ -503,6 +518,7 @@ class TransparentEdge:
         self.udp_gc_task: asyncio.Task | None = None
         self.tcp_win_counts: dict[str, int] = {}
         self.tcp_target_wins: dict[tuple[str, int], dict[str, int]] = {}
+        self.tcp_family_wins: dict[str, dict[str, int]] = {"ipv4": {}, "ipv6": {}}
 
     def _resolve_kernel_mode(self, cli_kernel_mode: str, config_kernel_mode: str) -> str:
         mode = cli_kernel_mode if cli_kernel_mode != "auto" else config_kernel_mode
@@ -552,6 +568,35 @@ class TransparentEdge:
             async with server4, server6:
                 await asyncio.gather(server4.serve_forever(), server6.serve_forever())
 
+    def _direct_redundancy_for_target(self, target: TargetAddress) -> int:
+        base = self.config.direct_redundancy
+        if target.family == socket.AF_INET6 and self.config.direct_redundancy_v6 is not None:
+            base = self.config.direct_redundancy_v6
+        elif target.family == socket.AF_INET and self.config.direct_redundancy_v4 is not None:
+            base = self.config.direct_redundancy_v4
+        base = max(1, min(base, self.config.direct_max_redundancy))
+        target_stats = self.tcp_target_wins.get((target.host, target.port), {})
+        family_key = "ipv6" if target.family == socket.AF_INET6 else "ipv4"
+        family_stats = self.tcp_family_wins.get(family_key, {})
+        target_prefers_relay = sum(count for name, count in target_stats.items() if winner_group(name) != "direct") > grouped_total(target_stats, "direct")
+        family_prefers_relay = sum(count for name, count in family_stats.items() if winner_group(name) != "direct") > grouped_total(family_stats, "direct")
+        if target_prefers_relay or family_prefers_relay:
+            return min(self.config.direct_max_redundancy, base + 1)
+        return base
+
+    def _build_direct_paths(self, session: TransparentSession) -> list[BasePath]:
+        count = self._direct_redundancy_for_target(session.target)
+        return [
+            DirectTcpPath(
+                name=f"direct-{index + 1}" if count > 1 else "direct",
+                on_frame=lambda path, event, payload, s=session: self._handle_tcp_session(s, path, event, payload),
+                open_timeout=self.config.direct_open_timeout,
+                happy_eyeballs_delay=self.config.tcp_connect_happy_eyeballs_delay,
+                tcp_nodelay=self.config.relay_tcp_nodelay,
+            )
+            for index in range(count)
+        ]
+
     def _start_udp_listeners(self) -> None:
         binds = []
         if self.listen_host == "127.0.0.1":
@@ -574,8 +619,8 @@ class TransparentEdge:
         try:
             target = self._get_original_dst(writer)
             session_id = next(self.session_ids)
-            session = TransparentSession(session_id=session_id, target=target, reader=reader, writer=writer, paths=[], warmup_bytes=self.config.tcp_warmup_bytes, loser_grace_ms=self.config.tcp_loser_grace_ms, stats=self.tcp_win_counts, target_stats=self.tcp_target_wins)
-            paths: list[BasePath] = [DirectTcpPath(name="direct", on_frame=lambda path, event, payload, s=session: self._handle_tcp_session(s, path, event, payload), open_timeout=self.config.direct_open_timeout, happy_eyeballs_delay=self.config.tcp_connect_happy_eyeballs_delay, tcp_nodelay=self.config.relay_tcp_nodelay)]
+            session = TransparentSession(session_id=session_id, target=target, reader=reader, writer=writer, paths=[], warmup_bytes=self.config.tcp_warmup_bytes, loser_grace_ms=self.config.tcp_loser_grace_ms, stats=self.tcp_win_counts, target_stats=self.tcp_target_wins, family_stats=self.tcp_family_wins)
+            paths: list[BasePath] = self._build_direct_paths(session)
             for connection in self.manager.available():
                 stream_id = next(self.stream_ids)
                 paths.append(RelayTcpPath(name=connection.node.name, on_frame=lambda path, event, payload, s=session: self._handle_tcp_session(s, path, event, payload), connection=connection, session_id=session_id, stream_id=stream_id))