#!/usr/bin/env python3 """ Fast partial media locker (cross-platform, Python stdlib only). Goal: - Very fast "lock/unlock" by encrypting only the first N MB of each file. - No separate key file; password only. - Metadata is embedded in each encrypted file tail, so files can be moved anywhere and still be unlocked by this script. Security note: - This is NOT full-file encryption. It is designed for speed/obfuscation. """ from __future__ import annotations import argparse import concurrent.futures import getpass import hashlib import hmac import os import struct import sys import zlib from pathlib import Path from typing import Iterable MAGIC = b"FMLKv1!!" VERSION = 1 SALT_SIZE = 16 NONCE_SIZE = 16 VERIFIER_SIZE = 16 LOCKED_SUFFIX = ".lockx" # magic(8) + version(1) + reserved(3) + chunk_size(8) + original_size(8) # + salt(16) + nonce(16) + verifier(16) + crc32(4) TRAILER_STRUCT = struct.Struct(">8sB3sQQ16s16s16sI") TRAILER_SIZE = TRAILER_STRUCT.size MEDIA_EXTS = { ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".m4v", ".ts", ".m2ts", ".mp3", ".wav", ".flac", ".aac", ".m4a", ".ogg", ".wma", ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".heic", } class LockerError(Exception): pass def _derive_keys(password: str, salt: bytes) -> tuple[bytes, bytes]: # scrypt is in stdlib and works on Linux/Windows without extra dependencies. km = hashlib.scrypt( password.encode("utf-8"), salt=salt, n=2**14, r=8, p=1, dklen=64, ) return km[:32], km[32:] def _keystream(stream_key: bytes, nonce: bytes, length: int) -> bytes: if length <= 0: return b"" blocks = (length + 31) // 32 out = bytearray(blocks * 32) mv = memoryview(out) off = 0 for counter in range(blocks): block = hmac.digest(stream_key, nonce + counter.to_bytes(8, "big"), hashlib.sha256) mv[off : off + 32] = block off += 32 return bytes(mv[:length]) def _xor_bytes(data: bytes, stream_key: bytes, nonce: bytes) -> bytes: if not data: return b"" ks = _keystream(stream_key, nonce, len(data)) # Python-level byte loops are slow for MB-size chunks. x = int.from_bytes(data, "little") ^ int.from_bytes(ks, "little") return x.to_bytes(len(data), "little") def _build_verifier(check_key: bytes) -> bytes: return hmac.digest(check_key, b"FMLK-PASSWORD-CHECK", hashlib.sha256)[:VERIFIER_SIZE] def _build_trailer(chunk_size: int, original_size: int, salt: bytes, nonce: bytes, verifier: bytes) -> bytes: head = TRAILER_STRUCT.pack( MAGIC, VERSION, b"\x00\x00\x00", chunk_size, original_size, salt, nonce, verifier, 0, ) crc = zlib.crc32(head[:-4]) & 0xFFFFFFFF return head[:-4] + struct.pack(">I", crc) def _parse_trailer(raw: bytes) -> dict | None: if len(raw) != TRAILER_SIZE: return None magic, version, _reserved, chunk_size, original_size, salt, nonce, verifier, crc = TRAILER_STRUCT.unpack(raw) if magic != MAGIC or version != VERSION: return None expect_crc = zlib.crc32(raw[:-4]) & 0xFFFFFFFF if crc != expect_crc: return None return { "chunk_size": chunk_size, "original_size": original_size, "salt": salt, "nonce": nonce, "verifier": verifier, } def _read_trailer(path: Path) -> dict | None: size = path.stat().st_size if size < TRAILER_SIZE: return None with path.open("rb") as f: f.seek(-TRAILER_SIZE, os.SEEK_END) raw = f.read(TRAILER_SIZE) return _parse_trailer(raw) def is_encrypted(path: Path) -> bool: meta = _read_trailer(path) if not meta: return False size = path.stat().st_size # Basic sanity: encrypted file size should be original + trailer. return meta["original_size"] + TRAILER_SIZE == size def _iter_files(target: Path, all_files: bool) -> Iterable[Path]: if target.is_file(): yield target return for p in target.rglob("*"): if not p.is_file(): continue if not all_files and p.suffix.lower() not in MEDIA_EXTS: continue yield p def _iter_files_by_names(target: Path, names: list[str], all_files: bool) -> Iterable[Path]: if target.is_file(): selected = {target.resolve()} else: selected: set[Path] = set() raw_names = [n.strip() for n in names if n.strip()] basename_set = {n for n in raw_names if "/" not in n and "\\" not in n} for raw in raw_names: if "/" in raw or "\\" in raw: rel = Path(raw) cand = (target / rel).resolve() if cand.is_file(): selected.add(cand) for p in _iter_files(target, all_files=all_files): if p.name in basename_set: selected.add(p.resolve()) for p in sorted(selected): if not all_files and p.suffix.lower() not in MEDIA_EXTS: continue yield p def _locked_name(path: Path) -> Path: if path.name.endswith(LOCKED_SUFFIX): return path return path.with_name(path.name + LOCKED_SUFFIX) def _unlocked_name(path: Path) -> Path: if path.name.endswith(LOCKED_SUFFIX): return path.with_name(path.name[: -len(LOCKED_SUFFIX)]) return path def encrypt_file(path: Path, password: str, chunk_size: int) -> str: if not path.exists() or not path.is_file(): return "skip(not_file)" if is_encrypted(path): return "skip(already_encrypted)" original_size = path.stat().st_size if original_size == 0: return "skip(empty)" locked_path = _locked_name(path) if locked_path != path and locked_path.exists(): return "fail(name_conflict)" salt = os.urandom(SALT_SIZE) nonce = os.urandom(NONCE_SIZE) stream_key, check_key = _derive_keys(password, salt) verifier = _build_verifier(check_key) n = min(chunk_size, original_size) with path.open("r+b") as f: plain = f.read(n) cipher = _xor_bytes(plain, stream_key, nonce) f.seek(0) f.write(cipher) trailer = _build_trailer(chunk_size=chunk_size, original_size=original_size, salt=salt, nonce=nonce, verifier=verifier) f.seek(0, os.SEEK_END) f.write(trailer) if locked_path != path: path.rename(locked_path) return "ok" def decrypt_file(path: Path, password: str) -> str: if not path.exists() or not path.is_file(): return "skip(not_file)" meta = _read_trailer(path) if not meta: return "skip(not_encrypted)" size = path.stat().st_size if meta["original_size"] + TRAILER_SIZE != size: return "skip(invalid_layout)" unlocked_path = _unlocked_name(path) if unlocked_path != path and unlocked_path.exists(): return "fail(name_conflict)" stream_key, check_key = _derive_keys(password, meta["salt"]) verifier = _build_verifier(check_key) if not hmac.compare_digest(verifier, meta["verifier"]): return "fail(wrong_password)" n = min(meta["chunk_size"], meta["original_size"]) with path.open("r+b") as f: cipher = f.read(n) plain = _xor_bytes(cipher, stream_key, meta["nonce"]) f.seek(0) f.write(plain) f.truncate(meta["original_size"]) if unlocked_path != path: path.rename(unlocked_path) return "ok" def _ask_password(confirm: bool) -> str: pw = getpass.getpass("请输入密码: ") if not pw: raise LockerError("密码不能为空") if confirm: pw2 = getpass.getpass("请再次输入密码: ") if pw != pw2: raise LockerError("两次密码不一致") return pw def _resolve_workers(workers: int) -> int: if workers < 0: raise LockerError("--workers 不能小于0") if workers == 0: cpu = os.cpu_count() or 1 return max(1, min(32, cpu * 2)) return workers def run_encrypt(target: Path, password: str, chunk_mb: int, all_files: bool, workers: int, names: list[str] | None = None) -> int: chunk_size = chunk_mb * 1024 * 1024 worker_count = _resolve_workers(workers) ok = 0 skipped = 0 failed = 0 file_iter = _iter_files_by_names(target, names, all_files=all_files) if names else _iter_files(target, all_files=all_files) files = list(file_iter) if worker_count == 1: results = ((p, encrypt_file(p, password, chunk_size)) for p in files) else: with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as ex: mapped = ex.map(lambda p: encrypt_file(p, password, chunk_size), files) results = zip(files, mapped) for p, res in results: if res == "ok": ok += 1 print(f"[OK] encrypted: {p}") elif res.startswith("fail"): failed += 1 print(f"[FAIL] {p} -> {res}") else: skipped += 1 print(f"[SKIP] {p} -> {res}") print(f"完成: encrypted={ok}, failed={failed}, skipped={skipped}, chunk={chunk_mb}MB, workers={worker_count}") return 2 if failed else 0 def run_decrypt(target: Path, password: str, all_files: bool, workers: int, names: list[str] | None = None) -> int: worker_count = _resolve_workers(workers) ok = 0 skipped = 0 failed = 0 file_iter = _iter_files_by_names(target, names, all_files=all_files) if names else _iter_files(target, all_files=all_files) files = list(file_iter) if worker_count == 1: results = ((p, decrypt_file(p, password)) for p in files) else: with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as ex: mapped = ex.map(lambda p: decrypt_file(p, password), files) results = zip(files, mapped) for p, res in results: if res == "ok": ok += 1 print(f"[OK] decrypted: {p}") elif res.startswith("fail"): failed += 1 print(f"[FAIL] {p} -> {res}") else: skipped += 1 print(f"[SKIP] {p} -> {res}") print(f"完成: decrypted={ok}, failed={failed}, skipped={skipped}, workers={worker_count}") return 2 if failed else 0 def run_status(target: Path, all_files: bool) -> int: total = 0 encrypted = 0 for p in _iter_files(target, all_files=all_files): total += 1 flag = is_encrypted(p) encrypted += int(flag) print(f"{'[ENC]' if flag else '[RAW]'} {p}") print(f"统计: total={total}, encrypted={encrypted}, raw={total - encrypted}") return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="快速加密工具" ) sub = parser.add_subparsers(dest="cmd", required=True) for name in ("lock", "unlock", "status", "lock-name", "unlock-name"): aliases: list[str] = [] if name == "lock": aliases = ["encrypt"] elif name == "unlock": aliases = ["decrypt"] elif name == "lock-name": aliases = ["encrypt-name"] elif name == "unlock-name": aliases = ["decrypt-name"] p = sub.add_parser(name, aliases=aliases) p.add_argument("target", help="文件或目录") if name in ("lock-name", "unlock-name"): p.add_argument("names", nargs="+", help="文件名或相对路径(可一次传多个)") p.add_argument("--media-only", action="store_true", help="仅处理媒体后缀(默认处理所有文件)") if name in ("lock", "lock-name"): p.add_argument("--chunk-mb", type=int, default=8, help="加密前多少MB(默认8)") if name in ("lock", "unlock", "lock-name", "unlock-name"): p.add_argument("--password", help="密码(不传则交互输入)") p.add_argument("--workers", type=int, default=0, help="并发线程数,0=自动(默认)") return parser def main() -> int: parser = build_parser() args = parser.parse_args() target = Path(args.target) if not target.exists(): print(f"目标不存在: {target}", file=sys.stderr) return 1 try: all_files = not args.media_only if args.cmd in ("lock", "encrypt"): if args.chunk_mb <= 0: raise LockerError("--chunk-mb 必须大于0") pw = args.password if args.password else _ask_password(confirm=True) return run_encrypt(target, pw, args.chunk_mb, all_files=all_files, workers=args.workers) if args.cmd in ("unlock", "decrypt"): pw = args.password if args.password else _ask_password(confirm=False) return run_decrypt(target, pw, all_files=all_files, workers=args.workers) if args.cmd in ("lock-name", "encrypt-name"): if args.chunk_mb <= 0: raise LockerError("--chunk-mb 必须大于0") pw = args.password if args.password else _ask_password(confirm=True) return run_encrypt(target, pw, args.chunk_mb, all_files=all_files, workers=args.workers, names=args.names) if args.cmd in ("unlock-name", "decrypt-name"): pw = args.password if args.password else _ask_password(confirm=False) return run_decrypt(target, pw, all_files=all_files, workers=args.workers, names=args.names) return run_status(target, all_files=all_files) except LockerError as e: print(f"错误: {e}", file=sys.stderr) return 1 except KeyboardInterrupt: print("已取消", file=sys.stderr) return 130 if __name__ == "__main__": raise SystemExit(main())