Procházet zdrojové kódy

短视频卡顿目测减少,其他待验证,增加基线测试

Gogs před 9 hodinami
rodič
revize
b97dd18705
8 změnil soubory, kde provedl 971 přidání a 41 odebrání
  1. 17 5
      config.json
  2. 6 0
      config.py
  3. 74 0
      docs/project-issues.md
  4. 693 0
      scripts/benchmark_local.py
  5. 4 0
      scripts/rotate-log.py
  6. 39 6
      scripts/start-transparent.sh
  7. 33 16
      socks_edge.py
  8. 105 14
      transparent_edge.py

+ 17 - 5
config.json

@@ -3,15 +3,27 @@
   "redundancy": 3,
   "direct_redundancy": 3,
   "direct_max_redundancy": 3,
-  "direct_redundancy_v6": 3,
-  "udp_direct_redundancy": 3,
-  "tcp_warmup_bytes": 2097152,
-  "tcp_loser_grace_ms": 1500,
+  "direct_redundancy_v4": 3,
+  "direct_redundancy_v6": 2,
+  "direct_ipv6_enabled": true,
+  "direct_open_timeout": 6.0,
+  "relay_open_timeout": 6.0,
+  "tcp_connect_happy_eyeballs_delay": 0.25,
+  "tcp_warmup_bytes": 1024288,
+  "tcp_loser_grace_ms": 1000,
+  "udp_redundancy": 1,
+  "udp_direct_redundancy": 2,
+  "udp_direct_redundancy_v4": 2,
+  "udp_direct_redundancy_v6": 1,
   "probe_interval": 3,
   "relay_reconnect_delay": 1,
   "relay_reconnect_max_delay": 10,
+  "udp_always_broadcast": false,
+  "udp_copy_interval_ms": 0,
+  "udp_failover_idle_ms": 1200,
   "socks_host": "127.0.0.1",
   "socks_port": 19180,
   "relays": [
-  ] 
+    {"name": "hk2", "host": "23.238.9.140", "port": 9009, "token": "130", "weight": 100}
+  ]
 }

+ 6 - 0
config.py

@@ -30,6 +30,8 @@ class Config:
     direct_open_timeout: float = 10.0
     relay_open_timeout: float = 10.0
     tcp_connect_happy_eyeballs_delay: float | None = None
+    direct_ipv6_enabled: bool = True
+    tcp_failover_idle_ms: int = 1200
     relay_reconnect_delay: float = 3.0
     relay_reconnect_attempts: int = 5
     relay_reconnect_max_delay: float = 30.0
@@ -46,6 +48,7 @@ class Config:
     udp_direct_redundancy_v6: int | None = None
     udp_always_broadcast: bool = True
     udp_copy_interval_ms: int = 8
+    udp_failover_idle_ms: int = 1200
     socks_host: str = "127.0.0.1"
     socks_port: int = 0
 
@@ -64,6 +67,8 @@ class Config:
             direct_open_timeout=raw.get("direct_open_timeout", 10.0),
             relay_open_timeout=raw.get("relay_open_timeout", 10.0),
             tcp_connect_happy_eyeballs_delay=raw.get("tcp_connect_happy_eyeballs_delay"),
+            direct_ipv6_enabled=raw.get("direct_ipv6_enabled", True),
+            tcp_failover_idle_ms=max(100, raw.get("tcp_failover_idle_ms", 1200)),
             relay_reconnect_delay=raw.get("relay_reconnect_delay", 3.0),
             relay_reconnect_attempts=max(1, raw.get("relay_reconnect_attempts", 5)),
             relay_reconnect_max_delay=max(raw.get("relay_reconnect_delay", 3.0), raw.get("relay_reconnect_max_delay", 30.0)),
@@ -80,6 +85,7 @@ class Config:
             udp_direct_redundancy_v6=raw.get("udp_direct_redundancy_v6"),
             udp_always_broadcast=raw.get("udp_always_broadcast", True),
             udp_copy_interval_ms=max(0, raw.get("udp_copy_interval_ms", 8)),
+            udp_failover_idle_ms=max(100, raw.get("udp_failover_idle_ms", 1200)),
             socks_host=raw.get("socks_host", "127.0.0.1"),
             socks_port=max(0, raw.get("socks_port", 0)),
         )

+ 74 - 0
docs/project-issues.md

@@ -0,0 +1,74 @@
+# mynetspeeder 项目问题清单
+
+> 说明:本文只做项目体检,不修改代码。按“高风险 / 中风险 / 低风险”排序,优先列出最可能影响稳定性、排障和后续维护的问题。
+
+## 高风险
+
+1. **配置缺少严格校验**
+   - 位置:`config.py:54-89`
+   - 问题:`Config.load()` 直接读取 JSON 并套默认值,缺少字段类型、取值范围、必填项的统一校验。
+   - 影响:配置写错后往往不是启动时立即失败,而是运行一段时间后才在网络路径、重连或统计逻辑里暴露,排查成本高。
+
+2. **并发状态机复杂,容易出现隐性竞态**
+   - 位置:`relay_client.py:49-235`、`transparent_edge.py:197-847`、`socks_edge.py:241-680`
+   - 问题:连接、会话、winner 选择、重连、关闭清理都依赖大量异步任务和共享状态。
+   - 影响:在高并发、短连接、抖动网络、对端异常关闭时,可能出现“会话已关但任务还在跑”“winner 已切换但旧路径未及时收敛”等边界行为。
+
+3. **异常处理偏宽,真实错误容易被吞掉**
+   - 位置:`relay_client.py`、`relay_server.py`、`transparent_edge.py`、`socks_edge.py`
+   - 问题:项目里大量使用 `except Exception`,部分分支只做 `pass` 或简单打印。
+   - 影响:服务能尽量不中断,但很多根因不会被结构化记录,后续定位问题主要依赖人工翻日志。
+
+4. **日志格式强依赖,汇总结果脆弱**
+   - 位置:`cli.py:18-295`
+   - 问题:`summary` 通过正则解析运行日志,完全依赖日志文本格式稳定。
+   - 影响:只要日志文案略有调整,统计结果就可能漏算或错算,适合临时分析,不适合作为长期可靠报表源。
+
+## 中风险
+
+1. **缺少自动化测试覆盖**
+   - 位置:仓库当前未见测试文件
+   - 问题:网络代理、中继、UDP 竞速、透明接管这类逻辑对回归特别敏感,但目前没有明显的单测或集成测试保护。
+   - 影响:改动一处逻辑,可能在另一路径上引入回归,尤其是重连、关闭、UDP 分包和 winner 收敛场景。
+
+2. **日志输出偏“人读友好”,不够机器化**
+   - 位置:`relay_client.py`、`relay_server.py`、`transparent_edge.py`、`socks_edge.py`
+   - 问题:当前更多是 `print(...)` 风格输出,缺少统一级别、字段、结构化上下文。
+   - 影响:排障时可以看,但想做持续监控、指标采集、告警联动会比较费劲。
+
+3. **UDP 路径语义较复杂,文档和实现容易产生认知偏差**
+   - 位置:`README.md`、`scripts/start-transparent.sh`、`socks_edge.py`、`transparent_edge.py`
+   - 问题:当前有“UDP 透明接管”和“显式 SOCKS5 UDP ASSOCIATE”两套入口,且 `socks_port > 0` 时会优先走 SOCKS 入口。
+   - 影响:如果用户只看配置字面意思,容易误以为 UDP 透明和 SOCKS UDP 会同时生效,实际行为需要结合启动脚本理解。
+
+4. **重连与健康检查阈值偏经验化**
+   - 位置:`config.py:31-38`、`relay_client.py:60-75`、`scheduler.py:24-58`
+   - 问题:重连间隔、ping 超时、open timeout、probe timeout 都是经验值。
+   - 影响:在不同网络质量下,过于激进会误判,过于保守会拖慢故障恢复。
+
+5. **统计与调度逻辑耦合在一起,后续可维护性一般**
+   - 位置:`scheduler.py`、`cli.py`
+   - 问题:调度器和状态统计、探测输出、汇总逻辑之间复用不多,更多是“当前能用”。
+   - 影响:未来如果要增加新策略、分层统计或更细粒度诊断,可能需要继续补很多旁路逻辑。
+
+## 低风险
+
+1. **部分默认值偏保守,未必适合所有场景**
+   - 位置:`config.py:24-53`
+   - 问题:例如 `redundancy`、`direct_redundancy`、`udp_copy_interval_ms`、`tcp_loser_grace_ms` 都是通用默认值。
+   - 影响:不是 bug,但不同线路质量下可能需要手工调参才能达到最佳效果。
+
+2. **`README.md` 信息量较大,入门成本略高**
+   - 位置:`README.md`
+   - 问题:文档已经很完整,但新用户第一次看时会觉得模式、优先级、脚本行为比较多。
+   - 影响:学习成本稍高,不过不影响核心功能。
+
+3. **脚本命名和入口较多,初次使用容易混淆**
+   - 位置:`scripts/start.sh`、`scripts/start-transparent.sh`、`scripts/start_udp.sh`、`scripts/start-relay.sh`
+   - 问题:多个脚本分别负责不同模式,功能边界清楚,但命名上对新用户不够直观。
+   - 影响:属于使用体验问题,不影响稳定性。
+
+## 总结
+
+- 这套代码已经是“可运行、可交付”的状态,核心风险不在功能缺失,而在**复杂并发场景下的可观测性和回归保护**。
+- 如果后续要继续提升,优先顺序建议是:**配置校验 → 日志结构化 → 最小测试覆盖 → UDP/TCP 边界场景验证**。

