| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- #!/usr/bin/env python3
- """
- Fast partial media locker (cross-platform, Python stdlib only).
- Goal:
- - Very fast "lock/unlock" by encrypting only the first N MB of each file.
- - No separate key file; password only.
- - Metadata is embedded in each encrypted file tail, so files can be moved anywhere
- and still be unlocked by this script.
- Security note:
- - This is NOT full-file encryption. It is designed for speed/obfuscation.
- """
- from __future__ import annotations
- import argparse
- import concurrent.futures
- import getpass
- import hashlib
- import hmac
- import os
- import struct
- import sys
- import zlib
- from pathlib import Path
- from typing import Iterable
- MAGIC = b"FMLKv1!!"
- VERSION = 1
- SALT_SIZE = 16
- NONCE_SIZE = 16
- VERIFIER_SIZE = 16
- LOCKED_SUFFIX = ".lockx"
- # magic(8) + version(1) + reserved(3) + chunk_size(8) + original_size(8)
- # + salt(16) + nonce(16) + verifier(16) + crc32(4)
- TRAILER_STRUCT = struct.Struct(">8sB3sQQ16s16s16sI")
- TRAILER_SIZE = TRAILER_STRUCT.size
- MEDIA_EXTS = {
- ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".m4v", ".ts", ".m2ts",
- ".mp3", ".wav", ".flac", ".aac", ".m4a", ".ogg", ".wma",
- ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".heic",
- }
- class LockerError(Exception):
- pass
- def _derive_keys(password: str, salt: bytes) -> tuple[bytes, bytes]:
- # scrypt is in stdlib and works on Linux/Windows without extra dependencies.
- km = hashlib.scrypt(
- password.encode("utf-8"),
- salt=salt,
- n=2**14,
- r=8,
- p=1,
- dklen=64,
- )
- return km[:32], km[32:]
- def _keystream(stream_key: bytes, nonce: bytes, length: int) -> bytes:
- if length <= 0:
- return b""
- blocks = (length + 31) // 32
- out = bytearray(blocks * 32)
- mv = memoryview(out)
- off = 0
- for counter in range(blocks):
- block = hmac.digest(stream_key, nonce + counter.to_bytes(8, "big"), hashlib.sha256)
- mv[off : off + 32] = block
- off += 32
- return bytes(mv[:length])
- def _xor_bytes(data: bytes, stream_key: bytes, nonce: bytes) -> bytes:
- if not data:
- return b""
- ks = _keystream(stream_key, nonce, len(data))
- # Python-level byte loops are slow for MB-size chunks.
- x = int.from_bytes(data, "little") ^ int.from_bytes(ks, "little")
- return x.to_bytes(len(data), "little")
- def _build_verifier(check_key: bytes) -> bytes:
- return hmac.digest(check_key, b"FMLK-PASSWORD-CHECK", hashlib.sha256)[:VERIFIER_SIZE]
- def _build_trailer(chunk_size: int, original_size: int, salt: bytes, nonce: bytes, verifier: bytes) -> bytes:
- head = TRAILER_STRUCT.pack(
- MAGIC,
- VERSION,
- b"\x00\x00\x00",
- chunk_size,
- original_size,
- salt,
- nonce,
- verifier,
- 0,
- )
- crc = zlib.crc32(head[:-4]) & 0xFFFFFFFF
- return head[:-4] + struct.pack(">I", crc)
- def _parse_trailer(raw: bytes) -> dict | None:
- if len(raw) != TRAILER_SIZE:
- return None
- magic, version, _reserved, chunk_size, original_size, salt, nonce, verifier, crc = TRAILER_STRUCT.unpack(raw)
- if magic != MAGIC or version != VERSION:
- return None
- expect_crc = zlib.crc32(raw[:-4]) & 0xFFFFFFFF
- if crc != expect_crc:
- return None
- return {
- "chunk_size": chunk_size,
- "original_size": original_size,
- "salt": salt,
- "nonce": nonce,
- "verifier": verifier,
- }
- def _read_trailer(path: Path) -> dict | None:
- size = path.stat().st_size
- if size < TRAILER_SIZE:
- return None
- with path.open("rb") as f:
- f.seek(-TRAILER_SIZE, os.SEEK_END)
- raw = f.read(TRAILER_SIZE)
- return _parse_trailer(raw)
- def is_encrypted(path: Path) -> bool:
- meta = _read_trailer(path)
- if not meta:
- return False
- size = path.stat().st_size
- # Basic sanity: encrypted file size should be original + trailer.
- return meta["original_size"] + TRAILER_SIZE == size
- def _iter_files(target: Path, all_files: bool) -> Iterable[Path]:
- if target.is_file():
- yield target
- return
- for p in target.rglob("*"):
- if not p.is_file():
- continue
- if not all_files and p.suffix.lower() not in MEDIA_EXTS:
- continue
- yield p
- def _iter_files_by_names(target: Path, names: list[str], all_files: bool) -> Iterable[Path]:
- if target.is_file():
- selected = {target.resolve()}
- else:
- selected: set[Path] = set()
- raw_names = [n.strip() for n in names if n.strip()]
- basename_set = {n for n in raw_names if "/" not in n and "\\" not in n}
- for raw in raw_names:
- if "/" in raw or "\\" in raw:
- rel = Path(raw)
- cand = (target / rel).resolve()
- if cand.is_file():
- selected.add(cand)
- for p in _iter_files(target, all_files=all_files):
- if p.name in basename_set:
- selected.add(p.resolve())
- for p in sorted(selected):
- if not all_files and p.suffix.lower() not in MEDIA_EXTS:
- continue
- yield p
- def _locked_name(path: Path) -> Path:
- if path.name.endswith(LOCKED_SUFFIX):
- return path
- return path.with_name(path.name + LOCKED_SUFFIX)
- def _unlocked_name(path: Path) -> Path:
- if path.name.endswith(LOCKED_SUFFIX):
- return path.with_name(path.name[: -len(LOCKED_SUFFIX)])
- return path
- def encrypt_file(path: Path, password: str, chunk_size: int) -> str:
- if not path.exists() or not path.is_file():
- return "skip(not_file)"
- if is_encrypted(path):
- return "skip(already_encrypted)"
- original_size = path.stat().st_size
- if original_size == 0:
- return "skip(empty)"
- locked_path = _locked_name(path)
- if locked_path != path and locked_path.exists():
- return "fail(name_conflict)"
- salt = os.urandom(SALT_SIZE)
- nonce = os.urandom(NONCE_SIZE)
- stream_key, check_key = _derive_keys(password, salt)
- verifier = _build_verifier(check_key)
- n = min(chunk_size, original_size)
- with path.open("r+b") as f:
- plain = f.read(n)
- cipher = _xor_bytes(plain, stream_key, nonce)
- f.seek(0)
- f.write(cipher)
- trailer = _build_trailer(chunk_size=chunk_size, original_size=original_size, salt=salt, nonce=nonce, verifier=verifier)
- f.seek(0, os.SEEK_END)
- f.write(trailer)
- if locked_path != path:
- path.rename(locked_path)
- return "ok"
- def decrypt_file(path: Path, password: str) -> str:
- if not path.exists() or not path.is_file():
- return "skip(not_file)"
- meta = _read_trailer(path)
- if not meta:
- return "skip(not_encrypted)"
- size = path.stat().st_size
- if meta["original_size"] + TRAILER_SIZE != size:
- return "skip(invalid_layout)"
- unlocked_path = _unlocked_name(path)
- if unlocked_path != path and unlocked_path.exists():
- return "fail(name_conflict)"
- stream_key, check_key = _derive_keys(password, meta["salt"])
- verifier = _build_verifier(check_key)
- if not hmac.compare_digest(verifier, meta["verifier"]):
- return "fail(wrong_password)"
- n = min(meta["chunk_size"], meta["original_size"])
- with path.open("r+b") as f:
- cipher = f.read(n)
- plain = _xor_bytes(cipher, stream_key, meta["nonce"])
- f.seek(0)
- f.write(plain)
- f.truncate(meta["original_size"])
- if unlocked_path != path:
- path.rename(unlocked_path)
- return "ok"
- def _ask_password(confirm: bool) -> str:
- pw = getpass.getpass("请输入密码: ")
- if not pw:
- raise LockerError("密码不能为空")
- if confirm:
- pw2 = getpass.getpass("请再次输入密码: ")
- if pw != pw2:
- raise LockerError("两次密码不一致")
- return pw
- def _resolve_workers(workers: int) -> int:
- if workers < 0:
- raise LockerError("--workers 不能小于0")
- if workers == 0:
- cpu = os.cpu_count() or 1
- return max(1, min(32, cpu * 2))
- return workers
- def run_encrypt(target: Path, password: str, chunk_mb: int, all_files: bool, workers: int, names: list[str] | None = None) -> int:
- chunk_size = chunk_mb * 1024 * 1024
- worker_count = _resolve_workers(workers)
- ok = 0
- skipped = 0
- failed = 0
- file_iter = _iter_files_by_names(target, names, all_files=all_files) if names else _iter_files(target, all_files=all_files)
- files = list(file_iter)
- if worker_count == 1:
- results = ((p, encrypt_file(p, password, chunk_size)) for p in files)
- else:
- with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as ex:
- mapped = ex.map(lambda p: encrypt_file(p, password, chunk_size), files)
- results = zip(files, mapped)
- for p, res in results:
- if res == "ok":
- ok += 1
- print(f"[OK] encrypted: {p}")
- elif res.startswith("fail"):
- failed += 1
- print(f"[FAIL] {p} -> {res}")
- else:
- skipped += 1
- print(f"[SKIP] {p} -> {res}")
- print(f"完成: encrypted={ok}, failed={failed}, skipped={skipped}, chunk={chunk_mb}MB, workers={worker_count}")
- return 2 if failed else 0
- def run_decrypt(target: Path, password: str, all_files: bool, workers: int, names: list[str] | None = None) -> int:
- worker_count = _resolve_workers(workers)
- ok = 0
- skipped = 0
- failed = 0
- file_iter = _iter_files_by_names(target, names, all_files=all_files) if names else _iter_files(target, all_files=all_files)
- files = list(file_iter)
- if worker_count == 1:
- results = ((p, decrypt_file(p, password)) for p in files)
- else:
- with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as ex:
- mapped = ex.map(lambda p: decrypt_file(p, password), files)
- results = zip(files, mapped)
- for p, res in results:
- if res == "ok":
- ok += 1
- print(f"[OK] decrypted: {p}")
- elif res.startswith("fail"):
- failed += 1
- print(f"[FAIL] {p} -> {res}")
- else:
- skipped += 1
- print(f"[SKIP] {p} -> {res}")
- print(f"完成: decrypted={ok}, failed={failed}, skipped={skipped}, workers={worker_count}")
- return 2 if failed else 0
- def run_status(target: Path, all_files: bool) -> int:
- total = 0
- encrypted = 0
- for p in _iter_files(target, all_files=all_files):
- total += 1
- flag = is_encrypted(p)
- encrypted += int(flag)
- print(f"{'[ENC]' if flag else '[RAW]'} {p}")
- print(f"统计: total={total}, encrypted={encrypted}, raw={total - encrypted}")
- return 0
- def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="快速加密工具"
- )
- sub = parser.add_subparsers(dest="cmd", required=True)
- for name in ("lock", "unlock", "status", "lock-name", "unlock-name"):
- aliases: list[str] = []
- if name == "lock":
- aliases = ["encrypt"]
- elif name == "unlock":
- aliases = ["decrypt"]
- elif name == "lock-name":
- aliases = ["encrypt-name"]
- elif name == "unlock-name":
- aliases = ["decrypt-name"]
- p = sub.add_parser(name, aliases=aliases)
- p.add_argument("target", help="文件或目录")
- if name in ("lock-name", "unlock-name"):
- p.add_argument("names", nargs="+", help="文件名或相对路径(可一次传多个)")
- p.add_argument("--media-only", action="store_true", help="仅处理媒体后缀(默认处理所有文件)")
- if name in ("lock", "lock-name"):
- p.add_argument("--chunk-mb", type=int, default=8, help="加密前多少MB(默认8)")
- if name in ("lock", "unlock", "lock-name", "unlock-name"):
- p.add_argument("--password", help="密码(不传则交互输入)")
- p.add_argument("--workers", type=int, default=0, help="并发线程数,0=自动(默认)")
- return parser
- def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- target = Path(args.target)
- if not target.exists():
- print(f"目标不存在: {target}", file=sys.stderr)
- return 1
- try:
- all_files = not args.media_only
- if args.cmd in ("lock", "encrypt"):
- if args.chunk_mb <= 0:
- raise LockerError("--chunk-mb 必须大于0")
- pw = args.password if args.password else _ask_password(confirm=True)
- return run_encrypt(target, pw, args.chunk_mb, all_files=all_files, workers=args.workers)
- if args.cmd in ("unlock", "decrypt"):
- pw = args.password if args.password else _ask_password(confirm=False)
- return run_decrypt(target, pw, all_files=all_files, workers=args.workers)
- if args.cmd in ("lock-name", "encrypt-name"):
- if args.chunk_mb <= 0:
- raise LockerError("--chunk-mb 必须大于0")
- pw = args.password if args.password else _ask_password(confirm=True)
- return run_encrypt(target, pw, args.chunk_mb, all_files=all_files, workers=args.workers, names=args.names)
- if args.cmd in ("unlock-name", "decrypt-name"):
- pw = args.password if args.password else _ask_password(confirm=False)
- return run_decrypt(target, pw, all_files=all_files, workers=args.workers, names=args.names)
- return run_status(target, all_files=all_files)
- except LockerError as e:
- print(f"错误: {e}", file=sys.stderr)
- return 1
- except KeyboardInterrupt:
- print("已取消", file=sys.stderr)
- return 130
- if __name__ == "__main__":
- raise SystemExit(main())
|