Kaynağa Gözat

最新功能

sequoiaoo 1 hafta önce
işleme
e117c745a6
4 değiştirilmiş dosya ile 609 ekleme ve 0 silme
  1. 96 0
      README.md
  2. 12 0
      fast-media-lock
  3. 447 0
      fast_media_lock.py
  4. 54 0
      install_fast_media_lock.sh

+ 96 - 0
README.md

@@ -0,0 +1,96 @@
+# Fast Media Lock
+
+`fast_media_lock.py` 是一个跨平台(Linux / Windows)的快速“部分加密”脚本。
+它只加密文件前 `N MB`(默认 `8MB`),用于让常见播放器或工具无法直接打开文件。
+
+## 说明
+
+- 不使用单独密钥文件,只需密码。
+- 每个文件的解密元信息写在文件尾部。
+- 因为元信息跟着文件走,所以文件移动到其他目录或设备后,仍可用同脚本解密。
+- 加密后文件名会追加后缀 `.lockx`,解密时自动去掉。
+- 默认处理所有文件;加 `--media-only` 时仅处理媒体后缀。
+
+## 媒体后缀(`--media-only` 时生效)
+
+视频: `.mp4 .mkv .avi .mov .wmv .flv .webm .m4v .ts .m2ts`  
+音频: `.mp3 .wav .flac .aac .m4a .ogg .wma`  
+图片: `.jpg .jpeg .png .gif .bmp .webp .heic`
+
+## 重要注意
+
+- 这是“快速部分加密”,不是全量强加密。
+- 它的目标是速度和通用工具不可直接打开,不等同于高强度抗攻击加密。
+
+## 命令关键字
+
+- 主命令:`lock` / `unlock` / `status` / `lock-name` / `unlock-name`
+- 兼容别名:`encrypt` = `lock`,`decrypt` = `unlock`
+- 兼容别名:`encrypt-name` = `lock-name`,`decrypt-name` = `unlock-name`
+
+## 常用命令
+
+```bash
+# 加密目录(默认前 8MB,默认处理所有文件)
+python3 fast_media_lock.py lock /path/to/dir --password "你的密码"
+
+# 指定加密前多少MB(例如 32MB)
+python3 fast_media_lock.py lock /path/to/dir --password "你的密码" --chunk-mb 32
+
+# 仅处理媒体文件
+python3 fast_media_lock.py lock /path/to/dir --password "你的密码" --media-only
+
+# 并发处理(0=自动,建议按磁盘和CPU调)
+python3 fast_media_lock.py lock /path/to/dir --password "你的密码" --workers 8
+
+# 解密目录中的已加密文件
+python3 fast_media_lock.py unlock /path/to/dir --password "你的密码"
+
+# 按文件名(或相对路径)加密指定文件
+python3 fast_media_lock.py lock-name /path/to/dir a.mp4 b.mp3 subdir/c.jpg --password "你的密码"
+
+# 按文件名(或相对路径)解密指定文件
+python3 fast_media_lock.py unlock-name /path/to/dir a.mp4.lockx subdir/c.jpg.lockx --password "你的密码"
+
+# 查看目录内文件加密状态
+python3 fast_media_lock.py status /path/to/dir
+
+# 旧关键字仍可用(别名)
+python3 fast_media_lock.py encrypt /path/to/dir --password "你的密码"
+python3 fast_media_lock.py decrypt /path/to/dir --password "你的密码"
+```
+
+## Windows 示例
+
+```bat
+python fast_media_lock.py lock "D:\\videos" --password "你的密码"
+python fast_media_lock.py unlock "D:\\videos" --password "你的密码"
+python fast_media_lock.py status "D:\\videos"
+```
+
+## Ubuntu 全局命令(非定时)
+
+项目里已提供:
+- `fast-media-lock`:shell 启动器(转发参数给 `fast_media_lock.py`)
+- `install_fast_media_lock.sh`:安装到 `/usr/local/bin/fast-media-lock`
+
+安装:
+
+```bash
+cd /home/myprojector/hidedoc
+./install_fast_media_lock.sh
+```
+
+安装后可在任意目录执行:
+
+```bash
+fast-media-lock -h
+fast-media-lock lock /path/to/dir --password "你的密码" --workers 8
+fast-media-lock unlock /path/to/dir --password "你的密码"
+```
+
+卸载:
+
+```bash
+sudo rm -f /usr/local/bin/fast-media-lock
+```

+ 12 - 0
fast-media-lock

