from __future__ import annotations import argparse import asyncio import json import re from collections import defaultdict from pathlib import Path from . import __version__ from .config import Config from .relay_server import RelayServer from .relay_client import RelayManager from .socks_edge import SocksEdge from .transparent_edge import TransparentEdge TCP_WIN_RE = re.compile( r"tcp win session=(?P\d+) target=(?P[^:]+):(?P\d+) winner=(?P\S+) .*?direct=(?P\d+) .*?relay=(?P\d+)" ) UDP_WIN_RE = re.compile( r"udp flow=(?P\d+) winner=(?P\S+) target=(?P[^:]+):(?P\d+)" ) UDP_SOCKS_SUMMARY_RE = re.compile( r"udp summary bind=(?P\S+) flows=(?P\d+) winners=(?P\d+) " r"winner_breakdown=direct=(?P\d+),relay=(?P\d+) " r"sample=(?P.*?) candidates=(?P\[.*?\]) " r"sent=(?P\d+) recv=(?P\d+) dup=(?P\d+) " r"direct_paths=(?P\d+) relay_paths=(?P\d+) relay_errors=(?P.*)" ) def normalize_winner(name: str) -> str: return "direct" if name.startswith("direct") else name def parse_summary_log(log_path: Path) -> dict[str, object]: tcp_total = 0 tcp_direct = 0 tcp_relay = 0 tcp_winners: dict[str, int] = defaultdict(int) tcp_targets: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) udp_total = 0 udp_direct = 0 udp_relay = 0 udp_winners: dict[str, int] = defaultdict(int) udp_targets: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) udp_snapshot: dict[str, object] | None = None for line in log_path.read_text(errors="replace").splitlines(): tcp_match = TCP_WIN_RE.search(line) if tcp_match: tcp_total += 1 winner = normalize_winner(tcp_match.group("winner")) host = tcp_match.group("host") port = tcp_match.group("port") key = f"{host}:{port}" tcp_winners[winner] += 1 tcp_targets[key][winner] += 1 if winner == "direct": tcp_direct += 1 else: tcp_relay += 1 continue udp_match = UDP_WIN_RE.search(line) if udp_match: udp_total += 1 winner = normalize_winner(udp_match.group("winner")) host = udp_match.group("host") port = udp_match.group("port") key = f"{host}:{port}" udp_winners[winner] += 1 udp_targets[key][winner] += 1 if winner == "direct": udp_direct += 1 else: udp_relay += 1 continue udp_snapshot_match = UDP_SOCKS_SUMMARY_RE.search(line) if udp_snapshot_match: udp_snapshot = { "bind": udp_snapshot_match.group("bind"), "flows": int(udp_snapshot_match.group("flows")), "winners": int(udp_snapshot_match.group("winners")), "direct": int(udp_snapshot_match.group("direct")), "relay": int(udp_snapshot_match.group("relay")), "sample": udp_snapshot_match.group("sample"), "candidates": udp_snapshot_match.group("candidates"), "sent": int(udp_snapshot_match.group("sent")), "recv": int(udp_snapshot_match.group("recv")), "dup": int(udp_snapshot_match.group("dup")), "direct_paths": int(udp_snapshot_match.group("direct_paths")), "relay_paths": int(udp_snapshot_match.group("relay_paths")), "relay_errors": udp_snapshot_match.group("relay_errors"), } return { "path": log_path, "tcp_total": tcp_total, "tcp_direct": tcp_direct, "tcp_relay": tcp_relay, "tcp_winners": tcp_winners, "tcp_targets": tcp_targets, "udp_total": udp_total, "udp_direct": udp_direct, "udp_relay": udp_relay, "udp_winners": udp_winners, "udp_targets": udp_targets, "udp_snapshot": udp_snapshot, } def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="mynetspeeder") parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}") sub = parser.add_subparsers(dest="command", required=True) relay = sub.add_parser("relay", help="在子节点 VPS 上启动 relay") relay.add_argument("--listen-host", default="0.0.0.0") relay.add_argument("--listen-port", type=int, default=9009) relay.add_argument("--token", required=True) relay.set_defaults(handler=handle_relay) edge = sub.add_parser("edge", help="在当前主 VPS 上启动透明 direct 出站加速") edge.add_argument("--listen-host", default="127.0.0.1") edge.add_argument("--listen-port", type=int, default=19080) edge.add_argument("--config", required=True) edge.add_argument("--enable-udp", action="store_true") edge.add_argument("--kernel", choices=("auto", "20", "24"), default="auto") edge.set_defaults(handler=handle_edge) socks = sub.add_parser("socks", help="在当前主 VPS 上启动显式 SOCKS5 出站加速") socks.add_argument("--listen-host", default="127.0.0.1") socks.add_argument("--listen-port", type=int, default=19180) socks.add_argument("--config", required=True) socks.set_defaults(handler=handle_socks) probe = sub.add_parser("probe", help="查看子节点探测与在线状态") probe.add_argument("--config", required=True) probe.add_argument("--once", action="store_true") probe.set_defaults(handler=handle_probe) summary = sub.add_parser("summary", help="汇总透明模式日志里的胜率") summary.add_argument("--log-file", default="/var/log/mynetspeeder-edge.log") summary.add_argument("--top", type=int, default=10) summary.add_argument("--json", action="store_true") summary.set_defaults(handler=handle_summary) return parser def handle_relay(args: argparse.Namespace) -> int: asyncio.run(RelayServer(args.token).start(args.listen_host, args.listen_port)) return 0 def handle_edge(args: argparse.Namespace) -> int: asyncio.run(TransparentEdge(args.listen_host, args.listen_port, Config.load(args.config), enable_udp=args.enable_udp, kernel_mode=args.kernel).start()) return 0 def handle_socks(args: argparse.Namespace) -> int: asyncio.run(SocksEdge(args.listen_host, args.listen_port, Config.load(args.config)).start()) return 0 def handle_probe(args: argparse.Namespace) -> int: async def run_probe() -> None: manager = RelayManager(Config.load(args.config)) await manager.start() await asyncio.sleep(2) print(json.dumps(manager.snapshot(), ensure_ascii=False, indent=2)) if not args.once: while True: await asyncio.sleep(5) print(json.dumps(manager.snapshot(), ensure_ascii=False, indent=2)) asyncio.run(run_probe()) return 0 def handle_summary(args: argparse.Namespace) -> int: log_path = Path(args.log_file) if not log_path.exists(): raise SystemExit(f"log file not found: {log_path}") parsed = parse_summary_log(log_path) tcp_total = parsed["tcp_total"] tcp_direct = parsed["tcp_direct"] tcp_relay = parsed["tcp_relay"] tcp_winners = parsed["tcp_winners"] tcp_targets = parsed["tcp_targets"] udp_total = parsed["udp_total"] udp_direct = parsed["udp_direct"] udp_relay = parsed["udp_relay"] udp_winners = parsed["udp_winners"] udp_targets = parsed["udp_targets"] udp_snapshot = parsed["udp_snapshot"] if udp_total == 0: sibling_logs = [log_path.with_name("mynetspeeder-socks.log"), log_path.with_name("mynetspeeder-relay.log")] for sibling in sibling_logs: if not sibling.exists(): continue sibling_parsed = parse_summary_log(sibling) if sibling_parsed["udp_total"] == 0 and sibling_parsed["udp_snapshot"] is None: continue udp_total = sibling_parsed["udp_total"] udp_direct = sibling_parsed["udp_direct"] udp_relay = sibling_parsed["udp_relay"] udp_winners = sibling_parsed["udp_winners"] udp_targets = sibling_parsed["udp_targets"] udp_snapshot = sibling_parsed["udp_snapshot"] break tcp_ordered_targets = sorted( tcp_targets.items(), key=lambda item: sum(item[1].values()), reverse=True, )[: max(args.top, 0)] udp_ordered_targets = sorted( udp_targets.items(), key=lambda item: sum(item[1].values()), reverse=True, )[: max(args.top, 0)] result = { "log_file": str(log_path), "tcp": { "total": tcp_total, "direct": tcp_direct, "relay": tcp_relay, "winners": dict(sorted(tcp_winners.items(), key=lambda item: (-item[1], item[0]))), "targets": [ { "target": target, "wins": dict(sorted(counts.items(), key=lambda item: (-item[1], item[0]))), } for target, counts in tcp_ordered_targets ], }, "udp": { "total": udp_total, "direct": udp_direct, "relay": udp_relay, "winners": dict(sorted(udp_winners.items(), key=lambda item: (-item[1], item[0]))), "targets": [ { "target": target, "wins": dict(sorted(counts.items(), key=lambda item: (-item[1], item[0]))), } for target, counts in udp_ordered_targets ], "snapshot": udp_snapshot, }, } if udp_total == 0 and udp_snapshot is not None: result["udp"]["total"] = udp_snapshot["winners"] result["udp"]["direct"] = udp_snapshot["direct"] result["udp"]["relay"] = udp_snapshot["relay"] if not result["udp"]["winners"] and udp_snapshot is not None: snapshot_winners: dict[str, int] = {} if int(udp_snapshot["direct"]) > 0: snapshot_winners["direct"] = int(udp_snapshot["direct"]) if int(udp_snapshot["relay"]) > 0: snapshot_winners["relay"] = int(udp_snapshot["relay"]) result["udp"]["winners"] = snapshot_winners if not result["udp"]["targets"] and udp_snapshot is not None: snapshot_target_wins: dict[str, int] = {} if int(udp_snapshot["direct"]) > 0: snapshot_target_wins["direct"] = int(udp_snapshot["direct"]) if int(udp_snapshot["relay"]) > 0: snapshot_target_wins["relay"] = int(udp_snapshot["relay"]) result["udp"]["targets"] = [ { "target": f"snapshot@{udp_snapshot['bind']}", "wins": snapshot_target_wins, } ] if args.json: print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 print(f"log: {log_path}") for protocol in ("tcp", "udp"): section = result[protocol] print(f"{protocol}: total={section['total']} direct={section['direct']} relay={section['relay']}") if protocol == "udp" and section.get("snapshot"): snapshot = section["snapshot"] print( f" snapshot: bind={snapshot['bind']} flows={snapshot['flows']} winners={snapshot['winners']} " f"direct_paths={snapshot['direct_paths']} relay_paths={snapshot['relay_paths']} dup={snapshot['dup']} " f"candidates={snapshot['candidates']}" ) print("winners:") for name, count in section["winners"].items(): print(f" {name}: {count}") print("targets:") for item in section["targets"]: wins = ", ".join(f"{name}={count}" for name, count in item["wins"].items()) print(f" {item['target']}: {wins}") return 0 def main() -> int: parser = build_parser() args = parser.parse_args() return args.handler(args)