#!/usr/bin/env python3 from __future__ import annotations import argparse import asyncio import contextlib import json import os import socket import ssl import statistics import struct import sys import time from dataclasses import asdict, dataclass from pathlib import Path from urllib.parse import urlparse ROOT = Path(__file__).resolve().parents[2] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from mynetspeeder.config_udp import UdpConfig # noqa: E402 @dataclass class HttpResult: connect_ms: float first_ms: float total_ms: float ok: bool status: int = 0 error: str = "" @dataclass class DnsResult: associate_ms: float query_ms: float total_ms: float ok: bool error: str = "" @dataclass class UdpStreamResult: mode: str sent: int success: int timeouts: int max_timeout_streak: int recovery_count: int avg_recovery_ms: float max_recovery_ms: float max_gap_ms: float jitter_ms: float p95_ms: float max_latency_ms: float def parse_host_port(text: str) -> tuple[str, int]: if text.startswith("["): host, rest = text[1:].split("]", 1) if not rest.startswith(":"): raise ValueError(f"无效地址: {text}") return host, int(rest[1:]) if text.count(":") == 1: host, port = text.rsplit(":", 1) return host, int(port) raise ValueError(f"无效地址: {text}") def parse_url(text: str) -> tuple[str, int, str, bool]: parsed = urlparse(text) if parsed.scheme not in ("http", "https"): raise ValueError(f"仅支持 http/https: {text}") if not parsed.hostname: raise ValueError(f"无效 URL: {text}") port = parsed.port or (443 if parsed.scheme == "https" else 80) path = parsed.path or "/" if parsed.query: path = f"{path}?{parsed.query}" return parsed.hostname, port, path, parsed.scheme == "https" def percentile(values: list[float], pct: float) -> float: if not values: return 0.0 ordered = sorted(values) if len(ordered) == 1: return ordered[0] idx = (len(ordered) - 1) * pct lo = int(idx) hi = min(lo + 1, len(ordered) - 1) weight = idx - lo return ordered[lo] * (1 - weight) + ordered[hi] * weight def summarize(values: list[float]) -> dict[str, float]: if not values: return {"avg": 0.0, "p50": 0.0, "p95": 0.0} return { "avg": statistics.mean(values), "p50": percentile(values, 0.50), "p95": percentile(values, 0.95), } def stdev_or_zero(values: list[float]) -> float: if len(values) < 2: return 0.0 return statistics.pstdev(values) def summarize_udp_stream(samples: list[float], success_times: list[float], timeout_marks: list[float], sent: int, mode: str) -> UdpStreamResult: summary = summarize(samples) max_gap_ms = 0.0 if len(success_times) >= 2: gaps = [(success_times[idx] - success_times[idx - 1]) * 1000.0 for idx in range(1, len(success_times))] max_gap_ms = max(gaps) recovery_values: list[float] = [] for timeout_at in timeout_marks: next_success = next((success_at for success_at in success_times if success_at > timeout_at), None) if next_success is not None: recovery_values.append((next_success - timeout_at) * 1000.0) max_streak = 0 current_streak = 0 if sent > 0: success_time_set = set(success_times) timeout_time_set = set(timeout_marks) ordered_events: list[tuple[float, bool]] = [] ordered_events.extend((value, True) for value in success_time_set) ordered_events.extend((value, False) for value in timeout_time_set) ordered_events.sort(key=lambda item: item[0]) for _at, is_success in ordered_events: if is_success: current_streak = 0 else: current_streak += 1 if current_streak > max_streak: max_streak = current_streak return UdpStreamResult( mode=mode, sent=sent, success=len(samples), timeouts=len(timeout_marks), max_timeout_streak=max_streak, recovery_count=len(recovery_values), avg_recovery_ms=statistics.mean(recovery_values) if recovery_values else 0.0, max_recovery_ms=max(recovery_values) if recovery_values else 0.0, max_gap_ms=max_gap_ms, jitter_ms=stdev_or_zero(samples), p95_ms=summary["p95"], max_latency_ms=max(samples) if samples else 0.0, ) @dataclass class UdpRoundResult: round_no: int direct: DnsResult proxy: DnsResult | None def fmt_ms(value: float) -> str: return f"{value:.2f}ms" def fmt_pct(value: float) -> str: return f"{value:+.1f}%" def verdict_from_diff(pct: float, threshold: float = 5.0) -> str: if pct <= -threshold: return "提升" if pct >= threshold: return "退化" return "无明显变化" def overall_verdict(changes: list[float]) -> str: if not changes: return "无可用结果" score = 0 for change in changes: if change <= -5.0: score += 1 elif change >= 5.0: score -= 1 if score > 0: return "整体提升" if score < 0: return "整体退化" return "整体无明显变化" def progress_line(done: int, total: int, started_at: float) -> str: elapsed = time.perf_counter() - started_at if total <= 0: return "进度:0%" ratio = done / total if done else 0.0 percent = int(ratio * 100) if done else 0 remain = "--" if done: total_est = elapsed / ratio remain = f"{max(0.0, total_est - elapsed):.1f}s" return f"进度:{done}/{total} ({percent}%),已用 {elapsed:.1f}s,估算剩余:{remain}" def socks5_addr_bytes(host: str, port: int) -> bytes: try: return b"\x01" + socket.inet_aton(host) + port.to_bytes(2, "big") except OSError: try: return b"\x04" + socket.inet_pton(socket.AF_INET6, host) + port.to_bytes(2, "big") except OSError: raw = host.encode() return b"\x03" + bytes([len(raw)]) + raw + port.to_bytes(2, "big") async def socks5_handshake(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: writer.write(b"\x05\x01\x00") await writer.drain() reply = await reader.readexactly(2) if reply != b"\x05\x00": raise ConnectionError("SOCKS5 握手失败") async def socks5_udp_associate(proxy_host: str, proxy_port: int, timeout: float) -> tuple[asyncio.StreamReader, asyncio.StreamWriter, tuple[str, int], float]: started = time.perf_counter() reader, writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port), timeout=timeout) await socks5_handshake(reader, writer) writer.write(b"\x05\x03\x00\x01\x00\x00\x00\x00\x00\x00") await writer.drain() head = await reader.readexactly(4) if head[1] != 0x00: raise ConnectionError(f"SOCKS5 UDP ASSOCIATE 失败: {head[1]}") atyp = head[3] if atyp == 1: relay_host = socket.inet_ntoa(await reader.readexactly(4)) elif atyp == 3: size = (await reader.readexactly(1))[0] relay_host = (await reader.readexactly(size)).decode() elif atyp == 4: relay_host = socket.inet_ntop(socket.AF_INET6, await reader.readexactly(16)) else: raise ConnectionError("SOCKS5 UDP relay ATYP 不支持") relay_port = int.from_bytes(await reader.readexactly(2), "big") return reader, writer, (relay_host, relay_port), (time.perf_counter() - started) * 1000 def build_dns_query(name: str) -> bytes: query_id = int(time.time() * 1000) & 0xFFFF header = struct.pack("!HHHHHH", query_id, 0x0100, 1, 0, 0, 0) labels = b"".join(bytes([len(part)]) + part.encode() for part in name.strip(".").split(".") if part) return header + labels + b"\x00\x00\x01\x00\x01" def parse_dns_response(data: bytes) -> bool: if len(data) < 12: raise ValueError("DNS 响应过短") flags = int.from_bytes(data[2:4], "big") rcode = flags & 0x000F qdcount = int.from_bytes(data[4:6], "big") ancount = int.from_bytes(data[6:8], "big") return rcode == 0 and (ancount > 0 or qdcount > 0) def strip_socks_udp(packet: bytes) -> bytes: if len(packet) < 10: raise ValueError("SOCKS UDP 响应过短") atyp = packet[3] offset = 4 if atyp == 1: offset += 4 elif atyp == 3: size = packet[offset] offset += 1 + size elif atyp == 4: offset += 16 else: raise ValueError("SOCKS UDP ATYP 不支持") offset += 2 return packet[offset:] def edge_log_counters(log_path: Path) -> tuple[int, int]: if not log_path.exists(): return 0, 0 accept_count = 0 win_count = 0 for line in log_path.read_text(errors="replace").splitlines(): if " accept peer=" in line: accept_count += 1 if " tcp win session=" in line: win_count += 1 return accept_count, win_count def edge_log_target_hits(log_path: Path, host: str, port: int) -> int: if not log_path.exists(): return 0 needle = f"target={host}:{port}" return sum(1 for line in log_path.read_text(errors="replace").splitlines() if needle in line) async def http_roundtrip(url: str, timeout: float) -> HttpResult: host, port, path, is_https = parse_url(url) writer: asyncio.StreamWriter | None = None started = time.perf_counter() try: if is_https: reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port, ssl=ssl.create_default_context(), server_hostname=host), timeout=timeout, ) else: reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout) connect_ms = (time.perf_counter() - started) * 1000 request = ( f"GET {path} HTTP/1.1\r\n" f"Host: {host}\r\n" "User-Agent: mynetspeeder-benchmark\r\n" "Accept: */*\r\n" "Connection: close\r\n\r\n" ).encode() send_started = time.perf_counter() writer.write(request) await writer.drain() header = await asyncio.wait_for(reader.readuntil(b"\r\n\r\n"), timeout=timeout) first_ms = (time.perf_counter() - send_started) * 1000 status_line = header.split(b"\r\n", 1)[0].decode(errors="replace") status = 0 if status_line.startswith("HTTP/"): parts = status_line.split() if len(parts) >= 2 and parts[1].isdigit(): status = int(parts[1]) total_ms = (time.perf_counter() - send_started) * 1000 ok = 200 <= status < 400 return HttpResult(connect_ms=connect_ms, first_ms=first_ms, total_ms=total_ms, ok=ok, status=status) except Exception as exc: return HttpResult(0.0, 0.0, 0.0, False, error=str(exc)) finally: if writer is not None: with contextlib.suppress(Exception): writer.close() await writer.wait_closed() async def dns_roundtrip(server_host: str, server_port: int, timeout: float, proxy: tuple[str, int] | None) -> DnsResult: loop = asyncio.get_running_loop() query = build_dns_query("www.google.com") ctrl_writer: asyncio.StreamWriter | None = None sock: socket.socket | None = None started = time.perf_counter() try: if proxy is None: sock = socket.socket(socket.AF_INET6 if ":" in server_host else socket.AF_INET, socket.SOCK_DGRAM) sock.setblocking(False) await asyncio.wait_for(loop.sock_connect(sock, (server_host, server_port)), timeout=timeout) send_started = time.perf_counter() await loop.sock_sendall(sock, query) data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout) query_ms = (time.perf_counter() - send_started) * 1000 else: _ctrl_reader, ctrl_writer, relay, associate_ms = await asyncio.wait_for( socks5_udp_associate(proxy[0], proxy[1], timeout), timeout=timeout, ) family = socket.AF_INET6 if ":" in relay[0] else socket.AF_INET sock = socket.socket(family, socket.SOCK_DGRAM) sock.setblocking(False) await asyncio.wait_for(loop.sock_connect(sock, relay), timeout=timeout) packet = b"\x00\x00\x00" + socks5_addr_bytes(server_host, server_port) + query send_started = time.perf_counter() await loop.sock_sendall(sock, packet) data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout) data = strip_socks_udp(data) query_ms = (time.perf_counter() - send_started) * 1000 started -= associate_ms / 1000.0 ok = parse_dns_response(data) total_ms = (time.perf_counter() - started) * 1000 return DnsResult( associate_ms=0.0 if proxy is None else max(0.0, total_ms - query_ms), query_ms=query_ms, total_ms=total_ms, ok=ok, ) except Exception as exc: return DnsResult(0.0, 0.0, 0.0, False, error=str(exc)) finally: if sock is not None: sock.close() if ctrl_writer is not None: with contextlib.suppress(Exception): ctrl_writer.close() await ctrl_writer.wait_closed() async def run_step(label: str, done: int, total: int, started_at: float, coro, timeout: float): print(f" 开始 {label}") task = asyncio.create_task(coro) step_started = time.perf_counter() while not task.done(): print(f" {label} {progress_line(done, total, started_at)}") if time.perf_counter() - step_started >= timeout: task.cancel() with contextlib.suppress(asyncio.CancelledError): await task print(f" 超时 {label}") return None await asyncio.sleep(0.5) return await task async def http_roundtrip_as_user(script_path: Path, runtime_user: str, url: str, timeout: float) -> HttpResult: proc = await asyncio.create_subprocess_exec( "runuser", "-u", runtime_user, "--", "python3", str(script_path), "--child-http", url, "--child-timeout", str(timeout), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: error = stderr.decode(errors="replace").strip() or stdout.decode(errors="replace").strip() or f"子进程退出码 {proc.returncode}" return HttpResult(0.0, 0.0, 0.0, False, error=error) try: payload = json.loads(stdout.decode()) return HttpResult(**payload) except Exception as exc: return HttpResult(0.0, 0.0, 0.0, False, error=f"解析子进程结果失败: {exc}") async def bench_tcp_transparent(args, script_path: Path) -> float | None: can_run_direct = os.geteuid() == 0 current_user = os.environ.get("USER", "") if current_user == args.runtime_user: print("TCP 透明测试跳过:当前用户就是透明排除用户,无法命中 19080") return None if not can_run_direct: print("TCP 直连基线跳过:当前不是 root,无法切换到透明排除用户") return None target_host, target_port, _target_path, _is_https = parse_url(args.http_url) direct_results: list[HttpResult] = [] transparent_results: list[HttpResult] = [] total = args.count * 2 started_at = time.perf_counter() pre_accept, pre_win = edge_log_counters(Path(args.edge_log)) pre_target_hits = edge_log_target_hits(Path(args.edge_log), target_host, target_port) print("TCP 透明链路测试开始") print(f" 目标 URL: {args.http_url}") print(f" 直连基线用户: {args.runtime_user}(应绕过 transparent)") print(" 透明链路用户: 当前执行用户(应命中 transparent_edge)") if args.http_url.startswith("https://"): print(" 提示: transparent TCP 基准更建议使用 http:// 目标,HTTPS 目标更容易受上层握手行为影响") for index in range(args.count): direct_result = await run_step( f"TCP 直连 第{index + 1}步", index * 2 + 1, total, started_at, http_roundtrip_as_user(script_path, args.runtime_user, args.http_url, args.timeout), timeout=max(3.0, args.timeout + 3.0), ) if isinstance(direct_result, HttpResult): direct_results.append(direct_result) transparent_result = await run_step( f"TCP 透明 第{index + 1}步", index * 2 + 2, total, started_at, http_roundtrip(args.http_url, args.timeout), timeout=max(3.0, args.timeout + 3.0), ) if isinstance(transparent_result, HttpResult): transparent_results.append(transparent_result) post_accept, post_win = edge_log_counters(Path(args.edge_log)) post_target_hits = edge_log_target_hits(Path(args.edge_log), target_host, target_port) direct_ok = [item for item in direct_results if item.ok] transparent_ok = [item for item in transparent_results if item.ok] print(f"TCP 目标: {args.http_url}") print( f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},连接均值 {fmt_ms(summarize([item.connect_ms for item in direct_ok])['avg'])}," f" 首包均值 {fmt_ms(summarize([item.first_ms for item in direct_ok])['avg'])},总耗时均值 {fmt_ms(summarize([item.total_ms for item in direct_ok])['avg'])}" ) if direct_results and not direct_ok: print(f" 直连失败原因: {direct_results[0].error or '未知'}") print( f" 透明: 成功 {len(transparent_ok)}/{len(transparent_results)},连接均值 {fmt_ms(summarize([item.connect_ms for item in transparent_ok])['avg'])}," f" 首包均值 {fmt_ms(summarize([item.first_ms for item in transparent_ok])['avg'])},总耗时均值 {fmt_ms(summarize([item.total_ms for item in transparent_ok])['avg'])}" ) if transparent_results and not transparent_ok: print(f" 透明失败原因: {transparent_results[0].error or '未知'}") print( f" Transparent 日志增量: accept +{post_accept - pre_accept},tcp win +{post_win - pre_win}," f" 目标命中 +{post_target_hits - pre_target_hits}" ) if not direct_ok or not transparent_ok: print(" 对比: 无法比较,原因:直连或透明没有成功样本") return None direct_total = summarize([item.total_ms for item in direct_ok])["avg"] transparent_total = summarize([item.total_ms for item in transparent_ok])["avg"] diff = transparent_total - direct_total pct = (diff / direct_total * 100.0) if direct_total > 0 else 0.0 print(f" 对比: transparent 总耗时变化 {fmt_pct(pct)},结论:{verdict_from_diff(pct)}") return pct async def bench_udp_socks(args, proxy: tuple[str, int] | None) -> float | None: direct_results: list[DnsResult] = [] proxy_results: list[DnsResult] = [] total = args.count * (2 if proxy else 1) started_at = time.perf_counter() round_results: list[UdpRoundResult] = [] print("UDP SOCKS 链路测试开始") print(f" DNS 目标: {args.dns_server_host}:{args.dns_server_port}") for index in range(args.count): direct_result = await run_step( f"UDP 直连 第{index + 1}步", index * (2 if proxy else 1) + 1, total, started_at, dns_roundtrip(args.dns_server_host, args.dns_server_port, args.timeout, None), timeout=max(3.0, args.timeout + 2.0), ) if isinstance(direct_result, DnsResult): direct_results.append(direct_result) if proxy is None: round_results.append(UdpRoundResult(index + 1, direct_result if isinstance(direct_result, DnsResult) else DnsResult(0.0, 0.0, 0.0, False, error="执行失败"), None)) continue proxy_result = await run_step( f"UDP SOCKS 第{index + 1}步", index * 2 + 2, total, started_at, dns_roundtrip(args.dns_server_host, args.dns_server_port, args.timeout, proxy), timeout=max(3.0, args.timeout + 2.0), ) if isinstance(proxy_result, DnsResult): proxy_results.append(proxy_result) round_results.append( UdpRoundResult( index + 1, direct_result if isinstance(direct_result, DnsResult) else DnsResult(0.0, 0.0, 0.0, False, error="执行失败"), proxy_result if isinstance(proxy_result, DnsResult) else DnsResult(0.0, 0.0, 0.0, False, error="执行失败"), ) ) direct_ok = [item for item in direct_results if item.ok] proxy_ok = [item for item in proxy_results if item.ok] direct_query_values = [item.query_ms for item in direct_ok] proxy_query_values = [item.query_ms for item in proxy_ok] direct_stall_count = sum(1 for value in direct_query_values if value >= args.udp_stall_ms) proxy_stall_count = sum(1 for value in proxy_query_values if value >= args.udp_stall_ms) print(f"UDP 目标: {args.dns_server_host}:{args.dns_server_port}") print(" 逐轮结果:") for item in round_results: direct_part = f"直连 query={fmt_ms(item.direct.query_ms)} total={fmt_ms(item.direct.total_ms)} ok={item.direct.ok}" if item.proxy is None: print(f" 第{item.round_no}轮: {direct_part}") else: proxy_part = f"SOCKS associate={fmt_ms(item.proxy.associate_ms)} query={fmt_ms(item.proxy.query_ms)} total={fmt_ms(item.proxy.total_ms)} ok={item.proxy.ok}" print(f" 第{item.round_no}轮: {direct_part}; {proxy_part}") print( f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},查询均值 {fmt_ms(summarize(direct_query_values)['avg'])}," f" p95 {fmt_ms(summarize(direct_query_values)['p95'])},抖动 {fmt_ms(stdev_or_zero(direct_query_values))}," f" 最慢 {fmt_ms(max(direct_query_values) if direct_query_values else 0.0)},卡顿次数 {direct_stall_count}" ) if direct_results and not direct_ok: print(f" 直连失败原因: {direct_results[0].error or '未知'}") if proxy is None: print(" SOCKS: 未执行或不可用") return None print( f" SOCKS: 成功 {len(proxy_ok)}/{len(proxy_results)},关联均值 {fmt_ms(summarize([item.associate_ms for item in proxy_ok])['avg'])}," f" 查询均值 {fmt_ms(summarize(proxy_query_values)['avg'])},p95 {fmt_ms(summarize(proxy_query_values)['p95'])}," f" 抖动 {fmt_ms(stdev_or_zero(proxy_query_values))},最慢 {fmt_ms(max(proxy_query_values) if proxy_query_values else 0.0)}," f" 卡顿次数 {proxy_stall_count}" ) if proxy_results and not proxy_ok: print(f" SOCKS 失败原因: {proxy_results[0].error or '未知'}") if not direct_ok or not proxy_ok: print(" 对比: 无法比较,原因:直连或 SOCKS 没有成功样本") return None direct_query = summarize(direct_query_values)["avg"] proxy_query = summarize(proxy_query_values)["avg"] diff = proxy_query - direct_query pct = (diff / direct_query * 100.0) if direct_query > 0 else 0.0 print(f" 对比: SOCKS 查询耗时变化 {fmt_pct(pct)},结论:{verdict_from_diff(pct)}") stall_diff = proxy_stall_count - direct_stall_count jitter_diff = stdev_or_zero(proxy_query_values) - stdev_or_zero(direct_query_values) if stall_diff < 0: stability = "更稳" elif stall_diff > 0: stability = "更不稳" elif jitter_diff < -5.0: stability = "更稳" elif jitter_diff > 5.0: stability = "更不稳" else: stability = "接近" print( f" 稳定性: SOCKS 相比直连 {stability},卡顿次数变化 {stall_diff:+d}," f" 抖动变化 {fmt_ms(jitter_diff)},卡顿阈值 {fmt_ms(args.udp_stall_ms)}" ) return pct async def udp_stream_once(server_host: str, server_port: int, timeout: float, proxy: tuple[str, int] | None) -> tuple[bool, float]: result = await dns_roundtrip(server_host, server_port, timeout, proxy) return result.ok, result.query_ms async def run_udp_stream_probe(args, proxy: tuple[str, int] | None, mode: str) -> UdpStreamResult: samples: list[float] = [] success_times: list[float] = [] timeout_marks: list[float] = [] started = time.perf_counter() total = args.udp_stream_count for index in range(total): label = f"UDP 连续流 {mode} 第{index + 1}步" print(f" 开始 {label}") ok, query_ms = await udp_stream_once(args.dns_server_host, args.dns_server_port, args.timeout, proxy) now = time.perf_counter() print(f" {label} {progress_line(index + 1, total, started)}") if ok: samples.append(query_ms) success_times.append(now) else: timeout_marks.append(now) if index + 1 < total and args.udp_stream_interval_ms > 0: await asyncio.sleep(args.udp_stream_interval_ms / 1000.0) return summarize_udp_stream(samples, success_times, timeout_marks, total, mode) async def bench_udp_stream(args, proxy: tuple[str, int] | None) -> None: print("") print("UDP 连续流稳定性测试开始") print( f" 目标: {args.dns_server_host}:{args.dns_server_port},样本数: {args.udp_stream_count}," f" 间隔: {args.udp_stream_interval_ms}ms,超时阈值: {args.timeout:.1f}s" ) direct = await run_udp_stream_probe(args, None, "直连") proxy_result = await run_udp_stream_probe(args, proxy, "SOCKS") if proxy is not None else None print("UDP 连续流结果") print( f" 直连: 成功 {direct.success}/{direct.sent},超时 {direct.timeouts},连续超时最大 {direct.max_timeout_streak}," f" p95 {fmt_ms(direct.p95_ms)},抖动 {fmt_ms(direct.jitter_ms)},最大间隔 {fmt_ms(direct.max_gap_ms)}," f" 恢复均值 {fmt_ms(direct.avg_recovery_ms)},恢复最慢 {fmt_ms(direct.max_recovery_ms)}" ) if proxy_result is None: print(" SOCKS: 未执行或不可用") return print( f" SOCKS: 成功 {proxy_result.success}/{proxy_result.sent},超时 {proxy_result.timeouts},连续超时最大 {proxy_result.max_timeout_streak}," f" p95 {fmt_ms(proxy_result.p95_ms)},抖动 {fmt_ms(proxy_result.jitter_ms)},最大间隔 {fmt_ms(proxy_result.max_gap_ms)}," f" 恢复均值 {fmt_ms(proxy_result.avg_recovery_ms)},恢复最慢 {fmt_ms(proxy_result.max_recovery_ms)}" ) recovery_diff = proxy_result.avg_recovery_ms - direct.avg_recovery_ms gap_diff = proxy_result.max_gap_ms - direct.max_gap_ms timeout_diff = proxy_result.timeouts - direct.timeouts streak_diff = proxy_result.max_timeout_streak - direct.max_timeout_streak if timeout_diff < 0 or streak_diff < 0 or recovery_diff < -20.0 or gap_diff < -20.0: verdict = "更稳" elif timeout_diff > 0 or streak_diff > 0 or recovery_diff > 20.0 or gap_diff > 20.0: verdict = "更不稳" else: verdict = "接近" print( f" 连续流稳定性: SOCKS 相比直连 {verdict},超时变化 {timeout_diff:+d},连续超时变化 {streak_diff:+d}," f" 最大间隔变化 {fmt_ms(gap_diff)},恢复均值变化 {fmt_ms(recovery_diff)}" ) async def child_http_mode(url: str, timeout: float) -> int: result = await http_roundtrip(url, timeout) print(json.dumps(asdict(result), ensure_ascii=False)) return 0 if result.ok else 1 async def amain(args) -> int: if args.child_http: return await child_http_mode(args.child_http, args.child_timeout) config = UdpConfig.load(args.config) if args.config and Path(args.config).exists() else None proxy: tuple[str, int] | None = None if config is not None and config.socks_port > 0: proxy = (config.socks_host, config.socks_port) if args.proxy_host and args.proxy_port: proxy = (args.proxy_host, args.proxy_port) print("本地基准测试开始") print(f" 样本数: {args.count}") print(f" UDP 目标: {args.dns_server_host}:{args.dns_server_port}") print(f" SOCKS: {proxy[0]}:{proxy[1]}" if proxy else " SOCKS: 未配置或未启动") print("") udp_pct = None udp_pct = await bench_udp_socks(args, proxy) if args.udp_stream_count > 0: await bench_udp_stream(args, proxy) changes = [value for value in (udp_pct,) if value is not None] if not changes: print("中文总结:无可用结果") return 0 parts = [] if udp_pct is not None: parts.append(f"UDP={verdict_from_diff(udp_pct)}") print(f"中文总结:{overall_verdict(changes)}({', '.join(parts)})") return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="mynetspeeder 真实链路本地手工基准测试") parser.add_argument("--config", default="/home/mynetspeeder/config.json", help="配置文件路径") parser.add_argument("--proxy-host", default="", help="SOCKS5 地址,默认读取 config.json") parser.add_argument("--proxy-port", type=int, default=0, help="SOCKS5 端口,默认读取 config.json") parser.add_argument("--dns-server", default="8.8.8.8:53", help="UDP/SOCKS 链路 DNS 目标,格式 host:port") parser.add_argument("--count", type=int, default=5, help="UDP 测试轮数,默认 5") parser.add_argument("--timeout", type=float, default=3.0, help="单次测试超时秒数") parser.add_argument("--udp-stall-ms", type=float, default=120.0, help="UDP 单次查询超过该值记为一次疑似卡顿") parser.add_argument("--udp-stream-count", type=int, default=20, help="UDP 连续流稳定性测试样本数,设为 0 则关闭") parser.add_argument("--udp-stream-interval-ms", type=float, default=200.0, help="UDP 连续流相邻样本间隔毫秒") parser.add_argument("--child-http", default="", help=argparse.SUPPRESS) parser.add_argument("--child-timeout", type=float, default=3.0, help=argparse.SUPPRESS) return parser def main() -> int: args = build_parser().parse_args() args.dns_server_host, args.dns_server_port = parse_host_port(args.dns_server) return asyncio.run(amain(args)) if __name__ == "__main__": raise SystemExit(main())