| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693 |
- #!/usr/bin/env python3
- from __future__ import annotations
- import argparse
- import asyncio
- import contextlib
- import random
- import ssl
- import socket
- import statistics
- import sys
- import time
- from dataclasses import dataclass
- from pathlib import Path
- from urllib.parse import urlparse
- ROOT = Path(__file__).resolve().parents[2]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- from mynetspeeder.config import Config # noqa: E402
- def parse_host_port(text: str) -> tuple[str, int]:
- if text.startswith("["):
- host, rest = text[1:].split("]", 1)
- if not rest.startswith(":"):
- raise ValueError(f"无效地址: {text}")
- return host, int(rest[1:])
- if text.count(":") == 1:
- host, port = text.rsplit(":", 1)
- return host, int(port)
- raise ValueError(f"无效地址: {text}")
- def parse_url(text: str) -> tuple[str, int, str, bool]:
- parsed = urlparse(text)
- if parsed.scheme not in ("http", "https"):
- raise ValueError(f"仅支持 http/https: {text}")
- if not parsed.hostname:
- raise ValueError(f"无效 URL: {text}")
- port = parsed.port or (443 if parsed.scheme == "https" else 80)
- path = parsed.path or "/"
- if parsed.query:
- path = f"{path}?{parsed.query}"
- return parsed.hostname, port, path, parsed.scheme == "https"
- def parse_dns_target(text: str) -> tuple[str, int]:
- host, port = parse_host_port(text)
- return host, port
- def socks5_addr_bytes(host: str, port: int) -> bytes:
- try:
- return b"\x01" + socket.inet_aton(host) + port.to_bytes(2, "big")
- except OSError:
- try:
- return b"\x04" + socket.inet_pton(socket.AF_INET6, host) + port.to_bytes(2, "big")
- except OSError:
- raw = host.encode()
- return b"\x03" + bytes([len(raw)]) + raw + port.to_bytes(2, "big")
- async def socks5_greet(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
- writer.write(b"\x05\x01\x00")
- await writer.drain()
- reply = await reader.readexactly(2)
- if reply != b"\x05\x00":
- raise ConnectionError("SOCKS5 握手失败")
- async def socks5_connect(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, host: str, port: int) -> None:
- await socks5_greet(reader, writer)
- writer.write(b"\x05\x01\x00" + socks5_addr_bytes(host, port))
- await writer.drain()
- head = await reader.readexactly(4)
- if head[1] != 0x00:
- raise ConnectionError(f"SOCKS5 CONNECT 失败: {head[1]}")
- atyp = head[3]
- if atyp == 1:
- await reader.readexactly(4 + 2)
- elif atyp == 3:
- length = (await reader.readexactly(1))[0]
- await reader.readexactly(length + 2)
- elif atyp == 4:
- await reader.readexactly(16 + 2)
- async def open_http_connection(host: str, port: int, timeout: float, proxy: tuple[str, int] | None, use_tls: bool) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
- if proxy is None:
- if use_tls:
- return await asyncio.wait_for(asyncio.open_connection(host, port, ssl=ssl.create_default_context(), server_hostname=host), timeout=timeout)
- return await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
- proxy_reader, proxy_writer = await asyncio.wait_for(asyncio.open_connection(proxy[0], proxy[1]), timeout=timeout)
- await socks5_connect(proxy_reader, proxy_writer, host, port)
- if not use_tls:
- return proxy_reader, proxy_writer
- raw_sock = proxy_writer.get_extra_info("socket")
- if raw_sock is None:
- raise ConnectionError("SOCKS5 socket unavailable")
- dup = raw_sock.dup()
- dup.setblocking(False)
- proxy_writer.close()
- with contextlib.suppress(Exception):
- await proxy_writer.wait_closed()
- return await asyncio.wait_for(
- asyncio.open_connection(ssl=ssl.create_default_context(), sock=dup, server_hostname=host),
- timeout=timeout,
- )
- def percentile(values: list[float], pct: float) -> float:
- if not values:
- return 0.0
- ordered = sorted(values)
- if len(ordered) == 1:
- return ordered[0]
- idx = (len(ordered) - 1) * pct
- lo = int(idx)
- hi = min(lo + 1, len(ordered) - 1)
- weight = idx - lo
- return ordered[lo] * (1 - weight) + ordered[hi] * weight
- def summarize(values: list[float]) -> dict[str, float]:
- if not values:
- return {"avg": 0.0, "p50": 0.0, "p95": 0.0, "min": 0.0, "max": 0.0}
- return {
- "avg": statistics.mean(values),
- "p50": percentile(values, 0.50),
- "p95": percentile(values, 0.95),
- "min": min(values),
- "max": max(values),
- }
- def fmt_ms(value: float) -> str:
- return f"{value:.2f}ms"
- def fmt_pct(value: float) -> str:
- return f"{value:+.1f}%"
- def classify_change(diff_pct: float, improvement_threshold: float = 5.0, regression_threshold: float = -5.0) -> str:
- if diff_pct <= regression_threshold:
- return "提升"
- if diff_pct >= improvement_threshold:
- return "退化"
- return "无明显变化"
- def progress_line(done: int, total: int, started_at: float) -> str:
- if total <= 0:
- return "进度:0%"
- elapsed = time.perf_counter() - started_at
- ratio = done / total
- percent = int(ratio * 100)
- if done <= 0:
- remain = "估算剩余:--"
- else:
- total_est = elapsed / ratio
- remain = f"估算剩余:{max(0.0, total_est - elapsed):.1f}s"
- return f"进度:{done}/{total} ({percent}%),已用 {elapsed:.1f}s,{remain}"
- async def run_step_with_heartbeat(step_label: str, total: int, done: int, coro, started_at: float, timeout: float) -> object | None:
- task = asyncio.create_task(coro)
- print(f" 开始 {step_label}")
- step_started = time.perf_counter()
- while not task.done():
- now = time.perf_counter()
- if now - step_started >= timeout:
- task.cancel()
- with contextlib.suppress(asyncio.CancelledError):
- await task
- print(f" 超时 {step_label} {progress_line(done, total, started_at)}")
- return None
- print(f" {step_label} {progress_line(done, total, started_at)}")
- await asyncio.sleep(min(1.0, max(0.1, timeout - (now - step_started))))
- result = await task
- print(f" 完成 {step_label} {progress_line(done, total, started_at)}")
- return result
- async def run_parallel_steps(step_label: str, total: int, done: int, coros: list[tuple[str, object]], started_at: float, timeout: float) -> list[object | None]:
- tasks = [asyncio.create_task(coro) for _, coro in coros]
- labels = [label for label, _ in coros]
- print(f" 开始 {step_label}")
- step_started = time.perf_counter()
- while any(not task.done() for task in tasks):
- now = time.perf_counter()
- if now - step_started >= timeout:
- for task in tasks:
- if not task.done():
- task.cancel()
- for task in tasks:
- with contextlib.suppress(asyncio.CancelledError):
- await task
- print(f" 超时 {step_label} {progress_line(done, total, started_at)}")
- return [None for _ in tasks]
- print(f" {step_label} {progress_line(done, total, started_at)}")
- await asyncio.sleep(min(1.0, max(0.1, timeout - (now - step_started))))
- results: list[object | None] = []
- for label, task in zip(labels, tasks):
- result = await task
- results.append(result)
- print(f" 完成 {step_label} {progress_line(done, total, started_at)}")
- return results
- def print_section_summary(name: str, direct_avg: float, proxy_avg: float | None, unit: str = "ms") -> None:
- if proxy_avg is None:
- print(f" {name}: 仅测到直连,均值 {direct_avg:.2f}{unit}")
- return
- diff = proxy_avg - direct_avg
- pct = (diff / direct_avg * 100.0) if direct_avg > 0 else 0.0
- verdict = classify_change(pct)
- print(f" {name}: 直连 {direct_avg:.2f}{unit},代理 {proxy_avg:.2f}{unit},差值 {diff:+.2f}{unit},变化 {pct:+.1f}%,结论:{verdict}")
- def overall_verdict(changes: list[float]) -> str:
- if not changes:
- return "无可用结果"
- score = 0
- for change in changes:
- if change <= -5.0:
- score += 1
- elif change >= 5.0:
- score -= 1
- if score > 0:
- return "整体提升"
- if score < 0:
- return "整体退化"
- return "整体无明显变化"
- @dataclass
- class TcpResult:
- connect_ms: float
- ttfb_ms: float
- total_ms: float
- @dataclass
- class UdpResult:
- associate_ms: float
- first_ms: float
- total_ms: float
- duplicates: int
- ok: bool
- @dataclass
- class HttpResult:
- connect_ms: float
- ttfb_ms: float
- total_ms: float
- status: int
- ok: bool
- error: str = ""
- @dataclass
- class DnsResult:
- query_ms: float
- total_ms: float
- ok: bool
- error: str = ""
- async def tcp_echo_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
- try:
- while True:
- chunk = await reader.read(65536)
- if not chunk:
- break
- writer.write(chunk)
- await writer.drain()
- except Exception:
- pass
- finally:
- writer.close()
- with contextlib.suppress(Exception):
- await writer.wait_closed()
- def build_dns_query(name: str, query_id: int | None = None) -> bytes:
- if query_id is None:
- query_id = random.randint(0, 0xFFFF)
- header = query_id.to_bytes(2, "big")
- header += b"\x01\x00" # standard query, recursion desired
- header += b"\x00\x01" # qdcount
- header += b"\x00\x00" # ancount
- header += b"\x00\x00" # nscount
- header += b"\x00\x00" # arcount
- labels = b"".join(bytes([len(part)]) + part.encode() for part in name.strip(".").split(".") if part)
- return header + labels + b"\x00" + b"\x00\x01" + b"\x00\x01"
- def parse_dns_response(data: bytes) -> tuple[int, bool]:
- if len(data) < 12:
- raise ValueError("DNS 响应过短")
- flags = int.from_bytes(data[2:4], "big")
- rcode = flags & 0x000F
- qdcount = int.from_bytes(data[4:6], "big")
- ancount = int.from_bytes(data[6:8], "big")
- return rcode, ancount > 0 or qdcount > 0
- async def http_roundtrip(url: str, proxy: tuple[str, int] | None, timeout: float) -> HttpResult:
- host, port, path, is_https = parse_url(url)
- started = time.perf_counter()
- try:
- reader, writer = await open_http_connection(host, port, timeout, proxy, is_https)
- connect_ms = (time.perf_counter() - started) * 1000
- request = (
- f"GET {path} HTTP/1.1\r\n"
- f"Host: {host}\r\n"
- "User-Agent: mynetspeeder-benchmark\r\n"
- "Accept: */*\r\n"
- "Connection: close\r\n\r\n"
- ).encode()
- started = time.perf_counter()
- writer.write(request)
- await writer.drain()
- head = await asyncio.wait_for(reader.readuntil(b"\r\n\r\n"), timeout=timeout)
- first_ms = (time.perf_counter() - started) * 1000
- status_line = head.split(b"\r\n", 1)[0].decode(errors="replace")
- status = 0
- if status_line.startswith("HTTP/"):
- parts = status_line.split()
- if len(parts) >= 2 and parts[1].isdigit():
- status = int(parts[1])
- body = await reader.read()
- total_ms = (time.perf_counter() - started) * 1000
- ok = 200 <= status < 400 and bool(body or head)
- return HttpResult(connect_ms, first_ms, total_ms, status, ok)
- except Exception:
- return HttpResult(0.0, 0.0, 0.0, 0, False, error="http 请求失败")
- finally:
- with contextlib.suppress(Exception):
- writer.close() # type: ignore[name-defined]
- await writer.wait_closed() # type: ignore[name-defined]
- async def dns_roundtrip(server_host: str, server_port: int, proxy: tuple[str, int] | None, timeout: float) -> DnsResult:
- query = build_dns_query("www.google.com")
- loop = asyncio.get_running_loop()
- started = time.perf_counter()
- query_started = started
- try:
- if proxy is None:
- sock = socket.socket(socket.AF_INET6 if ":" in server_host else socket.AF_INET, socket.SOCK_DGRAM)
- sock.setblocking(False)
- await loop.sock_connect(sock, (server_host, server_port))
- try:
- await loop.sock_sendall(sock, query)
- data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout)
- finally:
- sock.close()
- else:
- reader, writer, relay, associate_ms = await asyncio.wait_for(socks5_udp_associate(proxy[0], proxy[1]), timeout=timeout)
- sock = socket.socket(socket.AF_INET6 if ":" in relay[0] else socket.AF_INET, socket.SOCK_DGRAM)
- sock.setblocking(False)
- await loop.sock_connect(sock, relay)
- try:
- packet = b"\x00\x00\x00" + encode_socks_addr(server_host, server_port) + query
- await loop.sock_sendall(sock, packet)
- data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout)
- data = strip_socks_udp(data)
- finally:
- writer.close()
- with contextlib.suppress(Exception):
- await writer.wait_closed()
- sock.close()
- query_started = query_started + associate_ms / 1000.0
- query_ms = (time.perf_counter() - query_started) * 1000
- rcode, ok = parse_dns_response(data)
- return DnsResult(query_ms=query_ms, total_ms=(time.perf_counter() - started) * 1000, ok=ok and rcode == 0)
- except Exception:
- return DnsResult(0.0, 0.0, False, error="dns 查询失败")
- async def bench_http(args, proxy: tuple[str, int] | None) -> None:
- direct_results: list[HttpResult] = []
- proxy_results: list[HttpResult] = []
- total_steps = args.count * (2 if proxy is not None else 1)
- started_at = time.perf_counter()
- print("TCP/HTTPS 正式测试开始")
- for index in range(args.count):
- step_base = index * (2 if proxy is not None else 1)
- if proxy is None:
- direct_result = await run_step_with_heartbeat(
- f"HTTPS 直连 第{index + 1}步",
- total_steps,
- step_base + 1,
- http_roundtrip(args.http_url, None, args.timeout),
- started_at,
- timeout=max(1.0, args.timeout + 1.0),
- )
- if isinstance(direct_result, HttpResult):
- direct_results.append(direct_result)
- continue
- results = await run_parallel_steps(
- f"HTTPS 第{index + 1}轮",
- total_steps,
- step_base + 1,
- [
- (f"HTTPS 直连 第{index + 1}步", http_roundtrip(args.http_url, None, args.timeout)),
- (f"HTTPS 代理 第{index + 1}步", http_roundtrip(args.http_url, proxy, args.timeout)),
- ],
- started_at,
- timeout=max(1.0, args.timeout + 1.0),
- )
- if isinstance(results[0], HttpResult):
- direct_results.append(results[0])
- if isinstance(results[1], HttpResult):
- proxy_results.append(results[1])
- print(f"HTTP 目标: {args.http_url}")
- direct_ok = [item for item in direct_results if item.ok]
- print(f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},首包均值 {fmt_ms(summarize([item.ttfb_ms for item in direct_ok])['avg'])},总耗时均值 {fmt_ms(summarize([item.total_ms for item in direct_ok])['avg'])}")
- if direct_results and not direct_ok:
- print(f" 直连失败原因: {direct_results[0].error or '未知'}")
- if proxy is None or not proxy_results:
- print(" 代理: 未执行或不可用")
- return
- proxy_ok = [item for item in proxy_results if item.ok]
- direct_ttfb = summarize([item.ttfb_ms for item in direct_ok])["avg"]
- proxy_ttfb = summarize([item.ttfb_ms for item in proxy_ok])["avg"]
- direct_total = summarize([item.total_ms for item in direct_ok])["avg"]
- proxy_total = summarize([item.total_ms for item in proxy_ok])["avg"]
- ttfb_pct = ((proxy_ttfb - direct_ttfb) / direct_ttfb * 100.0) if direct_ttfb > 0 else 0.0
- total_pct = ((proxy_total - direct_total) / direct_total * 100.0) if direct_total > 0 else 0.0
- print(f" 代理: 成功 {len(proxy_ok)}/{len(proxy_results)},首包均值 {fmt_ms(proxy_ttfb)},总耗时均值 {fmt_ms(proxy_total)}")
- print(f" 对比: 首包变化 {fmt_pct(ttfb_pct)},总耗时变化 {fmt_pct(total_pct)},结论:{classify_change(total_pct)}")
- async def bench_dns(args, proxy: tuple[str, int] | None) -> None:
- direct_results: list[DnsResult] = []
- proxy_results: list[DnsResult] = []
- total_steps = args.count * (2 if proxy is not None else 1)
- started_at = time.perf_counter()
- print("UDP/DNS 正式测试开始")
- for index in range(args.count):
- step_base = index * (2 if proxy is not None else 1)
- if proxy is None:
- direct_result = await run_step_with_heartbeat(
- f"DNS 直连 第{index + 1}步",
- total_steps,
- step_base + 1,
- dns_roundtrip(args.dns_server_host, args.dns_server_port, None, args.timeout),
- started_at,
- timeout=max(1.0, args.timeout + 1.0),
- )
- if isinstance(direct_result, DnsResult):
- direct_results.append(direct_result)
- continue
- results = await run_parallel_steps(
- f"DNS 第{index + 1}轮",
- total_steps,
- step_base + 1,
- [
- (f"DNS 直连 第{index + 1}步", dns_roundtrip(args.dns_server_host, args.dns_server_port, None, args.timeout)),
- (f"DNS 代理 第{index + 1}步", dns_roundtrip(args.dns_server_host, args.dns_server_port, proxy, args.timeout)),
- ],
- started_at,
- timeout=max(1.0, args.timeout + 1.0),
- )
- if isinstance(results[0], DnsResult):
- direct_results.append(results[0])
- if isinstance(results[1], DnsResult):
- proxy_results.append(results[1])
- direct_ok = [item for item in direct_results if item.ok]
- print(f"DNS 目标: {args.dns_server_host}:{args.dns_server_port}")
- print(f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},查询均值 {fmt_ms(summarize([item.query_ms for item in direct_ok])['avg'])}")
- if direct_results and not direct_ok:
- print(f" 直连失败原因: {direct_results[0].error or '未知'}")
- if proxy is None or not proxy_results:
- print(" 代理: 未执行或不可用")
- return
- proxy_ok = [item for item in proxy_results if item.ok]
- direct_query = summarize([item.query_ms for item in direct_ok])["avg"]
- proxy_query = summarize([item.query_ms for item in proxy_ok])["avg"]
- delta = proxy_query - direct_query
- pct = (delta / direct_query * 100.0) if direct_query > 0 else 0.0
- print(f" 代理: 成功 {len(proxy_ok)}/{len(proxy_results)},查询均值 {fmt_ms(proxy_query)}")
- print(f" 对比: DNS 查询变化 {fmt_pct(pct)},结论:{classify_change(pct)}")
- async def bench_tcp(args, target: tuple[str, int], proxy: tuple[str, int] | None) -> None:
- direct_results: list[TcpResult] = []
- proxy_results: list[TcpResult] = []
- warmup_steps = args.warmup * (2 if proxy is not None else 1)
- async def warmup_runner(step: int) -> None:
- if proxy is None:
- with contextlib.suppress(Exception):
- await tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)
- return
- if step % 2 == 1:
- with contextlib.suppress(Exception):
- await tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)
- else:
- with contextlib.suppress(Exception):
- await tcp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)
- if warmup_steps > 0:
- print("TCP 预热开始")
- for step in range(1, warmup_steps + 1):
- await warmup_runner(step)
- print(f" 预热进度:{step}/{warmup_steps}")
- total_steps = args.count * (2 if proxy is not None else 1)
- started_at = time.perf_counter()
- print("TCP 正式测试开始")
- for index in range(args.count):
- step_base = index * (2 if proxy is not None else 1)
- if proxy is None:
- direct_result = await run_step_with_heartbeat(
- f"TCP 直连 第{index + 1}步",
- total_steps,
- step_base + 1,
- tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout),
- started_at,
- timeout=max(1.0, args.timeout + 1.0),
- )
- if isinstance(direct_result, TcpResult):
- direct_results.append(direct_result)
- continue
- results = await run_parallel_steps(
- f"TCP 第{index + 1}轮",
- total_steps,
- step_base + 1,
- [
- (f"TCP 直连 第{index + 1}步", tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)),
- (f"TCP 代理 第{index + 1}步", tcp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)),
- ],
- started_at,
- timeout=max(1.0, args.timeout + 1.0),
- )
- if isinstance(results[0], TcpResult):
- direct_results.append(results[0])
- if isinstance(results[1], TcpResult):
- proxy_results.append(results[1])
- direct = {
- "connect": summarize([item.connect_ms for item in direct_results]),
- "ttfb": summarize([item.ttfb_ms for item in direct_results]),
- "total": summarize([item.total_ms for item in direct_results]),
- }
- print(f"TCP 目标: {target[0]}:{target[1]}")
- print(f" 直连: 成功 {len(direct_results)}/{len(direct_results)},连接均值 {fmt_ms(direct['connect']['avg'])},首包均值 {fmt_ms(direct['ttfb']['avg'])},总耗时均值 {fmt_ms(direct['total']['avg'])}")
- print(f" 直连: 连接 p50/p95 {fmt_ms(direct['connect']['p50'])} / {fmt_ms(direct['connect']['p95'])},总耗时 p50/p95 {fmt_ms(direct['total']['p50'])} / {fmt_ms(direct['total']['p95'])}")
- if proxy is None or not proxy_results:
- print(" 代理: 未执行或不可用")
- return
- proxy_stats = {
- "connect": summarize([item.connect_ms for item in proxy_results]),
- "ttfb": summarize([item.ttfb_ms for item in proxy_results]),
- "total": summarize([item.total_ms for item in proxy_results]),
- }
- diff = proxy_stats["total"]["avg"] - direct["total"]["avg"]
- pct = (diff / direct["total"]["avg"] * 100.0) if direct["total"]["avg"] > 0 else 0.0
- print(f" 代理: 成功 {len(proxy_results)}/{len(proxy_results)},连接均值 {fmt_ms(proxy_stats['connect']['avg'])},首包均值 {fmt_ms(proxy_stats['ttfb']['avg'])},总耗时均值 {fmt_ms(proxy_stats['total']['avg'])}")
- print(f" 代理: 连接 p50/p95 {fmt_ms(proxy_stats['connect']['p50'])} / {fmt_ms(proxy_stats['connect']['p95'])},总耗时 p50/p95 {fmt_ms(proxy_stats['total']['p50'])} / {fmt_ms(proxy_stats['total']['p95'])}")
- print(f" 对比: 代理总耗时相对直连 {fmt_ms(diff)},变化 {fmt_pct(pct)},结论:{classify_change(pct)}")
- print_section_summary("连接耗时", direct["connect"]["avg"], proxy_stats["connect"]["avg"])
- print_section_summary("首包耗时", direct["ttfb"]["avg"], proxy_stats["ttfb"]["avg"])
- print_section_summary("总耗时", direct["total"]["avg"], proxy_stats["total"]["avg"])
- print(f" TCP 总结:{overall_verdict([pct])}")
- async def bench_udp(args, target: tuple[str, int], proxy: tuple[str, int] | None) -> None:
- direct_results: list[UdpResult] = []
- proxy_results: list[UdpResult] = []
- warmup_steps = args.warmup * (2 if proxy is not None else 1)
- async def warmup_runner(step: int) -> None:
- if proxy is None:
- with contextlib.suppress(Exception):
- await udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)
- return
- if step % 2 == 1:
- with contextlib.suppress(Exception):
- await udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)
- else:
- with contextlib.suppress(Exception):
- await udp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)
- if warmup_steps > 0:
- print("UDP 预热开始")
- for step in range(1, warmup_steps + 1):
- await warmup_runner(step)
- print(f" 预热进度:{step}/{warmup_steps}")
- total_steps = args.count * (2 if proxy is not None else 1)
- started_at = time.perf_counter()
- print("UDP 正式测试开始")
- for index in range(args.count):
- step_base = index * (2 if proxy is not None else 1)
- if proxy is None:
- direct_result = await run_step_with_heartbeat(
- f"UDP 直连 第{index + 1}步",
- total_steps,
- step_base + 1,
- udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout),
- started_at,
- timeout=max(1.0, args.timeout + 1.0),
- )
- if isinstance(direct_result, UdpResult):
- direct_results.append(direct_result)
- continue
- results = await run_parallel_steps(
- f"UDP 第{index + 1}轮",
- total_steps,
- step_base + 1,
- [
- (f"UDP 直连 第{index + 1}步", udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)),
- (f"UDP 代理 第{index + 1}步", udp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)),
- ],
- started_at,
- timeout=max(1.0, args.timeout + 1.0),
- )
- if isinstance(results[0], UdpResult):
- direct_results.append(results[0])
- if isinstance(results[1], UdpResult):
- proxy_results.append(results[1])
- direct_ok = [item for item in direct_results if item.ok]
- print(f"UDP 目标: {target[0]}:{target[1]}")
- print(f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},首包均值 {fmt_ms(summarize([item.first_ms for item in direct_ok])['avg'])},重复包 {sum(item.duplicates for item in direct_ok)}")
- if proxy is None or not proxy_results:
- print(" 代理: 未执行或不可用")
- return
- proxy_ok = [item for item in proxy_results if item.ok]
- proxy_first = summarize([item.first_ms for item in proxy_ok])["avg"]
- direct_first = summarize([item.first_ms for item in direct_ok])["avg"]
- delta = proxy_first - direct_first
- pct = (delta / direct_first * 100.0) if direct_first > 0 else 0.0
- print(f" 代理: 成功 {len(proxy_ok)}/{len(proxy_results)},关联均值 {fmt_ms(summarize([item.associate_ms for item in proxy_ok])['avg'])},首包均值 {fmt_ms(proxy_first)},重复包 {sum(item.duplicates for item in proxy_ok)}")
- print(f" 对比: 代理首包相对直连 {fmt_ms(delta)},变化 {fmt_pct(pct)},结论:{classify_change(pct)}")
- print(f" UDP 总结:{overall_verdict([pct])}")
- async def amain(args) -> int:
- config = Config.load(args.config) if args.config and Path(args.config).exists() else None
- proxy: tuple[str, int] | None = None
- if config is not None and config.socks_port > 0:
- proxy = (config.socks_host, config.socks_port)
- if args.proxy_host and args.proxy_port:
- proxy = (args.proxy_host, args.proxy_port)
- print("本地基准测试开始")
- print(f" 样本数: {args.count},热身: {args.warmup},载荷: {args.payload_size} 字节")
- if proxy is not None:
- print(f" 代理: {proxy[0]}:{proxy[1]}")
- else:
- print(" 代理: 未配置或未启动,将只测直连")
- print(f" HTTP 目标: {args.http_url}")
- print(f" DNS 目标: {args.dns_server_host}:{args.dns_server_port}")
- print("")
- if args.mode in ("tcp", "all"):
- await bench_http(args, proxy)
- print("")
- if args.mode in ("udp", "all"):
- await bench_dns(args, proxy)
- return 0
- def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(description="mynetspeeder 本地手工基线测试")
- parser.add_argument("--config", default="/home/mynetspeeder/config.json", help="配置文件路径,用于读取默认 socks 端口")
- parser.add_argument("--proxy-host", default="", help="SOCKS5 代理地址,默认读取 config.json")
- parser.add_argument("--proxy-port", type=int, default=0, help="SOCKS5 代理端口,默认读取 config.json")
- parser.add_argument("--http-url", default="https://research.google/blog/", help="HTTPS 测试 URL")
- parser.add_argument("--dns-server", default="8.8.8.8:53", help="DNS 测试服务器,格式 host:port")
- parser.add_argument("--mode", choices=("tcp", "udp", "all"), default="all", help="只测 TCP、只测 UDP 或都测")
- parser.add_argument("--count", type=int, default=8, help="正式样本数,默认 8,尽量控制在 2-5 分钟内完成")
- parser.add_argument("--warmup", type=int, default=0, help="热身轮数,默认 0;仅用于平滑首次连接抖动")
- parser.add_argument("--payload-size", type=int, default=4096, help="每次请求的载荷大小")
- parser.add_argument("--timeout", type=float, default=2.0, help="单次请求超时秒数,默认 2.0")
- return parser
- def main() -> int:
- args = build_parser().parse_args()
- args.dns_server_host, args.dns_server_port = parse_dns_target(args.dns_server)
- return asyncio.run(amain(args))
- if __name__ == "__main__":
- raise SystemExit(main())
|