Gogs 1 неделя назад
Родитель
Сommit
bbd8416a01
6 измененных файлов с 274 добавлено и 35 удалено
  1. 13 0
      config.json
  2. 9 2
      config.py
  3. 1 1
      scripts/install.sh
  4. 207 0
      scripts/tcp_only_start.sh
  5. 33 0
      scripts/tcp_only_stop.sh
  6. 11 32
      transparent_edge.py

+ 13 - 0
config.json

@@ -1,5 +1,18 @@
 {
   "strategy": "top3",
+
+  "redundancy": 3,
+  "direct_redundancy": 3,
+  "direct_max_redundancy": 6,
+  "direct_redundancy_v4": 4,
+  "direct_redundancy_v6": 3,
+  "direct_ipv6_enabled": true,
+  "direct_open_timeout": 6.0,
+  "relay_open_timeout": 6.0,
+  "tcp_connect_happy_eyeballs_delay": 0.25,
+  "tcp_warmup_bytes": 262144,
+  "tcp_loser_grace_ms": 100,
+
   "udp_redundancy": 2,
   "udp_direct_redundancy": 2,
   "udp_direct_redundancy_v4": 2,

+ 9 - 2
config.py

@@ -1,6 +1,7 @@
 from __future__ import annotations
 
 import json
+import os
 from dataclasses import dataclass
 from pathlib import Path
 from typing import Literal
@@ -53,8 +54,7 @@ class Config:
     socks_port: int = 0
 
     @classmethod
-    def load(cls, path: str) -> "Config":
-        raw = json.loads(Path(path).read_text())
+    def from_dict(cls, raw: dict) -> "Config":
         relays = [RelayNode(**item) for item in raw.get("relays", [])]
         return cls(
             relays=relays,
@@ -89,3 +89,10 @@ class Config:
             socks_host=raw.get("socks_host", "127.0.0.1"),
             socks_port=max(0, raw.get("socks_port", 0)),
         )
+
+    @classmethod
+    def load(cls, path: str) -> "Config":
+        inline_raw = os.environ.get("MYNETSPEEDER_CONFIG_JSON")
+        if inline_raw:
+            return cls.from_dict(json.loads(inline_raw))
+        return cls.from_dict(json.loads(Path(path).read_text()))

+ 1 - 1
scripts/install.sh

@@ -35,7 +35,7 @@ exec "$PYTHON_BIN" -m "$PACKAGE_NAME" "\$@"
 EOF
 chmod +x "$BIN_PATH"
 
-chmod +x "$PREFIX/scripts/start-transparent.sh" "$PREFIX/scripts/stop-transparent.sh" "$PREFIX/scripts/install.sh" "$PREFIX/scripts/start_only_udp.sh"
+chmod +x "$PREFIX/scripts/start-transparent.sh" "$PREFIX/scripts/stop-transparent.sh" "$PREFIX/scripts/install.sh" "$PREFIX/scripts/start_only_udp.sh" "$PREFIX/scripts/tcp_only_start.sh" "$PREFIX/scripts/tcp_only_stop.sh"
 [[ -f "$PREFIX/scripts/start-relay.sh" ]] && chmod +x "$PREFIX/scripts/start-relay.sh"
 [[ -f "$PREFIX/scripts/start-relay-udp.sh" ]] && chmod +x "$PREFIX/scripts/start-relay-udp.sh"
 [[ -f "$PREFIX/scripts/stop-relay.sh" ]] && chmod +x "$PREFIX/scripts/stop-relay.sh"

+ 207 - 0
scripts/tcp_only_start.sh

@@ -0,0 +1,207 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+usage() {
+  cat <<'EOF'
+Usage: tcp_only_start.sh [config_path]
+
+Options via env:
+  MYNETSPEEDER_LISTEN_HOST   edge 监听地址,默认读取配置或 127.0.0.1
+  MYNETSPEEDER_LISTEN_PORT   edge 监听端口,默认读取配置或 19080
+  MYNETSPEEDER_USER          运行用户,默认 mynetspeeder
+  MYNETSPEEDER_LOG_MAX_MB    单个日志最大大小,默认 50
+  MYNETSPEEDER_LOG_BACKUPS   日志轮转保留份数,默认 3
+EOF
+}
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+ROOT_DIR="$(dirname "$SCRIPT_DIR")"
+PACKAGE_PARENT="$(dirname "$ROOT_DIR")"
+PACKAGE_NAME="$(basename "$ROOT_DIR")"
+
+CONFIG_PATH="${1:-$ROOT_DIR/config.json}"
+RUNTIME_USER="${MYNETSPEEDER_USER:-mynetspeeder}"
+PID_FILE="/var/run/mynetspeeder-edge.pid"
+LOG_FILE="/var/log/mynetspeeder-edge.log"
+LOG_MAX_MB="${MYNETSPEEDER_LOG_MAX_MB:-50}"
+LOG_BACKUPS="${MYNETSPEEDER_LOG_BACKUPS:-3}"
+CHAIN4="MYNETSPEEDER"
+CHAIN6="MYNETSPEEDER6"
+SSH_PORTS="${MYNETSPEEDER_SSH_PORTS:-}"
+SELF_EXCLUDE_V4="127.0.0.0/8 169.254.0.0/16"
+SELF_EXCLUDE_V6="::1/128 fe80::/10"
+
+if [[ $EUID -ne 0 ]]; then
+  echo "need root"
+  exit 1
+fi
+
+if [[ ! -f "$CONFIG_PATH" ]]; then
+  echo "config not found: $CONFIG_PATH"
+  exit 1
+fi
+
+LISTEN_HOST_FROM_CONFIG=$(python3 - <<'PY' "$CONFIG_PATH"
+import json, sys
+cfg = json.load(open(sys.argv[1]))
+print(cfg.get("listen_host", cfg.get("socks_host", "127.0.0.1")))
+PY
+)
+LISTEN_PORT_FROM_CONFIG=$(python3 - <<'PY' "$CONFIG_PATH"
+import json, sys
+cfg = json.load(open(sys.argv[1]))
+port = int(cfg.get("listen_port", cfg.get("socks_port", 19080)) or 19080)
+print(port if port > 0 else 19080)
+PY
+)
+LISTEN_HOST="${MYNETSPEEDER_LISTEN_HOST:-$LISTEN_HOST_FROM_CONFIG}"
+LISTEN_PORT="${MYNETSPEEDER_LISTEN_PORT:-$LISTEN_PORT_FROM_CONFIG}"
+
+if ! [[ "$LISTEN_PORT" =~ ^[0-9]+$ ]]; then
+  echo "listen port must be numeric"
+  exit 1
+fi
+
+if [[ -z "$SSH_PORTS" && -n "${SSH_CONNECTION:-}" ]]; then
+  SSH_PORTS="${SSH_CONNECTION##* }"
+fi
+
+SSH_PORT_ARRAY=()
+if [[ -n "$SSH_PORTS" ]]; then
+  IFS=',' read -r -a SSH_PORT_ARRAY <<< "$SSH_PORTS"
+  for ssh_port in "${SSH_PORT_ARRAY[@]}"; do
+    [[ "$ssh_port" =~ ^[0-9]+$ ]] || { echo "ssh ports must be numeric, got: $ssh_port"; exit 1; }
+  done
+fi
+
+id -u "$RUNTIME_USER" >/dev/null 2>&1 || useradd --system --no-create-home --shell /usr/sbin/nologin "$RUNTIME_USER"
+mkdir -p /var/log
+touch "$LOG_FILE"
+chown "$RUNTIME_USER":"$RUNTIME_USER" "$LOG_FILE"
+if ! [[ "$LOG_MAX_MB" =~ ^[0-9]+$ ]] || ! [[ "$LOG_BACKUPS" =~ ^[0-9]+$ ]]; then
+  echo "log limits must be numeric"
+  exit 1
+fi
+LOG_MAX_BYTES=$((LOG_MAX_MB * 1024 * 1024))
+
+WAIT_TIMEOUT="${MYNETSPEEDER_LISTEN_TIMEOUT:-15}"
+
+ensure_rule() {
+  local cmd="$1"
+  local table="$2"
+  local chain="$3"
+  shift 3
+  if ! "$cmd" -t "$table" -C "$chain" "$@" >/dev/null 2>&1; then
+    "$cmd" -t "$table" -A "$chain" "$@"
+  fi
+}
+
+wait_for_listen() {
+  local host="$1"
+  local port="$2"
+  local log_file="$3"
+  local log_pattern="$4"
+  local label="$5"
+  local timeout_sec="${6:-10}"
+  local deadline=$((SECONDS + timeout_sec))
+  local pattern
+  pattern="${host//./\\.}:${port}( |$)"
+  while (( SECONDS < deadline )); do
+    if ss -H -lntp 2>/dev/null | grep -qE "$pattern"; then
+      return 0
+    fi
+    if [[ -f "$log_file" ]] && grep -qE "$log_pattern" "$log_file" 2>/dev/null; then
+      return 0
+    fi
+    sleep 0.2
+  done
+  echo "$label failed to listen"
+  return 1
+}
+
+add_exclusions_v4() {
+  iptables -t nat -A "$CHAIN4" -m addrtype --dst-type LOCAL -j RETURN
+  for cidr in $SELF_EXCLUDE_V4; do
+    iptables -t nat -A "$CHAIN4" -d "$cidr" -j RETURN
+  done
+  iptables -t nat -A "$CHAIN4" -m owner --uid-owner "$RUNTIME_USER" -j RETURN
+  for ssh_port in "${SSH_PORT_ARRAY[@]}"; do
+    iptables -t nat -A "$CHAIN4" -p tcp --sport "$ssh_port" -j RETURN
+  done
+}
+
+add_exclusions_v6() {
+  ip6tables -t nat -A "$CHAIN6" -m addrtype --dst-type LOCAL -j RETURN
+  for cidr in $SELF_EXCLUDE_V6; do
+    ip6tables -t nat -A "$CHAIN6" -d "$cidr" -j RETURN
+  done
+  ip6tables -t nat -A "$CHAIN6" -m owner --uid-owner "$RUNTIME_USER" -j RETURN
+  for ssh_port in "${SSH_PORT_ARRAY[@]}"; do
+    ip6tables -t nat -A "$CHAIN6" -p tcp --sport "$ssh_port" -j RETURN
+  done
+}
+
+pkill -f 'python3 -m mynetspeeder edge' || true
+
+INLINE_CONFIG_JSON="$(python3 - <<'PY' "$CONFIG_PATH"
+import json, sys
+from pathlib import Path
+
+cfg = json.loads(Path(sys.argv[1]).read_text())
+cfg["relays"] = []
+cfg["socks_port"] = 0
+cfg["direct_redundancy"] = max(int(cfg.get("direct_redundancy", 3) or 3), 3)
+cfg["direct_redundancy_v4"] = max(int(cfg.get("direct_redundancy_v4", cfg["direct_redundancy"]) or cfg["direct_redundancy"]), 3)
+cfg["direct_redundancy_v6"] = max(int(cfg.get("direct_redundancy_v6", 2) or 2), 2)
+cfg["direct_max_redundancy"] = max(int(cfg.get("direct_max_redundancy", 4) or 4), 4)
+cfg["direct_open_timeout"] = min(float(cfg.get("direct_open_timeout", 2.5) or 2.5), 2.5)
+cfg["tcp_connect_happy_eyeballs_delay"] = min(float(cfg.get("tcp_connect_happy_eyeballs_delay", 0.1) or 0.1), 0.1)
+cfg["tcp_warmup_bytes"] = max(int(cfg.get("tcp_warmup_bytes", 524288) or 524288), 524288)
+cfg["tcp_loser_grace_ms"] = min(int(cfg.get("tcp_loser_grace_ms", 80) or 80), 80)
+print(json.dumps(cfg, ensure_ascii=False))
+PY
+)"
+
+echo "tcp-only config prepared: relays cleared, socks disabled"
+
+runuser -u "$RUNTIME_USER" -- env MYNETSPEEDER_CONFIG_JSON="$INLINE_CONFIG_JSON" bash -lc "export PYTHONUNBUFFERED=1; export PYTHONPATH=${PACKAGE_PARENT}; cd ${PACKAGE_PARENT} && exec nohup python3 -m ${PACKAGE_NAME} edge --listen-host ${LISTEN_HOST} --listen-port ${LISTEN_PORT} --config ${CONFIG_PATH} 2>&1 | python3 ${ROOT_DIR}/scripts/rotate-log.py ${LOG_FILE} ${LOG_MAX_BYTES} ${LOG_BACKUPS}" &
+EDGE_PID=$!
+echo "$EDGE_PID" > "$PID_FILE"
+
+if ! wait_for_listen "$LISTEN_HOST" "$LISTEN_PORT" "$LOG_FILE" "transparent tcp listening on" "edge" "$WAIT_TIMEOUT"; then
+  tail -n 50 "$LOG_FILE" || true
+  exit 1
+fi
+
+iptables -t nat -N "$CHAIN4" 2>/dev/null || true
+iptables -t nat -F "$CHAIN4"
+add_exclusions_v4
+iptables -t nat -A "$CHAIN4" -p tcp -j REDIRECT --to-ports "$LISTEN_PORT"
+ensure_rule iptables nat OUTPUT -p tcp -j "$CHAIN4"
+
+IP6_ENABLED=0
+IP6_NAT_SUPPORTED=0
+if command -v ip6tables >/dev/null 2>&1; then
+  if ip6tables -t nat -S >/dev/null 2>&1; then
+    IP6_ENABLED=1
+    IP6_NAT_SUPPORTED=1
+    ip6tables -t nat -N "$CHAIN6" 2>/dev/null || true
+    ip6tables -t nat -F "$CHAIN6"
+    add_exclusions_v6
+    ip6tables -t nat -A "$CHAIN6" -p tcp -j REDIRECT --to-ports "$LISTEN_PORT"
+    ensure_rule ip6tables nat OUTPUT -p tcp -j "$CHAIN6"
+  fi
+fi
+
+iptables -t nat -C OUTPUT -p tcp -j "$CHAIN4" >/dev/null 2>&1 || { echo "self-check failed: ipv4 tcp output hook missing"; exit 1; }
+if [[ "$IP6_ENABLED" == "1" && "$IP6_NAT_SUPPORTED" == "1" ]]; then
+  ip6tables -t nat -C OUTPUT -p tcp -j "$CHAIN6" >/dev/null 2>&1 || { echo "self-check failed: ipv6 tcp output hook missing"; exit 1; }
+fi
+
+echo "mynetspeeder tcp-only mode started on ${LISTEN_HOST}:${LISTEN_PORT}"
+echo "tcp: enabled"
+echo "udp: disabled"
+echo "relays: cleared from config"
+echo "pid file: $PID_FILE"
+echo "log file: $LOG_FILE"
+echo "log max: ${LOG_MAX_MB}MB x ${LOG_BACKUPS}"

