main.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. from __future__ import annotations
  2. import json
  3. import mimetypes
  4. import shutil
  5. import uuid
  6. import base64
  7. import urllib.error
  8. import urllib.parse
  9. import urllib.request
  10. from pathlib import Path
  11. from typing import Any
  12. from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
  13. from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
  14. from fastapi.staticfiles import StaticFiles
  15. from fastapi.templating import Jinja2Templates
  16. from pydantic import BaseModel
  17. BASE_DIR = Path(__file__).resolve().parent.parent
  18. LOCAL_MUSIC_DIR = BASE_DIR / "mp3file"
  19. CLOUD_MUSIC_DIR = Path("/mnt/baiducloud/百度网盘/mp3file")
  20. CLOUD_WEBDAV_BASE = "http://110.42.102.94:5244/dav"
  21. CLOUD_ALIST_BASE = "http://110.42.102.94:5244"
  22. CLOUD_WEBDAV_USER = "sequoia00"
  23. CLOUD_WEBDAV_PASSWORD = "792199bb"
  24. MUSIC_ROOTS: list[tuple[str, Path, str]] = [
  25. ("", LOCAL_MUSIC_DIR, "本地音乐"),
  26. ("cloud", CLOUD_MUSIC_DIR, "百度网盘"),
  27. ]
  28. PLAYLISTS_FILE = BASE_DIR / "playlists.json"
  29. SUPPORTED_EXTENSIONS = {
  30. ".mp3",
  31. ".wav",
  32. ".flac",
  33. ".m3u",
  34. ".m3u8",
  35. ".ogg",
  36. ".aac",
  37. ".wma",
  38. ".opus",
  39. ".oga",
  40. ".mp4",
  41. ".m4a",
  42. ".webm",
  43. }
  44. IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
  45. app = FastAPI(title="MusicWeb", version="1.0.0")
  46. app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
  47. templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
  48. class FolderCreateRequest(BaseModel):
  49. path: str
  50. class MoveRequest(BaseModel):
  51. source: str
  52. destination_dir: str
  53. class PlaylistCreateRequest(BaseModel):
  54. name: str
  55. tracks: list[str]
  56. class PlaylistUpdateRequest(BaseModel):
  57. tracks: list[str]
  58. def ensure_storage() -> None:
  59. LOCAL_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
  60. CLOUD_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
  61. if not PLAYLISTS_FILE.exists():
  62. PLAYLISTS_FILE.write_text("[]", encoding="utf-8")
  63. def safe_music_path(relative_path: str) -> Path:
  64. prefix, _, inner_path = relative_path.partition("/")
  65. roots = {key: root.resolve() for key, root, _ in MUSIC_ROOTS}
  66. if prefix in roots:
  67. candidate = (roots[prefix] / inner_path).resolve()
  68. if candidate != roots[prefix] and roots[prefix] not in candidate.parents:
  69. raise HTTPException(status_code=400, detail="Invalid path")
  70. return candidate
  71. candidate = (LOCAL_MUSIC_DIR / relative_path).resolve()
  72. local_root = LOCAL_MUSIC_DIR.resolve()
  73. if candidate != local_root and local_root not in candidate.parents:
  74. raise HTTPException(status_code=400, detail="Invalid path")
  75. return candidate
  76. def load_playlists() -> list[dict[str, Any]]:
  77. ensure_storage()
  78. return json.loads(PLAYLISTS_FILE.read_text(encoding="utf-8"))
  79. def save_playlists(playlists: list[dict[str, Any]]) -> None:
  80. PLAYLISTS_FILE.write_text(
  81. json.dumps(playlists, ensure_ascii=False, indent=2), encoding="utf-8"
  82. )
  83. def is_under_root(path: Path, root: Path) -> bool:
  84. resolved_path = path.resolve()
  85. resolved_root = root.resolve()
  86. return resolved_path == resolved_root or resolved_root in resolved_path.parents
  87. def track_path_for(root: Path, path: Path, prefix: str = "") -> str:
  88. relative = path.relative_to(root).as_posix()
  89. return f"{prefix}/{relative}" if prefix else relative
  90. def track_path_from_abs(path: Path) -> str:
  91. if is_under_root(path, LOCAL_MUSIC_DIR):
  92. return path.relative_to(LOCAL_MUSIC_DIR).as_posix()
  93. if is_under_root(path, CLOUD_MUSIC_DIR):
  94. return f"cloud/{path.relative_to(CLOUD_MUSIC_DIR).as_posix()}"
  95. return path.name
  96. def cloud_webdav_url(relative_path: str) -> str:
  97. inner_path = relative_path.removeprefix("cloud/").lstrip("/")
  98. encoded = "/".join(urllib.parse.quote(part) for part in inner_path.split("/") if part)
  99. base = CLOUD_WEBDAV_BASE.rstrip("/")
  100. return f"{base}/{encoded}" if encoded else base
  101. def cloud_alist_api_path(relative_path: str) -> str:
  102. inner_path = relative_path.removeprefix("cloud/").lstrip("/")
  103. return f"/百度网盘/mp3file/{inner_path}" if inner_path else "/百度网盘/mp3file"
  104. def alist_login_token() -> str:
  105. payload = json.dumps(
  106. {
  107. "username": CLOUD_WEBDAV_USER,
  108. "password": CLOUD_WEBDAV_PASSWORD,
  109. }
  110. ).encode("utf-8")
  111. request = urllib.request.Request(
  112. f"{CLOUD_ALIST_BASE.rstrip('/')}/api/auth/login",
  113. data=payload,
  114. headers={"Content-Type": "application/json", "User-Agent": "MusicWebPlayer/1.0"},
  115. )
  116. try:
  117. with urllib.request.urlopen(request, timeout=20) as response:
  118. result = json.loads(response.read().decode("utf-8"))
  119. except urllib.error.HTTPError as error:
  120. raise HTTPException(status_code=error.code, detail=error.reason)
  121. except urllib.error.URLError as error:
  122. raise HTTPException(status_code=502, detail=str(error.reason))
  123. token = result.get("data", {}).get("token")
  124. if not token:
  125. raise HTTPException(status_code=502, detail="AList login failed")
  126. return token
  127. def cloud_raw_url(relative_path: str) -> str:
  128. payload = json.dumps({"path": cloud_alist_api_path(relative_path), "password": ""}).encode("utf-8")
  129. request = urllib.request.Request(
  130. f"{CLOUD_ALIST_BASE.rstrip('/')}/api/fs/get",
  131. data=payload,
  132. headers={
  133. "Content-Type": "application/json",
  134. "Authorization": alist_login_token(),
  135. "User-Agent": "MusicWebPlayer/1.0",
  136. },
  137. )
  138. try:
  139. with urllib.request.urlopen(request, timeout=20) as response:
  140. result = json.loads(response.read().decode("utf-8"))
  141. except urllib.error.HTTPError as error:
  142. raise HTTPException(status_code=error.code, detail=error.reason)
  143. except urllib.error.URLError as error:
  144. raise HTTPException(status_code=502, detail=str(error.reason))
  145. data = result.get("data", {})
  146. sign = data.get("sign")
  147. path = data.get("path")
  148. if not sign or not path:
  149. raise HTTPException(status_code=404, detail="Cloud signed url not found")
  150. encoded_path = "/".join(urllib.parse.quote(part) for part in path.lstrip("/").split("/"))
  151. quoted_sign = urllib.parse.quote(sign, safe="")
  152. return f"{CLOUD_ALIST_BASE.rstrip('/')}/d/{urllib.parse.quote('百度网盘')}/{encoded_path}?sign={quoted_sign}"
  153. def is_supported_file(path: Path) -> bool:
  154. return path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS
  155. def track_url(relative_path: str) -> str:
  156. return f"/api/stream/{relative_path}"
  157. def build_track(relative_path: str) -> dict[str, str]:
  158. filename = Path(relative_path).name
  159. return {
  160. "id": relative_path,
  161. "name": filename,
  162. "path": relative_path,
  163. "url": track_url(relative_path),
  164. "folder": str(Path(relative_path).parent).replace(".", "").strip("/"),
  165. }
  166. def find_cover_for_directory(root: Path, prefix: str = "") -> str | None:
  167. candidates = (
  168. "cover.jpg",
  169. "cover.jpeg",
  170. "cover.png",
  171. "cover.webp",
  172. "folder.jpg",
  173. "folder.jpeg",
  174. "folder.png",
  175. "folder.webp",
  176. )
  177. for candidate in candidates:
  178. path = root / candidate
  179. if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS:
  180. return track_path_for(root, path, prefix)
  181. for child in sorted(root.iterdir(), key=lambda item: item.name.lower()):
  182. if child.is_file() and child.suffix.lower() in IMAGE_EXTENSIONS:
  183. return track_path_for(root, child, prefix)
  184. return None
  185. def build_tree(root: Path, prefix: str = "", label: str = "音乐库") -> dict[str, Any]:
  186. cover_path = find_cover_for_directory(root, prefix)
  187. node = {
  188. "name": label,
  189. "path": prefix,
  190. "cover": f"/api/cover/{cover_path}" if cover_path else None,
  191. "folders": [],
  192. "tracks": [],
  193. }
  194. for child in sorted(root.iterdir(), key=lambda item: (item.is_file(), item.name.lower())):
  195. if child.is_dir():
  196. child_rel = child.relative_to(root).as_posix()
  197. child_prefix = f"{prefix}/{child_rel}" if prefix else child_rel
  198. node["folders"].append(build_tree(child, child_prefix, child.name))
  199. elif is_supported_file(child):
  200. node["tracks"].append(build_track(track_path_for(root, child, prefix)))
  201. return node
  202. def collect_tracks(root: Path, prefix: str = "") -> list[dict[str, str]]:
  203. tracks: list[dict[str, str]] = []
  204. for path in sorted(root.rglob("*")):
  205. if is_supported_file(path):
  206. tracks.append(build_track(track_path_for(root, path, prefix)))
  207. return tracks
  208. def root_entry(source: str) -> tuple[str, Path, str]:
  209. for prefix, root, label in MUSIC_ROOTS:
  210. if prefix == source:
  211. return prefix, root, label
  212. raise HTTPException(status_code=404, detail="Library source not found")
  213. def build_library_payload(source: str | None = None) -> dict[str, Any]:
  214. ensure_storage()
  215. entries = MUSIC_ROOTS if source is None else [root_entry(source)]
  216. tree = {
  217. "name": "音乐库",
  218. "path": "",
  219. "cover": None,
  220. "folders": [
  221. build_tree(root, prefix, label)
  222. for prefix, root, label in entries
  223. if root.exists()
  224. ],
  225. "tracks": [],
  226. }
  227. all_tracks: list[dict[str, str]] = []
  228. for prefix, root, _ in entries:
  229. if root.exists():
  230. all_tracks.extend(collect_tracks(root, prefix))
  231. return {
  232. "tree": tree,
  233. "all_tracks": all_tracks,
  234. "playlists": load_playlists(),
  235. }
  236. @app.on_event("startup")
  237. def startup_event() -> None:
  238. ensure_storage()
  239. @app.get("/", response_class=HTMLResponse)
  240. def index(request: Request) -> HTMLResponse:
  241. return templates.TemplateResponse("index.html", {"request": request})
  242. @app.get("/api/library")
  243. def library() -> JSONResponse:
  244. return JSONResponse(build_library_payload())
  245. @app.get("/api/library/{source}")
  246. def library_by_source(source: str) -> JSONResponse:
  247. normalized = "" if source == "local" else source
  248. return JSONResponse(build_library_payload(normalized))
  249. @app.get("/api/stream/{file_path:path}")
  250. def stream_file(request: Request, file_path: str) -> StreamingResponse:
  251. if file_path.startswith("cloud/"):
  252. return proxy_cloud_stream(cloud_raw_url(file_path), request.headers.get("range"))
  253. file = safe_music_path(file_path)
  254. if not file.exists() or not file.is_file():
  255. raise HTTPException(status_code=404, detail="File not found")
  256. media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
  257. def file_iterator() -> Any:
  258. with file.open("rb") as handle:
  259. while chunk := handle.read(1024 * 1024):
  260. if not chunk:
  261. break
  262. yield chunk
  263. return StreamingResponse(
  264. file_iterator(),
  265. media_type=media_type,
  266. headers={"Cache-Control": "private, no-cache"},
  267. )
  268. @app.get("/api/cloud-url/{file_path:path}")
  269. def get_cloud_url(file_path: str) -> JSONResponse:
  270. if not file_path.startswith("cloud/"):
  271. raise HTTPException(status_code=400, detail="Not a cloud track")
  272. return JSONResponse({"url": cloud_raw_url(file_path)})
  273. def proxy_cloud_stream(remote_url: str, range_header: str | None = None) -> StreamingResponse:
  274. auth = base64.b64encode(f"{CLOUD_WEBDAV_USER}:{CLOUD_WEBDAV_PASSWORD}".encode("utf-8")).decode("ascii")
  275. headers = {
  276. "Authorization": f"Basic {auth}",
  277. "User-Agent": "MusicWebPlayer/1.0",
  278. }
  279. if range_header:
  280. headers["Range"] = range_header
  281. request = urllib.request.Request(
  282. remote_url,
  283. headers=headers,
  284. )
  285. try:
  286. response = urllib.request.urlopen(request, timeout=30)
  287. except urllib.error.HTTPError as error:
  288. raise HTTPException(status_code=error.code, detail=error.reason)
  289. except urllib.error.URLError as error:
  290. raise HTTPException(status_code=502, detail=str(error.reason))
  291. media_type = mimetypes.guess_type(urllib.parse.urlparse(remote_url).path)[0] or response.headers.get_content_type() or "audio/mpeg"
  292. passthrough_headers = {
  293. "Cache-Control": "private, no-cache",
  294. "Accept-Ranges": response.headers.get("Accept-Ranges") or "bytes",
  295. "Content-Type": media_type,
  296. }
  297. for key in ("ETag", "Last-Modified", "Content-Range"):
  298. value = response.headers.get(key)
  299. if value:
  300. passthrough_headers[key] = value
  301. def remote_iterator() -> Any:
  302. with response:
  303. while chunk := response.read(1024 * 1024):
  304. yield chunk
  305. return StreamingResponse(
  306. remote_iterator(),
  307. status_code=getattr(response, "status", 200),
  308. media_type=media_type,
  309. headers=passthrough_headers,
  310. )
  311. @app.get("/api/cover/{file_path:path}")
  312. def cover_file(request: Request, file_path: str):
  313. if file_path.startswith("cloud/"):
  314. if Path(file_path).suffix.lower() not in IMAGE_EXTENSIONS:
  315. raise HTTPException(status_code=404, detail="Cover not found")
  316. return proxy_cloud_stream(cloud_raw_url(file_path), request.headers.get("range"))
  317. file = safe_music_path(file_path)
  318. if not file.exists() or not file.is_file() or file.suffix.lower() not in IMAGE_EXTENSIONS:
  319. raise HTTPException(status_code=404, detail="Cover not found")
  320. media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
  321. return FileResponse(file, media_type=media_type, filename=file.name)
  322. @app.post("/api/upload")
  323. async def upload_files(
  324. files: list[UploadFile] = File(...),
  325. target_dir: str = Form(default=""),
  326. ) -> JSONResponse:
  327. destination = safe_music_path(target_dir)
  328. if not str(destination).startswith(str(LOCAL_MUSIC_DIR.resolve())):
  329. raise HTTPException(status_code=400, detail="Upload destination must be local music dir")
  330. destination.mkdir(parents=True, exist_ok=True)
  331. saved: list[str] = []
  332. for upload in files:
  333. suffix = Path(upload.filename or "").suffix.lower()
  334. if suffix not in SUPPORTED_EXTENSIONS:
  335. continue
  336. filename = Path(upload.filename or f"upload-{uuid.uuid4().hex}").name
  337. file_path = destination / filename
  338. with file_path.open("wb") as buffer:
  339. shutil.copyfileobj(upload.file, buffer)
  340. saved.append(track_path_from_abs(file_path))
  341. return JSONResponse({"saved": saved})
  342. @app.post("/api/folder")
  343. def create_folder(payload: FolderCreateRequest) -> JSONResponse:
  344. folder = safe_music_path(payload.path)
  345. if not str(folder).startswith(str(LOCAL_MUSIC_DIR.resolve())):
  346. raise HTTPException(status_code=400, detail="Folder must be created in local music dir")
  347. folder.mkdir(parents=True, exist_ok=True)
  348. return JSONResponse({"created": track_path_from_abs(folder)})
  349. @app.post("/api/move")
  350. def move_file(payload: MoveRequest) -> JSONResponse:
  351. source = safe_music_path(payload.source)
  352. destination_dir = safe_music_path(payload.destination_dir)
  353. if not source.exists():
  354. raise HTTPException(status_code=404, detail="Source not found")
  355. if not destination_dir.exists():
  356. destination_dir.mkdir(parents=True, exist_ok=True)
  357. destination = destination_dir / source.name
  358. shutil.move(str(source), str(destination))
  359. return JSONResponse({"moved": track_path_from_abs(destination)})
  360. @app.post("/api/playlists")
  361. def create_playlist(payload: PlaylistCreateRequest) -> JSONResponse:
  362. playlists = load_playlists()
  363. playlist = {
  364. "id": uuid.uuid4().hex,
  365. "name": payload.name.strip() or "未命名播放列表",
  366. "tracks": payload.tracks,
  367. }
  368. playlists.append(playlist)
  369. save_playlists(playlists)
  370. return JSONResponse(playlist)
  371. @app.put("/api/playlists/{playlist_id}")
  372. def update_playlist(playlist_id: str, payload: PlaylistUpdateRequest) -> JSONResponse:
  373. playlists = load_playlists()
  374. for playlist in playlists:
  375. if playlist["id"] == playlist_id:
  376. playlist["tracks"] = payload.tracks
  377. save_playlists(playlists)
  378. return JSONResponse(playlist)
  379. raise HTTPException(status_code=404, detail="Playlist not found")
  380. @app.delete("/api/playlists/{playlist_id}")
  381. def delete_playlist(playlist_id: str) -> JSONResponse:
  382. playlists = load_playlists()
  383. filtered = [playlist for playlist in playlists if playlist["id"] != playlist_id]
  384. if len(filtered) == len(playlists):
  385. raise HTTPException(status_code=404, detail="Playlist not found")
  386. save_playlists(filtered)
  387. return JSONResponse({"deleted": playlist_id})