| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import argparse
- import os
- import sys
- import subprocess
- import socket
- import time
- import shutil
- from pathlib import Path
- DEFAULT_HOST = "117.50.195.224"
- DEFAULT_USER = "root"
- DEFAULT_PORT = 29765
- DEFAULT_SOCKS_HOST = "127.0.0.1"
- DEFAULT_SOCKS_PORT = 1066
- # Windows creation flags for detached background process
- CREATE_NO_WINDOW = 0x08000000
- DETACHED_PROCESS = 0x00000008
- CREATE_NEW_PROCESS_GROUP = 0x00000200
- def which_ssh():
- ssh_path = shutil.which("ssh")
- if ssh_path is None:
- print("未找到 ssh 可执行文件。请在系统中安装 OpenSSH 客户端并确保 ssh 在 PATH 中。", file=sys.stderr)
- sys.exit(2)
- return ssh_path
- def check_port_free(host, port):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(0.5)
- try:
- s.bind((host, port))
- s.close()
- return True
- except OSError:
- return False
- def wait_for_listen(host, port, timeout=10.0):
- # 等待本地端口开始监听(大概率说明隧道已建立)
- deadline = time.time() + timeout
- while time.time() < deadline:
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.settimeout(0.5)
- try:
- if s.connect_ex((host, port)) == 0:
- return True
- except OSError:
- pass
- time.sleep(0.2)
- return False
- def ensure_dir(p: Path):
- p.mkdir(parents=True, exist_ok=True)
- if not os.access(p, os.W_OK):
- print(f"目录不可写: {p}", file=sys.stderr)
- sys.exit(2)
- def default_control_path(host, port):
- base = Path.home() / ".ssh"
- ensure_dir(base)
- # 控制套接字文件名尽量短
- name = f"cm-{host}-{port}"
- cpath = base / name
- # Windows AF_UNIX 路径不要太长
- if len(str(cpath)) > 100:
- print(f"警告:控制套接字路径过长,可能导致失败: {cpath}", file=sys.stderr)
- return str(cpath)
- def build_keepalive_opts(alive_interval=60, alive_count=3):
- return [
- "-o", f"ServerAliveInterval={alive_interval}",
- "-o", f"ServerAliveCountMax={alive_count}",
- ]
- def start_tunnel(args):
- ssh = which_ssh()
- # 提前检查端口是否被占用
- if not args.force and not check_port_free(args.socks_host, args.socks_port):
- print(f"本地端口 {args.socks_host}:{args.socks_port} 已被占用。使用 --force 忽略或更换端口。", file=sys.stderr)
- sys.exit(1)
- cmd = [
- ssh,
- "-N",
- "-D", f"{args.socks_host}:{args.socks_port}",
- "-p", str(args.port),
- ] + build_keepalive_opts(args.alive_interval, args.alive_count)
- if args.strict_no:
- cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
- target = f"{args.user}@{args.host}"
- cmd.append(target)
- print("启动本地 SSH 隧道(非复用模式):")
- print(" ".join(cmd))
- # 在 Windows 上用无窗口/脱离控制台方式运行
- if os.name == "nt":
- flags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW
- with open(os.devnull, "wb") as devnull:
- proc = subprocess.Popen(
- cmd,
- stdin=devnull,
- stdout=devnull,
- stderr=devnull,
- creationflags=flags,
- close_fds=True,
- )
- print(f"已启动,PID={proc.pid}。尝试等待端口监听...")
- ok = wait_for_listen(args.socks_host, args.socks_port, timeout=args.wait_timeout)
- if ok:
- print(f"SOCKS 已监听于 {args.socks_host}:{args.socks_port}")
- else:
- print("未检测到端口监听。若使用密码认证,后台无法输入密码,请改用密钥或前台启动。")
- # 记录 pid 文件
- pidfile = pid_file_path(args)
- ensure_dir(pidfile.parent)
- pidfile.write_text(str(proc.pid), encoding="utf-8")
- print(f"PID 已写入 {pidfile}")
- else:
- # 在非 Windows 系统,使用 -f 让 ssh 后台运行(如果你跨平台使用)
- cmd.insert(1, "-f")
- subprocess.run(cmd, check=True)
- print("已启动。")
- def pid_file_path(args):
- base = Path.home() / ".ssh" / "tunnels"
- fname = f"{args.host}-{args.port}-D{args.socks_host.replace('.', '_')}_{args.socks_port}.pid"
- return base / fname
- def stop_tunnel(args):
- # 通过 pidfile 强制停止非复用模式进程
- pidfile = pid_file_path(args)
- if not pidfile.exists():
- print(f"未找到 PID 文件:{pidfile}。如果你是用复用模式启动,请用 stop-mux。", file=sys.stderr)
- sys.exit(1)
- try:
- pid = int(pidfile.read_text(encoding="utf-8").strip())
- except Exception as e:
- print(f"读取 PID 文件失败:{e}", file=sys.stderr)
- sys.exit(1)
- if os.name == "nt":
- # 使用 taskkill 结束
- r = subprocess.run(["taskkill", "/PID", str(pid), "/F"], capture_output=True, text=True)
- if r.returncode == 0:
- print(f"已结束进程 PID={pid}")
- try:
- pidfile.unlink()
- except Exception:
- pass
- else:
- print(f"结束进程失败:{r.stdout}\n{r.stderr}", file=sys.stderr)
- sys.exit(r.returncode)
- else:
- try:
- os.kill(pid, 15)
- print(f"已发送 SIGTERM 给 PID={pid}")
- pidfile.unlink(missing_ok=True)
- except Exception as e:
- print(f"结束失败:{e}", file=sys.stderr)
- sys.exit(1)
- def status_tunnel(args):
- # 仅检查本地 SOCKS 端口是否在监听
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(0.3)
- try:
- if s.connect_ex((args.socks_host, args.socks_port)) == 0:
- print(f"监听中:{args.socks_host}:{args.socks_port}")
- else:
- print(f"未监听:{args.socks_host}:{args.socks_port}")
- finally:
- s.close()
- def start_mux(args):
- ssh = which_ssh()
- cpath = args.control_path or default_control_path(args.host, args.port)
- # 确保控制路径所在目录存在
- ensure_dir(Path(cpath).expanduser().resolve().parent)
- cmd = [
- ssh,
- "-M",
- "-S", cpath,
- "-N",
- "-D", f"{args.socks_host}:{args.socks_port}",
- "-p", str(args.port),
- ] + build_keepalive_opts(args.alive_interval, args.alive_count)
- if args.strict_no:
- cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
- target = f"{args.user}@{args.host}"
- cmd.append(target)
- print("启动本地 SSH 隧道(连接复用模式):")
- print(" ".join(cmd))
- if os.name == "nt":
- flags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW
- with open(os.devnull, "wb") as devnull:
- proc = subprocess.Popen(
- cmd,
- stdin=devnull,
- stdout=devnull,
- stderr=devnull,
- creationflags=flags,
- close_fds=True,
- )
- print(f"已启动(复用主进程),PID={proc.pid}。等待端口监听...")
- ok = wait_for_listen(args.socks_host, args.socks_port, timeout=args.wait_timeout)
- if ok:
- print(f"SOCKS 已监听于 {args.socks_host}:{args.socks_port}")
- else:
- print("未检测到端口监听。若使用密码认证,后台无法输入密码,请改用密钥或前台启动。")
- else:
- # 非 Windows 可以用 -f,但复用主连接一般不建议 -f,使用后台方式或系统化管理更稳妥
- subprocess.Popen(cmd)
- def stop_mux(args):
- ssh = which_ssh()
- cpath = args.control_path or default_control_path(args.host, args.port)
- cmd = [
- ssh,
- "-S", cpath,
- "-O", "exit",
- "-p", str(args.port),
- f"{args.user}@{args.host}",
- ]
- print("关闭复用主连接:")
- print(" ".join(cmd))
- r = subprocess.run(cmd, capture_output=True, text=True)
- if r.returncode == 0:
- print(r.stdout.strip() or "已请求关闭。")
- else:
- # 有的版本输出在 stderr
- out = (r.stdout + "\n" + r.stderr).strip()
- print(f"关闭失败:{out}", file=sys.stderr)
- sys.exit(r.returncode)
- def status_mux(args):
- ssh = which_ssh()
- cpath = args.control_path or default_control_path(args.host, args.port)
- cmd = [
- ssh,
- "-S", cpath,
- "-O", "check",
- "-p", str(args.port),
- f"{args.user}@{args.host}",
- ]
- r = subprocess.run(cmd, capture_output=True, text=True)
- if r.returncode == 0:
- print(r.stdout.strip() or "Master running")
- else:
- out = (r.stdout + "\n" + r.stderr).strip()
- print(out or "Not running")
- sys.exit(r.returncode)
- def parse_args():
- p = argparse.ArgumentParser(description="Windows 上启动/管理 SSH 本地 SOCKS 隧道的小工具")
- sub = p.add_subparsers(dest="cmd", required=True)
- def add_common(sp):
- sp.add_argument("-H", "--host", default=DEFAULT_HOST, help="SSH 服务器地址")
- sp.add_argument("-u", "--user", default=DEFAULT_USER, help="SSH 用户名")
- sp.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="SSH 端口")
- sp.add_argument("--socks-host", default=DEFAULT_SOCKS_HOST, help="本地 SOCKS 监听地址")
- sp.add_argument("-D", "--socks-port", type=int, default=DEFAULT_SOCKS_PORT, help="本地 SOCKS 监听端口")
- sp.add_argument("--alive-interval", type=int, default=60, help="ServerAliveInterval")
- sp.add_argument("--alive-count", type=int, default=5, help="ServerAliveCountMax")
- sp.add_argument("--strict-no", action="store_true", help="临时关闭主机指纹检查(不安全)")
- sp.add_argument("--wait-timeout", type=float, default=10.0, help="启动后等待监听的秒数")
- return sp
- add_common(sub.add_parser("start", help="启动隧道(非复用)")).add_argument("--force", action="store_true", help="忽略本地端口占用检查")
- add_common(sub.add_parser("stop", help="停止隧道(通过 pid 文件)"))
- add_common(sub.add_parser("status", help="查看本地 SOCKS 端口是否在监听"))
- sp_mux_start = add_common(sub.add_parser("start-mux", help="启动隧道(连接复用模式)"))
- sp_mux_start.add_argument("-S", "--control-path", default=None, help="控制套接字路径(默认:~/.ssh/cm-<host>-<port>)")
- sp_mux_stop = add_common(sub.add_parser("stop-mux", help="关闭复用主连接"))
- sp_mux_stop.add_argument("-S", "--control-path", default=None, help="控制套接字路径")
- sp_mux_status = add_common(sub.add_parser("status-mux", help="查看复用主连接状态"))
- sp_mux_status.add_argument("-S", "--control-path", default=None, help="控制套接字路径")
- return p.parse_args()
- def main():
- args = parse_args()
- if args.cmd == "start":
- start_tunnel(args)
- elif args.cmd == "stop":
- stop_tunnel(args)
- elif args.cmd == "status":
- status_tunnel(args)
- elif args.cmd == "start-mux":
- start_mux(args)
- elif args.cmd == "stop-mux":
- stop_mux(args)
- elif args.cmd == "status-mux":
- status_mux(args)
- else:
- print("未知命令", file=sys.stderr)
- sys.exit(2)
- if __name__ == "__main__":
- main()
-
-
- # 使用示例
- # 服务器端需要先开通免密登录
- # 启动非复用隧道(相当于 ssh -N -D 127.0.0.1:1066 -p 29765 root@117.50.195.224,并包含存活探测):
- # python ssh_tunnel.py start -H 117.50.195.224 -u root -p 29765 -D 1066
- # python ssh_tunnel.py start -H 106.14.113.12 -u root -p 29765 -D 1067
- # python ssh_tunnel.py start -H 103.40.13.87 -u root -p 40268 -D 1061
- # 查看本地 SOCKS 端口是否监听: python ssh_tunnel.py status -D 1066
- # 停止非复用隧道(通过 PID 文件): python ssh_tunnel.py stop -H 117.50.195.224 -u root -p 29765 -D 1066
|