@@ -0,0 +1,12 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+TOOL="$SCRIPT_DIR/fast_media_lock.py"
+
+if [[ ! -f "$TOOL" ]]; then
+  echo "[ERROR] Cannot find fast_media_lock.py beside this script: $TOOL" >&2
+  exit 1
+fi
+
+exec python3 "$TOOL" "$@"

+ 447 - 0
fast_media_lock.py

@@ -0,0 +1,447 @@
+#!/usr/bin/env python3
+"""
+Fast partial media locker (cross-platform, Python stdlib only).
+
+Goal:
+- Very fast "lock/unlock" by encrypting only the first N MB of each file.
+- No separate key file; password only.
+- Metadata is embedded in each encrypted file tail, so files can be moved anywhere
+  and still be unlocked by this script.
+
+Security note:
+- This is NOT full-file encryption. It is designed for speed/obfuscation.
+"""
+
+from __future__ import annotations
+
+import argparse
+import concurrent.futures
+import getpass
+import hashlib
+import hmac
+import os
+import struct
+import sys
+import zlib
+from pathlib import Path
+from typing import Iterable
+
+MAGIC = b"FMLKv1!!"
+VERSION = 1
+SALT_SIZE = 16
+NONCE_SIZE = 16
+VERIFIER_SIZE = 16
+LOCKED_SUFFIX = ".lockx"
+
+# magic(8) + version(1) + reserved(3) + chunk_size(8) + original_size(8)
+# + salt(16) + nonce(16) + verifier(16) + crc32(4)
+TRAILER_STRUCT = struct.Struct(">8sB3sQQ16s16s16sI")
+TRAILER_SIZE = TRAILER_STRUCT.size
+
+MEDIA_EXTS = {
+    ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".m4v", ".ts", ".m2ts",
+    ".mp3", ".wav", ".flac", ".aac", ".m4a", ".ogg", ".wma",
+    ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".heic",
+}
+
+
+class LockerError(Exception):
+    pass
+
+
+def _derive_keys(password: str, salt: bytes) -> tuple[bytes, bytes]:
+    # scrypt is in stdlib and works on Linux/Windows without extra dependencies.
+    km = hashlib.scrypt(
+        password.encode("utf-8"),
+        salt=salt,
+        n=2**14,
+        r=8,
+        p=1,
+        dklen=64,
+    )
+    return km[:32], km[32:]
+
+
+def _keystream(stream_key: bytes, nonce: bytes, length: int) -> bytes:
+    if length <= 0:
+        return b""
+    blocks = (length + 31) // 32
+    out = bytearray(blocks * 32)
+    mv = memoryview(out)
+    off = 0
+    for counter in range(blocks):
+        block = hmac.digest(stream_key, nonce + counter.to_bytes(8, "big"), hashlib.sha256)
+        mv[off : off + 32] = block
+        off += 32
+    return bytes(mv[:length])
+
+
+def _xor_bytes(data: bytes, stream_key: bytes, nonce: bytes) -> bytes:
+    if not data:
+        return b""
+    ks = _keystream(stream_key, nonce, len(data))
+    # Python-level byte loops are slow for MB-size chunks.
+    x = int.from_bytes(data, "little") ^ int.from_bytes(ks, "little")
+    return x.to_bytes(len(data), "little")
+
+
+def _build_verifier(check_key: bytes) -> bytes:
+    return hmac.digest(check_key, b"FMLK-PASSWORD-CHECK", hashlib.sha256)[:VERIFIER_SIZE]
+
+
+def _build_trailer(chunk_size: int, original_size: int, salt: bytes, nonce: bytes, verifier: bytes) -> bytes:
+    head = TRAILER_STRUCT.pack(
+        MAGIC,
+        VERSION,
+        b"\x00\x00\x00",
+        chunk_size,
+        original_size,
+        salt,
+        nonce,
+        verifier,
+        0,
+    )
+    crc = zlib.crc32(head[:-4]) & 0xFFFFFFFF
+    return head[:-4] + struct.pack(">I", crc)
+
+
+def _parse_trailer(raw: bytes) -> dict | None:
+    if len(raw) != TRAILER_SIZE:
+        return None
+
+    magic, version, _reserved, chunk_size, original_size, salt, nonce, verifier, crc = TRAILER_STRUCT.unpack(raw)
+
+    if magic != MAGIC or version != VERSION:
+        return None
+
+    expect_crc = zlib.crc32(raw[:-4]) & 0xFFFFFFFF
+    if crc != expect_crc:
+        return None
+
+    return {
+        "chunk_size": chunk_size,
+        "original_size": original_size,
+        "salt": salt,
+        "nonce": nonce,
+        "verifier": verifier,
+    }
+
+
+def _read_trailer(path: Path) -> dict | None:
+    size = path.stat().st_size
+    if size < TRAILER_SIZE:
+        return None
+
+    with path.open("rb") as f:
+        f.seek(-TRAILER_SIZE, os.SEEK_END)
+        raw = f.read(TRAILER_SIZE)
+    return _parse_trailer(raw)
+
+
+def is_encrypted(path: Path) -> bool:
+    meta = _read_trailer(path)
+    if not meta:
+        return False
+
+    size = path.stat().st_size
+    # Basic sanity: encrypted file size should be original + trailer.
+    return meta["original_size"] + TRAILER_SIZE == size
+
+
+def _iter_files(target: Path, all_files: bool) -> Iterable[Path]:
+    if target.is_file():
+        yield target
+        return
+
+    for p in target.rglob("*"):
+        if not p.is_file():
+            continue
+        if not all_files and p.suffix.lower() not in MEDIA_EXTS:
+            continue
+        yield p
+
+
+def _iter_files_by_names(target: Path, names: list[str], all_files: bool) -> Iterable[Path]:
+    if target.is_file():
+        selected = {target.resolve()}
+    else:
+        selected: set[Path] = set()
+        raw_names = [n.strip() for n in names if n.strip()]
+        basename_set = {n for n in raw_names if "/" not in n and "\\" not in n}
+
+        for raw in raw_names:
+            if "/" in raw or "\\" in raw:
+                rel = Path(raw)
+                cand = (target / rel).resolve()
+                if cand.is_file():
+                    selected.add(cand)
+
+        for p in _iter_files(target, all_files=all_files):
+            if p.name in basename_set:
+                selected.add(p.resolve())
+
+    for p in sorted(selected):
+        if not all_files and p.suffix.lower() not in MEDIA_EXTS:
+            continue
+        yield p
+
+
+def _locked_name(path: Path) -> Path:
+    if path.name.endswith(LOCKED_SUFFIX):
+        return path
+    return path.with_name(path.name + LOCKED_SUFFIX)
+
+
+def _unlocked_name(path: Path) -> Path:
+    if path.name.endswith(LOCKED_SUFFIX):
+        return path.with_name(path.name[: -len(LOCKED_SUFFIX)])
+    return path
+
+
+def encrypt_file(path: Path, password: str, chunk_size: int) -> str:
+    if not path.exists() or not path.is_file():
+        return "skip(not_file)"
+
+    if is_encrypted(path):
+        return "skip(already_encrypted)"
+
+    original_size = path.stat().st_size
+    if original_size == 0:
+        return "skip(empty)"
+
+    locked_path = _locked_name(path)
+    if locked_path != path and locked_path.exists():
+        return "fail(name_conflict)"
+
+    salt = os.urandom(SALT_SIZE)
+    nonce = os.urandom(NONCE_SIZE)
+    stream_key, check_key = _derive_keys(password, salt)
+    verifier = _build_verifier(check_key)
+
+    n = min(chunk_size, original_size)
+
+    with path.open("r+b") as f:
+        plain = f.read(n)
+        cipher = _xor_bytes(plain, stream_key, nonce)
+        f.seek(0)
+        f.write(cipher)
+        trailer = _build_trailer(chunk_size=chunk_size, original_size=original_size, salt=salt, nonce=nonce, verifier=verifier)
+        f.seek(0, os.SEEK_END)
+        f.write(trailer)
+
+    if locked_path != path:
+        path.rename(locked_path)
+
+    return "ok"
+
+
+def decrypt_file(path: Path, password: str) -> str:
+    if not path.exists() or not path.is_file():
+        return "skip(not_file)"
+
+    meta = _read_trailer(path)
+    if not meta:
+        return "skip(not_encrypted)"
+
+    size = path.stat().st_size
+    if meta["original_size"] + TRAILER_SIZE != size:
+        return "skip(invalid_layout)"
+
+    unlocked_path = _unlocked_name(path)
+    if unlocked_path != path and unlocked_path.exists():
+        return "fail(name_conflict)"
+
+    stream_key, check_key = _derive_keys(password, meta["salt"])
+    verifier = _build_verifier(check_key)
+    if not hmac.compare_digest(verifier, meta["verifier"]):
+        return "fail(wrong_password)"
+
+    n = min(meta["chunk_size"], meta["original_size"])
+
+    with path.open("r+b") as f:
+        cipher = f.read(n)
+        plain = _xor_bytes(cipher, stream_key, meta["nonce"])
+        f.seek(0)
+        f.write(plain)
+        f.truncate(meta["original_size"])
+
+    if unlocked_path != path:
+        path.rename(unlocked_path)
+
+    return "ok"
+
+
+def _ask_password(confirm: bool) -> str:
+    pw = getpass.getpass("请输入密码: ")
+    if not pw:
+        raise LockerError("密码不能为空")
+
+    if confirm:
+        pw2 = getpass.getpass("请再次输入密码: ")
+        if pw != pw2:
+            raise LockerError("两次密码不一致")
+
+    return pw
+
+
+def _resolve_workers(workers: int) -> int:
+    if workers < 0:
+        raise LockerError("--workers 不能小于0")
+    if workers == 0:
+        cpu = os.cpu_count() or 1
+        return max(1, min(32, cpu * 2))
+    return workers
+
+
+def run_encrypt(target: Path, password: str, chunk_mb: int, all_files: bool, workers: int, names: list[str] | None = None) -> int:
+    chunk_size = chunk_mb * 1024 * 1024
+    worker_count = _resolve_workers(workers)
+    ok = 0
+    skipped = 0
+    failed = 0
+
+    file_iter = _iter_files_by_names(target, names, all_files=all_files) if names else _iter_files(target, all_files=all_files)
+    files = list(file_iter)
+
+    if worker_count == 1:
+        results = ((p, encrypt_file(p, password, chunk_size)) for p in files)
+    else:
+        with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as ex:
+            mapped = ex.map(lambda p: encrypt_file(p, password, chunk_size), files)
+            results = zip(files, mapped)
+
+    for p, res in results:
+        if res == "ok":
+            ok += 1
+            print(f"[OK] encrypted: {p}")
+        elif res.startswith("fail"):
+            failed += 1
+            print(f"[FAIL] {p} -> {res}")
+        else:
+            skipped += 1
+            print(f"[SKIP] {p} -> {res}")
+
+    print(f"完成: encrypted={ok}, failed={failed}, skipped={skipped}, chunk={chunk_mb}MB, workers={worker_count}")
+    return 2 if failed else 0
+
+
+def run_decrypt(target: Path, password: str, all_files: bool, workers: int, names: list[str] | None = None) -> int:
+    worker_count = _resolve_workers(workers)
+    ok = 0
+    skipped = 0
+    failed = 0
+
+    file_iter = _iter_files_by_names(target, names, all_files=all_files) if names else _iter_files(target, all_files=all_files)
+    files = list(file_iter)
+
+    if worker_count == 1:
+        results = ((p, decrypt_file(p, password)) for p in files)
+    else:
+        with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as ex:
+            mapped = ex.map(lambda p: decrypt_file(p, password), files)
+            results = zip(files, mapped)
+
+    for p, res in results:
+        if res == "ok":
+            ok += 1
+            print(f"[OK] decrypted: {p}")
+        elif res.startswith("fail"):
+            failed += 1
+            print(f"[FAIL] {p} -> {res}")
+        else:
+            skipped += 1
+            print(f"[SKIP] {p} -> {res}")
+
+    print(f"完成: decrypted={ok}, failed={failed}, skipped={skipped}, workers={worker_count}")
+    return 2 if failed else 0
+
+
+def run_status(target: Path, all_files: bool) -> int:
+    total = 0
+    encrypted = 0
+
+    for p in _iter_files(target, all_files=all_files):
+        total += 1
+        flag = is_encrypted(p)
+        encrypted += int(flag)
+        print(f"{'[ENC]' if flag else '[RAW]'} {p}")
+
+    print(f"统计: total={total}, encrypted={encrypted}, raw={total - encrypted}")
+    return 0
+
+
+def build_parser() -> argparse.ArgumentParser:
+    parser = argparse.ArgumentParser(
+        description="快速加密工具"
+    )
+    sub = parser.add_subparsers(dest="cmd", required=True)
+
+    for name in ("lock", "unlock", "status", "lock-name", "unlock-name"):
+        aliases: list[str] = []
+        if name == "lock":
+            aliases = ["encrypt"]
+        elif name == "unlock":
+            aliases = ["decrypt"]
+        elif name == "lock-name":
+            aliases = ["encrypt-name"]
+        elif name == "unlock-name":
+            aliases = ["decrypt-name"]
+
+        p = sub.add_parser(name, aliases=aliases)
+        p.add_argument("target", help="文件或目录")
+        if name in ("lock-name", "unlock-name"):
+            p.add_argument("names", nargs="+", help="文件名或相对路径(可一次传多个)")
+        p.add_argument("--media-only", action="store_true", help="仅处理媒体后缀(默认处理所有文件)")
+        if name in ("lock", "lock-name"):
+            p.add_argument("--chunk-mb", type=int, default=8, help="加密前多少MB(默认8)")
+        if name in ("lock", "unlock", "lock-name", "unlock-name"):
+            p.add_argument("--password", help="密码(不传则交互输入)")
+            p.add_argument("--workers", type=int, default=0, help="并发线程数,0=自动(默认)")
+
+    return parser
+
+
+def main() -> int:
+    parser = build_parser()
+    args = parser.parse_args()
+    target = Path(args.target)
+
+    if not target.exists():
+        print(f"目标不存在: {target}", file=sys.stderr)
+        return 1
+
+    try:
+        all_files = not args.media_only
+
+        if args.cmd in ("lock", "encrypt"):
+            if args.chunk_mb <= 0:
+                raise LockerError("--chunk-mb 必须大于0")
+            pw = args.password if args.password else _ask_password(confirm=True)
+            return run_encrypt(target, pw, args.chunk_mb, all_files=all_files, workers=args.workers)
+
+        if args.cmd in ("unlock", "decrypt"):
+            pw = args.password if args.password else _ask_password(confirm=False)
+            return run_decrypt(target, pw, all_files=all_files, workers=args.workers)
+
+        if args.cmd in ("lock-name", "encrypt-name"):
+            if args.chunk_mb <= 0:
+                raise LockerError("--chunk-mb 必须大于0")
+            pw = args.password if args.password else _ask_password(confirm=True)
+            return run_encrypt(target, pw, args.chunk_mb, all_files=all_files, workers=args.workers, names=args.names)
+
+        if args.cmd in ("unlock-name", "decrypt-name"):
+            pw = args.password if args.password else _ask_password(confirm=False)
+            return run_decrypt(target, pw, all_files=all_files, workers=args.workers, names=args.names)
+
+        return run_status(target, all_files=all_files)
+
+    except LockerError as e:
+        print(f"错误: {e}", file=sys.stderr)
+        return 1
+    except KeyboardInterrupt:
+        print("已取消", file=sys.stderr)
+        return 130
+
+
+if __name__ == "__main__":
+    raise SystemExit(main())

