from __future__ import annotations import argparse import asyncio import json import re from collections import defaultdict from pathlib import Path from . import __version__ from .config import Config from .relay_server import RelayServer from .relay_client import RelayManager from .socks_edge import SocksEdge from .transparent_edge import TransparentEdge TCP_WIN_RE = re.compile( r"tcp win session=(?P\d+) target=(?P\S+) winner=(?P\S+) .*?direct=(?P\d+) .*?relay=(?P\d+) .*?family=(?Pipv4|ipv6)" ) RELAY_STATUS_SUMMARY_RE = re.compile( r"relay status summary name=(?P\S+) errors=(?P\d+) last_detail=(?P.*)" ) UDP_WIN_RE = re.compile( r"udp flow=(?P\d+) winner=(?P\S+) target=(?P\S+)" ) SOCKS_UDP_SUMMARY_RE = re.compile( r"udp summary .*?winner_detail=(?P.*?) (?:target_detail=(?P.*?) )?packets_sent=" ) def normalize_winner(name: str) -> str: return "direct" if name.startswith("direct") else name def split_target(target: str) -> tuple[str, str] | None: if ":" not in target: return None host, port = target.rsplit(":", 1) if not port.isdigit(): return None return host, port def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="mynetspeeder") parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}") sub = parser.add_subparsers(dest="command", required=True) relay = sub.add_parser("relay", help="在子节点 VPS 上启动 relay") relay.add_argument("--listen-host", default="0.0.0.0") relay.add_argument("--listen-port", type=int, default=9009) relay.add_argument("--token", required=True) relay.set_defaults(handler=handle_relay) edge = sub.add_parser("edge", help="在当前主 VPS 上启动透明 direct 出站加速") edge.add_argument("--listen-host", default="127.0.0.1") edge.add_argument("--listen-port", type=int, default=19080) edge.add_argument("--config", required=True) edge.add_argument("--enable-udp", action="store_true") edge.add_argument("--kernel", choices=("auto", "20", "24"), default="auto") edge.set_defaults(handler=handle_edge) socks = sub.add_parser("socks", help="在当前主 VPS 上启动显式 SOCKS5 出站加速") socks.add_argument("--listen-host", default="127.0.0.1") socks.add_argument("--listen-port", type=int, default=19180) socks.add_argument("--config", required=True) socks.set_defaults(handler=handle_socks) probe = sub.add_parser("probe", help="查看子节点探测与在线状态") probe.add_argument("--config", required=True) probe.add_argument("--once", action="store_true") probe.set_defaults(handler=handle_probe) summary = sub.add_parser("summary", help="汇总透明模式日志里的胜率") summary.add_argument("--log-file", default="/var/log/mynetspeeder-edge.log") summary.add_argument("--socks-log-file", default="/var/log/mynetspeeder-socks.log") summary.add_argument("--top", type=int, default=10) summary.add_argument("--json", action="store_true") summary.set_defaults(handler=handle_summary) return parser def handle_relay(args: argparse.Namespace) -> int: asyncio.run(RelayServer(args.token).start(args.listen_host, args.listen_port)) return 0 def handle_edge(args: argparse.Namespace) -> int: asyncio.run(TransparentEdge(args.listen_host, args.listen_port, Config.load(args.config), enable_udp=args.enable_udp, kernel_mode=args.kernel).start()) return 0 def handle_socks(args: argparse.Namespace) -> int: asyncio.run(SocksEdge(args.listen_host, args.listen_port, Config.load(args.config)).start()) return 0 def handle_probe(args: argparse.Namespace) -> int: async def run_probe() -> None: manager = RelayManager(Config.load(args.config)) await manager.start() await asyncio.sleep(2) print(json.dumps(manager.snapshot(), ensure_ascii=False, indent=2)) if not args.once: while True: await asyncio.sleep(5) print(json.dumps(manager.snapshot(), ensure_ascii=False, indent=2)) asyncio.run(run_probe()) return 0 def handle_summary(args: argparse.Namespace) -> int: log_path = Path(args.log_file) if not log_path.exists(): raise SystemExit(f"log file not found: {log_path}") socks_log_path = Path(args.socks_log_file) tcp_total = 0 tcp_direct = 0 tcp_relay = 0 tcp_winners: dict[str, int] = defaultdict(int) tcp_targets: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) tcp_families: dict[str, int] = defaultdict(int) relay_status_errors: dict[str, int] = defaultdict(int) relay_status_details: dict[str, str] = {} udp_total = 0 udp_direct = 0 udp_relay = 0 udp_winners: dict[str, int] = defaultdict(int) udp_targets: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) for line in log_path.read_text(errors="replace").splitlines(): relay_status_match = RELAY_STATUS_SUMMARY_RE.search(line) if relay_status_match: name = relay_status_match.group("name") relay_status_errors[name] += int(relay_status_match.group("errors")) relay_status_details[name] = relay_status_match.group("detail") continue tcp_match = TCP_WIN_RE.search(line) if tcp_match: winner = normalize_winner(tcp_match.group("winner")) target_parts = split_target(tcp_match.group("target")) if target_parts is None: continue host, port = target_parts key = f"{host}:{port}" tcp_total += 1 tcp_winners[winner] += 1 tcp_targets[key][winner] += 1 tcp_families[tcp_match.group("family")] += 1 if winner == "direct": tcp_direct += 1 else: tcp_relay += 1 continue udp_match = UDP_WIN_RE.search(line) if udp_match: winner = normalize_winner(udp_match.group("winner")) target_parts = split_target(udp_match.group("target")) if target_parts is None: continue host, port = target_parts key = f"{host}:{port}" udp_total += 1 udp_winners[winner] += 1 udp_targets[key][winner] += 1 if winner == "direct": udp_direct += 1 else: udp_relay += 1 socks_udp_wins: dict[int, tuple[str, str | None]] = {} if socks_log_path.exists(): for line in socks_log_path.read_text(errors="replace").splitlines(): summary_match = SOCKS_UDP_SUMMARY_RE.search(line) if not summary_match: continue winners_raw = summary_match.group("winner_detail").strip() targets_raw = (summary_match.group("target_detail") or "").strip() if winners_raw == "none": continue target_map: dict[int, str] = {} if targets_raw and targets_raw != "none": for item in targets_raw.split(", "): parts = item.split(":") if len(parts) < 3: continue try: flow_id = int(parts[0]) except ValueError: continue target_map[flow_id] = f"{':'.join(parts[1:-1])}:{parts[-1]}" for item in winners_raw.split(", "): flow_parts = item.split(":", 1) if len(flow_parts) != 2: continue try: flow_id = int(flow_parts[0]) except ValueError: continue winner = normalize_winner(flow_parts[1]) socks_udp_wins[flow_id] = (winner, target_map.get(flow_id)) for winner, target in socks_udp_wins.values(): udp_total += 1 udp_winners[winner] += 1 if winner == "direct": udp_direct += 1 else: udp_relay += 1 if target: udp_targets[target][winner] += 1 tcp_ordered_targets = sorted( tcp_targets.items(), key=lambda item: sum(item[1].values()), reverse=True, )[: max(args.top, 0)] udp_ordered_targets = sorted( udp_targets.items(), key=lambda item: sum(item[1].values()), reverse=True, )[: max(args.top, 0)] result = { "log_file": str(log_path), "tcp": { "total": tcp_total, "direct": tcp_direct, "relay": tcp_relay, "families": dict(sorted(tcp_families.items(), key=lambda item: item[0])), "winners": dict(sorted(tcp_winners.items(), key=lambda item: (-item[1], item[0]))), "targets": [ { "target": target, "wins": dict(sorted(counts.items(), key=lambda item: (-item[1], item[0]))), } for target, counts in tcp_ordered_targets ], }, "udp": { "total": udp_total, "direct": udp_direct, "relay": udp_relay, "winners": dict(sorted(udp_winners.items(), key=lambda item: (-item[1], item[0]))), "targets": [ { "target": target, "wins": dict(sorted(counts.items(), key=lambda item: (-item[1], item[0]))), } for target, counts in udp_ordered_targets ], }, "relay_status": { "errors": dict(sorted(relay_status_errors.items(), key=lambda item: (-item[1], item[0]))), "last_detail": dict(sorted(relay_status_details.items(), key=lambda item: item[0])), }, } if args.json: print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 print(f"log: {log_path}") if socks_log_path.exists(): print(f"socks_log: {socks_log_path}") for protocol in ("tcp", "udp"): section = result[protocol] print(f"{protocol}: total={section['total']} direct={section['direct']} relay={section['relay']}") if protocol == "tcp": families = ", ".join(f"{name}={count}" for name, count in section.get("families", {}).items()) or "none" print(f"families: {families}") print("winners:") for name, count in section["winners"].items(): print(f" {name}: {count}") print("targets:") for item in section["targets"]: wins = ", ".join(f"{name}={count}" for name, count in item["wins"].items()) print(f" {item['target']}: {wins}") if result["relay_status"]["errors"]: print("relay_status:") for name, count in result["relay_status"]["errors"].items(): detail = result["relay_status"]["last_detail"].get(name, "unknown") print(f" {name}: errors={count} last_detail={detail}") return 0 def main() -> int: parser = build_parser() args = parser.parse_args() return args.handler(args)