+ 33 - 0
scripts/tcp_only_stop.sh

@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+PID_FILE="/var/run/mynetspeeder-edge.pid"
+CHAIN4="MYNETSPEEDER"
+CHAIN6="MYNETSPEEDER6"
+
+if [[ $EUID -ne 0 ]]; then
+  echo "need root"
+  exit 1
+fi
+
+if [[ -f "$PID_FILE" ]]; then
+  kill "$(cat "$PID_FILE")" 2>/dev/null || true
+  rm -f "$PID_FILE"
+fi
+pkill -f 'python3 -m mynetspeeder edge' || true
+
+if iptables -t nat -S >/dev/null 2>&1; then
+  iptables -t nat -D OUTPUT -p tcp -j "$CHAIN4" 2>/dev/null || true
+  iptables -t nat -F "$CHAIN4" 2>/dev/null || true
+  iptables -t nat -X "$CHAIN4" 2>/dev/null || true
+fi
+
+if command -v ip6tables >/dev/null 2>&1; then
+  if ip6tables -t nat -S >/dev/null 2>&1; then
+    ip6tables -t nat -D OUTPUT -p tcp -j "$CHAIN6" 2>/dev/null || true
+    ip6tables -t nat -F "$CHAIN6" 2>/dev/null || true
+    ip6tables -t nat -X "$CHAIN6" 2>/dev/null || true
+  fi
+fi
+
+echo "mynetspeeder tcp-only mode stopped"