+ 54 - 0
install_fast_media_lock.sh

@@ -0,0 +1,54 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+LAUNCHER="$SCRIPT_DIR/fast-media-lock"
+TARGET="/usr/local/bin/fast-media-lock"
+DEFAULT_WORKERS=8
+
+if [[ ! -x "$LAUNCHER" ]]; then
+  echo "[ERROR] launcher not found or not executable: $LAUNCHER" >&2
+  exit 1
+fi
+
+TMP_WRAPPER="$(mktemp)"
+trap 'rm -f "$TMP_WRAPPER"' EXIT
+
+cat > "$TMP_WRAPPER" <<EOF
+#!/usr/bin/env bash
+set -euo pipefail
+
+LAUNCHER="$LAUNCHER"
+DEFAULT_WORKERS=$DEFAULT_WORKERS
+
+if [[ ! -x "\$LAUNCHER" ]]; then
+  echo "[ERROR] launcher not found or not executable: \$LAUNCHER" >&2
+  exit 1
+fi
+
+args=("\$@")
+has_workers=0
+for arg in "\${args[@]}"; do
+  case "\$arg" in
+    --workers|--workers=*)
+      has_workers=1
+      break
+      ;;
+  esac
+done
+
+if [[ \$has_workers -eq 0 ]]; then
+  case "\${1:-}" in
+    lock|encrypt|unlock|decrypt|lock-name|encrypt-name|unlock-name|decrypt-name)
+      args+=(--workers "\$DEFAULT_WORKERS")
+      ;;
+  esac
+fi
+
+exec "\$LAUNCHER" "\${args[@]}"
+EOF
+
+sudo install -m 755 "$TMP_WRAPPER" "$TARGET"
+
+echo "Installed: $TARGET -> $LAUNCHER"
+echo "Try: fast-media-lock -h"