| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763 |
- #!/usr/bin/env python3
- from __future__ import annotations
- import argparse
- import asyncio
- import contextlib
- import json
- import os
- import socket
- import ssl
- import statistics
- import struct
- import sys
- import time
- from dataclasses import asdict, dataclass
- from pathlib import Path
- from urllib.parse import urlparse
- ROOT = Path(__file__).resolve().parents[2]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- from mynetspeeder.config import Config # noqa: E402
- @dataclass
- class HttpResult:
- connect_ms: float
- first_ms: float
- total_ms: float
- ok: bool
- status: int = 0
- error: str = ""
- @dataclass
- class DnsResult:
- associate_ms: float
- query_ms: float
- total_ms: float
- ok: bool
- error: str = ""
- @dataclass
- class UdpStreamResult:
- mode: str
- sent: int
- success: int
- timeouts: int
- max_timeout_streak: int
- recovery_count: int
- avg_recovery_ms: float
- max_recovery_ms: float
- max_gap_ms: float
- jitter_ms: float
- p95_ms: float
- max_latency_ms: float
- def parse_host_port(text: str) -> tuple[str, int]:
- if text.startswith("["):
- host, rest = text[1:].split("]", 1)
- if not rest.startswith(":"):
- raise ValueError(f"无效地址: {text}")
- return host, int(rest[1:])
- if text.count(":") == 1:
- host, port = text.rsplit(":", 1)
- return host, int(port)
- raise ValueError(f"无效地址: {text}")
- def parse_url(text: str) -> tuple[str, int, str, bool]:
- parsed = urlparse(text)
- if parsed.scheme not in ("http", "https"):
- raise ValueError(f"仅支持 http/https: {text}")
- if not parsed.hostname:
- raise ValueError(f"无效 URL: {text}")
- port = parsed.port or (443 if parsed.scheme == "https" else 80)
- path = parsed.path or "/"
- if parsed.query:
- path = f"{path}?{parsed.query}"
- return parsed.hostname, port, path, parsed.scheme == "https"
- def percentile(values: list[float], pct: float) -> float:
- if not values:
- return 0.0
- ordered = sorted(values)
- if len(ordered) == 1:
- return ordered[0]
- idx = (len(ordered) - 1) * pct
- lo = int(idx)
- hi = min(lo + 1, len(ordered) - 1)
- weight = idx - lo
- return ordered[lo] * (1 - weight) + ordered[hi] * weight
- def summarize(values: list[float]) -> dict[str, float]:
- if not values:
- return {"avg": 0.0, "p50": 0.0, "p95": 0.0}
- return {
- "avg": statistics.mean(values),
- "p50": percentile(values, 0.50),
- "p95": percentile(values, 0.95),
- }
- def stdev_or_zero(values: list[float]) -> float:
- if len(values) < 2:
- return 0.0
- return statistics.pstdev(values)
- def summarize_udp_stream(samples: list[float], success_times: list[float], timeout_marks: list[float], sent: int, mode: str) -> UdpStreamResult:
- summary = summarize(samples)
- max_gap_ms = 0.0
- if len(success_times) >= 2:
- gaps = [(success_times[idx] - success_times[idx - 1]) * 1000.0 for idx in range(1, len(success_times))]
- max_gap_ms = max(gaps)
- recovery_values: list[float] = []
- for timeout_at in timeout_marks:
- next_success = next((success_at for success_at in success_times if success_at > timeout_at), None)
- if next_success is not None:
- recovery_values.append((next_success - timeout_at) * 1000.0)
- max_streak = 0
- current_streak = 0
- if sent > 0:
- success_time_set = set(success_times)
- timeout_time_set = set(timeout_marks)
- ordered_events: list[tuple[float, bool]] = []
- ordered_events.extend((value, True) for value in success_time_set)
- ordered_events.extend((value, False) for value in timeout_time_set)
- ordered_events.sort(key=lambda item: item[0])
- for _at, is_success in ordered_events:
- if is_success:
- current_streak = 0
- else:
- current_streak += 1
- if current_streak > max_streak:
- max_streak = current_streak
- return UdpStreamResult(
- mode=mode,
- sent=sent,
- success=len(samples),
- timeouts=len(timeout_marks),
- max_timeout_streak=max_streak,
- recovery_count=len(recovery_values),
- avg_recovery_ms=statistics.mean(recovery_values) if recovery_values else 0.0,
- max_recovery_ms=max(recovery_values) if recovery_values else 0.0,
- max_gap_ms=max_gap_ms,
- jitter_ms=stdev_or_zero(samples),
- p95_ms=summary["p95"],
- max_latency_ms=max(samples) if samples else 0.0,
- )
- def fmt_ms(value: float) -> str:
- return f"{value:.2f}ms"
- def fmt_pct(value: float) -> str:
- return f"{value:+.1f}%"
- def verdict_from_diff(pct: float, threshold: float = 5.0) -> str:
- if pct <= -threshold:
- return "提升"
- if pct >= threshold:
- return "退化"
- return "无明显变化"
- def overall_verdict(changes: list[float]) -> str:
- if not changes:
- return "无可用结果"
- score = 0
- for change in changes:
- if change <= -5.0:
- score += 1
- elif change >= 5.0:
- score -= 1
- if score > 0:
- return "整体提升"
- if score < 0:
- return "整体退化"
- return "整体无明显变化"
- def progress_line(done: int, total: int, started_at: float) -> str:
- elapsed = time.perf_counter() - started_at
- if total <= 0:
- return "进度:0%"
- ratio = done / total if done else 0.0
- percent = int(ratio * 100) if done else 0
- remain = "--"
- if done:
- total_est = elapsed / ratio
- remain = f"{max(0.0, total_est - elapsed):.1f}s"
- return f"进度:{done}/{total} ({percent}%),已用 {elapsed:.1f}s,估算剩余:{remain}"
- def socks5_addr_bytes(host: str, port: int) -> bytes:
- try:
- return b"\x01" + socket.inet_aton(host) + port.to_bytes(2, "big")
- except OSError:
- try:
- return b"\x04" + socket.inet_pton(socket.AF_INET6, host) + port.to_bytes(2, "big")
- except OSError:
- raw = host.encode()
- return b"\x03" + bytes([len(raw)]) + raw + port.to_bytes(2, "big")
- async def socks5_handshake(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
- writer.write(b"\x05\x01\x00")
- await writer.drain()
- reply = await reader.readexactly(2)
- if reply != b"\x05\x00":
- raise ConnectionError("SOCKS5 握手失败")
- async def socks5_udp_associate(proxy_host: str, proxy_port: int, timeout: float) -> tuple[asyncio.StreamReader, asyncio.StreamWriter, tuple[str, int], float]:
- started = time.perf_counter()
- reader, writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port), timeout=timeout)
- await socks5_handshake(reader, writer)
- writer.write(b"\x05\x03\x00\x01\x00\x00\x00\x00\x00\x00")
- await writer.drain()
- head = await reader.readexactly(4)
- if head[1] != 0x00:
- raise ConnectionError(f"SOCKS5 UDP ASSOCIATE 失败: {head[1]}")
- atyp = head[3]
- if atyp == 1:
- relay_host = socket.inet_ntoa(await reader.readexactly(4))
- elif atyp == 3:
- size = (await reader.readexactly(1))[0]
- relay_host = (await reader.readexactly(size)).decode()
- elif atyp == 4:
- relay_host = socket.inet_ntop(socket.AF_INET6, await reader.readexactly(16))
- else:
- raise ConnectionError("SOCKS5 UDP relay ATYP 不支持")
- relay_port = int.from_bytes(await reader.readexactly(2), "big")
- return reader, writer, (relay_host, relay_port), (time.perf_counter() - started) * 1000
- def build_dns_query(name: str) -> bytes:
- query_id = int(time.time() * 1000) & 0xFFFF
- header = struct.pack("!HHHHHH", query_id, 0x0100, 1, 0, 0, 0)
- labels = b"".join(bytes([len(part)]) + part.encode() for part in name.strip(".").split(".") if part)
- return header + labels + b"\x00\x00\x01\x00\x01"
- def parse_dns_response(data: bytes) -> bool:
- if len(data) < 12:
- raise ValueError("DNS 响应过短")
- flags = int.from_bytes(data[2:4], "big")
- rcode = flags & 0x000F
- qdcount = int.from_bytes(data[4:6], "big")
- ancount = int.from_bytes(data[6:8], "big")
- return rcode == 0 and (ancount > 0 or qdcount > 0)
- def strip_socks_udp(packet: bytes) -> bytes:
- if len(packet) < 10:
- raise ValueError("SOCKS UDP 响应过短")
- atyp = packet[3]
- offset = 4
- if atyp == 1:
- offset += 4
- elif atyp == 3:
- size = packet[offset]
- offset += 1 + size
- elif atyp == 4:
- offset += 16
- else:
- raise ValueError("SOCKS UDP ATYP 不支持")
- offset += 2
- return packet[offset:]
- def edge_log_counters(log_path: Path) -> tuple[int, int]:
- if not log_path.exists():
- return 0, 0
- accept_count = 0
- win_count = 0
- for line in log_path.read_text(errors="replace").splitlines():
- if " accept peer=" in line:
- accept_count += 1
- if " tcp win session=" in line:
- win_count += 1
- return accept_count, win_count
- def edge_log_target_hits(log_path: Path, host: str, port: int) -> int:
- if not log_path.exists():
- return 0
- needle = f"target={host}:{port}"
- return sum(1 for line in log_path.read_text(errors="replace").splitlines() if needle in line)
- async def http_roundtrip(url: str, timeout: float) -> HttpResult:
- host, port, path, is_https = parse_url(url)
- writer: asyncio.StreamWriter | None = None
- started = time.perf_counter()
- try:
- if is_https:
- reader, writer = await asyncio.wait_for(
- asyncio.open_connection(host, port, ssl=ssl.create_default_context(), server_hostname=host),
- timeout=timeout,
- )
- else:
- reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
- connect_ms = (time.perf_counter() - started) * 1000
- request = (
- f"GET {path} HTTP/1.1\r\n"
- f"Host: {host}\r\n"
- "User-Agent: mynetspeeder-benchmark\r\n"
- "Accept: */*\r\n"
- "Connection: close\r\n\r\n"
- ).encode()
- send_started = time.perf_counter()
- writer.write(request)
- await writer.drain()
- header = await asyncio.wait_for(reader.readuntil(b"\r\n\r\n"), timeout=timeout)
- first_ms = (time.perf_counter() - send_started) * 1000
- status_line = header.split(b"\r\n", 1)[0].decode(errors="replace")
- status = 0
- if status_line.startswith("HTTP/"):
- parts = status_line.split()
- if len(parts) >= 2 and parts[1].isdigit():
- status = int(parts[1])
- total_ms = (time.perf_counter() - send_started) * 1000
- ok = 200 <= status < 400
- return HttpResult(connect_ms=connect_ms, first_ms=first_ms, total_ms=total_ms, ok=ok, status=status)
- except Exception as exc:
- return HttpResult(0.0, 0.0, 0.0, False, error=str(exc))
- finally:
- if writer is not None:
- with contextlib.suppress(Exception):
- writer.close()
- await writer.wait_closed()
- async def dns_roundtrip(server_host: str, server_port: int, timeout: float, proxy: tuple[str, int] | None) -> DnsResult:
- loop = asyncio.get_running_loop()
- query = build_dns_query("www.google.com")
- ctrl_writer: asyncio.StreamWriter | None = None
- sock: socket.socket | None = None
- started = time.perf_counter()
- try:
- if proxy is None:
- sock = socket.socket(socket.AF_INET6 if ":" in server_host else socket.AF_INET, socket.SOCK_DGRAM)
- sock.setblocking(False)
- await asyncio.wait_for(loop.sock_connect(sock, (server_host, server_port)), timeout=timeout)
- send_started = time.perf_counter()
- await loop.sock_sendall(sock, query)
- data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout)
- query_ms = (time.perf_counter() - send_started) * 1000
- else:
- _ctrl_reader, ctrl_writer, relay, associate_ms = await asyncio.wait_for(
- socks5_udp_associate(proxy[0], proxy[1], timeout),
- timeout=timeout,
- )
- family = socket.AF_INET6 if ":" in relay[0] else socket.AF_INET
- sock = socket.socket(family, socket.SOCK_DGRAM)
- sock.setblocking(False)
- await asyncio.wait_for(loop.sock_connect(sock, relay), timeout=timeout)
- packet = b"\x00\x00\x00" + socks5_addr_bytes(server_host, server_port) + query
- send_started = time.perf_counter()
- await loop.sock_sendall(sock, packet)
- data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout)
- data = strip_socks_udp(data)
- query_ms = (time.perf_counter() - send_started) * 1000
- started -= associate_ms / 1000.0
- ok = parse_dns_response(data)
- total_ms = (time.perf_counter() - started) * 1000
- return DnsResult(
- associate_ms=0.0 if proxy is None else max(0.0, total_ms - query_ms),
- query_ms=query_ms,
- total_ms=total_ms,
- ok=ok,
- )
- except Exception as exc:
- return DnsResult(0.0, 0.0, 0.0, False, error=str(exc))
- finally:
- if sock is not None:
- sock.close()
- if ctrl_writer is not None:
- with contextlib.suppress(Exception):
- ctrl_writer.close()
- await ctrl_writer.wait_closed()
- async def run_step(label: str, done: int, total: int, started_at: float, coro, timeout: float):
- print(f" 开始 {label}")
- task = asyncio.create_task(coro)
- step_started = time.perf_counter()
- while not task.done():
- print(f" {label} {progress_line(done, total, started_at)}")
- if time.perf_counter() - step_started >= timeout:
- task.cancel()
- with contextlib.suppress(asyncio.CancelledError):
- await task
- print(f" 超时 {label}")
- return None
- await asyncio.sleep(0.5)
- return await task
- async def http_roundtrip_as_user(script_path: Path, runtime_user: str, url: str, timeout: float) -> HttpResult:
- proc = await asyncio.create_subprocess_exec(
- "runuser",
- "-u",
- runtime_user,
- "--",
- "python3",
- str(script_path),
- "--child-http",
- url,
- "--child-timeout",
- str(timeout),
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- stdout, stderr = await proc.communicate()
- if proc.returncode != 0:
- error = stderr.decode(errors="replace").strip() or stdout.decode(errors="replace").strip() or f"子进程退出码 {proc.returncode}"
- return HttpResult(0.0, 0.0, 0.0, False, error=error)
- try:
- payload = json.loads(stdout.decode())
- return HttpResult(**payload)
- except Exception as exc:
- return HttpResult(0.0, 0.0, 0.0, False, error=f"解析子进程结果失败: {exc}")
- async def bench_tcp_transparent(args, script_path: Path) -> float | None:
- can_run_direct = os.geteuid() == 0
- current_user = os.environ.get("USER", "")
- if current_user == args.runtime_user:
- print("TCP 透明测试跳过:当前用户就是透明排除用户,无法命中 19080")
- return None
- if not can_run_direct:
- print("TCP 直连基线跳过:当前不是 root,无法切换到透明排除用户")
- return None
- target_host, target_port, _target_path, _is_https = parse_url(args.http_url)
- direct_results: list[HttpResult] = []
- transparent_results: list[HttpResult] = []
- total = args.count * 2
- started_at = time.perf_counter()
- pre_accept, pre_win = edge_log_counters(Path(args.edge_log))
- pre_target_hits = edge_log_target_hits(Path(args.edge_log), target_host, target_port)
- print("TCP 透明链路测试开始")
- print(f" 目标 URL: {args.http_url}")
- print(f" 直连基线用户: {args.runtime_user}(应绕过 transparent)")
- print(" 透明链路用户: 当前执行用户(应命中 transparent_edge)")
- if args.http_url.startswith("https://"):
- print(" 提示: transparent TCP 基准更建议使用 http:// 目标,HTTPS 目标更容易受上层握手行为影响")
- for index in range(args.count):
- direct_result = await run_step(
- f"TCP 直连 第{index + 1}步",
- index * 2 + 1,
- total,
- started_at,
- http_roundtrip_as_user(script_path, args.runtime_user, args.http_url, args.timeout),
- timeout=max(3.0, args.timeout + 3.0),
- )
- if isinstance(direct_result, HttpResult):
- direct_results.append(direct_result)
- transparent_result = await run_step(
- f"TCP 透明 第{index + 1}步",
- index * 2 + 2,
- total,
- started_at,
- http_roundtrip(args.http_url, args.timeout),
- timeout=max(3.0, args.timeout + 3.0),
- )
- if isinstance(transparent_result, HttpResult):
- transparent_results.append(transparent_result)
- post_accept, post_win = edge_log_counters(Path(args.edge_log))
- post_target_hits = edge_log_target_hits(Path(args.edge_log), target_host, target_port)
- direct_ok = [item for item in direct_results if item.ok]
- transparent_ok = [item for item in transparent_results if item.ok]
- print(f"TCP 目标: {args.http_url}")
- print(
- f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},连接均值 {fmt_ms(summarize([item.connect_ms for item in direct_ok])['avg'])},"
- f" 首包均值 {fmt_ms(summarize([item.first_ms for item in direct_ok])['avg'])},总耗时均值 {fmt_ms(summarize([item.total_ms for item in direct_ok])['avg'])}"
- )
- if direct_results and not direct_ok:
- print(f" 直连失败原因: {direct_results[0].error or '未知'}")
- print(
- f" 透明: 成功 {len(transparent_ok)}/{len(transparent_results)},连接均值 {fmt_ms(summarize([item.connect_ms for item in transparent_ok])['avg'])},"
- f" 首包均值 {fmt_ms(summarize([item.first_ms for item in transparent_ok])['avg'])},总耗时均值 {fmt_ms(summarize([item.total_ms for item in transparent_ok])['avg'])}"
- )
- if transparent_results and not transparent_ok:
- print(f" 透明失败原因: {transparent_results[0].error or '未知'}")
- print(
- f" Transparent 日志增量: accept +{post_accept - pre_accept},tcp win +{post_win - pre_win},"
- f" 目标命中 +{post_target_hits - pre_target_hits}"
- )
- if not direct_ok or not transparent_ok:
- print(" 对比: 无法比较,原因:直连或透明没有成功样本")
- return None
- direct_total = summarize([item.total_ms for item in direct_ok])["avg"]
- transparent_total = summarize([item.total_ms for item in transparent_ok])["avg"]
- diff = transparent_total - direct_total
- pct = (diff / direct_total * 100.0) if direct_total > 0 else 0.0
- print(f" 对比: transparent 总耗时变化 {fmt_pct(pct)},结论:{verdict_from_diff(pct)}")
- return pct
- async def bench_udp_socks(args, proxy: tuple[str, int] | None) -> float | None:
- direct_results: list[DnsResult] = []
- proxy_results: list[DnsResult] = []
- total = args.count * (2 if proxy else 1)
- started_at = time.perf_counter()
- print("UDP SOCKS 链路测试开始")
- print(f" DNS 目标: {args.dns_server_host}:{args.dns_server_port}")
- for index in range(args.count):
- direct_result = await run_step(
- f"UDP 直连 第{index + 1}步",
- index * (2 if proxy else 1) + 1,
- total,
- started_at,
- dns_roundtrip(args.dns_server_host, args.dns_server_port, args.timeout, None),
- timeout=max(3.0, args.timeout + 2.0),
- )
- if isinstance(direct_result, DnsResult):
- direct_results.append(direct_result)
- if proxy is None:
- continue
- proxy_result = await run_step(
- f"UDP SOCKS 第{index + 1}步",
- index * 2 + 2,
- total,
- started_at,
- dns_roundtrip(args.dns_server_host, args.dns_server_port, args.timeout, proxy),
- timeout=max(3.0, args.timeout + 2.0),
- )
- if isinstance(proxy_result, DnsResult):
- proxy_results.append(proxy_result)
- direct_ok = [item for item in direct_results if item.ok]
- proxy_ok = [item for item in proxy_results if item.ok]
- direct_query_values = [item.query_ms for item in direct_ok]
- proxy_query_values = [item.query_ms for item in proxy_ok]
- direct_stall_count = sum(1 for value in direct_query_values if value >= args.udp_stall_ms)
- proxy_stall_count = sum(1 for value in proxy_query_values if value >= args.udp_stall_ms)
- print(f"UDP 目标: {args.dns_server_host}:{args.dns_server_port}")
- print(
- f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},查询均值 {fmt_ms(summarize(direct_query_values)['avg'])},"
- f" p95 {fmt_ms(summarize(direct_query_values)['p95'])},抖动 {fmt_ms(stdev_or_zero(direct_query_values))},"
- f" 最慢 {fmt_ms(max(direct_query_values) if direct_query_values else 0.0)},卡顿次数 {direct_stall_count}"
- )
- if direct_results and not direct_ok:
- print(f" 直连失败原因: {direct_results[0].error or '未知'}")
- if proxy is None:
- print(" SOCKS: 未执行或不可用")
- return None
- print(
- f" SOCKS: 成功 {len(proxy_ok)}/{len(proxy_results)},关联均值 {fmt_ms(summarize([item.associate_ms for item in proxy_ok])['avg'])},"
- f" 查询均值 {fmt_ms(summarize(proxy_query_values)['avg'])},p95 {fmt_ms(summarize(proxy_query_values)['p95'])},"
- f" 抖动 {fmt_ms(stdev_or_zero(proxy_query_values))},最慢 {fmt_ms(max(proxy_query_values) if proxy_query_values else 0.0)},"
- f" 卡顿次数 {proxy_stall_count}"
- )
- if proxy_results and not proxy_ok:
- print(f" SOCKS 失败原因: {proxy_results[0].error or '未知'}")
- if not direct_ok or not proxy_ok:
- print(" 对比: 无法比较,原因:直连或 SOCKS 没有成功样本")
- return None
- direct_query = summarize(direct_query_values)["avg"]
- proxy_query = summarize(proxy_query_values)["avg"]
- diff = proxy_query - direct_query
- pct = (diff / direct_query * 100.0) if direct_query > 0 else 0.0
- print(f" 对比: SOCKS 查询耗时变化 {fmt_pct(pct)},结论:{verdict_from_diff(pct)}")
- stall_diff = proxy_stall_count - direct_stall_count
- jitter_diff = stdev_or_zero(proxy_query_values) - stdev_or_zero(direct_query_values)
- if stall_diff < 0:
- stability = "更稳"
- elif stall_diff > 0:
- stability = "更不稳"
- elif jitter_diff < -5.0:
- stability = "更稳"
- elif jitter_diff > 5.0:
- stability = "更不稳"
- else:
- stability = "接近"
- print(
- f" 稳定性: SOCKS 相比直连 {stability},卡顿次数变化 {stall_diff:+d},"
- f" 抖动变化 {fmt_ms(jitter_diff)},卡顿阈值 {fmt_ms(args.udp_stall_ms)}"
- )
- return pct
- async def udp_stream_once(server_host: str, server_port: int, timeout: float, proxy: tuple[str, int] | None) -> tuple[bool, float]:
- result = await dns_roundtrip(server_host, server_port, timeout, proxy)
- return result.ok, result.query_ms
- async def run_udp_stream_probe(args, proxy: tuple[str, int] | None, mode: str) -> UdpStreamResult:
- samples: list[float] = []
- success_times: list[float] = []
- timeout_marks: list[float] = []
- started = time.perf_counter()
- total = args.udp_stream_count
- for index in range(total):
- label = f"UDP 连续流 {mode} 第{index + 1}步"
- print(f" 开始 {label}")
- ok, query_ms = await udp_stream_once(args.dns_server_host, args.dns_server_port, args.timeout, proxy)
- now = time.perf_counter()
- print(f" {label} {progress_line(index + 1, total, started)}")
- if ok:
- samples.append(query_ms)
- success_times.append(now)
- else:
- timeout_marks.append(now)
- if index + 1 < total and args.udp_stream_interval_ms > 0:
- await asyncio.sleep(args.udp_stream_interval_ms / 1000.0)
- return summarize_udp_stream(samples, success_times, timeout_marks, total, mode)
- async def bench_udp_stream(args, proxy: tuple[str, int] | None) -> None:
- print("")
- print("UDP 连续流稳定性测试开始")
- print(
- f" 目标: {args.dns_server_host}:{args.dns_server_port},样本数: {args.udp_stream_count},"
- f" 间隔: {args.udp_stream_interval_ms}ms,超时阈值: {args.timeout:.1f}s"
- )
- direct = await run_udp_stream_probe(args, None, "直连")
- proxy_result = await run_udp_stream_probe(args, proxy, "SOCKS") if proxy is not None else None
- print("UDP 连续流结果")
- print(
- f" 直连: 成功 {direct.success}/{direct.sent},超时 {direct.timeouts},连续超时最大 {direct.max_timeout_streak},"
- f" p95 {fmt_ms(direct.p95_ms)},抖动 {fmt_ms(direct.jitter_ms)},最大间隔 {fmt_ms(direct.max_gap_ms)},"
- f" 恢复均值 {fmt_ms(direct.avg_recovery_ms)},恢复最慢 {fmt_ms(direct.max_recovery_ms)}"
- )
- if proxy_result is None:
- print(" SOCKS: 未执行或不可用")
- return
- print(
- f" SOCKS: 成功 {proxy_result.success}/{proxy_result.sent},超时 {proxy_result.timeouts},连续超时最大 {proxy_result.max_timeout_streak},"
- f" p95 {fmt_ms(proxy_result.p95_ms)},抖动 {fmt_ms(proxy_result.jitter_ms)},最大间隔 {fmt_ms(proxy_result.max_gap_ms)},"
- f" 恢复均值 {fmt_ms(proxy_result.avg_recovery_ms)},恢复最慢 {fmt_ms(proxy_result.max_recovery_ms)}"
- )
- recovery_diff = proxy_result.avg_recovery_ms - direct.avg_recovery_ms
- gap_diff = proxy_result.max_gap_ms - direct.max_gap_ms
- timeout_diff = proxy_result.timeouts - direct.timeouts
- streak_diff = proxy_result.max_timeout_streak - direct.max_timeout_streak
- if timeout_diff < 0 or streak_diff < 0 or recovery_diff < -20.0 or gap_diff < -20.0:
- verdict = "更稳"
- elif timeout_diff > 0 or streak_diff > 0 or recovery_diff > 20.0 or gap_diff > 20.0:
- verdict = "更不稳"
- else:
- verdict = "接近"
- print(
- f" 连续流稳定性: SOCKS 相比直连 {verdict},超时变化 {timeout_diff:+d},连续超时变化 {streak_diff:+d},"
- f" 最大间隔变化 {fmt_ms(gap_diff)},恢复均值变化 {fmt_ms(recovery_diff)}"
- )
- async def child_http_mode(url: str, timeout: float) -> int:
- result = await http_roundtrip(url, timeout)
- print(json.dumps(asdict(result), ensure_ascii=False))
- return 0 if result.ok else 1
- async def amain(args) -> int:
- if args.child_http:
- return await child_http_mode(args.child_http, args.child_timeout)
- if args.mode in ("tcp", "all"):
- _host, _port, _path, is_https = parse_url(args.http_url)
- if is_https:
- print("TCP 测试已跳过:transparent TCP 基线不支持使用 https:// 目标做有效对比")
- print("建议改用可快速返回的 http:// 目标,例如:")
- print(" --http-url http://connectivitycheck.gstatic.com/generate_204")
- if args.mode == "tcp":
- return 0
- config = Config.load(args.config) if args.config and Path(args.config).exists() else None
- proxy: tuple[str, int] | None = None
- if config is not None and config.socks_port > 0:
- proxy = (config.socks_host, config.socks_port)
- if args.proxy_host and args.proxy_port:
- proxy = (args.proxy_host, args.proxy_port)
- print("本地基准测试开始")
- print(f" 样本数: {args.count}")
- print(f" TCP 目标: {args.http_url}")
- print(f" UDP 目标: {args.dns_server_host}:{args.dns_server_port}")
- print(f" SOCKS: {proxy[0]}:{proxy[1]}" if proxy else " SOCKS: 未配置或未启动")
- print(f" Transparent 排除用户: {args.runtime_user}")
- print(" 说明: TCP 比较的是“直连基线 vs transparent_edge 实际链路”,UDP 比较的是“直连 vs socks_edge 实际链路”")
- print("")
- tcp_pct = None
- udp_pct = None
- script_path = Path(__file__).resolve()
- if args.mode in ("tcp", "all") and args.http_url.startswith("http://"):
- tcp_pct = await bench_tcp_transparent(args, script_path)
- print("")
- if args.mode in ("udp", "all"):
- udp_pct = await bench_udp_socks(args, proxy)
- if args.udp_stream_count > 0:
- await bench_udp_stream(args, proxy)
- changes = [value for value in (tcp_pct, udp_pct) if value is not None]
- if not changes:
- print("中文总结:无可用结果")
- return 0
- parts = []
- if tcp_pct is not None:
- parts.append(f"TCP={verdict_from_diff(tcp_pct)}")
- if udp_pct is not None:
- parts.append(f"UDP={verdict_from_diff(udp_pct)}")
- print(f"中文总结:{overall_verdict(changes)}({', '.join(parts)})")
- return 0
- def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(description="mynetspeeder 真实链路本地手工基准测试")
- parser.add_argument("--config", default="/home/mynetspeeder/config.json", help="配置文件路径")
- parser.add_argument("--proxy-host", default="", help="SOCKS5 地址,默认读取 config.json")
- parser.add_argument("--proxy-port", type=int, default=0, help="SOCKS5 端口,默认读取 config.json")
- parser.add_argument("--http-url", default="http://connectivitycheck.gstatic.com/generate_204", help="TCP/透明链路测试 URL,建议使用 http://")
- parser.add_argument("--dns-server", default="8.8.8.8:53", help="UDP/SOCKS 链路 DNS 目标,格式 host:port")
- parser.add_argument("--mode", choices=("tcp", "udp", "all"), default="all", help="只测 TCP、只测 UDP 或都测")
- parser.add_argument("--count", type=int, default=4, help="每类测试样本数,默认 4")
- parser.add_argument("--timeout", type=float, default=3.0, help="单次测试超时秒数")
- parser.add_argument("--udp-stall-ms", type=float, default=120.0, help="UDP 单次查询超过该值记为一次疑似卡顿")
- parser.add_argument("--udp-stream-count", type=int, default=20, help="UDP 连续流稳定性测试样本数,设为 0 则关闭")
- parser.add_argument("--udp-stream-interval-ms", type=float, default=200.0, help="UDP 连续流相邻样本间隔毫秒")
- parser.add_argument("--runtime-user", default=os.environ.get("MYNETSPEEDER_USER", "mynetspeeder"), help="transparent 排除用户")
- parser.add_argument("--edge-log", default="/var/log/mynetspeeder-edge.log", help="transparent edge 日志路径")
- parser.add_argument("--child-http", default="", help=argparse.SUPPRESS)
- parser.add_argument("--child-timeout", type=float, default=3.0, help=argparse.SUPPRESS)
- return parser
- def main() -> int:
- args = build_parser().parse_args()
- args.dns_server_host, args.dns_server_port = parse_host_port(args.dns_server)
- return asyncio.run(amain(args))
- if __name__ == "__main__":
- raise SystemExit(main())
|