ソースを参照

ssh for windows

sequoia00 1 週間 前
コミット
5314fff1d3
3 ファイル変更480 行追加0 行削除
  1. 55 0
      README.md
  2. 97 0
      run_tunnels.py
  3. 328 0
      ssh_tunnel.py

+ 55 - 0
README.md

@@ -0,0 +1,55 @@
+## 项目简介
+
+本项目提供一组脚本,便于在本地快速管理多个 SSH SOCKS5 隧道。
+
+- `ssh_tunnel.py`:封装单条 SSH 隧道的启动、停止、状态查询,并支持连接复用模式。
+- `run_tunnels.py`:批量启动/监控/停止在脚本中预先写死的一组隧道,适合日常固定场景一键管理。
+
+## 环境要求
+
+1. Python 3.8+。
+2. 本机可执行的 `ssh` 命令(OpenSSH 客户端)。
+3. 目标服务器已配置好密钥免密登录,或允许你在前台输入密码。
+4. 确保本地监听端口(如 1066、1067 等)未被占用。
+
+## 单条隧道管理 (`ssh_tunnel.py`)
+
+```bash
+# 启动一个 SOCKS5 隧道,监听 127.0.0.1:1066
+python ssh_tunnel.py start -H 117.50.195.224 -u root -p 29765 -D 1066
+
+# 查看某端口是否在监听
+python ssh_tunnel.py status -D 1066
+
+# 停止隧道(依赖脚本生成的 PID 文件)
+python ssh_tunnel.py stop -H 117.50.195.224 -u root -p 29765 -D 1066
+```
+
+进阶:
+
+- `start-mux` / `stop-mux` / `status-mux` 支持 SSH 连接复用。
+- 使用 `--force` 可忽略本地端口占用检查(谨慎使用)。
+- `--strict-no` 可临时关闭主机指纹校验(测试环境下可用,生产不建议)。
+
+## 批量管理 (`run_tunnels.py`)
+
+脚本内部定义了一组固定的隧道(可按需修改服务器、端口、局部 SOCKS 端口等)。提供以下命令:
+
+```bash
+# 依次后台启动所有隧道
+python run_tunnels.py start
+
+# 检查脚本里列出的几个端口是否在监听
+python run_tunnels.py status
+
+# 针对脚本中预设的服务器执行停止操作
+python run_tunnels.py stop
+```
+
+`start` 会并行启动多个 `ssh_tunnel.py start` 进程;`status` / `stop` 会串行执行并输出结果。若需要增删隧道,请直接编辑 `run_tunnels.py` 中 `start_cmds`/`status_cmds`/`stop_cmds` 列表。
+
+## 日常建议
+
+- 建议使用密钥认证,并在服务器端配置 `~/.ssh/config` 以减少参数。
+- 生产环境请保留主机指纹验证,避免 `--strict-no`。
+- 如需持久运行,可结合系统服务(systemd、PM2、任务计划等)管理这些脚本。

+ 97 - 0
run_tunnels.py

