benchmark_local.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763
  1. #!/usr/bin/env python3
  2. from __future__ import annotations
  3. import argparse
  4. import asyncio
  5. import contextlib
  6. import json
  7. import os
  8. import socket
  9. import ssl
  10. import statistics
  11. import struct
  12. import sys
  13. import time
  14. from dataclasses import asdict, dataclass
  15. from pathlib import Path
  16. from urllib.parse import urlparse
  17. ROOT = Path(__file__).resolve().parents[2]
  18. if str(ROOT) not in sys.path:
  19. sys.path.insert(0, str(ROOT))
  20. from mynetspeeder.config import Config # noqa: E402
  21. @dataclass
  22. class HttpResult:
  23. connect_ms: float
  24. first_ms: float
  25. total_ms: float
  26. ok: bool
  27. status: int = 0
  28. error: str = ""
  29. @dataclass
  30. class DnsResult:
  31. associate_ms: float
  32. query_ms: float
  33. total_ms: float
  34. ok: bool
  35. error: str = ""
  36. @dataclass
  37. class UdpStreamResult:
  38. mode: str
  39. sent: int
  40. success: int
  41. timeouts: int
  42. max_timeout_streak: int
  43. recovery_count: int
  44. avg_recovery_ms: float
  45. max_recovery_ms: float
  46. max_gap_ms: float
  47. jitter_ms: float
  48. p95_ms: float
  49. max_latency_ms: float
  50. def parse_host_port(text: str) -> tuple[str, int]:
  51. if text.startswith("["):
  52. host, rest = text[1:].split("]", 1)
  53. if not rest.startswith(":"):
  54. raise ValueError(f"无效地址: {text}")
  55. return host, int(rest[1:])
  56. if text.count(":") == 1:
  57. host, port = text.rsplit(":", 1)
  58. return host, int(port)
  59. raise ValueError(f"无效地址: {text}")
  60. def parse_url(text: str) -> tuple[str, int, str, bool]:
  61. parsed = urlparse(text)
  62. if parsed.scheme not in ("http", "https"):
  63. raise ValueError(f"仅支持 http/https: {text}")
  64. if not parsed.hostname:
  65. raise ValueError(f"无效 URL: {text}")
  66. port = parsed.port or (443 if parsed.scheme == "https" else 80)
  67. path = parsed.path or "/"
  68. if parsed.query:
  69. path = f"{path}?{parsed.query}"
  70. return parsed.hostname, port, path, parsed.scheme == "https"
  71. def percentile(values: list[float], pct: float) -> float:
  72. if not values:
  73. return 0.0
  74. ordered = sorted(values)
  75. if len(ordered) == 1:
  76. return ordered[0]
  77. idx = (len(ordered) - 1) * pct
  78. lo = int(idx)
  79. hi = min(lo + 1, len(ordered) - 1)
  80. weight = idx - lo
  81. return ordered[lo] * (1 - weight) + ordered[hi] * weight
  82. def summarize(values: list[float]) -> dict[str, float]:
  83. if not values:
  84. return {"avg": 0.0, "p50": 0.0, "p95": 0.0}
  85. return {
  86. "avg": statistics.mean(values),
  87. "p50": percentile(values, 0.50),
  88. "p95": percentile(values, 0.95),
  89. }
  90. def stdev_or_zero(values: list[float]) -> float:
  91. if len(values) < 2:
  92. return 0.0
  93. return statistics.pstdev(values)
  94. def summarize_udp_stream(samples: list[float], success_times: list[float], timeout_marks: list[float], sent: int, mode: str) -> UdpStreamResult:
  95. summary = summarize(samples)
  96. max_gap_ms = 0.0
  97. if len(success_times) >= 2:
  98. gaps = [(success_times[idx] - success_times[idx - 1]) * 1000.0 for idx in range(1, len(success_times))]
  99. max_gap_ms = max(gaps)
  100. recovery_values: list[float] = []
  101. for timeout_at in timeout_marks:
  102. next_success = next((success_at for success_at in success_times if success_at > timeout_at), None)
  103. if next_success is not None:
  104. recovery_values.append((next_success - timeout_at) * 1000.0)
  105. max_streak = 0
  106. current_streak = 0
  107. if sent > 0:
  108. success_time_set = set(success_times)
  109. timeout_time_set = set(timeout_marks)
  110. ordered_events: list[tuple[float, bool]] = []
  111. ordered_events.extend((value, True) for value in success_time_set)
  112. ordered_events.extend((value, False) for value in timeout_time_set)
  113. ordered_events.sort(key=lambda item: item[0])
  114. for _at, is_success in ordered_events:
  115. if is_success:
  116. current_streak = 0
  117. else:
  118. current_streak += 1
  119. if current_streak > max_streak:
  120. max_streak = current_streak
  121. return UdpStreamResult(
  122. mode=mode,
  123. sent=sent,
  124. success=len(samples),
  125. timeouts=len(timeout_marks),
  126. max_timeout_streak=max_streak,
  127. recovery_count=len(recovery_values),
  128. avg_recovery_ms=statistics.mean(recovery_values) if recovery_values else 0.0,
  129. max_recovery_ms=max(recovery_values) if recovery_values else 0.0,
  130. max_gap_ms=max_gap_ms,
  131. jitter_ms=stdev_or_zero(samples),
  132. p95_ms=summary["p95"],
  133. max_latency_ms=max(samples) if samples else 0.0,
  134. )
  135. def fmt_ms(value: float) -> str:
  136. return f"{value:.2f}ms"
  137. def fmt_pct(value: float) -> str:
  138. return f"{value:+.1f}%"
  139. def verdict_from_diff(pct: float, threshold: float = 5.0) -> str:
  140. if pct <= -threshold:
  141. return "提升"
  142. if pct >= threshold:
  143. return "退化"
  144. return "无明显变化"
  145. def overall_verdict(changes: list[float]) -> str:
  146. if not changes:
  147. return "无可用结果"
  148. score = 0
  149. for change in changes:
  150. if change <= -5.0:
  151. score += 1
  152. elif change >= 5.0:
  153. score -= 1
  154. if score > 0:
  155. return "整体提升"
  156. if score < 0:
  157. return "整体退化"
  158. return "整体无明显变化"
  159. def progress_line(done: int, total: int, started_at: float) -> str:
  160. elapsed = time.perf_counter() - started_at
  161. if total <= 0:
  162. return "进度:0%"
  163. ratio = done / total if done else 0.0
  164. percent = int(ratio * 100) if done else 0
  165. remain = "--"
  166. if done:
  167. total_est = elapsed / ratio
  168. remain = f"{max(0.0, total_est - elapsed):.1f}s"
  169. return f"进度:{done}/{total} ({percent}%),已用 {elapsed:.1f}s,估算剩余:{remain}"
  170. def socks5_addr_bytes(host: str, port: int) -> bytes:
  171. try:
  172. return b"\x01" + socket.inet_aton(host) + port.to_bytes(2, "big")
  173. except OSError:
  174. try:
  175. return b"\x04" + socket.inet_pton(socket.AF_INET6, host) + port.to_bytes(2, "big")
  176. except OSError:
  177. raw = host.encode()
  178. return b"\x03" + bytes([len(raw)]) + raw + port.to_bytes(2, "big")
  179. async def socks5_handshake(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
  180. writer.write(b"\x05\x01\x00")
  181. await writer.drain()
  182. reply = await reader.readexactly(2)
  183. if reply != b"\x05\x00":
  184. raise ConnectionError("SOCKS5 握手失败")
  185. async def socks5_udp_associate(proxy_host: str, proxy_port: int, timeout: float) -> tuple[asyncio.StreamReader, asyncio.StreamWriter, tuple[str, int], float]:
  186. started = time.perf_counter()
  187. reader, writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port), timeout=timeout)
  188. await socks5_handshake(reader, writer)
  189. writer.write(b"\x05\x03\x00\x01\x00\x00\x00\x00\x00\x00")
  190. await writer.drain()
  191. head = await reader.readexactly(4)
  192. if head[1] != 0x00:
  193. raise ConnectionError(f"SOCKS5 UDP ASSOCIATE 失败: {head[1]}")
  194. atyp = head[3]
  195. if atyp == 1:
  196. relay_host = socket.inet_ntoa(await reader.readexactly(4))
  197. elif atyp == 3:
  198. size = (await reader.readexactly(1))[0]
  199. relay_host = (await reader.readexactly(size)).decode()
  200. elif atyp == 4:
  201. relay_host = socket.inet_ntop(socket.AF_INET6, await reader.readexactly(16))
  202. else:
  203. raise ConnectionError("SOCKS5 UDP relay ATYP 不支持")
  204. relay_port = int.from_bytes(await reader.readexactly(2), "big")
  205. return reader, writer, (relay_host, relay_port), (time.perf_counter() - started) * 1000
  206. def build_dns_query(name: str) -> bytes:
  207. query_id = int(time.time() * 1000) & 0xFFFF
  208. header = struct.pack("!HHHHHH", query_id, 0x0100, 1, 0, 0, 0)
  209. labels = b"".join(bytes([len(part)]) + part.encode() for part in name.strip(".").split(".") if part)
  210. return header + labels + b"\x00\x00\x01\x00\x01"
  211. def parse_dns_response(data: bytes) -> bool:
  212. if len(data) < 12:
  213. raise ValueError("DNS 响应过短")
  214. flags = int.from_bytes(data[2:4], "big")
  215. rcode = flags & 0x000F
  216. qdcount = int.from_bytes(data[4:6], "big")
  217. ancount = int.from_bytes(data[6:8], "big")
  218. return rcode == 0 and (ancount > 0 or qdcount > 0)
  219. def strip_socks_udp(packet: bytes) -> bytes:
  220. if len(packet) < 10:
  221. raise ValueError("SOCKS UDP 响应过短")
  222. atyp = packet[3]
  223. offset = 4
  224. if atyp == 1:
  225. offset += 4
  226. elif atyp == 3:
  227. size = packet[offset]
  228. offset += 1 + size
  229. elif atyp == 4:
  230. offset += 16
  231. else:
  232. raise ValueError("SOCKS UDP ATYP 不支持")
  233. offset += 2
  234. return packet[offset:]
  235. def edge_log_counters(log_path: Path) -> tuple[int, int]:
  236. if not log_path.exists():
  237. return 0, 0
  238. accept_count = 0
  239. win_count = 0
  240. for line in log_path.read_text(errors="replace").splitlines():
  241. if " accept peer=" in line:
  242. accept_count += 1
  243. if " tcp win session=" in line:
  244. win_count += 1
  245. return accept_count, win_count
  246. def edge_log_target_hits(log_path: Path, host: str, port: int) -> int:
  247. if not log_path.exists():
  248. return 0
  249. needle = f"target={host}:{port}"
  250. return sum(1 for line in log_path.read_text(errors="replace").splitlines() if needle in line)
  251. async def http_roundtrip(url: str, timeout: float) -> HttpResult:
  252. host, port, path, is_https = parse_url(url)
  253. writer: asyncio.StreamWriter | None = None
  254. started = time.perf_counter()
  255. try:
  256. if is_https:
  257. reader, writer = await asyncio.wait_for(
  258. asyncio.open_connection(host, port, ssl=ssl.create_default_context(), server_hostname=host),
  259. timeout=timeout,
  260. )
  261. else:
  262. reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
  263. connect_ms = (time.perf_counter() - started) * 1000
  264. request = (
  265. f"GET {path} HTTP/1.1\r\n"
  266. f"Host: {host}\r\n"
  267. "User-Agent: mynetspeeder-benchmark\r\n"
  268. "Accept: */*\r\n"
  269. "Connection: close\r\n\r\n"
  270. ).encode()
  271. send_started = time.perf_counter()
  272. writer.write(request)
  273. await writer.drain()
  274. header = await asyncio.wait_for(reader.readuntil(b"\r\n\r\n"), timeout=timeout)
  275. first_ms = (time.perf_counter() - send_started) * 1000
  276. status_line = header.split(b"\r\n", 1)[0].decode(errors="replace")
  277. status = 0
  278. if status_line.startswith("HTTP/"):
  279. parts = status_line.split()
  280. if len(parts) >= 2 and parts[1].isdigit():
  281. status = int(parts[1])
  282. total_ms = (time.perf_counter() - send_started) * 1000
  283. ok = 200 <= status < 400
  284. return HttpResult(connect_ms=connect_ms, first_ms=first_ms, total_ms=total_ms, ok=ok, status=status)
  285. except Exception as exc:
  286. return HttpResult(0.0, 0.0, 0.0, False, error=str(exc))
  287. finally:
  288. if writer is not None:
  289. with contextlib.suppress(Exception):
  290. writer.close()
  291. await writer.wait_closed()
  292. async def dns_roundtrip(server_host: str, server_port: int, timeout: float, proxy: tuple[str, int] | None) -> DnsResult:
  293. loop = asyncio.get_running_loop()
  294. query = build_dns_query("www.google.com")
  295. ctrl_writer: asyncio.StreamWriter | None = None
  296. sock: socket.socket | None = None
  297. started = time.perf_counter()
  298. try:
  299. if proxy is None:
  300. sock = socket.socket(socket.AF_INET6 if ":" in server_host else socket.AF_INET, socket.SOCK_DGRAM)
  301. sock.setblocking(False)
  302. await asyncio.wait_for(loop.sock_connect(sock, (server_host, server_port)), timeout=timeout)
  303. send_started = time.perf_counter()
  304. await loop.sock_sendall(sock, query)
  305. data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout)
  306. query_ms = (time.perf_counter() - send_started) * 1000
  307. else:
  308. _ctrl_reader, ctrl_writer, relay, associate_ms = await asyncio.wait_for(
  309. socks5_udp_associate(proxy[0], proxy[1], timeout),
  310. timeout=timeout,
  311. )
  312. family = socket.AF_INET6 if ":" in relay[0] else socket.AF_INET
  313. sock = socket.socket(family, socket.SOCK_DGRAM)
  314. sock.setblocking(False)
  315. await asyncio.wait_for(loop.sock_connect(sock, relay), timeout=timeout)
  316. packet = b"\x00\x00\x00" + socks5_addr_bytes(server_host, server_port) + query
  317. send_started = time.perf_counter()
  318. await loop.sock_sendall(sock, packet)
  319. data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout)
  320. data = strip_socks_udp(data)
  321. query_ms = (time.perf_counter() - send_started) * 1000
  322. started -= associate_ms / 1000.0
  323. ok = parse_dns_response(data)
  324. total_ms = (time.perf_counter() - started) * 1000
  325. return DnsResult(
  326. associate_ms=0.0 if proxy is None else max(0.0, total_ms - query_ms),
  327. query_ms=query_ms,
  328. total_ms=total_ms,
  329. ok=ok,
  330. )
  331. except Exception as exc:
  332. return DnsResult(0.0, 0.0, 0.0, False, error=str(exc))
  333. finally:
  334. if sock is not None:
  335. sock.close()
  336. if ctrl_writer is not None:
  337. with contextlib.suppress(Exception):
  338. ctrl_writer.close()
  339. await ctrl_writer.wait_closed()
  340. async def run_step(label: str, done: int, total: int, started_at: float, coro, timeout: float):
  341. print(f" 开始 {label}")
  342. task = asyncio.create_task(coro)
  343. step_started = time.perf_counter()
  344. while not task.done():
  345. print(f" {label} {progress_line(done, total, started_at)}")
  346. if time.perf_counter() - step_started >= timeout:
  347. task.cancel()
  348. with contextlib.suppress(asyncio.CancelledError):
  349. await task
  350. print(f" 超时 {label}")
  351. return None
  352. await asyncio.sleep(0.5)
  353. return await task
  354. async def http_roundtrip_as_user(script_path: Path, runtime_user: str, url: str, timeout: float) -> HttpResult:
  355. proc = await asyncio.create_subprocess_exec(
  356. "runuser",
  357. "-u",
  358. runtime_user,
  359. "--",
  360. "python3",
  361. str(script_path),
  362. "--child-http",
  363. url,
  364. "--child-timeout",
  365. str(timeout),
  366. stdout=asyncio.subprocess.PIPE,
  367. stderr=asyncio.subprocess.PIPE,
  368. )
  369. stdout, stderr = await proc.communicate()
  370. if proc.returncode != 0:
  371. error = stderr.decode(errors="replace").strip() or stdout.decode(errors="replace").strip() or f"子进程退出码 {proc.returncode}"
  372. return HttpResult(0.0, 0.0, 0.0, False, error=error)
  373. try:
  374. payload = json.loads(stdout.decode())
  375. return HttpResult(**payload)
  376. except Exception as exc:
  377. return HttpResult(0.0, 0.0, 0.0, False, error=f"解析子进程结果失败: {exc}")
  378. async def bench_tcp_transparent(args, script_path: Path) -> float | None:
  379. can_run_direct = os.geteuid() == 0
  380. current_user = os.environ.get("USER", "")
  381. if current_user == args.runtime_user:
  382. print("TCP 透明测试跳过:当前用户就是透明排除用户,无法命中 19080")
  383. return None
  384. if not can_run_direct:
  385. print("TCP 直连基线跳过:当前不是 root,无法切换到透明排除用户")
  386. return None
  387. target_host, target_port, _target_path, _is_https = parse_url(args.http_url)
  388. direct_results: list[HttpResult] = []
  389. transparent_results: list[HttpResult] = []
  390. total = args.count * 2
  391. started_at = time.perf_counter()
  392. pre_accept, pre_win = edge_log_counters(Path(args.edge_log))
  393. pre_target_hits = edge_log_target_hits(Path(args.edge_log), target_host, target_port)
  394. print("TCP 透明链路测试开始")
  395. print(f" 目标 URL: {args.http_url}")
  396. print(f" 直连基线用户: {args.runtime_user}(应绕过 transparent)")
  397. print(" 透明链路用户: 当前执行用户(应命中 transparent_edge)")
  398. if args.http_url.startswith("https://"):
  399. print(" 提示: transparent TCP 基准更建议使用 http:// 目标,HTTPS 目标更容易受上层握手行为影响")
  400. for index in range(args.count):
  401. direct_result = await run_step(
  402. f"TCP 直连 第{index + 1}步",
  403. index * 2 + 1,
  404. total,
  405. started_at,
  406. http_roundtrip_as_user(script_path, args.runtime_user, args.http_url, args.timeout),
  407. timeout=max(3.0, args.timeout + 3.0),
  408. )
  409. if isinstance(direct_result, HttpResult):
  410. direct_results.append(direct_result)
  411. transparent_result = await run_step(
  412. f"TCP 透明 第{index + 1}步",
  413. index * 2 + 2,
  414. total,
  415. started_at,
  416. http_roundtrip(args.http_url, args.timeout),
  417. timeout=max(3.0, args.timeout + 3.0),
  418. )
  419. if isinstance(transparent_result, HttpResult):
  420. transparent_results.append(transparent_result)
  421. post_accept, post_win = edge_log_counters(Path(args.edge_log))
  422. post_target_hits = edge_log_target_hits(Path(args.edge_log), target_host, target_port)
  423. direct_ok = [item for item in direct_results if item.ok]
  424. transparent_ok = [item for item in transparent_results if item.ok]
  425. print(f"TCP 目标: {args.http_url}")
  426. print(
  427. f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},连接均值 {fmt_ms(summarize([item.connect_ms for item in direct_ok])['avg'])},"
  428. f" 首包均值 {fmt_ms(summarize([item.first_ms for item in direct_ok])['avg'])},总耗时均值 {fmt_ms(summarize([item.total_ms for item in direct_ok])['avg'])}"
  429. )
  430. if direct_results and not direct_ok:
  431. print(f" 直连失败原因: {direct_results[0].error or '未知'}")
  432. print(
  433. f" 透明: 成功 {len(transparent_ok)}/{len(transparent_results)},连接均值 {fmt_ms(summarize([item.connect_ms for item in transparent_ok])['avg'])},"
  434. f" 首包均值 {fmt_ms(summarize([item.first_ms for item in transparent_ok])['avg'])},总耗时均值 {fmt_ms(summarize([item.total_ms for item in transparent_ok])['avg'])}"
  435. )
  436. if transparent_results and not transparent_ok:
  437. print(f" 透明失败原因: {transparent_results[0].error or '未知'}")
  438. print(
  439. f" Transparent 日志增量: accept +{post_accept - pre_accept},tcp win +{post_win - pre_win},"
  440. f" 目标命中 +{post_target_hits - pre_target_hits}"
  441. )
  442. if not direct_ok or not transparent_ok:
  443. print(" 对比: 无法比较,原因:直连或透明没有成功样本")
  444. return None
  445. direct_total = summarize([item.total_ms for item in direct_ok])["avg"]
  446. transparent_total = summarize([item.total_ms for item in transparent_ok])["avg"]
  447. diff = transparent_total - direct_total
  448. pct = (diff / direct_total * 100.0) if direct_total > 0 else 0.0
  449. print(f" 对比: transparent 总耗时变化 {fmt_pct(pct)},结论:{verdict_from_diff(pct)}")
  450. return pct
  451. async def bench_udp_socks(args, proxy: tuple[str, int] | None) -> float | None:
  452. direct_results: list[DnsResult] = []
  453. proxy_results: list[DnsResult] = []
  454. total = args.count * (2 if proxy else 1)
  455. started_at = time.perf_counter()
  456. print("UDP SOCKS 链路测试开始")
  457. print(f" DNS 目标: {args.dns_server_host}:{args.dns_server_port}")
  458. for index in range(args.count):
  459. direct_result = await run_step(
  460. f"UDP 直连 第{index + 1}步",
  461. index * (2 if proxy else 1) + 1,
  462. total,
  463. started_at,
  464. dns_roundtrip(args.dns_server_host, args.dns_server_port, args.timeout, None),
  465. timeout=max(3.0, args.timeout + 2.0),
  466. )
  467. if isinstance(direct_result, DnsResult):
  468. direct_results.append(direct_result)
  469. if proxy is None:
  470. continue
  471. proxy_result = await run_step(
  472. f"UDP SOCKS 第{index + 1}步",
  473. index * 2 + 2,
  474. total,
  475. started_at,
  476. dns_roundtrip(args.dns_server_host, args.dns_server_port, args.timeout, proxy),
  477. timeout=max(3.0, args.timeout + 2.0),
  478. )
  479. if isinstance(proxy_result, DnsResult):
  480. proxy_results.append(proxy_result)
  481. direct_ok = [item for item in direct_results if item.ok]
  482. proxy_ok = [item for item in proxy_results if item.ok]
  483. direct_query_values = [item.query_ms for item in direct_ok]
  484. proxy_query_values = [item.query_ms for item in proxy_ok]
  485. direct_stall_count = sum(1 for value in direct_query_values if value >= args.udp_stall_ms)
  486. proxy_stall_count = sum(1 for value in proxy_query_values if value >= args.udp_stall_ms)
  487. print(f"UDP 目标: {args.dns_server_host}:{args.dns_server_port}")
  488. print(
  489. f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},查询均值 {fmt_ms(summarize(direct_query_values)['avg'])},"
  490. f" p95 {fmt_ms(summarize(direct_query_values)['p95'])},抖动 {fmt_ms(stdev_or_zero(direct_query_values))},"
  491. f" 最慢 {fmt_ms(max(direct_query_values) if direct_query_values else 0.0)},卡顿次数 {direct_stall_count}"
  492. )
  493. if direct_results and not direct_ok:
  494. print(f" 直连失败原因: {direct_results[0].error or '未知'}")
  495. if proxy is None:
  496. print(" SOCKS: 未执行或不可用")
  497. return None
  498. print(
  499. f" SOCKS: 成功 {len(proxy_ok)}/{len(proxy_results)},关联均值 {fmt_ms(summarize([item.associate_ms for item in proxy_ok])['avg'])},"
  500. f" 查询均值 {fmt_ms(summarize(proxy_query_values)['avg'])},p95 {fmt_ms(summarize(proxy_query_values)['p95'])},"
  501. f" 抖动 {fmt_ms(stdev_or_zero(proxy_query_values))},最慢 {fmt_ms(max(proxy_query_values) if proxy_query_values else 0.0)},"
  502. f" 卡顿次数 {proxy_stall_count}"
  503. )
  504. if proxy_results and not proxy_ok:
  505. print(f" SOCKS 失败原因: {proxy_results[0].error or '未知'}")
  506. if not direct_ok or not proxy_ok:
  507. print(" 对比: 无法比较,原因:直连或 SOCKS 没有成功样本")
  508. return None
  509. direct_query = summarize(direct_query_values)["avg"]
  510. proxy_query = summarize(proxy_query_values)["avg"]
  511. diff = proxy_query - direct_query
  512. pct = (diff / direct_query * 100.0) if direct_query > 0 else 0.0
  513. print(f" 对比: SOCKS 查询耗时变化 {fmt_pct(pct)},结论:{verdict_from_diff(pct)}")
  514. stall_diff = proxy_stall_count - direct_stall_count
  515. jitter_diff = stdev_or_zero(proxy_query_values) - stdev_or_zero(direct_query_values)
  516. if stall_diff < 0:
  517. stability = "更稳"
  518. elif stall_diff > 0:
  519. stability = "更不稳"
  520. elif jitter_diff < -5.0:
  521. stability = "更稳"
  522. elif jitter_diff > 5.0:
  523. stability = "更不稳"
  524. else:
  525. stability = "接近"
  526. print(
  527. f" 稳定性: SOCKS 相比直连 {stability},卡顿次数变化 {stall_diff:+d},"
  528. f" 抖动变化 {fmt_ms(jitter_diff)},卡顿阈值 {fmt_ms(args.udp_stall_ms)}"
  529. )
  530. return pct
  531. async def udp_stream_once(server_host: str, server_port: int, timeout: float, proxy: tuple[str, int] | None) -> tuple[bool, float]:
  532. result = await dns_roundtrip(server_host, server_port, timeout, proxy)
  533. return result.ok, result.query_ms
  534. async def run_udp_stream_probe(args, proxy: tuple[str, int] | None, mode: str) -> UdpStreamResult:
  535. samples: list[float] = []
  536. success_times: list[float] = []
  537. timeout_marks: list[float] = []
  538. started = time.perf_counter()
  539. total = args.udp_stream_count
  540. for index in range(total):
  541. label = f"UDP 连续流 {mode} 第{index + 1}步"
  542. print(f" 开始 {label}")
  543. ok, query_ms = await udp_stream_once(args.dns_server_host, args.dns_server_port, args.timeout, proxy)
  544. now = time.perf_counter()
  545. print(f" {label} {progress_line(index + 1, total, started)}")
  546. if ok:
  547. samples.append(query_ms)
  548. success_times.append(now)
  549. else:
  550. timeout_marks.append(now)
  551. if index + 1 < total and args.udp_stream_interval_ms > 0:
  552. await asyncio.sleep(args.udp_stream_interval_ms / 1000.0)
  553. return summarize_udp_stream(samples, success_times, timeout_marks, total, mode)
  554. async def bench_udp_stream(args, proxy: tuple[str, int] | None) -> None:
  555. print("")
  556. print("UDP 连续流稳定性测试开始")
  557. print(
  558. f" 目标: {args.dns_server_host}:{args.dns_server_port},样本数: {args.udp_stream_count},"
  559. f" 间隔: {args.udp_stream_interval_ms}ms,超时阈值: {args.timeout:.1f}s"
  560. )
  561. direct = await run_udp_stream_probe(args, None, "直连")
  562. proxy_result = await run_udp_stream_probe(args, proxy, "SOCKS") if proxy is not None else None
  563. print("UDP 连续流结果")
  564. print(
  565. f" 直连: 成功 {direct.success}/{direct.sent},超时 {direct.timeouts},连续超时最大 {direct.max_timeout_streak},"
  566. f" p95 {fmt_ms(direct.p95_ms)},抖动 {fmt_ms(direct.jitter_ms)},最大间隔 {fmt_ms(direct.max_gap_ms)},"
  567. f" 恢复均值 {fmt_ms(direct.avg_recovery_ms)},恢复最慢 {fmt_ms(direct.max_recovery_ms)}"
  568. )
  569. if proxy_result is None:
  570. print(" SOCKS: 未执行或不可用")
  571. return
  572. print(
  573. f" SOCKS: 成功 {proxy_result.success}/{proxy_result.sent},超时 {proxy_result.timeouts},连续超时最大 {proxy_result.max_timeout_streak},"
  574. f" p95 {fmt_ms(proxy_result.p95_ms)},抖动 {fmt_ms(proxy_result.jitter_ms)},最大间隔 {fmt_ms(proxy_result.max_gap_ms)},"
  575. f" 恢复均值 {fmt_ms(proxy_result.avg_recovery_ms)},恢复最慢 {fmt_ms(proxy_result.max_recovery_ms)}"
  576. )
  577. recovery_diff = proxy_result.avg_recovery_ms - direct.avg_recovery_ms
  578. gap_diff = proxy_result.max_gap_ms - direct.max_gap_ms
  579. timeout_diff = proxy_result.timeouts - direct.timeouts
  580. streak_diff = proxy_result.max_timeout_streak - direct.max_timeout_streak
  581. if timeout_diff < 0 or streak_diff < 0 or recovery_diff < -20.0 or gap_diff < -20.0:
  582. verdict = "更稳"
  583. elif timeout_diff > 0 or streak_diff > 0 or recovery_diff > 20.0 or gap_diff > 20.0:
  584. verdict = "更不稳"
  585. else:
  586. verdict = "接近"
  587. print(
  588. f" 连续流稳定性: SOCKS 相比直连 {verdict},超时变化 {timeout_diff:+d},连续超时变化 {streak_diff:+d},"
  589. f" 最大间隔变化 {fmt_ms(gap_diff)},恢复均值变化 {fmt_ms(recovery_diff)}"
  590. )
  591. async def child_http_mode(url: str, timeout: float) -> int:
  592. result = await http_roundtrip(url, timeout)
  593. print(json.dumps(asdict(result), ensure_ascii=False))
  594. return 0 if result.ok else 1
  595. async def amain(args) -> int:
  596. if args.child_http:
  597. return await child_http_mode(args.child_http, args.child_timeout)
  598. if args.mode in ("tcp", "all"):
  599. _host, _port, _path, is_https = parse_url(args.http_url)
  600. if is_https:
  601. print("TCP 测试已跳过:transparent TCP 基线不支持使用 https:// 目标做有效对比")
  602. print("建议改用可快速返回的 http:// 目标,例如:")
  603. print(" --http-url http://connectivitycheck.gstatic.com/generate_204")
  604. if args.mode == "tcp":
  605. return 0
  606. config = Config.load(args.config) if args.config and Path(args.config).exists() else None
  607. proxy: tuple[str, int] | None = None
  608. if config is not None and config.socks_port > 0:
  609. proxy = (config.socks_host, config.socks_port)
  610. if args.proxy_host and args.proxy_port:
  611. proxy = (args.proxy_host, args.proxy_port)
  612. print("本地基准测试开始")
  613. print(f" 样本数: {args.count}")
  614. print(f" TCP 目标: {args.http_url}")
  615. print(f" UDP 目标: {args.dns_server_host}:{args.dns_server_port}")
  616. print(f" SOCKS: {proxy[0]}:{proxy[1]}" if proxy else " SOCKS: 未配置或未启动")
  617. print(f" Transparent 排除用户: {args.runtime_user}")
  618. print(" 说明: TCP 比较的是“直连基线 vs transparent_edge 实际链路”,UDP 比较的是“直连 vs socks_edge 实际链路”")
  619. print("")
  620. tcp_pct = None
  621. udp_pct = None
  622. script_path = Path(__file__).resolve()
  623. if args.mode in ("tcp", "all") and args.http_url.startswith("http://"):
  624. tcp_pct = await bench_tcp_transparent(args, script_path)
  625. print("")
  626. if args.mode in ("udp", "all"):
  627. udp_pct = await bench_udp_socks(args, proxy)
  628. if args.udp_stream_count > 0:
  629. await bench_udp_stream(args, proxy)
  630. changes = [value for value in (tcp_pct, udp_pct) if value is not None]
  631. if not changes:
  632. print("中文总结:无可用结果")
  633. return 0
  634. parts = []
  635. if tcp_pct is not None:
  636. parts.append(f"TCP={verdict_from_diff(tcp_pct)}")
  637. if udp_pct is not None:
  638. parts.append(f"UDP={verdict_from_diff(udp_pct)}")
  639. print(f"中文总结:{overall_verdict(changes)}({', '.join(parts)})")
  640. return 0
  641. def build_parser() -> argparse.ArgumentParser:
  642. parser = argparse.ArgumentParser(description="mynetspeeder 真实链路本地手工基准测试")
  643. parser.add_argument("--config", default="/home/mynetspeeder/config.json", help="配置文件路径")
  644. parser.add_argument("--proxy-host", default="", help="SOCKS5 地址,默认读取 config.json")
  645. parser.add_argument("--proxy-port", type=int, default=0, help="SOCKS5 端口,默认读取 config.json")
  646. parser.add_argument("--http-url", default="http://connectivitycheck.gstatic.com/generate_204", help="TCP/透明链路测试 URL,建议使用 http://")
  647. parser.add_argument("--dns-server", default="8.8.8.8:53", help="UDP/SOCKS 链路 DNS 目标,格式 host:port")
  648. parser.add_argument("--mode", choices=("tcp", "udp", "all"), default="all", help="只测 TCP、只测 UDP 或都测")
  649. parser.add_argument("--count", type=int, default=4, help="每类测试样本数,默认 4")
  650. parser.add_argument("--timeout", type=float, default=3.0, help="单次测试超时秒数")
  651. parser.add_argument("--udp-stall-ms", type=float, default=120.0, help="UDP 单次查询超过该值记为一次疑似卡顿")
  652. parser.add_argument("--udp-stream-count", type=int, default=20, help="UDP 连续流稳定性测试样本数,设为 0 则关闭")
  653. parser.add_argument("--udp-stream-interval-ms", type=float, default=200.0, help="UDP 连续流相邻样本间隔毫秒")
  654. parser.add_argument("--runtime-user", default=os.environ.get("MYNETSPEEDER_USER", "mynetspeeder"), help="transparent 排除用户")
  655. parser.add_argument("--edge-log", default="/var/log/mynetspeeder-edge.log", help="transparent edge 日志路径")
  656. parser.add_argument("--child-http", default="", help=argparse.SUPPRESS)
  657. parser.add_argument("--child-timeout", type=float, default=3.0, help=argparse.SUPPRESS)
  658. return parser
  659. def main() -> int:
  660. args = build_parser().parse_args()
  661. args.dns_server_host, args.dns_server_port = parse_host_port(args.dns_server)
  662. return asyncio.run(amain(args))
  663. if __name__ == "__main__":
  664. raise SystemExit(main())