from __future__ import annotations import json import mimetypes import shutil import threading import uuid import base64 import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone from pathlib import Path from typing import Any from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel BASE_DIR = Path(__file__).resolve().parent.parent LOCAL_MUSIC_DIR = BASE_DIR / "mp3file" CLOUD_MUSIC_DIR = Path("/mnt/baiducloud/百度网盘/mp3file") CLOUD_WEBDAV_BASE = "http://110.42.102.94:5244/dav" CLOUD_ALIST_BASE = "http://110.42.102.94:5244" CLOUD_WEBDAV_USER = "sequoia00" CLOUD_WEBDAV_PASSWORD = "792199bb" MUSIC_ROOTS: list[tuple[str, Path, str]] = [ ("", LOCAL_MUSIC_DIR, "本地音乐"), ("cloud", CLOUD_MUSIC_DIR, "百度网盘"), ] PLAYLISTS_FILE = BASE_DIR / "playlists.json" CACHE_DIR = BASE_DIR / ".cache" CLOUD_LIBRARY_CACHE_FILE = CACHE_DIR / "cloud_library.json" CLOUD_COVER_CACHE_DIR = CACHE_DIR / "cloud_covers" SUPPORTED_EXTENSIONS = { ".mp3", ".wav", ".flac", ".m3u", ".m3u8", ".ogg", ".aac", ".wma", ".opus", ".oga", ".mp4", ".m4a", ".webm", } IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} app = FastAPI(title="MusicWeb", version="1.0.0") app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) CLOUD_REFRESH_STATE = {"running": False} CLOUD_REFRESH_LOCK = threading.Lock() class FolderCreateRequest(BaseModel): path: str class MoveRequest(BaseModel): source: str destination_dir: str class PlaylistCreateRequest(BaseModel): name: str tracks: list[str] class PlaylistUpdateRequest(BaseModel): tracks: list[str] def ensure_storage() -> None: LOCAL_MUSIC_DIR.mkdir(parents=True, exist_ok=True) CLOUD_MUSIC_DIR.mkdir(parents=True, exist_ok=True) CACHE_DIR.mkdir(parents=True, exist_ok=True) CLOUD_COVER_CACHE_DIR.mkdir(parents=True, exist_ok=True) if not PLAYLISTS_FILE.exists(): PLAYLISTS_FILE.write_text("[]", encoding="utf-8") def safe_music_path(relative_path: str) -> Path: prefix, _, inner_path = relative_path.partition("/") roots = {key: root.resolve() for key, root, _ in MUSIC_ROOTS} if prefix in roots: candidate = (roots[prefix] / inner_path).resolve() if candidate != roots[prefix] and roots[prefix] not in candidate.parents: raise HTTPException(status_code=400, detail="Invalid path") return candidate candidate = (LOCAL_MUSIC_DIR / relative_path).resolve() local_root = LOCAL_MUSIC_DIR.resolve() if candidate != local_root and local_root not in candidate.parents: raise HTTPException(status_code=400, detail="Invalid path") return candidate def load_playlists() -> list[dict[str, Any]]: ensure_storage() return json.loads(PLAYLISTS_FILE.read_text(encoding="utf-8")) def save_playlists(playlists: list[dict[str, Any]]) -> None: PLAYLISTS_FILE.write_text( json.dumps(playlists, ensure_ascii=False, indent=2), encoding="utf-8" ) def is_under_root(path: Path, root: Path) -> bool: resolved_path = path.resolve() resolved_root = root.resolve() return resolved_path == resolved_root or resolved_root in resolved_path.parents def track_path_for(root: Path, path: Path, prefix: str = "") -> str: relative = path.relative_to(root).as_posix() return f"{prefix}/{relative}" if prefix else relative def track_path_from_abs(path: Path) -> str: if is_under_root(path, LOCAL_MUSIC_DIR): return path.relative_to(LOCAL_MUSIC_DIR).as_posix() if is_under_root(path, CLOUD_MUSIC_DIR): return f"cloud/{path.relative_to(CLOUD_MUSIC_DIR).as_posix()}" return path.name def path_is_under(path: Path, root: Path) -> bool: try: resolved_path = path.resolve() resolved_root = root.resolve() except OSError: return False return resolved_path == resolved_root or resolved_root in resolved_path.parents def trigger_cloud_cache_refresh_if_needed(*paths: Path) -> None: if any(path_is_under(path, CLOUD_MUSIC_DIR) for path in paths): start_cloud_library_refresh() def cloud_cover_cache_path(relative_path: str) -> Path: inner_path = relative_path.removeprefix("cloud/").lstrip("/") target = (CLOUD_COVER_CACHE_DIR / inner_path).resolve() cache_root = CLOUD_COVER_CACHE_DIR.resolve() if target != cache_root and cache_root not in target.parents: raise HTTPException(status_code=400, detail="Invalid cover path") return target def cloud_webdav_url(relative_path: str) -> str: inner_path = relative_path.removeprefix("cloud/").lstrip("/") encoded = "/".join(urllib.parse.quote(part) for part in inner_path.split("/") if part) base = CLOUD_WEBDAV_BASE.rstrip("/") return f"{base}/{encoded}" if encoded else base def cloud_alist_api_path(relative_path: str) -> str: inner_path = relative_path.removeprefix("cloud/").lstrip("/") return f"/百度网盘/mp3file/{inner_path}" if inner_path else "/百度网盘/mp3file" def alist_login_token() -> str: payload = json.dumps( { "username": CLOUD_WEBDAV_USER, "password": CLOUD_WEBDAV_PASSWORD, } ).encode("utf-8") request = urllib.request.Request( f"{CLOUD_ALIST_BASE.rstrip('/')}/api/auth/login", data=payload, headers={"Content-Type": "application/json", "User-Agent": "MusicWebPlayer/1.0"}, ) try: with urllib.request.urlopen(request, timeout=20) as response: result = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as error: raise HTTPException(status_code=error.code, detail=error.reason) except urllib.error.URLError as error: raise HTTPException(status_code=502, detail=str(error.reason)) token = result.get("data", {}).get("token") if not token: raise HTTPException(status_code=502, detail="AList login failed") return token def cloud_raw_url(relative_path: str) -> str: payload = json.dumps({"path": cloud_alist_api_path(relative_path), "password": ""}).encode("utf-8") request = urllib.request.Request( f"{CLOUD_ALIST_BASE.rstrip('/')}/api/fs/get", data=payload, headers={ "Content-Type": "application/json", "Authorization": alist_login_token(), "User-Agent": "MusicWebPlayer/1.0", }, ) try: with urllib.request.urlopen(request, timeout=20) as response: result = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as error: raise HTTPException(status_code=error.code, detail=error.reason) except urllib.error.URLError as error: raise HTTPException(status_code=502, detail=str(error.reason)) data = result.get("data", {}) sign = data.get("sign") path = data.get("path") if not sign or not path: raise HTTPException(status_code=404, detail="Cloud signed url not found") encoded_path = "/".join(urllib.parse.quote(part) for part in path.lstrip("/").split("/")) quoted_sign = urllib.parse.quote(sign, safe="") return f"{CLOUD_ALIST_BASE.rstrip('/')}/d/{urllib.parse.quote('百度网盘')}/{encoded_path}?sign={quoted_sign}" def is_supported_file(path: Path) -> bool: return path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS def track_url(relative_path: str) -> str: return f"/api/stream/{relative_path}" def build_track(relative_path: str) -> dict[str, str]: filename = Path(relative_path).name return { "id": relative_path, "name": filename, "path": relative_path, "url": track_url(relative_path), "folder": str(Path(relative_path).parent).replace(".", "").strip("/"), } def find_cover_for_directory(root: Path, prefix: str = "") -> str | None: path = root / "cover.jpg" if path.is_file(): return track_path_for(root, path, prefix) return None def sanitize_cover_value(cover: Any) -> str | None: if not isinstance(cover, str) or not cover: return None parsed = urllib.parse.urlparse(cover) path = parsed.path or cover return cover if path.lower().endswith("/cover.jpg") else None def sanitize_library_tree(node: dict[str, Any]) -> dict[str, Any]: node["cover"] = sanitize_cover_value(node.get("cover")) for child in node.get("folders", []): if isinstance(child, dict): sanitize_library_tree(child) return node def build_tree(root: Path, prefix: str = "", label: str = "音乐库") -> dict[str, Any]: cover_path = find_cover_for_directory(root, prefix) node = { "name": label, "path": prefix, "cover": f"/api/cover/{cover_path}" if cover_path else None, "folders": [], "tracks": [], } for child in sorted(root.iterdir(), key=lambda item: (item.is_file(), item.name.lower())): if child.is_dir(): child_rel = child.relative_to(root).as_posix() child_prefix = f"{prefix}/{child_rel}" if prefix else child_rel node["folders"].append(build_tree(child, child_prefix, child.name)) elif is_supported_file(child): node["tracks"].append(build_track(track_path_for(root, child, prefix))) return node def collect_tracks(root: Path, prefix: str = "") -> list[dict[str, str]]: tracks: list[dict[str, str]] = [] for path in sorted(root.rglob("*")): if is_supported_file(path): tracks.append(build_track(track_path_for(root, path, prefix))) return tracks def root_entry(source: str) -> tuple[str, Path, str]: for prefix, root, label in MUSIC_ROOTS: if prefix == source: return prefix, root, label raise HTTPException(status_code=404, detail="Library source not found") def current_timestamp() -> str: return datetime.now(timezone.utc).isoformat() def build_library_payload(source: str | None = None) -> dict[str, Any]: ensure_storage() entries = MUSIC_ROOTS if source is None else [root_entry(source)] tree = { "name": "音乐库", "path": "", "cover": None, "folders": [ build_tree(root, prefix, label) for prefix, root, label in entries if root.exists() ], "tracks": [], } all_tracks: list[dict[str, str]] = [] for prefix, root, _ in entries: if root.exists(): all_tracks.extend(collect_tracks(root, prefix)) sanitize_library_tree(tree) return { "tree": tree, "all_tracks": all_tracks, "playlists": load_playlists(), } def read_cloud_library_cache() -> dict[str, Any] | None: if not CLOUD_LIBRARY_CACHE_FILE.exists(): return None try: payload = json.loads(CLOUD_LIBRARY_CACHE_FILE.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None if not isinstance(payload, dict): return None tree = payload.get("tree") if isinstance(tree, dict): sanitize_library_tree(tree) return payload def write_cloud_library_cache(payload: dict[str, Any]) -> dict[str, Any]: cached_payload = { **payload, "cache": { "updated_at": current_timestamp(), "is_cached": True, "refreshing": False, }, } CLOUD_LIBRARY_CACHE_FILE.write_text( json.dumps(cached_payload, ensure_ascii=False, indent=2), encoding="utf-8", ) return cached_payload def build_cloud_library_payload() -> dict[str, Any]: return build_library_payload("cloud") def cloud_library_payload_from_cache() -> dict[str, Any] | None: payload = read_cloud_library_cache() if not payload: return None cache_meta = payload.get("cache") if isinstance(payload.get("cache"), dict) else {} payload["cache"] = { "updated_at": cache_meta.get("updated_at"), "is_cached": True, "refreshing": CLOUD_REFRESH_STATE["running"], } return payload def refresh_cloud_library_cache_sync() -> dict[str, Any]: payload = build_cloud_library_payload() return write_cloud_library_cache(payload) def start_cloud_library_refresh() -> bool: with CLOUD_REFRESH_LOCK: if CLOUD_REFRESH_STATE["running"]: return False CLOUD_REFRESH_STATE["running"] = True def runner() -> None: try: refresh_cloud_library_cache_sync() finally: with CLOUD_REFRESH_LOCK: CLOUD_REFRESH_STATE["running"] = False threading.Thread(target=runner, daemon=True).start() return True @app.on_event("startup") def startup_event() -> None: ensure_storage() if not CLOUD_LIBRARY_CACHE_FILE.exists(): start_cloud_library_refresh() @app.get("/", response_class=HTMLResponse) def index(request: Request) -> HTMLResponse: return templates.TemplateResponse("index.html", {"request": request}) @app.get("/api/library") def library() -> JSONResponse: return JSONResponse(build_library_payload()) @app.get("/api/library/{source}") def library_by_source(source: str) -> JSONResponse: normalized = "" if source == "local" else source if normalized == "cloud": cached = cloud_library_payload_from_cache() if cached: start_cloud_library_refresh() return JSONResponse(cached) payload = refresh_cloud_library_cache_sync() return JSONResponse(payload) return JSONResponse(build_library_payload(normalized)) @app.get("/api/library/{source}/refresh") def refresh_library_by_source(source: str) -> JSONResponse: normalized = "" if source == "local" else source if normalized != "cloud": return JSONResponse(build_library_payload(normalized)) started = start_cloud_library_refresh() cached = cloud_library_payload_from_cache() if cached: cached["cache"]["refreshing"] = True return JSONResponse({"started": started, "library": cached}) payload = refresh_cloud_library_cache_sync() return JSONResponse({"started": started, "library": payload}) @app.get("/api/stream/{file_path:path}") def stream_file(request: Request, file_path: str) -> StreamingResponse: if file_path.startswith("cloud/"): return proxy_cloud_stream(cloud_raw_url(file_path), request.headers.get("range")) file = safe_music_path(file_path) if not file.exists() or not file.is_file(): raise HTTPException(status_code=404, detail="File not found") media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream" def file_iterator() -> Any: with file.open("rb") as handle: while chunk := handle.read(1024 * 1024): if not chunk: break yield chunk return StreamingResponse( file_iterator(), media_type=media_type, headers={"Cache-Control": "private, no-cache"}, ) @app.get("/api/cloud-url/{file_path:path}") def get_cloud_url(file_path: str) -> JSONResponse: if not file_path.startswith("cloud/"): raise HTTPException(status_code=400, detail="Not a cloud track") return JSONResponse({"url": cloud_raw_url(file_path)}) def proxy_cloud_stream(remote_url: str, range_header: str | None = None) -> StreamingResponse: auth = base64.b64encode(f"{CLOUD_WEBDAV_USER}:{CLOUD_WEBDAV_PASSWORD}".encode("utf-8")).decode("ascii") headers = { "Authorization": f"Basic {auth}", "User-Agent": "MusicWebPlayer/1.0", } if range_header: headers["Range"] = range_header request = urllib.request.Request( remote_url, headers=headers, ) try: response = urllib.request.urlopen(request, timeout=30) except urllib.error.HTTPError as error: raise HTTPException(status_code=error.code, detail=error.reason) except urllib.error.URLError as error: raise HTTPException(status_code=502, detail=str(error.reason)) media_type = mimetypes.guess_type(urllib.parse.urlparse(remote_url).path)[0] or response.headers.get_content_type() or "audio/mpeg" passthrough_headers = { "Cache-Control": "private, no-cache", "Accept-Ranges": response.headers.get("Accept-Ranges") or "bytes", "Content-Type": media_type, } for key in ("ETag", "Last-Modified", "Content-Range"): value = response.headers.get(key) if value: passthrough_headers[key] = value def remote_iterator() -> Any: with response: while chunk := response.read(1024 * 1024): yield chunk return StreamingResponse( remote_iterator(), status_code=getattr(response, "status", 200), media_type=media_type, headers=passthrough_headers, ) def cache_cloud_cover(relative_path: str) -> Path: cached_file = cloud_cover_cache_path(relative_path) if cached_file.exists() and cached_file.is_file(): return cached_file cached_file.parent.mkdir(parents=True, exist_ok=True) remote_url = cloud_raw_url(relative_path) request = urllib.request.Request( remote_url, headers={"User-Agent": "MusicWebPlayer/1.0"}, ) try: with urllib.request.urlopen(request, timeout=30) as response: with cached_file.open("wb") as output: shutil.copyfileobj(response, output) except urllib.error.HTTPError as error: raise HTTPException(status_code=error.code, detail=error.reason) except urllib.error.URLError as error: raise HTTPException(status_code=502, detail=str(error.reason)) except OSError as error: raise HTTPException(status_code=500, detail=str(error)) return cached_file @app.get("/api/cover/{file_path:path}") def cover_file(request: Request, file_path: str): if file_path.startswith("cloud/"): if Path(file_path).name.lower() != "cover.jpg": raise HTTPException(status_code=404, detail="Cover not found") cached_file = cache_cloud_cover(file_path) media_type = mimetypes.guess_type(cached_file.name)[0] or "image/jpeg" return FileResponse(cached_file, media_type=media_type, filename=cached_file.name) file = safe_music_path(file_path) if file.name.lower() != "cover.jpg" or not file.exists() or not file.is_file(): raise HTTPException(status_code=404, detail="Cover not found") media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream" return FileResponse(file, media_type=media_type, filename=file.name) @app.post("/api/upload") async def upload_files( files: list[UploadFile] = File(...), target_dir: str = Form(default=""), ) -> JSONResponse: destination = safe_music_path(target_dir) if not str(destination).startswith(str(LOCAL_MUSIC_DIR.resolve())): raise HTTPException(status_code=400, detail="Upload destination must be local music dir") destination.mkdir(parents=True, exist_ok=True) saved: list[str] = [] for upload in files: suffix = Path(upload.filename or "").suffix.lower() if suffix not in SUPPORTED_EXTENSIONS: continue filename = Path(upload.filename or f"upload-{uuid.uuid4().hex}").name file_path = destination / filename with file_path.open("wb") as buffer: shutil.copyfileobj(upload.file, buffer) saved.append(track_path_from_abs(file_path)) trigger_cloud_cache_refresh_if_needed(destination) return JSONResponse({"saved": saved}) @app.post("/api/folder") def create_folder(payload: FolderCreateRequest) -> JSONResponse: folder = safe_music_path(payload.path) if not str(folder).startswith(str(LOCAL_MUSIC_DIR.resolve())): raise HTTPException(status_code=400, detail="Folder must be created in local music dir") folder.mkdir(parents=True, exist_ok=True) trigger_cloud_cache_refresh_if_needed(folder) return JSONResponse({"created": track_path_from_abs(folder)}) @app.post("/api/move") def move_file(payload: MoveRequest) -> JSONResponse: source = safe_music_path(payload.source) destination_dir = safe_music_path(payload.destination_dir) if not source.exists(): raise HTTPException(status_code=404, detail="Source not found") if not destination_dir.exists(): destination_dir.mkdir(parents=True, exist_ok=True) destination = destination_dir / source.name shutil.move(str(source), str(destination)) trigger_cloud_cache_refresh_if_needed(source, destination_dir, destination) return JSONResponse({"moved": track_path_from_abs(destination)}) @app.post("/api/playlists") def create_playlist(payload: PlaylistCreateRequest) -> JSONResponse: playlists = load_playlists() playlist = { "id": uuid.uuid4().hex, "name": payload.name.strip() or "未命名播放列表", "tracks": payload.tracks, } playlists.append(playlist) save_playlists(playlists) return JSONResponse(playlist) @app.put("/api/playlists/{playlist_id}") def update_playlist(playlist_id: str, payload: PlaylistUpdateRequest) -> JSONResponse: playlists = load_playlists() for playlist in playlists: if playlist["id"] == playlist_id: playlist["tracks"] = payload.tracks save_playlists(playlists) return JSONResponse(playlist) raise HTTPException(status_code=404, detail="Playlist not found") @app.delete("/api/playlists/{playlist_id}") def delete_playlist(playlist_id: str) -> JSONResponse: playlists = load_playlists() filtered = [playlist for playlist in playlists if playlist["id"] != playlist_id] if len(filtered) == len(playlists): raise HTTPException(status_code=404, detail="Playlist not found") save_playlists(filtered) return JSONResponse({"deleted": playlist_id})