main.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. from __future__ import annotations
  2. import json
  3. import mimetypes
  4. import shutil
  5. import uuid
  6. import base64
  7. import urllib.error
  8. import urllib.parse
  9. import urllib.request
  10. from pathlib import Path
  11. from typing import Any
  12. from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
  13. from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
  14. from fastapi.staticfiles import StaticFiles
  15. from fastapi.templating import Jinja2Templates
  16. from pydantic import BaseModel
  17. BASE_DIR = Path(__file__).resolve().parent.parent
  18. LOCAL_MUSIC_DIR = BASE_DIR / "mp3file"
  19. CLOUD_MUSIC_DIR = Path("/mnt/baiducloud/百度网盘/mp3file")
  20. CLOUD_WEBDAV_BASE = "http://110.42.102.94:5244/dav"
  21. CLOUD_ALIST_BASE = "http://110.42.102.94:5244"
  22. CLOUD_WEBDAV_USER = "sequoia00"
  23. CLOUD_WEBDAV_PASSWORD = "792199bb"
  24. MUSIC_ROOTS: list[tuple[str, Path, str]] = [
  25. ("", LOCAL_MUSIC_DIR, "音乐库"),
  26. ("cloud", CLOUD_MUSIC_DIR, "百度网盘"),
  27. ]
  28. PLAYLISTS_FILE = BASE_DIR / "playlists.json"
  29. SUPPORTED_EXTENSIONS = {
  30. ".mp3",
  31. ".wav",
  32. ".flac",
  33. ".m3u",
  34. ".m3u8",
  35. ".ogg",
  36. ".aac",
  37. ".wma",
  38. ".opus",
  39. ".oga",
  40. ".mp4",
  41. ".m4a",
  42. ".webm",
  43. }
  44. IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
  45. app = FastAPI(title="MusicWeb", version="1.0.0")
  46. app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
  47. templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
  48. class FolderCreateRequest(BaseModel):
  49. path: str
  50. class MoveRequest(BaseModel):
  51. source: str
  52. destination_dir: str
  53. class PlaylistCreateRequest(BaseModel):
  54. name: str
  55. tracks: list[str]
  56. class PlaylistUpdateRequest(BaseModel):
  57. tracks: list[str]
  58. def ensure_storage() -> None:
  59. LOCAL_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
  60. CLOUD_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
  61. if not PLAYLISTS_FILE.exists():
  62. PLAYLISTS_FILE.write_text("[]", encoding="utf-8")
  63. def safe_music_path(relative_path: str) -> Path:
  64. prefix, _, inner_path = relative_path.partition("/")
  65. roots = {key: root.resolve() for key, root, _ in MUSIC_ROOTS}
  66. if prefix in roots:
  67. candidate = (roots[prefix] / inner_path).resolve()
  68. if candidate != roots[prefix] and roots[prefix] not in candidate.parents:
  69. raise HTTPException(status_code=400, detail="Invalid path")
  70. return candidate
  71. candidate = (LOCAL_MUSIC_DIR / relative_path).resolve()
  72. local_root = LOCAL_MUSIC_DIR.resolve()
  73. if candidate != local_root and local_root not in candidate.parents:
  74. raise HTTPException(status_code=400, detail="Invalid path")
  75. return candidate
  76. def load_playlists() -> list[dict[str, Any]]:
  77. ensure_storage()
  78. return json.loads(PLAYLISTS_FILE.read_text(encoding="utf-8"))
  79. def save_playlists(playlists: list[dict[str, Any]]) -> None:
  80. PLAYLISTS_FILE.write_text(
  81. json.dumps(playlists, ensure_ascii=False, indent=2), encoding="utf-8"
  82. )
  83. def is_under_root(path: Path, root: Path) -> bool:
  84. resolved_path = path.resolve()
  85. resolved_root = root.resolve()
  86. return resolved_path == resolved_root or resolved_root in resolved_path.parents
  87. def track_path_for(root: Path, path: Path, prefix: str = "") -> str:
  88. relative = path.relative_to(root).as_posix()
  89. return f"{prefix}/{relative}" if prefix else relative
  90. def track_path_from_abs(path: Path) -> str:
  91. if is_under_root(path, LOCAL_MUSIC_DIR):
  92. return path.relative_to(LOCAL_MUSIC_DIR).as_posix()
  93. if is_under_root(path, CLOUD_MUSIC_DIR):
  94. return f"cloud/{path.relative_to(CLOUD_MUSIC_DIR).as_posix()}"
  95. return path.name
  96. def cloud_webdav_url(relative_path: str) -> str:
  97. inner_path = relative_path.removeprefix("cloud/").lstrip("/")
  98. encoded = "/".join(urllib.parse.quote(part) for part in inner_path.split("/") if part)
  99. base = CLOUD_WEBDAV_BASE.rstrip("/")
  100. return f"{base}/{encoded}" if encoded else base
  101. def cloud_alist_api_path(relative_path: str) -> str:
  102. inner_path = relative_path.removeprefix("cloud/").lstrip("/")
  103. return f"/百度网盘/mp3file/{inner_path}" if inner_path else "/百度网盘/mp3file"
  104. def alist_login_token() -> str:
  105. payload = json.dumps(
  106. {
  107. "username": CLOUD_WEBDAV_USER,
  108. "password": CLOUD_WEBDAV_PASSWORD,
  109. }
  110. ).encode("utf-8")
  111. request = urllib.request.Request(
  112. f"{CLOUD_ALIST_BASE.rstrip('/')}/api/auth/login",
  113. data=payload,
  114. headers={"Content-Type": "application/json", "User-Agent": "MusicWebPlayer/1.0"},
  115. )
  116. try:
  117. with urllib.request.urlopen(request, timeout=20) as response:
  118. result = json.loads(response.read().decode("utf-8"))
  119. except urllib.error.HTTPError as error:
  120. raise HTTPException(status_code=error.code, detail=error.reason)
  121. except urllib.error.URLError as error:
  122. raise HTTPException(status_code=502, detail=str(error.reason))
  123. token = result.get("data", {}).get("token")
  124. if not token:
  125. raise HTTPException(status_code=502, detail="AList login failed")
  126. return token
  127. def cloud_raw_url(relative_path: str) -> str:
  128. payload = json.dumps({"path": cloud_alist_api_path(relative_path), "password": ""}).encode("utf-8")
  129. request = urllib.request.Request(
  130. f"{CLOUD_ALIST_BASE.rstrip('/')}/api/fs/get",
  131. data=payload,
  132. headers={
  133. "Content-Type": "application/json",
  134. "Authorization": alist_login_token(),
  135. "User-Agent": "MusicWebPlayer/1.0",
  136. },
  137. )
  138. try:
  139. with urllib.request.urlopen(request, timeout=20) as response:
  140. result = json.loads(response.read().decode("utf-8"))
  141. except urllib.error.HTTPError as error:
  142. raise HTTPException(status_code=error.code, detail=error.reason)
  143. except urllib.error.URLError as error:
  144. raise HTTPException(status_code=502, detail=str(error.reason))
  145. raw_url = result.get("data", {}).get("raw_url")
  146. if not raw_url:
  147. raise HTTPException(status_code=404, detail="Cloud raw url not found")
  148. return raw_url
  149. def is_supported_file(path: Path) -> bool:
  150. return path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS
  151. def track_url(relative_path: str) -> str:
  152. return f"/api/stream/{relative_path}"
  153. def build_track(relative_path: str) -> dict[str, str]:
  154. filename = Path(relative_path).name
  155. return {
  156. "id": relative_path,
  157. "name": filename,
  158. "path": relative_path,
  159. "url": track_url(relative_path),
  160. "folder": str(Path(relative_path).parent).replace(".", "").strip("/"),
  161. }
  162. def find_cover_for_directory(root: Path, prefix: str = "") -> str | None:
  163. candidates = (
  164. "cover.jpg",
  165. "cover.jpeg",
  166. "cover.png",
  167. "cover.webp",
  168. "folder.jpg",
  169. "folder.jpeg",
  170. "folder.png",
  171. "folder.webp",
  172. )
  173. for candidate in candidates:
  174. path = root / candidate
  175. if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS:
  176. return track_path_for(root, path, prefix)
  177. for child in sorted(root.iterdir(), key=lambda item: item.name.lower()):
  178. if child.is_file() and child.suffix.lower() in IMAGE_EXTENSIONS:
  179. return track_path_for(root, child, prefix)
  180. return None
  181. def build_tree(root: Path, prefix: str = "", label: str = "音乐库") -> dict[str, Any]:
  182. cover_path = find_cover_for_directory(root, prefix)
  183. node = {
  184. "name": label,
  185. "path": prefix,
  186. "cover": f"/api/cover/{cover_path}" if cover_path else None,
  187. "folders": [],
  188. "tracks": [],
  189. }
  190. for child in sorted(root.iterdir(), key=lambda item: (item.is_file(), item.name.lower())):
  191. if child.is_dir():
  192. child_rel = child.relative_to(root).as_posix()
  193. child_prefix = f"{prefix}/{child_rel}" if prefix else child_rel
  194. node["folders"].append(build_tree(child, child_prefix, child.name))
  195. elif is_supported_file(child):
  196. node["tracks"].append(build_track(track_path_for(root, child, prefix)))
  197. return node
  198. def collect_tracks(root: Path, prefix: str = "") -> list[dict[str, str]]:
  199. tracks: list[dict[str, str]] = []
  200. for path in sorted(root.rglob("*")):
  201. if is_supported_file(path):
  202. tracks.append(build_track(track_path_for(root, path, prefix)))
  203. return tracks
  204. @app.on_event("startup")
  205. def startup_event() -> None:
  206. ensure_storage()
  207. @app.get("/", response_class=HTMLResponse)
  208. def index(request: Request) -> HTMLResponse:
  209. return templates.TemplateResponse("index.html", {"request": request})
  210. @app.get("/api/library")
  211. def library() -> JSONResponse:
  212. ensure_storage()
  213. tree = {
  214. "name": "音乐库",
  215. "path": "",
  216. "cover": None,
  217. "folders": [
  218. build_tree(root, prefix, label)
  219. for prefix, root, label in MUSIC_ROOTS
  220. if root.exists()
  221. ],
  222. "tracks": [],
  223. }
  224. all_tracks: list[dict[str, str]] = []
  225. for prefix, root, _ in MUSIC_ROOTS:
  226. if root.exists():
  227. all_tracks.extend(collect_tracks(root, prefix))
  228. return JSONResponse(
  229. {
  230. "tree": tree,
  231. "all_tracks": all_tracks,
  232. "playlists": load_playlists(),
  233. }
  234. )
  235. @app.get("/api/stream/{file_path:path}")
  236. def stream_file(request: Request, file_path: str) -> StreamingResponse:
  237. if file_path.startswith("cloud/"):
  238. return RedirectResponse(cloud_raw_url(file_path), status_code=307)
  239. file = safe_music_path(file_path)
  240. if not file.exists() or not file.is_file():
  241. raise HTTPException(status_code=404, detail="File not found")
  242. media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
  243. def file_iterator() -> Any:
  244. with file.open("rb") as handle:
  245. while chunk := handle.read(1024 * 1024):
  246. if not chunk:
  247. break
  248. yield chunk
  249. return StreamingResponse(
  250. file_iterator(),
  251. media_type=media_type,
  252. headers={"Cache-Control": "private, no-cache"},
  253. )
  254. @app.get("/api/cloud-url/{file_path:path}")
  255. def get_cloud_url(file_path: str) -> JSONResponse:
  256. if not file_path.startswith("cloud/"):
  257. raise HTTPException(status_code=400, detail="Not a cloud track")
  258. return JSONResponse({"url": cloud_raw_url(file_path)})
  259. def proxy_cloud_stream(remote_url: str, range_header: str | None = None) -> StreamingResponse:
  260. auth = base64.b64encode(f"{CLOUD_WEBDAV_USER}:{CLOUD_WEBDAV_PASSWORD}".encode("utf-8")).decode("ascii")
  261. headers = {
  262. "Authorization": f"Basic {auth}",
  263. "User-Agent": "MusicWebPlayer/1.0",
  264. }
  265. if range_header:
  266. headers["Range"] = range_header
  267. request = urllib.request.Request(
  268. remote_url,
  269. headers=headers,
  270. )
  271. try:
  272. response = urllib.request.urlopen(request, timeout=30)
  273. except urllib.error.HTTPError as error:
  274. raise HTTPException(status_code=error.code, detail=error.reason)
  275. except urllib.error.URLError as error:
  276. raise HTTPException(status_code=502, detail=str(error.reason))
  277. media_type = mimetypes.guess_type(urllib.parse.urlparse(remote_url).path)[0] or response.headers.get_content_type() or "audio/mpeg"
  278. passthrough_headers = {
  279. "Cache-Control": "private, no-cache",
  280. "Accept-Ranges": response.headers.get("Accept-Ranges") or "bytes",
  281. "Content-Type": media_type,
  282. }
  283. for key in ("ETag", "Last-Modified", "Content-Range"):
  284. value = response.headers.get(key)
  285. if value:
  286. passthrough_headers[key] = value
  287. def remote_iterator() -> Any:
  288. with response:
  289. while chunk := response.read(1024 * 1024):
  290. yield chunk
  291. return StreamingResponse(
  292. remote_iterator(),
  293. status_code=getattr(response, "status", 200),
  294. media_type=media_type,
  295. headers=passthrough_headers,
  296. )
  297. @app.get("/api/cover/{file_path:path}")
  298. def cover_file(file_path: str) -> FileResponse:
  299. file = safe_music_path(file_path)
  300. if not file.exists() or not file.is_file() or file.suffix.lower() not in IMAGE_EXTENSIONS:
  301. raise HTTPException(status_code=404, detail="Cover not found")
  302. media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
  303. return FileResponse(file, media_type=media_type, filename=file.name)
  304. @app.post("/api/upload")
  305. async def upload_files(
  306. files: list[UploadFile] = File(...),
  307. target_dir: str = Form(default=""),
  308. ) -> JSONResponse:
  309. destination = safe_music_path(target_dir)
  310. if not str(destination).startswith(str(LOCAL_MUSIC_DIR.resolve())):
  311. raise HTTPException(status_code=400, detail="Upload destination must be local music dir")
  312. destination.mkdir(parents=True, exist_ok=True)
  313. saved: list[str] = []
  314. for upload in files:
  315. suffix = Path(upload.filename or "").suffix.lower()
  316. if suffix not in SUPPORTED_EXTENSIONS:
  317. continue
  318. filename = Path(upload.filename or f"upload-{uuid.uuid4().hex}").name
  319. file_path = destination / filename
  320. with file_path.open("wb") as buffer:
  321. shutil.copyfileobj(upload.file, buffer)
  322. saved.append(track_path_from_abs(file_path))
  323. return JSONResponse({"saved": saved})
  324. @app.post("/api/folder")
  325. def create_folder(payload: FolderCreateRequest) -> JSONResponse:
  326. folder = safe_music_path(payload.path)
  327. if not str(folder).startswith(str(LOCAL_MUSIC_DIR.resolve())):
  328. raise HTTPException(status_code=400, detail="Folder must be created in local music dir")
  329. folder.mkdir(parents=True, exist_ok=True)
  330. return JSONResponse({"created": track_path_from_abs(folder)})
  331. @app.post("/api/move")
  332. def move_file(payload: MoveRequest) -> JSONResponse:
  333. source = safe_music_path(payload.source)
  334. destination_dir = safe_music_path(payload.destination_dir)
  335. if not source.exists():
  336. raise HTTPException(status_code=404, detail="Source not found")
  337. if not destination_dir.exists():
  338. destination_dir.mkdir(parents=True, exist_ok=True)
  339. destination = destination_dir / source.name
  340. shutil.move(str(source), str(destination))
  341. return JSONResponse({"moved": track_path_from_abs(destination)})
  342. @app.post("/api/playlists")
  343. def create_playlist(payload: PlaylistCreateRequest) -> JSONResponse:
  344. playlists = load_playlists()
  345. playlist = {
  346. "id": uuid.uuid4().hex,
  347. "name": payload.name.strip() or "未命名播放列表",
  348. "tracks": payload.tracks,
  349. }
  350. playlists.append(playlist)
  351. save_playlists(playlists)
  352. return JSONResponse(playlist)
  353. @app.put("/api/playlists/{playlist_id}")
  354. def update_playlist(playlist_id: str, payload: PlaylistUpdateRequest) -> JSONResponse:
  355. playlists = load_playlists()
  356. for playlist in playlists:
  357. if playlist["id"] == playlist_id:
  358. playlist["tracks"] = payload.tracks
  359. save_playlists(playlists)
  360. return JSONResponse(playlist)
  361. raise HTTPException(status_code=404, detail="Playlist not found")
  362. @app.delete("/api/playlists/{playlist_id}")
  363. def delete_playlist(playlist_id: str) -> JSONResponse:
  364. playlists = load_playlists()
  365. filtered = [playlist for playlist in playlists if playlist["id"] != playlist_id]
  366. if len(filtered) == len(playlists):
  367. raise HTTPException(status_code=404, detail="Playlist not found")
  368. save_playlists(filtered)
  369. return JSONResponse({"deleted": playlist_id})