#!/usr/bin/env python3 from __future__ import annotations import argparse import asyncio import contextlib import random import ssl import socket import statistics import sys import time from dataclasses import dataclass from pathlib import Path from urllib.parse import urlparse ROOT = Path(__file__).resolve().parents[2] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from mynetspeeder.config import Config # noqa: E402 def parse_host_port(text: str) -> tuple[str, int]: if text.startswith("["): host, rest = text[1:].split("]", 1) if not rest.startswith(":"): raise ValueError(f"无效地址: {text}") return host, int(rest[1:]) if text.count(":") == 1: host, port = text.rsplit(":", 1) return host, int(port) raise ValueError(f"无效地址: {text}") def parse_url(text: str) -> tuple[str, int, str, bool]: parsed = urlparse(text) if parsed.scheme not in ("http", "https"): raise ValueError(f"仅支持 http/https: {text}") if not parsed.hostname: raise ValueError(f"无效 URL: {text}") port = parsed.port or (443 if parsed.scheme == "https" else 80) path = parsed.path or "/" if parsed.query: path = f"{path}?{parsed.query}" return parsed.hostname, port, path, parsed.scheme == "https" def parse_dns_target(text: str) -> tuple[str, int]: host, port = parse_host_port(text) return host, port def socks5_addr_bytes(host: str, port: int) -> bytes: try: return b"\x01" + socket.inet_aton(host) + port.to_bytes(2, "big") except OSError: try: return b"\x04" + socket.inet_pton(socket.AF_INET6, host) + port.to_bytes(2, "big") except OSError: raw = host.encode() return b"\x03" + bytes([len(raw)]) + raw + port.to_bytes(2, "big") async def socks5_greet(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: writer.write(b"\x05\x01\x00") await writer.drain() reply = await reader.readexactly(2) if reply != b"\x05\x00": raise ConnectionError("SOCKS5 握手失败") async def socks5_connect(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, host: str, port: int) -> None: await socks5_greet(reader, writer) writer.write(b"\x05\x01\x00" + socks5_addr_bytes(host, port)) await writer.drain() head = await reader.readexactly(4) if head[1] != 0x00: raise ConnectionError(f"SOCKS5 CONNECT 失败: {head[1]}") atyp = head[3] if atyp == 1: await reader.readexactly(4 + 2) elif atyp == 3: length = (await reader.readexactly(1))[0] await reader.readexactly(length + 2) elif atyp == 4: await reader.readexactly(16 + 2) async def open_http_connection(host: str, port: int, timeout: float, proxy: tuple[str, int] | None, use_tls: bool) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: if proxy is None: if use_tls: return await asyncio.wait_for(asyncio.open_connection(host, port, ssl=ssl.create_default_context(), server_hostname=host), timeout=timeout) return await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout) proxy_reader, proxy_writer = await asyncio.wait_for(asyncio.open_connection(proxy[0], proxy[1]), timeout=timeout) await socks5_connect(proxy_reader, proxy_writer, host, port) if not use_tls: return proxy_reader, proxy_writer raw_sock = proxy_writer.get_extra_info("socket") if raw_sock is None: raise ConnectionError("SOCKS5 socket unavailable") dup = raw_sock.dup() dup.setblocking(False) proxy_writer.close() with contextlib.suppress(Exception): await proxy_writer.wait_closed() return await asyncio.wait_for( asyncio.open_connection(ssl=ssl.create_default_context(), sock=dup, server_hostname=host), timeout=timeout, ) def percentile(values: list[float], pct: float) -> float: if not values: return 0.0 ordered = sorted(values) if len(ordered) == 1: return ordered[0] idx = (len(ordered) - 1) * pct lo = int(idx) hi = min(lo + 1, len(ordered) - 1) weight = idx - lo return ordered[lo] * (1 - weight) + ordered[hi] * weight def summarize(values: list[float]) -> dict[str, float]: if not values: return {"avg": 0.0, "p50": 0.0, "p95": 0.0, "min": 0.0, "max": 0.0} return { "avg": statistics.mean(values), "p50": percentile(values, 0.50), "p95": percentile(values, 0.95), "min": min(values), "max": max(values), } def fmt_ms(value: float) -> str: return f"{value:.2f}ms" def fmt_pct(value: float) -> str: return f"{value:+.1f}%" def classify_change(diff_pct: float, improvement_threshold: float = 5.0, regression_threshold: float = -5.0) -> str: if diff_pct <= regression_threshold: return "提升" if diff_pct >= improvement_threshold: return "退化" return "无明显变化" def progress_line(done: int, total: int, started_at: float) -> str: if total <= 0: return "进度:0%" elapsed = time.perf_counter() - started_at ratio = done / total percent = int(ratio * 100) if done <= 0: remain = "估算剩余:--" else: total_est = elapsed / ratio remain = f"估算剩余:{max(0.0, total_est - elapsed):.1f}s" return f"进度:{done}/{total} ({percent}%),已用 {elapsed:.1f}s,{remain}" async def run_step_with_heartbeat(step_label: str, total: int, done: int, coro, started_at: float, timeout: float) -> object | None: task = asyncio.create_task(coro) print(f" 开始 {step_label}") step_started = time.perf_counter() while not task.done(): now = time.perf_counter() if now - step_started >= timeout: task.cancel() with contextlib.suppress(asyncio.CancelledError): await task print(f" 超时 {step_label} {progress_line(done, total, started_at)}") return None print(f" {step_label} {progress_line(done, total, started_at)}") await asyncio.sleep(min(1.0, max(0.1, timeout - (now - step_started)))) result = await task print(f" 完成 {step_label} {progress_line(done, total, started_at)}") return result async def run_parallel_steps(step_label: str, total: int, done: int, coros: list[tuple[str, object]], started_at: float, timeout: float) -> list[object | None]: tasks = [asyncio.create_task(coro) for _, coro in coros] labels = [label for label, _ in coros] print(f" 开始 {step_label}") step_started = time.perf_counter() while any(not task.done() for task in tasks): now = time.perf_counter() if now - step_started >= timeout: for task in tasks: if not task.done(): task.cancel() for task in tasks: with contextlib.suppress(asyncio.CancelledError): await task print(f" 超时 {step_label} {progress_line(done, total, started_at)}") return [None for _ in tasks] print(f" {step_label} {progress_line(done, total, started_at)}") await asyncio.sleep(min(1.0, max(0.1, timeout - (now - step_started)))) results: list[object | None] = [] for label, task in zip(labels, tasks): result = await task results.append(result) print(f" 完成 {step_label} {progress_line(done, total, started_at)}") return results def print_section_summary(name: str, direct_avg: float, proxy_avg: float | None, unit: str = "ms") -> None: if proxy_avg is None: print(f" {name}: 仅测到直连,均值 {direct_avg:.2f}{unit}") return diff = proxy_avg - direct_avg pct = (diff / direct_avg * 100.0) if direct_avg > 0 else 0.0 verdict = classify_change(pct) print(f" {name}: 直连 {direct_avg:.2f}{unit},代理 {proxy_avg:.2f}{unit},差值 {diff:+.2f}{unit},变化 {pct:+.1f}%,结论:{verdict}") def overall_verdict(changes: list[float]) -> str: if not changes: return "无可用结果" score = 0 for change in changes: if change <= -5.0: score += 1 elif change >= 5.0: score -= 1 if score > 0: return "整体提升" if score < 0: return "整体退化" return "整体无明显变化" @dataclass class TcpResult: connect_ms: float ttfb_ms: float total_ms: float @dataclass class UdpResult: associate_ms: float first_ms: float total_ms: float duplicates: int ok: bool @dataclass class HttpResult: connect_ms: float ttfb_ms: float total_ms: float status: int ok: bool error: str = "" @dataclass class DnsResult: query_ms: float total_ms: float ok: bool error: str = "" async def tcp_echo_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: try: while True: chunk = await reader.read(65536) if not chunk: break writer.write(chunk) await writer.drain() except Exception: pass finally: writer.close() with contextlib.suppress(Exception): await writer.wait_closed() def build_dns_query(name: str, query_id: int | None = None) -> bytes: if query_id is None: query_id = random.randint(0, 0xFFFF) header = query_id.to_bytes(2, "big") header += b"\x01\x00" # standard query, recursion desired header += b"\x00\x01" # qdcount header += b"\x00\x00" # ancount header += b"\x00\x00" # nscount header += b"\x00\x00" # arcount labels = b"".join(bytes([len(part)]) + part.encode() for part in name.strip(".").split(".") if part) return header + labels + b"\x00" + b"\x00\x01" + b"\x00\x01" def parse_dns_response(data: bytes) -> tuple[int, bool]: if len(data) < 12: raise ValueError("DNS 响应过短") flags = int.from_bytes(data[2:4], "big") rcode = flags & 0x000F qdcount = int.from_bytes(data[4:6], "big") ancount = int.from_bytes(data[6:8], "big") return rcode, ancount > 0 or qdcount > 0 async def http_roundtrip(url: str, proxy: tuple[str, int] | None, timeout: float) -> HttpResult: host, port, path, is_https = parse_url(url) started = time.perf_counter() try: reader, writer = await open_http_connection(host, port, timeout, proxy, is_https) connect_ms = (time.perf_counter() - started) * 1000 request = ( f"GET {path} HTTP/1.1\r\n" f"Host: {host}\r\n" "User-Agent: mynetspeeder-benchmark\r\n" "Accept: */*\r\n" "Connection: close\r\n\r\n" ).encode() started = time.perf_counter() writer.write(request) await writer.drain() head = await asyncio.wait_for(reader.readuntil(b"\r\n\r\n"), timeout=timeout) first_ms = (time.perf_counter() - started) * 1000 status_line = head.split(b"\r\n", 1)[0].decode(errors="replace") status = 0 if status_line.startswith("HTTP/"): parts = status_line.split() if len(parts) >= 2 and parts[1].isdigit(): status = int(parts[1]) body = await reader.read() total_ms = (time.perf_counter() - started) * 1000 ok = 200 <= status < 400 and bool(body or head) return HttpResult(connect_ms, first_ms, total_ms, status, ok) except Exception: return HttpResult(0.0, 0.0, 0.0, 0, False, error="http 请求失败") finally: with contextlib.suppress(Exception): writer.close() # type: ignore[name-defined] await writer.wait_closed() # type: ignore[name-defined] async def dns_roundtrip(server_host: str, server_port: int, proxy: tuple[str, int] | None, timeout: float) -> DnsResult: query = build_dns_query("www.google.com") loop = asyncio.get_running_loop() started = time.perf_counter() query_started = started try: if proxy is None: sock = socket.socket(socket.AF_INET6 if ":" in server_host else socket.AF_INET, socket.SOCK_DGRAM) sock.setblocking(False) await loop.sock_connect(sock, (server_host, server_port)) try: await loop.sock_sendall(sock, query) data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout) finally: sock.close() else: reader, writer, relay, associate_ms = await asyncio.wait_for(socks5_udp_associate(proxy[0], proxy[1]), timeout=timeout) sock = socket.socket(socket.AF_INET6 if ":" in relay[0] else socket.AF_INET, socket.SOCK_DGRAM) sock.setblocking(False) await loop.sock_connect(sock, relay) try: packet = b"\x00\x00\x00" + encode_socks_addr(server_host, server_port) + query await loop.sock_sendall(sock, packet) data = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout) data = strip_socks_udp(data) finally: writer.close() with contextlib.suppress(Exception): await writer.wait_closed() sock.close() query_started = query_started + associate_ms / 1000.0 query_ms = (time.perf_counter() - query_started) * 1000 rcode, ok = parse_dns_response(data) return DnsResult(query_ms=query_ms, total_ms=(time.perf_counter() - started) * 1000, ok=ok and rcode == 0) except Exception: return DnsResult(0.0, 0.0, False, error="dns 查询失败") async def bench_http(args, proxy: tuple[str, int] | None) -> None: direct_results: list[HttpResult] = [] proxy_results: list[HttpResult] = [] total_steps = args.count * (2 if proxy is not None else 1) started_at = time.perf_counter() print("TCP/HTTPS 正式测试开始") for index in range(args.count): step_base = index * (2 if proxy is not None else 1) if proxy is None: direct_result = await run_step_with_heartbeat( f"HTTPS 直连 第{index + 1}步", total_steps, step_base + 1, http_roundtrip(args.http_url, None, args.timeout), started_at, timeout=max(1.0, args.timeout + 1.0), ) if isinstance(direct_result, HttpResult): direct_results.append(direct_result) continue results = await run_parallel_steps( f"HTTPS 第{index + 1}轮", total_steps, step_base + 1, [ (f"HTTPS 直连 第{index + 1}步", http_roundtrip(args.http_url, None, args.timeout)), (f"HTTPS 代理 第{index + 1}步", http_roundtrip(args.http_url, proxy, args.timeout)), ], started_at, timeout=max(1.0, args.timeout + 1.0), ) if isinstance(results[0], HttpResult): direct_results.append(results[0]) if isinstance(results[1], HttpResult): proxy_results.append(results[1]) print(f"HTTP 目标: {args.http_url}") direct_ok = [item for item in direct_results if item.ok] print(f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},首包均值 {fmt_ms(summarize([item.ttfb_ms for item in direct_ok])['avg'])},总耗时均值 {fmt_ms(summarize([item.total_ms for item in direct_ok])['avg'])}") if direct_results and not direct_ok: print(f" 直连失败原因: {direct_results[0].error or '未知'}") if proxy is None or not proxy_results: print(" 代理: 未执行或不可用") return proxy_ok = [item for item in proxy_results if item.ok] direct_ttfb = summarize([item.ttfb_ms for item in direct_ok])["avg"] proxy_ttfb = summarize([item.ttfb_ms for item in proxy_ok])["avg"] direct_total = summarize([item.total_ms for item in direct_ok])["avg"] proxy_total = summarize([item.total_ms for item in proxy_ok])["avg"] ttfb_pct = ((proxy_ttfb - direct_ttfb) / direct_ttfb * 100.0) if direct_ttfb > 0 else 0.0 total_pct = ((proxy_total - direct_total) / direct_total * 100.0) if direct_total > 0 else 0.0 print(f" 代理: 成功 {len(proxy_ok)}/{len(proxy_results)},首包均值 {fmt_ms(proxy_ttfb)},总耗时均值 {fmt_ms(proxy_total)}") print(f" 对比: 首包变化 {fmt_pct(ttfb_pct)},总耗时变化 {fmt_pct(total_pct)},结论:{classify_change(total_pct)}") async def bench_dns(args, proxy: tuple[str, int] | None) -> None: direct_results: list[DnsResult] = [] proxy_results: list[DnsResult] = [] total_steps = args.count * (2 if proxy is not None else 1) started_at = time.perf_counter() print("UDP/DNS 正式测试开始") for index in range(args.count): step_base = index * (2 if proxy is not None else 1) if proxy is None: direct_result = await run_step_with_heartbeat( f"DNS 直连 第{index + 1}步", total_steps, step_base + 1, dns_roundtrip(args.dns_server_host, args.dns_server_port, None, args.timeout), started_at, timeout=max(1.0, args.timeout + 1.0), ) if isinstance(direct_result, DnsResult): direct_results.append(direct_result) continue results = await run_parallel_steps( f"DNS 第{index + 1}轮", total_steps, step_base + 1, [ (f"DNS 直连 第{index + 1}步", dns_roundtrip(args.dns_server_host, args.dns_server_port, None, args.timeout)), (f"DNS 代理 第{index + 1}步", dns_roundtrip(args.dns_server_host, args.dns_server_port, proxy, args.timeout)), ], started_at, timeout=max(1.0, args.timeout + 1.0), ) if isinstance(results[0], DnsResult): direct_results.append(results[0]) if isinstance(results[1], DnsResult): proxy_results.append(results[1]) direct_ok = [item for item in direct_results if item.ok] print(f"DNS 目标: {args.dns_server_host}:{args.dns_server_port}") print(f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},查询均值 {fmt_ms(summarize([item.query_ms for item in direct_ok])['avg'])}") if direct_results and not direct_ok: print(f" 直连失败原因: {direct_results[0].error or '未知'}") if proxy is None or not proxy_results: print(" 代理: 未执行或不可用") return proxy_ok = [item for item in proxy_results if item.ok] direct_query = summarize([item.query_ms for item in direct_ok])["avg"] proxy_query = summarize([item.query_ms for item in proxy_ok])["avg"] delta = proxy_query - direct_query pct = (delta / direct_query * 100.0) if direct_query > 0 else 0.0 print(f" 代理: 成功 {len(proxy_ok)}/{len(proxy_results)},查询均值 {fmt_ms(proxy_query)}") print(f" 对比: DNS 查询变化 {fmt_pct(pct)},结论:{classify_change(pct)}") async def bench_tcp(args, target: tuple[str, int], proxy: tuple[str, int] | None) -> None: direct_results: list[TcpResult] = [] proxy_results: list[TcpResult] = [] warmup_steps = args.warmup * (2 if proxy is not None else 1) async def warmup_runner(step: int) -> None: if proxy is None: with contextlib.suppress(Exception): await tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout) return if step % 2 == 1: with contextlib.suppress(Exception): await tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout) else: with contextlib.suppress(Exception): await tcp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout) if warmup_steps > 0: print("TCP 预热开始") for step in range(1, warmup_steps + 1): await warmup_runner(step) print(f" 预热进度:{step}/{warmup_steps}") total_steps = args.count * (2 if proxy is not None else 1) started_at = time.perf_counter() print("TCP 正式测试开始") for index in range(args.count): step_base = index * (2 if proxy is not None else 1) if proxy is None: direct_result = await run_step_with_heartbeat( f"TCP 直连 第{index + 1}步", total_steps, step_base + 1, tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout), started_at, timeout=max(1.0, args.timeout + 1.0), ) if isinstance(direct_result, TcpResult): direct_results.append(direct_result) continue results = await run_parallel_steps( f"TCP 第{index + 1}轮", total_steps, step_base + 1, [ (f"TCP 直连 第{index + 1}步", tcp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)), (f"TCP 代理 第{index + 1}步", tcp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)), ], started_at, timeout=max(1.0, args.timeout + 1.0), ) if isinstance(results[0], TcpResult): direct_results.append(results[0]) if isinstance(results[1], TcpResult): proxy_results.append(results[1]) direct = { "connect": summarize([item.connect_ms for item in direct_results]), "ttfb": summarize([item.ttfb_ms for item in direct_results]), "total": summarize([item.total_ms for item in direct_results]), } print(f"TCP 目标: {target[0]}:{target[1]}") print(f" 直连: 成功 {len(direct_results)}/{len(direct_results)},连接均值 {fmt_ms(direct['connect']['avg'])},首包均值 {fmt_ms(direct['ttfb']['avg'])},总耗时均值 {fmt_ms(direct['total']['avg'])}") print(f" 直连: 连接 p50/p95 {fmt_ms(direct['connect']['p50'])} / {fmt_ms(direct['connect']['p95'])},总耗时 p50/p95 {fmt_ms(direct['total']['p50'])} / {fmt_ms(direct['total']['p95'])}") if proxy is None or not proxy_results: print(" 代理: 未执行或不可用") return proxy_stats = { "connect": summarize([item.connect_ms for item in proxy_results]), "ttfb": summarize([item.ttfb_ms for item in proxy_results]), "total": summarize([item.total_ms for item in proxy_results]), } diff = proxy_stats["total"]["avg"] - direct["total"]["avg"] pct = (diff / direct["total"]["avg"] * 100.0) if direct["total"]["avg"] > 0 else 0.0 print(f" 代理: 成功 {len(proxy_results)}/{len(proxy_results)},连接均值 {fmt_ms(proxy_stats['connect']['avg'])},首包均值 {fmt_ms(proxy_stats['ttfb']['avg'])},总耗时均值 {fmt_ms(proxy_stats['total']['avg'])}") print(f" 代理: 连接 p50/p95 {fmt_ms(proxy_stats['connect']['p50'])} / {fmt_ms(proxy_stats['connect']['p95'])},总耗时 p50/p95 {fmt_ms(proxy_stats['total']['p50'])} / {fmt_ms(proxy_stats['total']['p95'])}") print(f" 对比: 代理总耗时相对直连 {fmt_ms(diff)},变化 {fmt_pct(pct)},结论:{classify_change(pct)}") print_section_summary("连接耗时", direct["connect"]["avg"], proxy_stats["connect"]["avg"]) print_section_summary("首包耗时", direct["ttfb"]["avg"], proxy_stats["ttfb"]["avg"]) print_section_summary("总耗时", direct["total"]["avg"], proxy_stats["total"]["avg"]) print(f" TCP 总结:{overall_verdict([pct])}") async def bench_udp(args, target: tuple[str, int], proxy: tuple[str, int] | None) -> None: direct_results: list[UdpResult] = [] proxy_results: list[UdpResult] = [] warmup_steps = args.warmup * (2 if proxy is not None else 1) async def warmup_runner(step: int) -> None: if proxy is None: with contextlib.suppress(Exception): await udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout) return if step % 2 == 1: with contextlib.suppress(Exception): await udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout) else: with contextlib.suppress(Exception): await udp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout) if warmup_steps > 0: print("UDP 预热开始") for step in range(1, warmup_steps + 1): await warmup_runner(step) print(f" 预热进度:{step}/{warmup_steps}") total_steps = args.count * (2 if proxy is not None else 1) started_at = time.perf_counter() print("UDP 正式测试开始") for index in range(args.count): step_base = index * (2 if proxy is not None else 1) if proxy is None: direct_result = await run_step_with_heartbeat( f"UDP 直连 第{index + 1}步", total_steps, step_base + 1, udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout), started_at, timeout=max(1.0, args.timeout + 1.0), ) if isinstance(direct_result, UdpResult): direct_results.append(direct_result) continue results = await run_parallel_steps( f"UDP 第{index + 1}轮", total_steps, step_base + 1, [ (f"UDP 直连 第{index + 1}步", udp_direct_roundtrip(target[0], target[1], args.payload_size, args.timeout)), (f"UDP 代理 第{index + 1}步", udp_proxy_roundtrip(proxy[0], proxy[1], target[0], target[1], args.payload_size, args.timeout)), ], started_at, timeout=max(1.0, args.timeout + 1.0), ) if isinstance(results[0], UdpResult): direct_results.append(results[0]) if isinstance(results[1], UdpResult): proxy_results.append(results[1]) direct_ok = [item for item in direct_results if item.ok] print(f"UDP 目标: {target[0]}:{target[1]}") print(f" 直连: 成功 {len(direct_ok)}/{len(direct_results)},首包均值 {fmt_ms(summarize([item.first_ms for item in direct_ok])['avg'])},重复包 {sum(item.duplicates for item in direct_ok)}") if proxy is None or not proxy_results: print(" 代理: 未执行或不可用") return proxy_ok = [item for item in proxy_results if item.ok] proxy_first = summarize([item.first_ms for item in proxy_ok])["avg"] direct_first = summarize([item.first_ms for item in direct_ok])["avg"] delta = proxy_first - direct_first pct = (delta / direct_first * 100.0) if direct_first > 0 else 0.0 print(f" 代理: 成功 {len(proxy_ok)}/{len(proxy_results)},关联均值 {fmt_ms(summarize([item.associate_ms for item in proxy_ok])['avg'])},首包均值 {fmt_ms(proxy_first)},重复包 {sum(item.duplicates for item in proxy_ok)}") print(f" 对比: 代理首包相对直连 {fmt_ms(delta)},变化 {fmt_pct(pct)},结论:{classify_change(pct)}") print(f" UDP 总结:{overall_verdict([pct])}") async def amain(args) -> int: config = Config.load(args.config) if args.config and Path(args.config).exists() else None proxy: tuple[str, int] | None = None if config is not None and config.socks_port > 0: proxy = (config.socks_host, config.socks_port) if args.proxy_host and args.proxy_port: proxy = (args.proxy_host, args.proxy_port) print("本地基准测试开始") print(f" 样本数: {args.count},热身: {args.warmup},载荷: {args.payload_size} 字节") if proxy is not None: print(f" 代理: {proxy[0]}:{proxy[1]}") else: print(" 代理: 未配置或未启动,将只测直连") print(f" HTTP 目标: {args.http_url}") print(f" DNS 目标: {args.dns_server_host}:{args.dns_server_port}") print("") if args.mode in ("tcp", "all"): await bench_http(args, proxy) print("") if args.mode in ("udp", "all"): await bench_dns(args, proxy) return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="mynetspeeder 本地手工基线测试") parser.add_argument("--config", default="/home/mynetspeeder/config.json", help="配置文件路径,用于读取默认 socks 端口") parser.add_argument("--proxy-host", default="", help="SOCKS5 代理地址,默认读取 config.json") parser.add_argument("--proxy-port", type=int, default=0, help="SOCKS5 代理端口,默认读取 config.json") parser.add_argument("--http-url", default="https://research.google/blog/", help="HTTPS 测试 URL") parser.add_argument("--dns-server", default="8.8.8.8:53", help="DNS 测试服务器,格式 host:port") parser.add_argument("--mode", choices=("tcp", "udp", "all"), default="all", help="只测 TCP、只测 UDP 或都测") parser.add_argument("--count", type=int, default=8, help="正式样本数,默认 8,尽量控制在 2-5 分钟内完成") parser.add_argument("--warmup", type=int, default=0, help="热身轮数,默认 0;仅用于平滑首次连接抖动") parser.add_argument("--payload-size", type=int, default=4096, help="每次请求的载荷大小") parser.add_argument("--timeout", type=float, default=2.0, help="单次请求超时秒数,默认 2.0") return parser def main() -> int: args = build_parser().parse_args() args.dns_server_host, args.dns_server_port = parse_dns_target(args.dns_server) return asyncio.run(amain(args)) if __name__ == "__main__": raise SystemExit(main())