from __future__ import annotations import argparse import asyncio import json import re from collections import defaultdict from pathlib import Path from . import __version__ from .config import Config from .relay_server import RelayServer from .relay_client import RelayManager from .socks_edge import SocksEdge from .transparent_edge import TransparentEdge TCP_WIN_RE = re.compile( r"tcp win session=(?P\d+) target=(?P[^:]+):(?P\d+) winner=(?P\S+) .*?direct=(?P\d+) .*?relay=(?P\d+)" ) UDP_WIN_RE = re.compile( r"udp flow=(?P\d+) winner=(?P\S+) target=(?P[^:]+):(?P\d+)" ) def normalize_winner(name: str) -> str: return "direct" if name.startswith("direct") else name def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="mynetspeeder") parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}") sub = parser.add_subparsers(dest="command", required=True) relay = sub.add_parser("relay", help="在子节点 VPS 上启动 relay") relay.add_argument("--listen-host", default="0.0.0.0") relay.add_argument("--listen-port", type=int, default=9009) relay.add_argument("--token", required=True) relay.set_defaults(handler=handle_relay) edge = sub.add_parser("edge", help="在当前主 VPS 上启动透明 direct 出站加速") edge.add_argument("--listen-host", default="127.0.0.1") edge.add_argument("--listen-port", type=int, default=19080) edge.add_argument("--config", required=True) edge.add_argument("--enable-udp", action="store_true") edge.add_argument("--kernel", choices=("auto", "20", "24"), default="auto") edge.set_defaults(handler=handle_edge) socks = sub.add_parser("socks", help="在当前主 VPS 上启动显式 SOCKS5 出站加速") socks.add_argument("--listen-host", default="127.0.0.1") socks.add_argument("--listen-port", type=int, default=19180) socks.add_argument("--config", required=True) socks.set_defaults(handler=handle_socks) probe = sub.add_parser("probe", help="查看子节点探测与在线状态") probe.add_argument("--config", required=True) probe.add_argument("--once", action="store_true") probe.set_defaults(handler=handle_probe) summary = sub.add_parser("summary", help="汇总透明模式日志里的胜率") summary.add_argument("--log-file", default="/var/log/mynetspeeder-edge.log") summary.add_argument("--top", type=int, default=10) summary.add_argument("--json", action="store_true") summary.set_defaults(handler=handle_summary) return parser def handle_relay(args: argparse.Namespace) -> int: asyncio.run(RelayServer(args.token).start(args.listen_host, args.listen_port)) return 0 def handle_edge(args: argparse.Namespace) -> int: asyncio.run(TransparentEdge(args.listen_host, args.listen_port, Config.load(args.config), enable_udp=args.enable_udp, kernel_mode=args.kernel).start()) return 0 def handle_socks(args: argparse.Namespace) -> int: asyncio.run(SocksEdge(args.listen_host, args.listen_port, Config.load(args.config)).start()) return 0 def handle_probe(args: argparse.Namespace) -> int: async def run_probe() -> None: manager = RelayManager(Config.load(args.config)) await manager.start() await asyncio.sleep(2) print(json.dumps(manager.snapshot(), ensure_ascii=False, indent=2)) if not args.once: while True: await asyncio.sleep(5) print(json.dumps(manager.snapshot(), ensure_ascii=False, indent=2)) asyncio.run(run_probe()) return 0 def handle_summary(args: argparse.Namespace) -> int: log_path = Path(args.log_file) if not log_path.exists(): raise SystemExit(f"log file not found: {log_path}") tcp_total = 0 tcp_direct = 0 tcp_relay = 0 tcp_winners: dict[str, int] = defaultdict(int) tcp_targets: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) udp_total = 0 udp_direct = 0 udp_relay = 0 udp_winners: dict[str, int] = defaultdict(int) udp_targets: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) for line in log_path.read_text(errors="replace").splitlines(): tcp_match = TCP_WIN_RE.search(line) if tcp_match: tcp_total += 1 winner = normalize_winner(tcp_match.group("winner")) host = tcp_match.group("host") port = tcp_match.group("port") key = f"{host}:{port}" tcp_winners[winner] += 1 tcp_targets[key][winner] += 1 if winner == "direct": tcp_direct += 1 else: tcp_relay += 1 continue udp_match = UDP_WIN_RE.search(line) if udp_match: udp_total += 1 winner = normalize_winner(udp_match.group("winner")) host = udp_match.group("host") port = udp_match.group("port") key = f"{host}:{port}" udp_winners[winner] += 1 udp_targets[key][winner] += 1 if winner == "direct": udp_direct += 1 else: udp_relay += 1 tcp_ordered_targets = sorted( tcp_targets.items(), key=lambda item: sum(item[1].values()), reverse=True, )[: max(args.top, 0)] udp_ordered_targets = sorted( udp_targets.items(), key=lambda item: sum(item[1].values()), reverse=True, )[: max(args.top, 0)] result = { "log_file": str(log_path), "tcp": { "total": tcp_total, "direct": tcp_direct, "relay": tcp_relay, "winners": dict(sorted(tcp_winners.items(), key=lambda item: (-item[1], item[0]))), "targets": [ { "target": target, "wins": dict(sorted(counts.items(), key=lambda item: (-item[1], item[0]))), } for target, counts in tcp_ordered_targets ], }, "udp": { "total": udp_total, "direct": udp_direct, "relay": udp_relay, "winners": dict(sorted(udp_winners.items(), key=lambda item: (-item[1], item[0]))), "targets": [ { "target": target, "wins": dict(sorted(counts.items(), key=lambda item: (-item[1], item[0]))), } for target, counts in udp_ordered_targets ], }, } if args.json: print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 print(f"log: {log_path}") for protocol in ("tcp", "udp"): section = result[protocol] print(f"{protocol}: total={section['total']} direct={section['direct']} relay={section['relay']}") print("winners:") for name, count in section["winners"].items(): print(f" {name}: {count}") print("targets:") for item in section["targets"]: wins = ", ".join(f"{name}={count}" for name, count in item["wins"].items()) print(f" {item['target']}: {wins}") return 0 def main() -> int: parser = build_parser() args = parser.parse_args() return args.handler(args)