#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import os import sys import subprocess import socket import time import shutil from pathlib import Path DEFAULT_HOST = "117.50.195.224" DEFAULT_USER = "root" DEFAULT_PORT = 29765 DEFAULT_SOCKS_HOST = "127.0.0.1" DEFAULT_SOCKS_PORT = 1066 # Windows creation flags for detached background process CREATE_NO_WINDOW = 0x08000000 DETACHED_PROCESS = 0x00000008 CREATE_NEW_PROCESS_GROUP = 0x00000200 def which_ssh(): ssh_path = shutil.which("ssh") if ssh_path is None: print("未找到 ssh 可执行文件。请在系统中安装 OpenSSH 客户端并确保 ssh 在 PATH 中。", file=sys.stderr) sys.exit(2) return ssh_path def check_port_free(host, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(0.5) try: s.bind((host, port)) s.close() return True except OSError: return False def wait_for_listen(host, port, timeout=10.0): # 等待本地端口开始监听(大概率说明隧道已建立) deadline = time.time() + timeout while time.time() < deadline: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.5) try: if s.connect_ex((host, port)) == 0: return True except OSError: pass time.sleep(0.2) return False def ensure_dir(p: Path): p.mkdir(parents=True, exist_ok=True) if not os.access(p, os.W_OK): print(f"目录不可写: {p}", file=sys.stderr) sys.exit(2) def default_control_path(host, port): base = Path.home() / ".ssh" ensure_dir(base) # 控制套接字文件名尽量短 name = f"cm-{host}-{port}" cpath = base / name # Windows AF_UNIX 路径不要太长 if len(str(cpath)) > 100: print(f"警告:控制套接字路径过长,可能导致失败: {cpath}", file=sys.stderr) return str(cpath) def build_keepalive_opts(alive_interval=60, alive_count=3): return [ "-o", f"ServerAliveInterval={alive_interval}", "-o", f"ServerAliveCountMax={alive_count}", ] def start_tunnel(args): ssh = which_ssh() # 提前检查端口是否被占用 if not args.force and not check_port_free(args.socks_host, args.socks_port): print(f"本地端口 {args.socks_host}:{args.socks_port} 已被占用。使用 --force 忽略或更换端口。", file=sys.stderr) sys.exit(1) cmd = [ ssh, "-N", "-D", f"{args.socks_host}:{args.socks_port}", "-p", str(args.port), ] + build_keepalive_opts(args.alive_interval, args.alive_count) if args.strict_no: cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"] target = f"{args.user}@{args.host}" cmd.append(target) print("启动本地 SSH 隧道(非复用模式):") print(" ".join(cmd)) # 在 Windows 上用无窗口/脱离控制台方式运行 if os.name == "nt": flags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW with open(os.devnull, "wb") as devnull: proc = subprocess.Popen( cmd, stdin=devnull, stdout=devnull, stderr=devnull, creationflags=flags, close_fds=True, ) print(f"已启动,PID={proc.pid}。尝试等待端口监听...") ok = wait_for_listen(args.socks_host, args.socks_port, timeout=args.wait_timeout) if ok: print(f"SOCKS 已监听于 {args.socks_host}:{args.socks_port}") else: print("未检测到端口监听。若使用密码认证,后台无法输入密码,请改用密钥或前台启动。") # 记录 pid 文件 pidfile = pid_file_path(args) ensure_dir(pidfile.parent) pidfile.write_text(str(proc.pid), encoding="utf-8") print(f"PID 已写入 {pidfile}") else: # 在非 Windows 系统,使用 -f 让 ssh 后台运行(如果你跨平台使用) cmd.insert(1, "-f") subprocess.run(cmd, check=True) print("已启动。") def pid_file_path(args): base = Path.home() / ".ssh" / "tunnels" fname = f"{args.host}-{args.port}-D{args.socks_host.replace('.', '_')}_{args.socks_port}.pid" return base / fname def stop_tunnel(args): # 通过 pidfile 强制停止非复用模式进程 pidfile = pid_file_path(args) if not pidfile.exists(): print(f"未找到 PID 文件:{pidfile}。如果你是用复用模式启动,请用 stop-mux。", file=sys.stderr) sys.exit(1) try: pid = int(pidfile.read_text(encoding="utf-8").strip()) except Exception as e: print(f"读取 PID 文件失败:{e}", file=sys.stderr) sys.exit(1) if os.name == "nt": # 使用 taskkill 结束 r = subprocess.run(["taskkill", "/PID", str(pid), "/F"], capture_output=True, text=True) if r.returncode == 0: print(f"已结束进程 PID={pid}") try: pidfile.unlink() except Exception: pass else: print(f"结束进程失败:{r.stdout}\n{r.stderr}", file=sys.stderr) sys.exit(r.returncode) else: try: os.kill(pid, 15) print(f"已发送 SIGTERM 给 PID={pid}") pidfile.unlink(missing_ok=True) except Exception as e: print(f"结束失败:{e}", file=sys.stderr) sys.exit(1) def status_tunnel(args): # 仅检查本地 SOCKS 端口是否在监听 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(0.3) try: if s.connect_ex((args.socks_host, args.socks_port)) == 0: print(f"监听中:{args.socks_host}:{args.socks_port}") else: print(f"未监听:{args.socks_host}:{args.socks_port}") finally: s.close() def start_mux(args): ssh = which_ssh() cpath = args.control_path or default_control_path(args.host, args.port) # 确保控制路径所在目录存在 ensure_dir(Path(cpath).expanduser().resolve().parent) cmd = [ ssh, "-M", "-S", cpath, "-N", "-D", f"{args.socks_host}:{args.socks_port}", "-p", str(args.port), ] + build_keepalive_opts(args.alive_interval, args.alive_count) if args.strict_no: cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"] target = f"{args.user}@{args.host}" cmd.append(target) print("启动本地 SSH 隧道(连接复用模式):") print(" ".join(cmd)) if os.name == "nt": flags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW with open(os.devnull, "wb") as devnull: proc = subprocess.Popen( cmd, stdin=devnull, stdout=devnull, stderr=devnull, creationflags=flags, close_fds=True, ) print(f"已启动(复用主进程),PID={proc.pid}。等待端口监听...") ok = wait_for_listen(args.socks_host, args.socks_port, timeout=args.wait_timeout) if ok: print(f"SOCKS 已监听于 {args.socks_host}:{args.socks_port}") else: print("未检测到端口监听。若使用密码认证,后台无法输入密码,请改用密钥或前台启动。") else: # 非 Windows 可以用 -f,但复用主连接一般不建议 -f,使用后台方式或系统化管理更稳妥 subprocess.Popen(cmd) def stop_mux(args): ssh = which_ssh() cpath = args.control_path or default_control_path(args.host, args.port) cmd = [ ssh, "-S", cpath, "-O", "exit", "-p", str(args.port), f"{args.user}@{args.host}", ] print("关闭复用主连接:") print(" ".join(cmd)) r = subprocess.run(cmd, capture_output=True, text=True) if r.returncode == 0: print(r.stdout.strip() or "已请求关闭。") else: # 有的版本输出在 stderr out = (r.stdout + "\n" + r.stderr).strip() print(f"关闭失败:{out}", file=sys.stderr) sys.exit(r.returncode) def status_mux(args): ssh = which_ssh() cpath = args.control_path or default_control_path(args.host, args.port) cmd = [ ssh, "-S", cpath, "-O", "check", "-p", str(args.port), f"{args.user}@{args.host}", ] r = subprocess.run(cmd, capture_output=True, text=True) if r.returncode == 0: print(r.stdout.strip() or "Master running") else: out = (r.stdout + "\n" + r.stderr).strip() print(out or "Not running") sys.exit(r.returncode) def parse_args(): p = argparse.ArgumentParser(description="Windows 上启动/管理 SSH 本地 SOCKS 隧道的小工具") sub = p.add_subparsers(dest="cmd", required=True) def add_common(sp): sp.add_argument("-H", "--host", default=DEFAULT_HOST, help="SSH 服务器地址") sp.add_argument("-u", "--user", default=DEFAULT_USER, help="SSH 用户名") sp.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="SSH 端口") sp.add_argument("--socks-host", default=DEFAULT_SOCKS_HOST, help="本地 SOCKS 监听地址") sp.add_argument("-D", "--socks-port", type=int, default=DEFAULT_SOCKS_PORT, help="本地 SOCKS 监听端口") sp.add_argument("--alive-interval", type=int, default=60, help="ServerAliveInterval") sp.add_argument("--alive-count", type=int, default=5, help="ServerAliveCountMax") sp.add_argument("--strict-no", action="store_true", help="临时关闭主机指纹检查(不安全)") sp.add_argument("--wait-timeout", type=float, default=10.0, help="启动后等待监听的秒数") return sp add_common(sub.add_parser("start", help="启动隧道(非复用)")).add_argument("--force", action="store_true", help="忽略本地端口占用检查") add_common(sub.add_parser("stop", help="停止隧道(通过 pid 文件)")) add_common(sub.add_parser("status", help="查看本地 SOCKS 端口是否在监听")) sp_mux_start = add_common(sub.add_parser("start-mux", help="启动隧道(连接复用模式)")) sp_mux_start.add_argument("-S", "--control-path", default=None, help="控制套接字路径(默认:~/.ssh/cm--)") sp_mux_stop = add_common(sub.add_parser("stop-mux", help="关闭复用主连接")) sp_mux_stop.add_argument("-S", "--control-path", default=None, help="控制套接字路径") sp_mux_status = add_common(sub.add_parser("status-mux", help="查看复用主连接状态")) sp_mux_status.add_argument("-S", "--control-path", default=None, help="控制套接字路径") return p.parse_args() def main(): args = parse_args() if args.cmd == "start": start_tunnel(args) elif args.cmd == "stop": stop_tunnel(args) elif args.cmd == "status": status_tunnel(args) elif args.cmd == "start-mux": start_mux(args) elif args.cmd == "stop-mux": stop_mux(args) elif args.cmd == "status-mux": status_mux(args) else: print("未知命令", file=sys.stderr) sys.exit(2) if __name__ == "__main__": main() # 使用示例 # 服务器端需要先开通免密登录 # 启动非复用隧道(相当于 ssh -N -D 127.0.0.1:1066 -p 29765 root@117.50.195.224,并包含存活探测): # python ssh_tunnel.py start -H 117.50.195.224 -u root -p 29765 -D 1066 # python ssh_tunnel.py start -H 106.14.113.12 -u root -p 29765 -D 1067 # python ssh_tunnel.py start -H 103.40.13.87 -u root -p 40268 -D 1061 # 查看本地 SOCKS 端口是否监听: python ssh_tunnel.py status -D 1066 # 停止非复用隧道(通过 PID 文件): python ssh_tunnel.py stop -H 117.50.195.224 -u root -p 29765 -D 1066