+ 11 - 32
transparent_edge.py

@@ -219,7 +219,7 @@ class TransparentSession:
     open_tasks: list[asyncio.Task] = field(default_factory=list)
     backup_path: BasePath | None = None
     last_winner_data_at: float = 0.0
-    failover_task: asyncio.Task | None = None
+    converged: bool = False
 
     def _select_backup_path(self, winner: BasePath) -> BasePath | None:
         candidates = [path for path in self.paths if path is not winner and path.opened and not path.closed]
@@ -258,7 +258,6 @@ class TransparentSession:
         if self.opened_count == 0:
             raise ConnectionError(self.errors[0] if self.errors else "all paths failed")
         self.pump_task = asyncio.create_task(self._pump_local())
-        self.failover_task = asyncio.create_task(self._watch_failover())
 
     async def _pump_local(self) -> None:
         try:
@@ -270,45 +269,28 @@ class TransparentSession:
                 active = [path for path in self.paths if path.opened and not path.closed]
                 if not active:
                     break
-                if self.uplink_bytes <= self.warmup_bytes:
+                if self.winner is None and self.uplink_bytes <= self.warmup_bytes:
                     await asyncio.gather(*(path.send(chunk) for path in active), return_exceptions=True)
                 else:
                     if self.winner is None:
                         await self.winner_event.wait()
                     if self.winner:
                         send_targets = [self.winner]