+ 693 - 0
scripts/benchmark_local.py

@@ -0,0 +1,693 @@
+#!/usr/bin/env python3
+from __future__ import annotations
+
+import argparse
+import asyncio
+import contextlib
+import random
+import ssl
+import socket
+import statistics
+import sys
+import time
+from dataclasses import dataclass
+from pathlib import Path
+from urllib.parse import urlparse
+
+ROOT = Path(__file__).resolve().parents[2]
+if str(ROOT) not in sys.path:
+    sys.path.insert(0, str(ROOT))
+
+from mynetspeeder.config import Config  # noqa: E402
+
+
+def parse_host_port(text: str) -> tuple[str, int]:
+    if text.startswith("["):
+        host, rest = text[1:].split("]", 1)
+        if not rest.startswith(":"):
+            raise ValueError(f"无效地址: {text}")
+        return host, int(rest[1:])
+    if text.count(":") == 1:
+        host, port = text.rsplit(":", 1)
+        return host, int(port)
+    raise ValueError(f"无效地址: {text}")
+
+
+def parse_url(text: str) -> tuple[str, int, str, bool]:
+    parsed = urlparse(text)
+    if parsed.scheme not in ("http", "https"):
+        raise ValueError(f"仅支持 http/https: {text}")
+    if not parsed.hostname:
+        raise ValueError(f"无效 URL: {text}")
+    port = parsed.port or (443 if parsed.scheme == "https" else 80)
+    path = parsed.path or "/"
+    if parsed.query:
+        path = f"{path}?{parsed.query}"
+    return parsed.hostname, port, path, parsed.scheme == "https"
+
+
+def parse_dns_target(text: str) -> tuple[str, int]:
+    host, port = parse_host_port(text)
+    return host, port
+
+
+def socks5_addr_bytes(host: str, port: int) -> bytes:
+    try:
+        return b"\x01" + socket.inet_aton(host) + port.to_bytes(2, "big")
+    except OSError:
+        try:
+            return b"\x04" + socket.inet_pton(socket.AF_INET6, host) + port.to_bytes(2, "big")
+        except OSError:
+            raw = host.encode()
+            return b"\x03" + bytes([len(raw)]) + raw + port.to_bytes(2, "big")
+
+
+async def socks5_greet(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
+    writer.write(b"\x05\x01\x00")
+    await writer.drain()
+    reply = await reader.readexactly(2)
+    if reply != b"\x05\x00":
+        raise ConnectionError("SOCKS5 握手失败")
+
+
+async def socks5_connect(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, host: str, port: int) -> None:
+    await socks5_greet(reader, writer)
+    writer.write(b"\x05\x01\x00" + socks5_addr_bytes(host, port))
+    await writer.drain()
+    head = await reader.readexactly(4)
+    if head[1] != 0x00:
+        raise ConnectionError(f"SOCKS5 CONNECT 失败: {head[1]}")
+    atyp = head[3]
+    if atyp == 1:
+        await reader.readexactly(4 + 2)
+    elif atyp == 3:
+        length = (await reader.readexactly(1))[0]
+        await reader.readexactly(length + 2)
+    elif atyp == 4:
+        await reader.readexactly(16 + 2)
+
+
+async def open_http_connection(host: str, port: int, timeout: float, proxy: tuple[str, int] | None, use_tls: bool) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
+    if proxy is None:
+        if use_tls:
+            return await asyncio.wait_for(asyncio.open_connection(host, port, ssl=ssl.create_default_context(), server_hostname=host), timeout=timeout)
+        return await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
+
+    proxy_reader, proxy_writer = await asyncio.wait_for(asyncio.open_connection(proxy[0], proxy[1]), timeout=timeout)
+    await socks5_connect(proxy_reader, proxy_writer, host, port)
+    if not use_tls:
+        return proxy_reader, proxy_writer
+
+    raw_sock = proxy_writer.get_extra_info("socket")
+    if raw_sock is None:
+        raise ConnectionError("SOCKS5 socket unavailable")
+    dup = raw_sock.dup()
+    dup.setblocking(False)
+    proxy_writer.close()
+    with contextlib.suppress(Exception):
+        await proxy_writer.wait_closed()
+    return await asyncio.wait_for(
+        asyncio.open_connection(ssl=ssl.create_default_context(), sock=dup, server_hostname=host),
+        timeout=timeout,
+    )
+
+
+def percentile(values: list[float], pct: float) -> float:
+    if not values:
+        return 0.0
+    ordered = sorted(values)
+    if len(ordered) == 1:
+        return ordered[0]
+    idx = (len(ordered) - 1) * pct
+    lo = int(idx)
+    hi = min(lo + 1, len(ordered) - 1)
+    weight = idx - lo
+    return ordered[lo] * (1 - weight) + ordered[hi] * weight
+
+
+def summarize(values: list[float]) -> dict[str, float]:
+    if not values:
+        return {"avg": 0.0, "p50": 0.0, "p95": 0.0, "min": 0.0, "max": 0.0}
+    return {
+        "avg": statistics.mean(values),
+        "p50": percentile(values, 0.50),
+        "p95": percentile(values, 0.95),
+        "min": min(values),
+        "max": max(values),
+    }
+
+
+def fmt_ms(value: float) -> str:
+    return f"{value:.2f}ms"
+
+
+def fmt_pct(value: float) -> str:
+    return f"{value:+.1f}%"
+
+
+def classify_change(diff_pct: float, improvement_threshold: float = 5.0, regression_threshold: float = -5.0) -> str:
+    if diff_pct <= regression_threshold:
+        return "提升"
+    if diff_pct >= improvement_threshold:
+        return "退化"
+    return "无明显变化"
+
+
+def progress_line(done: int, total: int, started_at: float) -> str:
+    if total <= 0:
+        return "进度:0%"
+    elapsed = time.perf_counter() - started_at
+    ratio = done / total
+    percent = int(ratio * 100)
+    if done <= 0:
+        remain = "估算剩余:--"
+    else:
+        total_est = elapsed / ratio
+        remain = f"估算剩余:{max(0.0, total_est - elapsed):.1f}s"
+    return f"进度:{done}/{total} ({percent}%),已用 {elapsed:.1f}s,{remain}"
+
+
+async def run_step_with_heartbeat(step_label: str, total: int, done: int, coro, started_at: float, timeout: float) -> object | None:
+    task = asyncio.create_task(coro)
+    print(f"  开始 {step_label}")
+    step_started = time.perf_counter()
+    while not task.done():
+        now = time.perf_counter()
+        if now - step_started >= timeout:
+            task.cancel()
+            with contextlib.suppress(asyncio.CancelledError):
+                await task
+            print(f"  超时 {step_label} {progress_line(done, total, started_at)}")
+            return None
+        print(f"  {step_label} {progress_line(done, total, started_at)}")
+        await asyncio.sleep(min(1.0, max(0.1, timeout - (now - step_started))))
+    result = await task
+    print(f"  完成 {step_label} {progress_line(done, total, started_at)}")
+    return result
+
+
+async def run_parallel_steps(step_label: str, total: int, done: int, coros: list[tuple[str, object]], started_at: float, timeout: float) -> list[object | None]:
+    tasks = [asyncio.create_task(coro) for _, coro in coros]
+    labels = [label for label, _ in coros]
+    print(f"  开始 {step_label}")
+    step_started = time.perf_counter()
+    while any(not task.done() for task in tasks):
+        now = time.perf_counter()
+        if now - step_started >= timeout:
+            for task in tasks:
+                if not task.done():
+                    task.cancel()
+            for task in tasks:
+                with contextlib.suppress(asyncio.CancelledError):
+                    await task
+            print(f"  超时 {step_label} {progress_line(done, total, started_at)}")
+            return [None for _ in tasks]
+        print(f"  {step_label} {progress_line(done, total, started_at)}")
+        await asyncio.sleep(min(1.0, max(0.1, timeout - (now - step_started))))
+    results: list[object | None] = []
+    for label, task in zip(labels, tasks):
+        result = await task
+        results.append(result)
+    print(f"  完成 {step_label} {progress_line(done, total, started_at)}")
+    return results
+
+
+def print_section_summary(name: str, direct_avg: float, proxy_avg: float | None, unit: str = "ms") -> None:
+    if proxy_avg is None:
+        print(f"  {name}: 仅测到直连,均值 {direct_avg:.2f}{unit}")
+        return
+    diff = proxy_avg - direct_avg
+    pct = (diff / direct_avg * 100.0) if direct_avg > 0 else 0.0
+    verdict = classify_change(pct)
+    print(f"  {name}: 直连 {direct_avg:.2f}{unit},代理 {proxy_avg:.2f}{unit},差值 {diff:+.2f}{unit},变化 {pct:+.1f}%,结论:{verdict}")
+
+
+def overall_verdict(changes: list[float]) -> str:
+    if not changes:
+        return "无可用结果"
+    score = 0
+    for change in changes:
+        if change <= -5.0:
+            score += 1
+        elif change >= 5.0:
+            score -= 1
+    if score > 0:
+        return "整体提升"
+    if score < 0:
+        return "整体退化"
+    return "整体无明显变化"
+
+
+@dataclass
+class TcpResult:
+    connect_ms: float
+    ttfb_ms: float
+    total_ms: float
+
+
+@dataclass
+class UdpResult:
+    associate_ms: float
+    first_ms: float
+    total_ms: float
+    duplicates: int
+    ok: bool
+
+
+@dataclass
+class HttpResult:
+    connect_ms: float
+    ttfb_ms: float
+    total_ms: float
+    status: int
+    ok: bool
+    error: str = ""
+
+
+@dataclass
+class DnsResult:
+    query_ms: float
+    total_ms: float
+    ok: bool
+    error: str = ""
+
+
+async def tcp_echo_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
+    try:
+        while True:
+            chunk = await reader.read(65536)
+            if not chunk:
+                break
+            writer.write(chunk)
+            await writer.drain()
+    except Exception:
+        pass
+    finally:
+        writer.close()
+        with contextlib.suppress(Exception):
+            await writer.wait_closed()
+
+
+def build_dns_query(name: str, query_id: int | None = None) -> bytes:
+    if query_id is None:
+        query_id = random.randint(0, 0xFFFF)
+    header = query_id.to_bytes(2, "big")
+    header += b"\x01\x00"  # standard query, recursion desired
+    header += b"\x00\x01"  # qdcount
+    header += b"\x00\x00"  # ancount
+    header += b"\x00\x00"  # nscount
+    header += b"\x00\x00"  # arcount
+    labels = b"".join(bytes([len(part)]) + part.encode() for part in name.strip(".").split(".") if part)
+    return header + labels + b"\x00" + b"\x00\x01" + b"\x00\x01"
+
+
+def parse_dns_response(data: bytes) -> tuple[int, bool]:
+    if len(data) < 12:
+        raise ValueError("DNS 响应过短")
+    flags = int.from_bytes(data[2:4], "big")
+    rcode = flags & 0x000F
+    qdcount = int.from_bytes(data[4:6], "big")
+    ancount = int.from_bytes(data[6:8], "big")
+    return rcode, ancount > 0 or qdcount > 0
+async def http_roundtrip(url: str, proxy: tuple[str, int] | None, timeout: float) -> HttpResult:
+    host, port, path, is_https = parse_url(url)
+    started = time.perf_counter()
+    try:
+        reader, writer = await open_http_connection(host, port, timeout, proxy, is_https)
+        connect_ms = (time.perf_counter() - started) * 1000
+        request = (
+            f"GET {path} HTTP/1.1\r\n"
+            f"Host: {host}\r\n"
+            "User-Agent: mynetspeeder-benchmark\r\n"
+            "Accept: */*\r\n"
+            "Connection: close\r\n\r\n"
+        ).encode()
+        started = time.perf_counter()
+        writer.write(request)
+        await writer.drain()
+        head = await asyncio.wait_for(reader.readuntil(b"\r\n\r\n"), timeout=timeout)
+        first_ms = (time.perf_counter() - started) * 1000
+        status_line = head.split(b"\r\n", 1)[0].decode(errors="replace")
+        status = 0
+        if status_line.startswith("HTTP/"):
+            parts = status_line.split()
+            if len(parts) >= 2 and parts[1].isdigit():
+                status = int(parts[1])
+        body = await reader.read()
+        total_ms = (time.perf_counter() - started) * 1000
+        ok = 200 <= status < 400 and bool(body or head)
+        return HttpResult(connect_ms, first_ms, total_ms, status, ok)
+    except Exception:
+        return HttpResult(0.0, 0.0, 0.0, 0, False, error="http 请求失败")
+    finally:
+        with contextlib.suppress(Exception):
+            writer.close()  # type: ignore[name-defined]
+            await writer.wait_closed()  # type: ignore[name-defined]
+
+
+async def dns_roundtrip(server_host: str, server_port: int, proxy: tuple[str, int] | None, timeout: float) -> DnsResult:
+    query = build_dns_query("www.google.com")
+    loop = asyncio.get_running_loop()
+    started = time.perf_counter()
+    query_started = started
+    try:
+        if proxy is None:
+            sock = socket.socket(socket.AF_INET6 if ":" in server_host else socket.AF_INET, socket.SOCK_DGRAM)
+            sock.setblocking(False)
+            await loop.sock_connect(sock, (server_host, server_port))
+            try:
+                await loop.sock_sendall(sock, query)
+                data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout)
+            finally:
+                sock.close()
+        else:
+            reader, writer, relay, associate_ms = await asyncio.wait_for(socks5_udp_associate(proxy[0], proxy[1]), timeout=timeout)
+            sock = socket.socket(socket.AF_INET6 if ":" in relay[0] else socket.AF_INET, socket.SOCK_DGRAM)
+            sock.setblocking(False)
+            await loop.sock_connect(sock, relay)
+            try:
+                packet = b"\x00\x00\x00" + encode_socks_addr(server_host, server_port) + query
+                await loop.sock_sendall(sock, packet)
+                data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout)
+                data = strip_socks_udp(data)
+            finally:
+                writer.close()
+                with contextlib.suppress(Exception):
+                    await writer.wait_closed()
+                sock.close()
+            query_started = query_started + associate_ms / 1000.0
+        query_ms = (time.perf_counter() - query_started) * 1000
+        rcode, ok = parse_dns_response(data)
+        return DnsResult(query_ms=query_ms, total_ms=(time.perf_counter() - started) * 1000, ok=ok and rcode == 0)
+    except Exception:
+        return DnsResult(0.0, 0.0, False, error="dns 查询失败")
+
+
+async def bench_http(args, proxy: tuple[str, int] | None) -> None:
+    direct_results: list[HttpResult] = []
+    proxy_results: list[HttpResult] = []
+    total_steps = args.count * (2 if proxy is not None else 1)
+    started_at = time.perf_counter()
+    print("TCP/HTTPS 正式测试开始")
+    for index in range(args.count):
+        step_base = index * (2 if proxy is not None else 1)
+        if proxy is None:
+            direct_result = await run_step_with_heartbeat(
+                f"HTTPS 直连 第{index + 1}步",
+                total_steps,
+                step_base + 1,
+                http_roundtrip(args.http_url, None, args.timeout),
+                started_at,
+                timeout=max(1.0, args.timeout + 1.0),
+            )
+            if isinstance(direct_result, HttpResult):
+                direct_results.append(direct_result)
+            continue
+        results = await run_parallel_steps(
+            f"HTTPS 第{index + 1}轮",
+            total_steps,
+            step_base + 1,
+            [
+                (f"HTTPS 直连 第{index + 1}步", http_roundtrip(args.http_url, None, args.timeout)),
+                (f"HTTPS 代理 第{index + 1}步", http_roundtrip(args.http_url, proxy, args.timeout)),
+            ],
+            started_at,
+            timeout=max(1.0, args.timeout + 1.0),
+        )
+        if isinstance(results[0], HttpResult):
+            direct_results.append(results[0])
+        if isinstance(results[1], HttpResult):
+            proxy_results.append(results[1])
+
+    print(f"HTTP 目标: {args.http_url}")
+    direct_ok = [item for item in direct_results if item.ok]
+    print(f"  直连: 成功 {len(direct_ok)}/{len(direct_results)},首包均值 {fmt_ms(summarize([item.ttfb_ms for item in direct_ok])['avg'])},总耗时均值 {fmt_ms(summarize([item.total_ms for item in direct_ok])['avg'])}")
+    if direct_results and not direct_ok:
+        print(f"  直连失败原因: {direct_results[0].error or '未知'}")
+    if proxy is None or not proxy_results:
+        print("  代理: 未执行或不可用")
+        return
+    proxy_ok = [item for item in proxy_results if item.ok]
+    direct_ttfb = summarize([item.ttfb_ms for item in direct_ok])["avg"]
+    proxy_ttfb = summarize([item.ttfb_ms for item in proxy_ok])["avg"]
+    direct_total = summarize([item.total_ms for item in direct_ok])["avg"]
+    proxy_total = summarize([item.total_ms for item in proxy_ok])["avg"]
+    ttfb_pct = ((proxy_ttfb - direct_ttfb) / direct_ttfb * 100.0) if direct_ttfb > 0 else 0.0
+    total_pct = ((proxy_total - direct_total) / direct_total * 100.0) if direct_total > 0 else 0.0
+    print(f"  代理: 成功 {len(proxy_ok)}/{len(proxy_results)},首包均值 {fmt_ms(proxy_ttfb)},总耗时均值 {fmt_ms(proxy_total)}")
+    print(f"  对比: 首包变化 {fmt_pct(ttfb_pct)},总耗时变化 {fmt_pct(total_pct)},结论:{classify_change(total_pct)}")
+
+
+async def bench_dns(args, proxy: tuple[str, int] | None) -> None:
+    direct_results: list[DnsResult] = []
+    proxy_results: list[DnsResult] = []
+    total_steps = args.count * (2 if proxy is not None else 1)
+    started_at = time.perf_counter()
+    print("UDP/DNS 正式测试开始")
+    for index in range(args.count):
+        step_base = index * (2 if proxy is not None else 1)
+        if proxy is None:
+            direct_result = await run_step_with_heartbeat(
+                f"DNS 直连 第{index + 1}步",
+                total_steps,
+                step_base + 1,
+                dns_roundtrip(args.dns_server_host, args.dns_server_port, None, args.timeout),
+                started_at,
+                timeout=max(1.0, args.timeout + 1.0),
+            )
+            if isinstance(direct_result, DnsResult):
+                direct_results.append(direct_result)
+            continue
+        results = await run_parallel_steps(
+            f"DNS 第{index + 1}轮",
+            total_steps,
+            step_base + 1,
+            [
+                (f"DNS 直连 第{index + 1}步", dns_roundtrip(args.dns_server_host, args.dns_server_port, None, args.timeout)),
+                (f"DNS 代理 第{index + 1}步", dns_roundtrip(args.dns_server_host, args.dns_server_port, proxy, args.timeout)),
+            ],
+            started_at,
+            timeout=max(1.0, args.timeout + 1.0),
+        )
+        if isinstance(results[0], DnsResult):
+            direct_results.append(results[0])
+        if isinstance(results[1], DnsResult):
+            proxy_results.append(results[1])
+
+    direct_ok = [item for item in direct_results if item.ok]
+    print(f"DNS 目标: {args.dns_server_host}:{args.dns_server_port}")
+    print(f"  直连: 成功 {len(direct_ok)}/{len(direct_results)},查询均值 {fmt_ms(summarize([item.query_ms for item in direct_ok])['avg'])}")
+    if direct_results and not direct_ok:
+        print(f"  直连失败原因: {direct_results[0].error or '未知'}")
+    if proxy is None or not proxy_results:
+        print("  代理: 未执行或不可用")
+        return
+    proxy_ok = [item for item in proxy_results if item.ok]
+    direct_query = summarize([item.query_ms for item in direct_ok])["avg"]
+    proxy_query = summarize([item.query_ms for item in proxy_ok])["avg"]
+    delta = proxy_query - direct_query
+    pct = (delta / direct_query * 100.0) if direct_query > 0 else 0.0
+    print(f"  代理: 成功 {len(proxy_ok)}/{len(proxy_results)},查询均值 {fmt_ms(proxy_query)}")
+    print(f"  对比: DNS 查询变化 {fmt_pct(pct)},结论:{classify_change(pct)}")
+
+
+async def bench_tcp(args, target: tuple[str, int], proxy: tuple[str, int] | None) -> None:
+    direct_results: list[TcpResult] = []
+    proxy_results: list[TcpResult] = []
+    warmup_steps = args.warmup * (2 if proxy is not None else 1)
+
+    async def warmup_runner(step: int) -> None:
+        if proxy is None:
+            with contextlib.suppress(Exception):
+                await tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)
+            return
+        if step % 2 == 1:
+            with contextlib.suppress(Exception):
+                await tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)
+        else:
+            with contextlib.suppress(Exception):
+                await tcp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)
+
+    if warmup_steps > 0:
+        print("TCP 预热开始")
+        for step in range(1, warmup_steps + 1):
+            await warmup_runner(step)
+            print(f"  预热进度:{step}/{warmup_steps}")
+    total_steps = args.count * (2 if proxy is not None else 1)
+    started_at = time.perf_counter()
+    print("TCP 正式测试开始")
+    for index in range(args.count):
+        step_base = index * (2 if proxy is not None else 1)
+        if proxy is None:
+            direct_result = await run_step_with_heartbeat(
+                f"TCP 直连 第{index + 1}步",
+                total_steps,
+                step_base + 1,
+                tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout),
+                started_at,
+                timeout=max(1.0, args.timeout + 1.0),
+            )
+            if isinstance(direct_result, TcpResult):
+                direct_results.append(direct_result)
+            continue
+        results = await run_parallel_steps(
+            f"TCP 第{index + 1}轮",
+            total_steps,
+            step_base + 1,
+            [
+                (f"TCP 直连 第{index + 1}步", tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)),
+                (f"TCP 代理 第{index + 1}步", tcp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)),
+            ],
+            started_at,
+            timeout=max(1.0, args.timeout + 1.0),
+        )
+        if isinstance(results[0], TcpResult):
+            direct_results.append(results[0])
+        if isinstance(results[1], TcpResult):
+            proxy_results.append(results[1])
+    direct = {
+        "connect": summarize([item.connect_ms for item in direct_results]),
+        "ttfb": summarize([item.ttfb_ms for item in direct_results]),
+        "total": summarize([item.total_ms for item in direct_results]),
+    }
+    print(f"TCP 目标: {target[0]}:{target[1]}")
+    print(f"  直连: 成功 {len(direct_results)}/{len(direct_results)},连接均值 {fmt_ms(direct['connect']['avg'])},首包均值 {fmt_ms(direct['ttfb']['avg'])},总耗时均值 {fmt_ms(direct['total']['avg'])}")
+    print(f"  直连: 连接 p50/p95 {fmt_ms(direct['connect']['p50'])} / {fmt_ms(direct['connect']['p95'])},总耗时 p50/p95 {fmt_ms(direct['total']['p50'])} / {fmt_ms(direct['total']['p95'])}")
+    if proxy is None or not proxy_results:
+        print("  代理: 未执行或不可用")
+        return
+    proxy_stats = {
+        "connect": summarize([item.connect_ms for item in proxy_results]),
+        "ttfb": summarize([item.ttfb_ms for item in proxy_results]),
+        "total": summarize([item.total_ms for item in proxy_results]),
+    }
+    diff = proxy_stats["total"]["avg"] - direct["total"]["avg"]
+    pct = (diff / direct["total"]["avg"] * 100.0) if direct["total"]["avg"] > 0 else 0.0
+    print(f"  代理: 成功 {len(proxy_results)}/{len(proxy_results)},连接均值 {fmt_ms(proxy_stats['connect']['avg'])},首包均值 {fmt_ms(proxy_stats['ttfb']['avg'])},总耗时均值 {fmt_ms(proxy_stats['total']['avg'])}")
+    print(f"  代理: 连接 p50/p95 {fmt_ms(proxy_stats['connect']['p50'])} / {fmt_ms(proxy_stats['connect']['p95'])},总耗时 p50/p95 {fmt_ms(proxy_stats['total']['p50'])} / {fmt_ms(proxy_stats['total']['p95'])}")
+    print(f"  对比: 代理总耗时相对直连 {fmt_ms(diff)},变化 {fmt_pct(pct)},结论:{classify_change(pct)}")
+    print_section_summary("连接耗时", direct["connect"]["avg"], proxy_stats["connect"]["avg"])
+    print_section_summary("首包耗时", direct["ttfb"]["avg"], proxy_stats["ttfb"]["avg"])
+    print_section_summary("总耗时", direct["total"]["avg"], proxy_stats["total"]["avg"])
+    print(f"  TCP 总结:{overall_verdict([pct])}")
+
+
+async def bench_udp(args, target: tuple[str, int], proxy: tuple[str, int] | None) -> None:
+    direct_results: list[UdpResult] = []
+    proxy_results: list[UdpResult] = []
+    warmup_steps = args.warmup * (2 if proxy is not None else 1)
+
+    async def warmup_runner(step: int) -> None:
+        if proxy is None:
+            with contextlib.suppress(Exception):
+                await udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)
+            return
+        if step % 2 == 1:
+            with contextlib.suppress(Exception):
+                await udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)
+        else:
+            with contextlib.suppress(Exception):
+                await udp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)
+
+    if warmup_steps > 0:
+        print("UDP 预热开始")
+        for step in range(1, warmup_steps + 1):
+            await warmup_runner(step)
+            print(f"  预热进度:{step}/{warmup_steps}")
+    total_steps = args.count * (2 if proxy is not None else 1)
+    started_at = time.perf_counter()
+    print("UDP 正式测试开始")
+    for index in range(args.count):
+        step_base = index * (2 if proxy is not None else 1)
+        if proxy is None:
+            direct_result = await run_step_with_heartbeat(
+                f"UDP 直连 第{index + 1}步",
+                total_steps,
+                step_base + 1,
+                udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout),
+                started_at,
+                timeout=max(1.0, args.timeout + 1.0),
+            )
+            if isinstance(direct_result, UdpResult):
+                direct_results.append(direct_result)
+            continue
+        results = await run_parallel_steps(
+            f"UDP 第{index + 1}轮",
+            total_steps,
+            step_base + 1,
+            [
+                (f"UDP 直连 第{index + 1}步", udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)),
+                (f"UDP 代理 第{index + 1}步", udp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)),
+            ],
+            started_at,
+            timeout=max(1.0, args.timeout + 1.0),
+        )
+        if isinstance(results[0], UdpResult):
+            direct_results.append(results[0])
+        if isinstance(results[1], UdpResult):
+            proxy_results.append(results[1])
+    direct_ok = [item for item in direct_results if item.ok]
+    print(f"UDP 目标: {target[0]}:{target[1]}")
+    print(f"  直连: 成功 {len(direct_ok)}/{len(direct_results)},首包均值 {fmt_ms(summarize([item.first_ms for item in direct_ok])['avg'])},重复包 {sum(item.duplicates for item in direct_ok)}")
+    if proxy is None or not proxy_results:
+        print("  代理: 未执行或不可用")
+        return
+    proxy_ok = [item for item in proxy_results if item.ok]
+    proxy_first = summarize([item.first_ms for item in proxy_ok])["avg"]
+    direct_first = summarize([item.first_ms for item in direct_ok])["avg"]
+    delta = proxy_first - direct_first
+    pct = (delta / direct_first * 100.0) if direct_first > 0 else 0.0
+    print(f"  代理: 成功 {len(proxy_ok)}/{len(proxy_results)},关联均值 {fmt_ms(summarize([item.associate_ms for item in proxy_ok])['avg'])},首包均值 {fmt_ms(proxy_first)},重复包 {sum(item.duplicates for item in proxy_ok)}")
+    print(f"  对比: 代理首包相对直连 {fmt_ms(delta)},变化 {fmt_pct(pct)},结论:{classify_change(pct)}")
+    print(f"  UDP 总结:{overall_verdict([pct])}")
+
+
+async def amain(args) -> int:
+    config = Config.load(args.config) if args.config and Path(args.config).exists() else None
+    proxy: tuple[str, int] | None = None
+    if config is not None and config.socks_port > 0:
+        proxy = (config.socks_host, config.socks_port)
+    if args.proxy_host and args.proxy_port:
+        proxy = (args.proxy_host, args.proxy_port)
+
+    print("本地基准测试开始")
+    print(f"  样本数: {args.count},热身: {args.warmup},载荷: {args.payload_size} 字节")
+    if proxy is not None:
+        print(f"  代理: {proxy[0]}:{proxy[1]}")
+    else:
+        print("  代理: 未配置或未启动,将只测直连")
+    print(f"  HTTP 目标: {args.http_url}")
+    print(f"  DNS 目标: {args.dns_server_host}:{args.dns_server_port}")
+    print("")
+
+    if args.mode in ("tcp", "all"):
+        await bench_http(args, proxy)
+        print("")
+    if args.mode in ("udp", "all"):
+        await bench_dns(args, proxy)
+    return 0
+
+
+def build_parser() -> argparse.ArgumentParser:
+    parser = argparse.ArgumentParser(description="mynetspeeder 本地手工基线测试")
+    parser.add_argument("--config", default="/home/mynetspeeder/config.json", help="配置文件路径,用于读取默认 socks 端口")
+    parser.add_argument("--proxy-host", default="", help="SOCKS5 代理地址,默认读取 config.json")
+    parser.add_argument("--proxy-port", type=int, default=0, help="SOCKS5 代理端口,默认读取 config.json")
+    parser.add_argument("--http-url", default="https://research.google/blog/", help="HTTPS 测试 URL")
+    parser.add_argument("--dns-server", default="8.8.8.8:53", help="DNS 测试服务器,格式 host:port")
+    parser.add_argument("--mode", choices=("tcp", "udp", "all"), default="all", help="只测 TCP、只测 UDP 或都测")
+    parser.add_argument("--count", type=int, default=8, help="正式样本数,默认 8,尽量控制在 2-5 分钟内完成")
+    parser.add_argument("--warmup", type=int, default=0, help="热身轮数,默认 0;仅用于平滑首次连接抖动")
+    parser.add_argument("--payload-size", type=int, default=4096, help="每次请求的载荷大小")
+    parser.add_argument("--timeout", type=float, default=2.0, help="单次请求超时秒数,默认 2.0")
+    return parser
+
+
+def main() -> int:
+    args = build_parser().parse_args()
+    args.dns_server_host, args.dns_server_port = parse_dns_target(args.dns_server)
+    return asyncio.run(amain(args))
+
+
+if __name__ == "__main__":
+    raise SystemExit(main())