@@ -0,0 +1,97 @@
+import subprocess
+import sys
+from pathlib import Path
+
+def run_and_print(cmd, cwd):
+    """运行命令并打印其输出,返回是否成功"""
+    print(f"$ {' '.join(cmd)}")
+    result = subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
+    if result.stdout:
+        print(result.stdout.strip())
+    if result.stderr:
+        print(result.stderr.strip(), file=sys.stderr)
+    return result.returncode == 0
+
+def main():
+    if len(sys.argv) < 2 or sys.argv[1] not in {"start", "status", "stop"}:
+        print("用法: python manage_tunnels.py [start|status|stop]")
+        sys.exit(1)
+
+    action = sys.argv[1]
+
+    base = Path(__file__).resolve().parent
+    python_exe = sys.executable  # 使用当前 Python 解释器
+    ssh_script = str(base / "ssh_tunnel.py")
+
+    # 定义三条启动命令
+    start_cmds = [
+        # 
+        [python_exe, ssh_script, "start", "-H", "117.50.195.224", "-u", "root", "-p", "29765", "-D", "1066"],
+        #ali
+        [python_exe, ssh_script, "start", "-H", "106.14.113.12", "-u", "root", "-p", "29765", "-D", "1067"],
+        #wu
+        [python_exe, ssh_script, "start", "-H", "114.66.42.82", "-u", "root", "-p", "29765", "-D", "1068"],
+        #home
+        [python_exe, ssh_script, "start", "-H", "117.50.195.224", "-u", "root", "-p", "10702", "-D", "1024"],
+        #sea
+        [python_exe, ssh_script, "start", "-H", "117.50.195.224", "-u", "root", "-p", "10802", "-D", "1069"],
+        #wu
+        [python_exe, ssh_script, "start", "-H", "114.66.42.82", "-u", "root", "-p", "17002", "-D", "1070"],
+        
+        [python_exe, ssh_script, "start", "-H", "114.66.42.82", "-u", "root", "-p", "18000", "-D", "1071"],
+    ]
+
+    # 对应三个状态监控命令(包含示例中的 -D 1067)
+    status_cmds = [
+        # [python_exe, ssh_script, "status", "-D", "1061"],
+        [python_exe, ssh_script, "status", "-D", "1066"],
+        [python_exe, ssh_script, "status", "-D", "1067"],  
+        [python_exe, ssh_script, "status", "-D", "1024"], # 示例指定的状态监控
+    ]
+
+    # 停止服务命令(示例指定的停止 117.50.195.224:29765 -> -D 1066)
+    stop_cmds = [
+        # [python_exe, ssh_script, "stop", "-H", "103.40.13.87", "-u", "root", "-p", "40268", "-D", "1061"],
+        [python_exe, ssh_script, "stop", "-H", "117.50.195.224", "-u", "root", "-p", "29765", "-D", "1066"],
+        [python_exe, ssh_script, "stop", "-H", "106.14.113.12", "-u", "root", "-p", "29765", "-D", "1067"],
+        [python_exe, ssh_script, "stop", "-H", "114.66.42.82", "-u", "root", "-p", "29765", "-D", "1068"],
+        [python_exe, ssh_script, "stop", "-H", "117.50.195.224", "-u", "root", "-p", "10702", "-D", "1024"],
+        [python_exe, ssh_script, "stop", "-H", "117.50.195.224", "-u", "root", "-p", "10802", "-D", "1069"],
+        [python_exe, ssh_script, "stop", "-H", "114.66.42.82", "-u", "root", "-p", "17002", "-D", "1070"],
+        [python_exe, ssh_script, "stop", "-H", "114.66.42.82", "-u", "root", "-p", "18000", "-D", "1071"],
+    ]
+
+    if action == "start":
+        procs = []
+        for cmd in start_cmds:
+            # 并行启动,不阻塞
+            p = subprocess.Popen(cmd, cwd=base)
+            procs.append(p)
+        print(f"已启动 {len(procs)} 个隧道进程,PID:{[p.pid for p in procs]}")
+
+        # 如果希望脚本阻塞直到进程退出,取消下面两行注释
+        # for p in procs:
+        #     p.wait()
+
+    elif action == "status":
+        all_ok = True
+        for cmd in status_cmds:
+            ok = run_and_print(cmd, base)
+            all_ok = all_ok and ok
+        if not all_ok:
+            sys.exit(1)
+
+    elif action == "stop":
+        all_ok = True
+        for cmd in stop_cmds:
+            ok = run_and_print(cmd, base)
+            all_ok = all_ok and ok
+        if all_ok:
+            print("停止命令已执行完成")
+        else:
+            print("部分停止命令执行失败", file=sys.stderr)
+            sys.exit(1)
+
+if __name__ == "__main__":
+    main()
+    #启动命令 python .\run_tunnels.py start

+ 328 - 0
ssh_tunnel.py

