fast_media_lock.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. #!/usr/bin/env python3
  2. """
  3. Fast partial media locker (cross-platform, Python stdlib only).
  4. Goal:
  5. - Very fast "lock/unlock" by encrypting only the first N MB of each file.
  6. - No separate key file; password only.
  7. - Metadata is embedded in each encrypted file tail, so files can be moved anywhere
  8. and still be unlocked by this script.
  9. Security note:
  10. - This is NOT full-file encryption. It is designed for speed/obfuscation.
  11. """
  12. from __future__ import annotations
  13. import argparse
  14. import concurrent.futures
  15. import getpass
  16. import hashlib
  17. import hmac
  18. import os
  19. import struct
  20. import sys
  21. import zlib
  22. from pathlib import Path
  23. from typing import Iterable
  24. MAGIC = b"FMLKv1!!"
  25. VERSION = 1
  26. SALT_SIZE = 16
  27. NONCE_SIZE = 16
  28. VERIFIER_SIZE = 16
  29. LOCKED_SUFFIX = ".lockx"
  30. # magic(8) + version(1) + reserved(3) + chunk_size(8) + original_size(8)
  31. # + salt(16) + nonce(16) + verifier(16) + crc32(4)
  32. TRAILER_STRUCT = struct.Struct(">8sB3sQQ16s16s16sI")
  33. TRAILER_SIZE = TRAILER_STRUCT.size
  34. MEDIA_EXTS = {
  35. ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".m4v", ".ts", ".m2ts",
  36. ".mp3", ".wav", ".flac", ".aac", ".m4a", ".ogg", ".wma",
  37. ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".heic",
  38. }
  39. class LockerError(Exception):
  40. pass
  41. def _derive_keys(password: str, salt: bytes) -> tuple[bytes, bytes]:
  42. # scrypt is in stdlib and works on Linux/Windows without extra dependencies.
  43. km = hashlib.scrypt(
  44. password.encode("utf-8"),
  45. salt=salt,
  46. n=2**14,
  47. r=8,
  48. p=1,
  49. dklen=64,
  50. )
  51. return km[:32], km[32:]
  52. def _keystream(stream_key: bytes, nonce: bytes, length: int) -> bytes:
  53. if length <= 0:
  54. return b""
  55. blocks = (length + 31) // 32
  56. out = bytearray(blocks * 32)
  57. mv = memoryview(out)
  58. off = 0
  59. for counter in range(blocks):
  60. block = hmac.digest(stream_key, nonce + counter.to_bytes(8, "big"), hashlib.sha256)
  61. mv[off : off + 32] = block
  62. off += 32
  63. return bytes(mv[:length])
  64. def _xor_bytes(data: bytes, stream_key: bytes, nonce: bytes) -> bytes:
  65. if not data:
  66. return b""
  67. ks = _keystream(stream_key, nonce, len(data))
  68. # Python-level byte loops are slow for MB-size chunks.
  69. x = int.from_bytes(data, "little") ^ int.from_bytes(ks, "little")
  70. return x.to_bytes(len(data), "little")
  71. def _build_verifier(check_key: bytes) -> bytes:
  72. return hmac.digest(check_key, b"FMLK-PASSWORD-CHECK", hashlib.sha256)[:VERIFIER_SIZE]
  73. def _build_trailer(chunk_size: int, original_size: int, salt: bytes, nonce: bytes, verifier: bytes) -> bytes:
  74. head = TRAILER_STRUCT.pack(
  75. MAGIC,
  76. VERSION,
  77. b"\x00\x00\x00",
  78. chunk_size,
  79. original_size,
  80. salt,
  81. nonce,
  82. verifier,
  83. 0,
  84. )
  85. crc = zlib.crc32(head[:-4]) & 0xFFFFFFFF
  86. return head[:-4] + struct.pack(">I", crc)
  87. def _parse_trailer(raw: bytes) -> dict | None:
  88. if len(raw) != TRAILER_SIZE:
  89. return None
  90. magic, version, _reserved, chunk_size, original_size, salt, nonce, verifier, crc = TRAILER_STRUCT.unpack(raw)
  91. if magic != MAGIC or version != VERSION:
  92. return None
  93. expect_crc = zlib.crc32(raw[:-4]) & 0xFFFFFFFF
  94. if crc != expect_crc:
  95. return None
  96. return {
  97. "chunk_size": chunk_size,
  98. "original_size": original_size,
  99. "salt": salt,
  100. "nonce": nonce,
  101. "verifier": verifier,
  102. }
  103. def _read_trailer(path: Path) -> dict | None:
  104. size = path.stat().st_size
  105. if size < TRAILER_SIZE:
  106. return None
  107. with path.open("rb") as f:
  108. f.seek(-TRAILER_SIZE, os.SEEK_END)
  109. raw = f.read(TRAILER_SIZE)
  110. return _parse_trailer(raw)
  111. def is_encrypted(path: Path) -> bool:
  112. meta = _read_trailer(path)
  113. if not meta:
  114. return False
  115. size = path.stat().st_size
  116. # Basic sanity: encrypted file size should be original + trailer.
  117. return meta["original_size"] + TRAILER_SIZE == size
  118. def _iter_files(target: Path, all_files: bool) -> Iterable[Path]:
  119. if target.is_file():
  120. yield target
  121. return
  122. for p in target.rglob("*"):
  123. if not p.is_file():
  124. continue
  125. if not all_files and p.suffix.lower() not in MEDIA_EXTS:
  126. continue
  127. yield p
  128. def _iter_files_by_names(target: Path, names: list[str], all_files: bool) -> Iterable[Path]:
  129. if target.is_file():
  130. selected = {target.resolve()}
  131. else:
  132. selected: set[Path] = set()
  133. raw_names = [n.strip() for n in names if n.strip()]
  134. basename_set = {n for n in raw_names if "/" not in n and "\\" not in n}
  135. for raw in raw_names:
  136. if "/" in raw or "\\" in raw:
  137. rel = Path(raw)
  138. cand = (target / rel).resolve()
  139. if cand.is_file():
  140. selected.add(cand)
  141. for p in _iter_files(target, all_files=all_files):
  142. if p.name in basename_set:
  143. selected.add(p.resolve())
  144. for p in sorted(selected):
  145. if not all_files and p.suffix.lower() not in MEDIA_EXTS:
  146. continue
  147. yield p
  148. def _locked_name(path: Path) -> Path:
  149. if path.name.endswith(LOCKED_SUFFIX):
  150. return path
  151. return path.with_name(path.name + LOCKED_SUFFIX)
  152. def _unlocked_name(path: Path) -> Path:
  153. if path.name.endswith(LOCKED_SUFFIX):
  154. return path.with_name(path.name[: -len(LOCKED_SUFFIX)])
  155. return path
  156. def encrypt_file(path: Path, password: str, chunk_size: int) -> str:
  157. if not path.exists() or not path.is_file():
  158. return "skip(not_file)"
  159. if is_encrypted(path):
  160. return "skip(already_encrypted)"
  161. original_size = path.stat().st_size
  162. if original_size == 0:
  163. return "skip(empty)"
  164. locked_path = _locked_name(path)
  165. if locked_path != path and locked_path.exists():
  166. return "fail(name_conflict)"
  167. salt = os.urandom(SALT_SIZE)
  168. nonce = os.urandom(NONCE_SIZE)
  169. stream_key, check_key = _derive_keys(password, salt)
  170. verifier = _build_verifier(check_key)
  171. n = min(chunk_size, original_size)
  172. with path.open("r+b") as f:
  173. plain = f.read(n)
  174. cipher = _xor_bytes(plain, stream_key, nonce)
  175. f.seek(0)
  176. f.write(cipher)
  177. trailer = _build_trailer(chunk_size=chunk_size, original_size=original_size, salt=salt, nonce=nonce, verifier=verifier)
  178. f.seek(0, os.SEEK_END)
  179. f.write(trailer)
  180. if locked_path != path:
  181. path.rename(locked_path)
  182. return "ok"
  183. def decrypt_file(path: Path, password: str) -> str:
  184. if not path.exists() or not path.is_file():
  185. return "skip(not_file)"
  186. meta = _read_trailer(path)
  187. if not meta:
  188. return "skip(not_encrypted)"
  189. size = path.stat().st_size
  190. if meta["original_size"] + TRAILER_SIZE != size:
  191. return "skip(invalid_layout)"
  192. unlocked_path = _unlocked_name(path)
  193. if unlocked_path != path and unlocked_path.exists():
  194. return "fail(name_conflict)"
  195. stream_key, check_key = _derive_keys(password, meta["salt"])
  196. verifier = _build_verifier(check_key)
  197. if not hmac.compare_digest(verifier, meta["verifier"]):
  198. return "fail(wrong_password)"
  199. n = min(meta["chunk_size"], meta["original_size"])
  200. with path.open("r+b") as f:
  201. cipher = f.read(n)
  202. plain = _xor_bytes(cipher, stream_key, meta["nonce"])
  203. f.seek(0)
  204. f.write(plain)
  205. f.truncate(meta["original_size"])
  206. if unlocked_path != path:
  207. path.rename(unlocked_path)
  208. return "ok"
  209. def _ask_password(confirm: bool) -> str:
  210. pw = getpass.getpass("请输入密码: ")
  211. if not pw:
  212. raise LockerError("密码不能为空")
  213. if confirm:
  214. pw2 = getpass.getpass("请再次输入密码: ")
  215. if pw != pw2:
  216. raise LockerError("两次密码不一致")
  217. return pw
  218. def _resolve_workers(workers: int) -> int:
  219. if workers < 0:
  220. raise LockerError("--workers 不能小于0")
  221. if workers == 0:
  222. cpu = os.cpu_count() or 1
  223. return max(1, min(32, cpu * 2))
  224. return workers
  225. def run_encrypt(target: Path, password: str, chunk_mb: int, all_files: bool, workers: int, names: list[str] | None = None) -> int:
  226. chunk_size = chunk_mb * 1024 * 1024
  227. worker_count = _resolve_workers(workers)
  228. ok = 0
  229. skipped = 0
  230. failed = 0
  231. file_iter = _iter_files_by_names(target, names, all_files=all_files) if names else _iter_files(target, all_files=all_files)
  232. files = list(file_iter)
  233. if worker_count == 1:
  234. results = ((p, encrypt_file(p, password, chunk_size)) for p in files)
  235. else:
  236. with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as ex:
  237. mapped = ex.map(lambda p: encrypt_file(p, password, chunk_size), files)
  238. results = zip(files, mapped)
  239. for p, res in results:
  240. if res == "ok":
  241. ok += 1
  242. print(f"[OK] encrypted: {p}")
  243. elif res.startswith("fail"):
  244. failed += 1
  245. print(f"[FAIL] {p} -> {res}")
  246. else:
  247. skipped += 1
  248. print(f"[SKIP] {p} -> {res}")
  249. print(f"完成: encrypted={ok}, failed={failed}, skipped={skipped}, chunk={chunk_mb}MB, workers={worker_count}")
  250. return 2 if failed else 0
  251. def run_decrypt(target: Path, password: str, all_files: bool, workers: int, names: list[str] | None = None) -> int:
  252. worker_count = _resolve_workers(workers)
  253. ok = 0
  254. skipped = 0
  255. failed = 0
  256. file_iter = _iter_files_by_names(target, names, all_files=all_files) if names else _iter_files(target, all_files=all_files)
  257. files = list(file_iter)
  258. if worker_count == 1:
  259. results = ((p, decrypt_file(p, password)) for p in files)
  260. else:
  261. with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as ex:
  262. mapped = ex.map(lambda p: decrypt_file(p, password), files)
  263. results = zip(files, mapped)
  264. for p, res in results:
  265. if res == "ok":
  266. ok += 1
  267. print(f"[OK] decrypted: {p}")
  268. elif res.startswith("fail"):
  269. failed += 1
  270. print(f"[FAIL] {p} -> {res}")
  271. else:
  272. skipped += 1
  273. print(f"[SKIP] {p} -> {res}")
  274. print(f"完成: decrypted={ok}, failed={failed}, skipped={skipped}, workers={worker_count}")
  275. return 2 if failed else 0
  276. def run_status(target: Path, all_files: bool) -> int:
  277. total = 0
  278. encrypted = 0
  279. for p in _iter_files(target, all_files=all_files):
  280. total += 1
  281. flag = is_encrypted(p)
  282. encrypted += int(flag)
  283. print(f"{'[ENC]' if flag else '[RAW]'} {p}")
  284. print(f"统计: total={total}, encrypted={encrypted}, raw={total - encrypted}")
  285. return 0
  286. def build_parser() -> argparse.ArgumentParser:
  287. parser = argparse.ArgumentParser(
  288. description="快速加密工具"
  289. )
  290. sub = parser.add_subparsers(dest="cmd", required=True)
  291. for name in ("lock", "unlock", "status", "lock-name", "unlock-name"):
  292. aliases: list[str] = []
  293. if name == "lock":
  294. aliases = ["encrypt"]
  295. elif name == "unlock":
  296. aliases = ["decrypt"]
  297. elif name == "lock-name":
  298. aliases = ["encrypt-name"]
  299. elif name == "unlock-name":
  300. aliases = ["decrypt-name"]
  301. p = sub.add_parser(name, aliases=aliases)
  302. p.add_argument("target", help="文件或目录")
  303. if name in ("lock-name", "unlock-name"):
  304. p.add_argument("names", nargs="+", help="文件名或相对路径(可一次传多个)")
  305. p.add_argument("--media-only", action="store_true", help="仅处理媒体后缀(默认处理所有文件)")
  306. if name in ("lock", "lock-name"):
  307. p.add_argument("--chunk-mb", type=int, default=8, help="加密前多少MB(默认8)")
  308. if name in ("lock", "unlock", "lock-name", "unlock-name"):
  309. p.add_argument("--password", help="密码(不传则交互输入)")
  310. p.add_argument("--workers", type=int, default=0, help="并发线程数,0=自动(默认)")
  311. return parser
  312. def main() -> int:
  313. parser = build_parser()
  314. args = parser.parse_args()
  315. target = Path(args.target)
  316. if not target.exists():
  317. print(f"目标不存在: {target}", file=sys.stderr)
  318. return 1
  319. try:
  320. all_files = not args.media_only
  321. if args.cmd in ("lock", "encrypt"):
  322. if args.chunk_mb <= 0:
  323. raise LockerError("--chunk-mb 必须大于0")
  324. pw = args.password if args.password else _ask_password(confirm=True)
  325. return run_encrypt(target, pw, args.chunk_mb, all_files=all_files, workers=args.workers)
  326. if args.cmd in ("unlock", "decrypt"):
  327. pw = args.password if args.password else _ask_password(confirm=False)
  328. return run_decrypt(target, pw, all_files=all_files, workers=args.workers)
  329. if args.cmd in ("lock-name", "encrypt-name"):
  330. if args.chunk_mb <= 0:
  331. raise LockerError("--chunk-mb 必须大于0")
  332. pw = args.password if args.password else _ask_password(confirm=True)
  333. return run_encrypt(target, pw, args.chunk_mb, all_files=all_files, workers=args.workers, names=args.names)
  334. if args.cmd in ("unlock-name", "decrypt-name"):
  335. pw = args.password if args.password else _ask_password(confirm=False)
  336. return run_decrypt(target, pw, all_files=all_files, workers=args.workers, names=args.names)
  337. return run_status(target, all_files=all_files)
  338. except LockerError as e:
  339. print(f"错误: {e}", file=sys.stderr)
  340. return 1
  341. except KeyboardInterrupt:
  342. print("已取消", file=sys.stderr)
  343. return 130
  344. if __name__ == "__main__":
  345. raise SystemExit(main())