+ 4 - 0
scripts/rotate-log.py

@@ -1,6 +1,7 @@
 #!/usr/bin/env python3
 from __future__ import annotations
 
+from datetime import datetime
 import os
 import sys
 from pathlib import Path
@@ -37,6 +38,9 @@ def main() -> int:
         chunk = stream.readline()
         if not chunk:
             break
+        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        text = chunk.decode(errors="replace").rstrip("\n")
+        chunk = f"[{timestamp}] {text}\n".encode()
         with log_path.open("ab") as handle:
             handle.write(chunk)
         try:

+ 39 - 6
scripts/start-transparent.sh

@@ -20,6 +20,7 @@ KERNEL_MODE="${MYNETSPEEDER_KERNEL_MODE:-auto}"
 CONFIG_PATH="/home/mynetspeeder/config.json"
 CAPTURE_UID="${MYNETSPEEDER_CAPTURE_UID:-}"
 UDP_CAPTURE_EFFECTIVE=0
+STARTUP_WARNINGS=()
 
 while [[ $# -gt 0 ]]; do
   case "$1" in
@@ -111,6 +112,29 @@ ensure_rule() {
   fi
 }
 
+wait_for_listen() {
+  local host="$1"
+  local port="$2"
+  local log_file="$3"
+  local log_pattern="$4"
+  local label="$5"
+  local timeout_sec="${6:-10}"
+  local deadline=$((SECONDS + timeout_sec))
+  local pattern
+  pattern="${host//./\\.}:${port}( |$)"
+  while (( SECONDS < deadline )); do
+    if ss -H -lntp 2>/dev/null | grep -qE "$pattern"; then
+      return 0
+    fi
+    if [[ -f "$log_file" ]] && grep -qE "$log_pattern" "$log_file" 2>/dev/null; then
+      return 0
+    fi
+    sleep 0.2
+  done
+  echo "$label failed to listen"
+  return 1
+}
+
 add_exclusions_v4() {
   iptables -t nat -A "$CHAIN4" -m addrtype --dst-type LOCAL -j RETURN
   for cidr in $SELF_EXCLUDE_V4; do
@@ -128,7 +152,7 @@ cfg = json.load(open(sys.argv[1]))
 for relay in cfg.get('relays', []):
     print(relay['host'])
 PY
-)
+) || true
 }
 
 add_exclusions_v6() {
@@ -148,7 +172,7 @@ cfg = json.load(open(sys.argv[1]))
 for relay in cfg.get('relays', []):
     print(relay['host'])
 PY
-)
+) || true
 }
 
 add_udp_exclusions_v4() {
@@ -179,8 +203,10 @@ if [[ "$SOCKS_PORT" -gt 0 ]]; then
   runuser -u "$RUNTIME_USER" -- bash -lc "export PYTHONUNBUFFERED=1; export PYTHONPATH=/home; cd /home && exec nohup python3 -m mynetspeeder socks --listen-host ${SOCKS_HOST} --listen-port ${SOCKS_PORT} --config ${CONFIG_PATH} 2>&1 | python3 /home/mynetspeeder/scripts/rotate-log.py ${SOCKS_LOG_FILE} ${LOG_MAX_BYTES} ${LOG_BACKUPS}" &
   SOCKS_PID=$!
   echo "$SOCKS_PID" > "$SOCKS_PID_FILE"
-  sleep 1
-  ss -lntp | grep -qE "${SOCKS_HOST//./\\.}:${SOCKS_PORT}( |$)" || { echo "socks failed to listen"; tail -n 50 "$SOCKS_LOG_FILE" || true; exit 1; }
+  if ! wait_for_listen "$SOCKS_HOST" "$SOCKS_PORT" "$SOCKS_LOG_FILE" "socks5 listening on" "socks" 15; then
+    STARTUP_WARNINGS+=("socks listen not confirmed")
+    tail -n 50 "$SOCKS_LOG_FILE" || true
+  fi
 fi
 
 if [[ "$ENABLE_UDP" == "1" && "$SOCKS_PORT" -gt 0 ]]; then
@@ -196,8 +222,10 @@ fi
 runuser -u "$RUNTIME_USER" -- bash -lc "export PYTHONUNBUFFERED=1; export PYTHONPATH=/home; cd /home && exec nohup python3 -m mynetspeeder edge --listen-host ${LISTEN_HOST} --listen-port ${LISTEN_PORT} --kernel ${KERNEL_MODE} --config ${CONFIG_PATH} ${EDGE_UDP_FLAG} 2>&1 | python3 /home/mynetspeeder/scripts/rotate-log.py ${LOG_FILE} ${LOG_MAX_BYTES} ${LOG_BACKUPS}" &
 EDGE_PID=$!
 echo "$EDGE_PID" > "$PID_FILE"
-sleep 1
-ss -ln | grep -qE "[:.]${LISTEN_PORT}( |$)" || { echo "edge failed to listen"; tail -n 50 "$LOG_FILE" || true; exit 1; }
+if ! wait_for_listen "$LISTEN_HOST" "$LISTEN_PORT" "$LOG_FILE" "transparent tcp listening on" "edge" 15; then
+  STARTUP_WARNINGS+=("edge listen not confirmed")
+  tail -n 50 "$LOG_FILE" || true
+fi
 
 iptables -t nat -N "$CHAIN4" 2>/dev/null || true
 iptables -t nat -F "$CHAIN4"
@@ -288,6 +316,9 @@ echo "log file: $LOG_FILE"
 echo "log max: ${LOG_MAX_MB}MB x ${LOG_BACKUPS}"
 echo "ipv4 chain rules: $RULES_V4"
 echo "ipv6 chain rules: $RULES_V6"
+if [[ ${#STARTUP_WARNINGS[@]} -gt 0 ]]; then
+  echo "startup warnings: ${STARTUP_WARNINGS[*]}"
+fi
 
 echo "self-check: ok"
 
@@ -295,3 +326,5 @@ if [[ "$VERBOSE" == "1" ]]; then
   echo "verbose mode: press Ctrl+C to stop viewing logs, service keeps running"
   exec tail -n 80 -f "$LOG_FILE"
 fi
+
+exit 0

+ 33 - 16
socks_edge.py

@@ -101,6 +101,7 @@ class UdpFlowState:
     direct_failures: set[str] = field(default_factory=set)
     relay_failures: dict[str, int] = field(default_factory=dict)
     relay_error_seen: set[str] = field(default_factory=set)
+    path_last_seen: dict[str, float] = field(default_factory=dict)
 
     def touch(self, now: float) -> None:
         self.last_activity = now
@@ -260,18 +261,14 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         self._log_udp_summary()
 
     def _reset_client_state(self, addr) -> None:
+        old_addr = self.client_addr
+        remapped_flows: dict[tuple[tuple[str, int], str, int], UdpFlowState] = {}
         for flow in list(self.client_flows.values()):
-            for task in list(flow.direct_tasks.values()):
-                task.cancel()
-            for sock in list(flow.direct_sockets.values()):
-                with contextlib.suppress(Exception):
-                    sock.close()
-            for stream_id in list(flow.link_streams.values()):
-                self.edge.udp_flow_sessions.pop((flow.flow_id, stream_id), None)
-        self.client_flows.clear()
+            flow.client_addr = (addr[0], addr[1])
+            remapped_flows[((addr[0], addr[1]), flow.target_host, flow.target_port)] = flow
+        self.client_flows = remapped_flows
         self.client_addr = addr
-        self.win_counts.clear()
-        print(f"[edge] udp client bound addr={addr[0]}:{addr[1]}")
+        print(f"[edge] udp client rebound migrated old={old_addr[0]}:{old_addr[1]} new={addr[0]}:{addr[1]} flows={len(self.client_flows)}")
 
     async def handle_from_relay(self, frame: Frame, link: RelayLink) -> None:
         if self.transport is None or self.client_addr is None:
@@ -292,6 +289,7 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         packet = self._build_socks_udp(flow.target_host, flow.target_port, payload)
         now = asyncio.get_running_loop().time()
         flow.touch(now)
+        flow.path_last_seen[source_name] = now
         flow.packets_received += 1
         if flow.winner_name is None:
             flow.winner_name = source_name
@@ -299,6 +297,11 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
             self._log_udp_summary(force=True)
         elif flow.winner_name != source_name:
             flow.duplicate_responses += 1
+            winner_last_seen = flow.path_last_seen.get(flow.winner_name, 0.0)
+            if winner_last_seen and now - winner_last_seen >= (self.edge.config.udp_failover_idle_ms / 1000):
+                flow.winner_name = source_name
+                self.win_counts[source_name] = self.win_counts.get(source_name, 0) + 1
+                self._log_udp_summary(force=True)
         if flow.winner_name == source_name:
             self.transport.sendto(packet, self.client_addr)
 
@@ -323,7 +326,14 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         duplicates = sum(flow.duplicate_responses for flow in self.client_flows.values())
         direct_paths = sum(len(flow.direct_sockets) for flow in self.client_flows.values())
         relay_candidates = sum(len(flow.link_streams) for flow in self.client_flows.values())
-        winner_detail = ", ".join(f"{flow.flow_id}:{flow.winner_name}" for flow in self.client_flows.values() if flow.winner_name) or "none"
+        direct_wins = sum(1 for flow in self.client_flows.values() if flow.winner_name and flow.winner_name.startswith("direct"))
+        relay_wins = winners - direct_wins
+        sample_flows = [
+            f"{flow.flow_id}:{flow.winner_name or 'pending'}"
+            for flow in sorted(self.client_flows.values(), key=lambda item: item.flow_id)
+            if flow.winner_name
+        ][:5]
+        winner_detail = ", ".join(sample_flows) or "none"
         relay_errors: list[str] = []
         for flow in self.client_flows.values():
             for name, count in flow.relay_failures.items():
@@ -331,13 +341,16 @@ class UdpAssociateServer(asyncio.DatagramProtocol):
         relay_error_detail = ", ".join(sorted(relay_errors)) or "none"
         if self.client_addr:
             print(
-                f"[edge] udp summary bind={self.client_addr[0]}:{self.client_addr[1]} active_flows={active_flows} "
-                f"winner_flows={winners} winner_detail={winner_detail} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} "
+                f"[edge] udp summary bind={self.client_addr[0]}:{self.client_addr[1]} flows={active_flows} winners={winners} "
+                f"winner_breakdown=direct={direct_wins},relay={relay_wins} sample={winner_detail} "
+                f"sent={packets_sent} recv={packets_received} dup={duplicates} "
                 f"direct_paths={direct_paths} relay_paths={relay_candidates} relay_errors={relay_error_detail}"
             )
         else:
             print(
-                f"[edge] udp summary bind=unbound active_flows={active_flows} winner_flows={winners} winner_detail={winner_detail} packets_sent={packets_sent} packets_received={packets_received} dup={duplicates} "
+                f"[edge] udp summary bind=unbound flows={active_flows} winners={winners} "
+                f"winner_breakdown=direct={direct_wins},relay={relay_wins} sample={winner_detail} "
+                f"sent={packets_sent} recv={packets_received} dup={duplicates} "
                 f"direct_paths={direct_paths} relay_paths={relay_candidates} relay_errors={relay_error_detail}"
             )
 
@@ -498,6 +511,9 @@ class SocksEdge:
         active_direct_names = list(direct_names)
         active_links = links
         if not (self.config.udp_always_broadcast or flow.winner_name is None):
+            winner_last_seen = flow.path_last_seen.get(flow.winner_name, 0.0) if flow.winner_name else 0.0
+            if winner_last_seen and asyncio.get_running_loop().time() - winner_last_seen >= (self.config.udp_failover_idle_ms / 1000):
+                flow.winner_name = None
             active_direct_names = [name for name in active_direct_names if name == flow.winner_name]
             active_links = [link for link in active_links if link.node.name == flow.winner_name]
         if not active_direct_names and not active_links:
@@ -576,9 +592,10 @@ class SocksEdge:
         port = struct.unpack("!H", await read_exact(reader, 2))[0]
         peer_text = f"{peer[0]}:{peer[1]}" if isinstance(peer, tuple) and len(peer) >= 2 else str(peer)
         if command == 1:
-            writer.write(b"\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00")
+            print(f"[edge] socks handshake peer={peer_text} command=connect target={host}:{port}")
+            writer.write(b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00")
             await writer.drain()
-            raise ValueError("tcp connect disabled in socks udp-only mode")
+            return host, port, False
         if command == 3 and self.udp_server and self.udp_server.transport:
             bind_host, bind_port = self.udp_server.transport.get_extra_info("sockname")[:2]
             self.udp_server.register_associate(peer)

+ 105 - 14
transparent_edge.py

@@ -202,6 +202,7 @@ class TransparentSession:
     paths: list[BasePath]
     warmup_bytes: int
     loser_grace_ms: int
+    tcp_failover_idle_ms: int
     stats: dict[str, int]
     target_stats: dict[tuple[str, int], dict[str, int]]
     family_stats: dict[str, dict[str, int]]
@@ -215,6 +216,21 @@ class TransparentSession:
     closed: bool = False
     pump_task: asyncio.Task | None = None
     loser_close_task: asyncio.Task | None = None
+    open_tasks: list[asyncio.Task] = field(default_factory=list)
+    backup_path: BasePath | None = None
+    last_winner_data_at: float = 0.0
+    failover_task: asyncio.Task | None = None
+
+    def _select_backup_path(self, winner: BasePath) -> BasePath | None:
+        candidates = [path for path in self.paths if path is not winner and path.opened and not path.closed]
+        if not candidates:
+            return None
+        winner_is_direct = winner_group(winner.name) == "direct"
+        # Prefer the opposite group to increase failover diversity.
+        opposite = [path for path in candidates if (winner_group(path.name) == "direct") != winner_is_direct]
+        pool = opposite or candidates
+        # Keep the first eligible path as a synchronized backup.
+        return pool[0]
 
     def _record_win(self, winner: BasePath) -> None:
         self.stats[winner.name] = self.stats.get(winner.name, 0) + 1
@@ -237,11 +253,12 @@ class TransparentSession:
         print(f"[edge] tcp win session={self.session_id} target={self.target.host}:{self.target.port} winner={winner.name} direct={direct_wins} relay={relay_wins} relay_breakdown={relay_detail} target_pref={target_pref} target_direct={target_direct} target_relay={target_relay} target_breakdown={target_detail} family_pref={family_pref} family={family_key} family_direct={family_direct} family_relay={family_relay}")
 
     async def start(self) -> None:
-        await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True)
-        await asyncio.wait_for(self.open_event.wait(), timeout=15)
+        self.open_tasks = [asyncio.create_task(path.open(self.target)) for path in self.paths]
+        await asyncio.wait_for(self.open_event.wait(), timeout=8)
         if self.opened_count == 0:
             raise ConnectionError(self.errors[0] if self.errors else "all paths failed")
         self.pump_task = asyncio.create_task(self._pump_local())
+        self.failover_task = asyncio.create_task(self._watch_failover())
 
     async def _pump_local(self) -> None:
         try:
@@ -259,12 +276,39 @@ class TransparentSession:
                     if self.winner is None:
                         await self.winner_event.wait()
                     if self.winner:
-                        await self.winner.send(chunk)
+                        send_targets = [self.winner]
+                        if self.backup_path and self.backup_path.opened and not self.backup_path.closed and self.backup_path is not self.winner:
+                            send_targets.append(self.backup_path)
+                        await asyncio.gather(*(path.send(chunk) for path in send_targets), return_exceptions=True)
         except Exception:
             pass
         finally:
             await self.close()
 
+    async def _watch_failover(self) -> None:
+        try:
+            while not self.closed:
+                await asyncio.sleep(0.2)
+                if self.winner is None:
+                    continue
+                if self.last_winner_data_at <= 0:
+                    continue
+                idle_ms = (asyncio.get_running_loop().time() - self.last_winner_data_at) * 1000
+                if idle_ms < 0:
+                    continue
+                if idle_ms >= self.tcp_failover_idle_ms and self.backup_path and self.backup_path.opened and not self.backup_path.closed:
+                    old = self.winner
+                    self.winner = self.backup_path
+                    self.backup_path = self._select_backup_path(self.winner)
+                    self.last_winner_data_at = asyncio.get_running_loop().time()
+                    self._record_win(self.winner)
+                    print(
+                        f"[edge] tcp failover session={self.session_id} target={self.target.host}:{self.target.port} "
+                        f"old_winner={old.name if old else 'none'} new_winner={self.winner.name} idle_ms={int(idle_ms)}"
+                    )
+        except Exception:
+            pass
+
     async def handle_path(self, path: BasePath, event: str, payload: bytes | None) -> None:
         if self.closed:
             return
@@ -281,11 +325,13 @@ class TransparentSession:
             if self.winner is None:
                 self.winner = path
                 self._record_win(path)
+                self.backup_path = self._select_backup_path(path)
                 self.winner_event.set()
                 if self.loser_grace_ms > 0:
                     self.loser_close_task = asyncio.create_task(self._close_losers_after_grace(path))
                 else:
                     await self._close_losers(path)
+            self.last_winner_data_at = asyncio.get_running_loop().time()
             if path is self.winner and payload is not None:
                 self.writer.write(payload)
                 await self.writer.drain()
@@ -300,7 +346,7 @@ class TransparentSession:
                 await self.close()
 
     async def _close_losers(self, winner: BasePath) -> None:
-        await asyncio.gather(*(path.close() for path in self.paths if path is not winner), return_exceptions=True)
+        await asyncio.gather(*(path.close() for path in self.paths if path is not winner and path is not self.backup_path), return_exceptions=True)
 
     async def _close_losers_after_grace(self, winner: BasePath) -> None:
         await asyncio.sleep(self.loser_grace_ms / 1000)
@@ -311,7 +357,12 @@ class TransparentSession:
         if self.closed:
             return
         self.closed = True
-        print(f"[edge] session={self.session_id} closed target={self.target.host}:{self.target.port}")
+        if self.errors:
+            detail = ", ".join(self.errors[:3])
+            print(
+                f"[edge] session={self.session_id} closed target={self.target.host}:{self.target.port} "
+                f"errors={len(self.errors)} detail={detail}"
+            )
         if self.pump_task and self.pump_task is not asyncio.current_task():
             self.pump_task.cancel()
             with contextlib.suppress(Exception):
@@ -320,6 +371,17 @@ class TransparentSession:
             self.loser_close_task.cancel()
             with contextlib.suppress(Exception):
                 await self.loser_close_task
+        if self.failover_task and self.failover_task is not asyncio.current_task():
+            self.failover_task.cancel()
+            with contextlib.suppress(Exception):
+                await self.failover_task
+        for task in self.open_tasks:
+            if task is not asyncio.current_task():
+                task.cancel()
+        for task in self.open_tasks:
+            if task is not asyncio.current_task():
+                with contextlib.suppress(Exception):
+                    await task
         await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
         self.writer.close()
         with contextlib.suppress(Exception):
@@ -442,6 +504,8 @@ class UdpFlow:
     packets_received: int = 0
     duplicate_responses: int = 0
     send_task: asyncio.Task | None = None
+    winner_burst_sent: int = 0
+    converged: bool = False
 
     async def start(self) -> None:
         await asyncio.gather(*(path.open(self.target) for path in self.paths), return_exceptions=True)
@@ -453,7 +517,19 @@ class UdpFlow:
         if not active:
             return
         copies = max(1, self.redundancy + 1)
-        targets = active if self.always_broadcast or self.winner is None or self.winner.closed else [self.winner]
+        if self.winner is None or self.winner.closed:
+            self.converged = False
+            self.winner_burst_sent = 0
+            targets = active
+        elif not self.converged:
+            # 先并发、后收敛:winner 刚出现时保留短暂重叠,随后快速收敛到单路径。
+            self.winner_burst_sent += 1
+            backup = [path for path in active if path is not self.winner][:1]
+            targets = [self.winner, *backup] if self.winner_burst_sent <= 2 else [self.winner]
+            if self.winner_burst_sent > 2:
+                self.converged = True
+        else:
+            targets = [self.winner]
         for attempt in range(copies):
             await asyncio.gather(*(path.send(payload) for path in targets), return_exceptions=True)
             if attempt + 1 < copies and self.copy_interval_ms > 0:
@@ -465,6 +541,8 @@ class UdpFlow:
             self.packets_received += 1
             if self.winner is None:
                 self.winner = path
+                self.converged = False
+                self.winner_burst_sent = 0
                 mode = "redundant" if self.redundancy > 0 else "single"
                 print(f"[edge] udp flow={self.flow_id} winner={path.name} target={self.target.host}:{self.target.port} mode={mode} candidates={len(self.paths)}")
             elif path is not self.winner:
@@ -476,6 +554,8 @@ class UdpFlow:
             if path is self.winner:
                 remaining = [candidate for candidate in self.paths if candidate.opened and not candidate.closed]
                 self.winner = remaining[0] if remaining else None
+                self.converged = False
+                self.winner_burst_sent = 0
 
     async def close(self) -> None:
         if self.closed:
@@ -485,10 +565,6 @@ class UdpFlow:
             self.send_task.cancel()
             with contextlib.suppress(Exception):
                 await self.send_task
-        print(
-            f"[edge] udp flow={self.flow_id} closed target={self.target.host}:{self.target.port} "
-            f"sent={self.packets_sent} received={self.packets_received} dup={self.duplicate_responses}"
-        )
         await asyncio.gather(*(path.close() for path in self.paths), return_exceptions=True)
 
 
@@ -665,6 +741,8 @@ class TransparentEdge:
                 await asyncio.gather(server4.serve_forever(), server6.serve_forever())
 
     def _direct_redundancy_for_target(self, target: TargetAddress) -> int:
+        if target.family == socket.AF_INET6 and not self.config.direct_ipv6_enabled:
+            return 0
         base = self.config.direct_redundancy
         if target.family == socket.AF_INET6 and self.config.direct_redundancy_v6 is not None:
             base = self.config.direct_redundancy_v6
@@ -674,14 +752,25 @@ class TransparentEdge:
         target_stats = self.tcp_target_wins.get((target.host, target.port), {})
         family_key = "ipv6" if target.family == socket.AF_INET6 else "ipv4"
         family_stats = self.tcp_family_wins.get(family_key, {})
-        target_prefers_relay = sum(count for name, count in target_stats.items() if winner_group(name) != "direct") > grouped_total(target_stats, "direct")
-        family_prefers_relay = sum(count for name, count in family_stats.items() if winner_group(name) != "direct") > grouped_total(family_stats, "direct")
+        target_total = sum(target_stats.values())
+        family_total = sum(family_stats.values())
+        target_relay = sum(count for name, count in target_stats.items() if winner_group(name) != "direct")
+        family_relay = sum(count for name, count in family_stats.items() if winner_group(name) != "direct")
+        target_prefers_relay = target_total >= 4 and target_relay > grouped_total(target_stats, "direct")
+        family_prefers_relay = family_total >= 8 and family_relay > grouped_total(family_stats, "direct")
+        target_prefers_direct = target_total >= 4 and grouped_total(target_stats, "direct") > target_relay
+        family_prefers_direct = family_total >= 8 and grouped_total(family_stats, "direct") > family_relay
         if target_prefers_relay or family_prefers_relay:
-            return min(self.config.direct_max_redundancy, base + 1)
+            return max(1, base - 1)
+        if target_prefers_direct or family_prefers_direct:
+            if base > 2:
+                return base - 1
         return base
 
     def _build_direct_paths(self, session: TransparentSession) -> list[BasePath]:
         count = self._direct_redundancy_for_target(session.target)
+        if count <= 0:
+            return []
         return [
             DirectTcpPath(
                 name=f"direct-{index + 1}" if count > 1 else "direct",
@@ -694,6 +783,8 @@ class TransparentEdge:
         ]
 
     def _build_udp_direct_paths(self, target: TargetAddress, flow_id: int) -> list[BasePath]:
+        if target.family == socket.AF_INET6 and not self.config.direct_ipv6_enabled:
+            return []
         count = max(1, self.config.udp_direct_redundancy)
         if target.family == socket.AF_INET6 and self.config.udp_direct_redundancy_v6 is not None:
             count = max(1, self.config.udp_direct_redundancy_v6)
@@ -730,7 +821,7 @@ class TransparentEdge:
         try:
             target = self._get_original_dst(writer)
             session_id = next(self.session_ids)
-            session = TransparentSession(session_id=session_id, target=target, reader=reader, writer=writer, paths=[], warmup_bytes=self.config.tcp_warmup_bytes, loser_grace_ms=self.config.tcp_loser_grace_ms, stats=self.tcp_win_counts, target_stats=self.tcp_target_wins, family_stats=self.tcp_family_wins)
+            session = TransparentSession(session_id=session_id, target=target, reader=reader, writer=writer, paths=[], warmup_bytes=self.config.tcp_warmup_bytes, loser_grace_ms=self.config.tcp_loser_grace_ms, tcp_failover_idle_ms=self.config.tcp_failover_idle_ms, stats=self.tcp_win_counts, target_stats=self.tcp_target_wins, family_stats=self.tcp_family_wins)
             paths: list[BasePath] = self._build_direct_paths(session)
             for connection in self.manager.available():
                 stream_id = next(self.stream_ids)