-                        if self.backup_path and self.backup_path.opened and not self.backup_path.closed and self.backup_path is not self.winner:
+                        if (
+                            not self.converged
+                            and self.backup_path
+                            and self.backup_path.opened
+                            and not self.backup_path.closed
+                            and self.backup_path is not self.winner
+                        ):
                             send_targets.append(self.backup_path)
                         await asyncio.gather(*(path.send(chunk) for path in send_targets), return_exceptions=True)
+                        self.converged = True
         except Exception:
             pass
         finally:
             await self.close()
 
-    async def _watch_failover(self) -> None:
-        try:
-            while not self.closed:
-                await asyncio.sleep(0.2)
-                if self.winner is None:
-                    continue
-                if self.last_winner_data_at <= 0:
-                    continue
-                idle_ms = (asyncio.get_running_loop().time() - self.last_winner_data_at) * 1000
-                if idle_ms < 0:
-                    continue
-                if idle_ms >= self.tcp_failover_idle_ms and self.backup_path and self.backup_path.opened and not self.backup_path.closed:
-                    old = self.winner
-                    self.winner = self.backup_path
-                    self.backup_path = self._select_backup_path(self.winner)
-                    self.last_winner_data_at = asyncio.get_running_loop().time()
-                    self._record_win(self.winner)
-                    print(
-                        f"[edge] tcp failover session={self.session_id} target={self.target.host}:{self.target.port} "
-                        f"old_winner={old.name if old else 'none'} new_winner={self.winner.name} idle_ms={int(idle_ms)}"
-                    )
-        except Exception:
-            pass
-
     async def handle_path(self, path: BasePath, event: str, payload: bytes | None) -> None:
         if self.closed:
             return
@@ -327,6 +309,7 @@ class TransparentSession:
                 self._record_win(path)
                 self.backup_path = self._select_backup_path(path)
                 self.winner_event.set()
+                self.converged = False
                 if self.loser_grace_ms > 0:
                     self.loser_close_task = asyncio.create_task(self._close_losers_after_grace(path))
                 else:
@@ -371,10 +354,6 @@ class TransparentSession:
             self.loser_close_task.cancel()
             with contextlib.suppress(Exception):
                 await self.loser_close_task
-        if self.failover_task and self.failover_task is not asyncio.current_task():
-            self.failover_task.cancel()
-            with contextlib.suppress(Exception):
-                await self.failover_task
         for task in self.open_tasks:
             if task is not asyncio.current_task():
                 task.cancel()