@@ -0,0 +1,328 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import argparse
+import os
+import sys
+import subprocess
+import socket
+import time
+import shutil
+from pathlib import Path
+
+DEFAULT_HOST = "117.50.195.224"
+DEFAULT_USER = "root"
+DEFAULT_PORT = 29765
+DEFAULT_SOCKS_HOST = "127.0.0.1"
+DEFAULT_SOCKS_PORT = 1066
+
+# Windows creation flags for detached background process
+CREATE_NO_WINDOW = 0x08000000
+DETACHED_PROCESS = 0x00000008
+CREATE_NEW_PROCESS_GROUP = 0x00000200
+
+def which_ssh():
+    ssh_path = shutil.which("ssh")
+    if ssh_path is None:
+        print("未找到 ssh 可执行文件。请在系统中安装 OpenSSH 客户端并确保 ssh 在 PATH 中。", file=sys.stderr)
+        sys.exit(2)
+    return ssh_path
+
+def check_port_free(host, port):
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    s.settimeout(0.5)
+    try:
+        s.bind((host, port))
+        s.close()
+        return True
+    except OSError:
+        return False
+
+def wait_for_listen(host, port, timeout=10.0):
+    # 等待本地端口开始监听(大概率说明隧道已建立)
+    deadline = time.time() + timeout
+    while time.time() < deadline:
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            s.settimeout(0.5)
+            try:
+                if s.connect_ex((host, port)) == 0:
+                    return True
+            except OSError:
+                pass
+        time.sleep(0.2)
+    return False
+
+def ensure_dir(p: Path):
+    p.mkdir(parents=True, exist_ok=True)
+    if not os.access(p, os.W_OK):
+        print(f"目录不可写: {p}", file=sys.stderr)
+        sys.exit(2)
+
+def default_control_path(host, port):
+    base = Path.home() / ".ssh"
+    ensure_dir(base)
+    # 控制套接字文件名尽量短
+    name = f"cm-{host}-{port}"
+    cpath = base / name
+    # Windows AF_UNIX 路径不要太长
+    if len(str(cpath)) > 100:
+        print(f"警告:控制套接字路径过长,可能导致失败: {cpath}", file=sys.stderr)
+    return str(cpath)
+
+def build_keepalive_opts(alive_interval=60, alive_count=3):
+    return [
+        "-o", f"ServerAliveInterval={alive_interval}",
+        "-o", f"ServerAliveCountMax={alive_count}",
+    ]
+
+def start_tunnel(args):
+    ssh = which_ssh()
+    # 提前检查端口是否被占用
+    if not args.force and not check_port_free(args.socks_host, args.socks_port):
+        print(f"本地端口 {args.socks_host}:{args.socks_port} 已被占用。使用 --force 忽略或更换端口。", file=sys.stderr)
+        sys.exit(1)
+
+    cmd = [
+        ssh,
+        "-N",
+        "-D", f"{args.socks_host}:{args.socks_port}",
+        "-p", str(args.port),
+    ] + build_keepalive_opts(args.alive_interval, args.alive_count)
+
+    if args.strict_no:
+        cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
+
+    target = f"{args.user}@{args.host}"
+    cmd.append(target)
+
+    print("启动本地 SSH 隧道(非复用模式):")
+    print(" ".join(cmd))
+
+    # 在 Windows 上用无窗口/脱离控制台方式运行
+    if os.name == "nt":
+        flags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW
+        with open(os.devnull, "wb") as devnull:
+            proc = subprocess.Popen(
+                cmd,
+                stdin=devnull,
+                stdout=devnull,
+                stderr=devnull,
+                creationflags=flags,
+                close_fds=True,
+            )
+        print(f"已启动,PID={proc.pid}。尝试等待端口监听...")
+        ok = wait_for_listen(args.socks_host, args.socks_port, timeout=args.wait_timeout)
+        if ok:
+            print(f"SOCKS 已监听于 {args.socks_host}:{args.socks_port}")
+        else:
+            print("未检测到端口监听。若使用密码认证,后台无法输入密码,请改用密钥或前台启动。")
+        # 记录 pid 文件
+        pidfile = pid_file_path(args)
+        ensure_dir(pidfile.parent)
+        pidfile.write_text(str(proc.pid), encoding="utf-8")
+        print(f"PID 已写入 {pidfile}")
+    else:
+        # 在非 Windows 系统,使用 -f 让 ssh 后台运行(如果你跨平台使用)
+        cmd.insert(1, "-f")
+        subprocess.run(cmd, check=True)
+        print("已启动。")
+
+def pid_file_path(args):
+    base = Path.home() / ".ssh" / "tunnels"
+    fname = f"{args.host}-{args.port}-D{args.socks_host.replace('.', '_')}_{args.socks_port}.pid"
+    return base / fname
+
+def stop_tunnel(args):
+    # 通过 pidfile 强制停止非复用模式进程
+    pidfile = pid_file_path(args)
+    if not pidfile.exists():
+        print(f"未找到 PID 文件:{pidfile}。如果你是用复用模式启动,请用 stop-mux。", file=sys.stderr)
+        sys.exit(1)
+    try:
+        pid = int(pidfile.read_text(encoding="utf-8").strip())
+    except Exception as e:
+        print(f"读取 PID 文件失败:{e}", file=sys.stderr)
+        sys.exit(1)
+
+    if os.name == "nt":
+        # 使用 taskkill 结束
+        r = subprocess.run(["taskkill", "/PID", str(pid), "/F"], capture_output=True, text=True)
+        if r.returncode == 0:
+            print(f"已结束进程 PID={pid}")
+            try:
+                pidfile.unlink()
+            except Exception:
+                pass
+        else:
+            print(f"结束进程失败:{r.stdout}\n{r.stderr}", file=sys.stderr)
+            sys.exit(r.returncode)
+    else:
+        try:
+            os.kill(pid, 15)
+            print(f"已发送 SIGTERM 给 PID={pid}")
+            pidfile.unlink(missing_ok=True)
+        except Exception as e:
+            print(f"结束失败:{e}", file=sys.stderr)
+            sys.exit(1)
+
+def status_tunnel(args):
+    # 仅检查本地 SOCKS 端口是否在监听
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    s.settimeout(0.3)
+    try:
+        if s.connect_ex((args.socks_host, args.socks_port)) == 0:
+            print(f"监听中:{args.socks_host}:{args.socks_port}")
+        else:
+            print(f"未监听:{args.socks_host}:{args.socks_port}")
+    finally:
+        s.close()
+
+def start_mux(args):
+    ssh = which_ssh()
+    cpath = args.control_path or default_control_path(args.host, args.port)
+    # 确保控制路径所在目录存在
+    ensure_dir(Path(cpath).expanduser().resolve().parent)
+
+    cmd = [
+        ssh,
+        "-M",
+        "-S", cpath,
+        "-N",
+        "-D", f"{args.socks_host}:{args.socks_port}",
+        "-p", str(args.port),
+    ] + build_keepalive_opts(args.alive_interval, args.alive_count)
+
+    if args.strict_no:
+        cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
+
+    target = f"{args.user}@{args.host}"
+    cmd.append(target)
+
+    print("启动本地 SSH 隧道(连接复用模式):")
+    print(" ".join(cmd))
+
+    if os.name == "nt":
+        flags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW
+        with open(os.devnull, "wb") as devnull:
+            proc = subprocess.Popen(
+                cmd,
+                stdin=devnull,
+                stdout=devnull,
+                stderr=devnull,
+                creationflags=flags,
+                close_fds=True,
+            )
+        print(f"已启动(复用主进程),PID={proc.pid}。等待端口监听...")
+        ok = wait_for_listen(args.socks_host, args.socks_port, timeout=args.wait_timeout)
+        if ok:
+            print(f"SOCKS 已监听于 {args.socks_host}:{args.socks_port}")
+        else:
+            print("未检测到端口监听。若使用密码认证,后台无法输入密码,请改用密钥或前台启动。")
+    else:
+        # 非 Windows 可以用 -f,但复用主连接一般不建议 -f,使用后台方式或系统化管理更稳妥
+        subprocess.Popen(cmd)
+
+def stop_mux(args):
+    ssh = which_ssh()
+    cpath = args.control_path or default_control_path(args.host, args.port)
+    cmd = [
+        ssh,
+        "-S", cpath,
+        "-O", "exit",
+        "-p", str(args.port),
+        f"{args.user}@{args.host}",
+    ]
+    print("关闭复用主连接:")
+    print(" ".join(cmd))
+    r = subprocess.run(cmd, capture_output=True, text=True)
+    if r.returncode == 0:
+        print(r.stdout.strip() or "已请求关闭。")
+    else:
+        # 有的版本输出在 stderr
+        out = (r.stdout + "\n" + r.stderr).strip()
+        print(f"关闭失败:{out}", file=sys.stderr)
+        sys.exit(r.returncode)
+
+def status_mux(args):
+    ssh = which_ssh()
+    cpath = args.control_path or default_control_path(args.host, args.port)
+    cmd = [
+        ssh,
+        "-S", cpath,
+        "-O", "check",
+        "-p", str(args.port),
+        f"{args.user}@{args.host}",
+    ]
+    r = subprocess.run(cmd, capture_output=True, text=True)
+    if r.returncode == 0:
+        print(r.stdout.strip() or "Master running")
+    else:
+        out = (r.stdout + "\n" + r.stderr).strip()
+        print(out or "Not running")
+        sys.exit(r.returncode)
+
+def parse_args():
+    p = argparse.ArgumentParser(description="Windows 上启动/管理 SSH 本地 SOCKS 隧道的小工具")
+    sub = p.add_subparsers(dest="cmd", required=True)
+
+    def add_common(sp):
+        sp.add_argument("-H", "--host", default=DEFAULT_HOST, help="SSH 服务器地址")
+        sp.add_argument("-u", "--user", default=DEFAULT_USER, help="SSH 用户名")
+        sp.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="SSH 端口")
+        sp.add_argument("--socks-host", default=DEFAULT_SOCKS_HOST, help="本地 SOCKS 监听地址")
+        sp.add_argument("-D", "--socks-port", type=int, default=DEFAULT_SOCKS_PORT, help="本地 SOCKS 监听端口")
+        sp.add_argument("--alive-interval", type=int, default=60, help="ServerAliveInterval")
+        sp.add_argument("--alive-count", type=int, default=5, help="ServerAliveCountMax")
+        sp.add_argument("--strict-no", action="store_true", help="临时关闭主机指纹检查(不安全)")
+        sp.add_argument("--wait-timeout", type=float, default=10.0, help="启动后等待监听的秒数")
+        return sp
+
+    add_common(sub.add_parser("start", help="启动隧道(非复用)")).add_argument("--force", action="store_true", help="忽略本地端口占用检查")
+    add_common(sub.add_parser("stop", help="停止隧道(通过 pid 文件)"))
+    add_common(sub.add_parser("status", help="查看本地 SOCKS 端口是否在监听"))
+
+    sp_mux_start = add_common(sub.add_parser("start-mux", help="启动隧道(连接复用模式)"))
+    sp_mux_start.add_argument("-S", "--control-path", default=None, help="控制套接字路径(默认:~/.ssh/cm-<host>-<port>)")
+
+    sp_mux_stop = add_common(sub.add_parser("stop-mux", help="关闭复用主连接"))
+    sp_mux_stop.add_argument("-S", "--control-path", default=None, help="控制套接字路径")
+
+    sp_mux_status = add_common(sub.add_parser("status-mux", help="查看复用主连接状态"))
+    sp_mux_status.add_argument("-S", "--control-path", default=None, help="控制套接字路径")
+
+    return p.parse_args()
+
+def main():
+    args = parse_args()
+    if args.cmd == "start":
+        start_tunnel(args)
+    elif args.cmd == "stop":
+        stop_tunnel(args)
+    elif args.cmd == "status":
+        status_tunnel(args)
+    elif args.cmd == "start-mux":
+        start_mux(args)
+    elif args.cmd == "stop-mux":
+        stop_mux(args)
+    elif args.cmd == "status-mux":
+        status_mux(args)
+    else:
+        print("未知命令", file=sys.stderr)
+        sys.exit(2)
+
+if __name__ == "__main__":
+    main()
+    
+    
+# 使用示例
+
+# 服务器端需要先开通免密登录
+
+# 启动非复用隧道(相当于 ssh -N -D 127.0.0.1:1066 -p 29765 root@117.50.195.224,并包含存活探测): 
+# python ssh_tunnel.py start -H 117.50.195.224 -u root -p 29765 -D 1066
+# python ssh_tunnel.py start -H 106.14.113.12 -u root -p 29765 -D 1067
+# python ssh_tunnel.py start -H 103.40.13.87 -u root -p 40268 -D 1061
+
+# 查看本地 SOCKS 端口是否监听: python ssh_tunnel.py status -D 1066
+
+# 停止非复用隧道(通过 PID 文件): python ssh_tunnel.py stop -H 117.50.195.224 -u root -p 29765 -D 1066