from __future__ import annotations import json import mimetypes import shutil import uuid import base64 import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel BASE_DIR = Path(__file__).resolve().parent.parent LOCAL_MUSIC_DIR = BASE_DIR / "mp3file" CLOUD_MUSIC_DIR = Path("/mnt/baiducloud/百度网盘/mp3file") CLOUD_WEBDAV_BASE = "http://110.42.102.94:5244/dav" CLOUD_ALIST_BASE = "http://110.42.102.94:5244" CLOUD_WEBDAV_USER = "sequoia00" CLOUD_WEBDAV_PASSWORD = "792199bb" MUSIC_ROOTS: list[tuple[str, Path, str]] = [ ("", LOCAL_MUSIC_DIR, "音乐库"), ("cloud", CLOUD_MUSIC_DIR, "百度网盘"), ] PLAYLISTS_FILE = BASE_DIR / "playlists.json" SUPPORTED_EXTENSIONS = { ".mp3", ".wav", ".flac", ".m3u", ".m3u8", ".ogg", ".aac", ".wma", ".opus", ".oga", ".mp4", ".m4a", ".webm", } IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} app = FastAPI(title="MusicWeb", version="1.0.0") app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) class FolderCreateRequest(BaseModel): path: str class MoveRequest(BaseModel): source: str destination_dir: str class PlaylistCreateRequest(BaseModel): name: str tracks: list[str] class PlaylistUpdateRequest(BaseModel): tracks: list[str] def ensure_storage() -> None: LOCAL_MUSIC_DIR.mkdir(parents=True, exist_ok=True) CLOUD_MUSIC_DIR.mkdir(parents=True, exist_ok=True) if not PLAYLISTS_FILE.exists(): PLAYLISTS_FILE.write_text("[]", encoding="utf-8") def safe_music_path(relative_path: str) -> Path: prefix, _, inner_path = relative_path.partition("/") roots = {key: root.resolve() for key, root, _ in MUSIC_ROOTS} if prefix in roots: candidate = (roots[prefix] / inner_path).resolve() if candidate != roots[prefix] and roots[prefix] not in candidate.parents: raise HTTPException(status_code=400, detail="Invalid path") return candidate candidate = (LOCAL_MUSIC_DIR / relative_path).resolve() local_root = LOCAL_MUSIC_DIR.resolve() if candidate != local_root and local_root not in candidate.parents: raise HTTPException(status_code=400, detail="Invalid path") return candidate def load_playlists() -> list[dict[str, Any]]: ensure_storage() return json.loads(PLAYLISTS_FILE.read_text(encoding="utf-8")) def save_playlists(playlists: list[dict[str, Any]]) -> None: PLAYLISTS_FILE.write_text( json.dumps(playlists, ensure_ascii=False, indent=2), encoding="utf-8" ) def is_under_root(path: Path, root: Path) -> bool: resolved_path = path.resolve() resolved_root = root.resolve() return resolved_path == resolved_root or resolved_root in resolved_path.parents def track_path_for(root: Path, path: Path, prefix: str = "") -> str: relative = path.relative_to(root).as_posix() return f"{prefix}/{relative}" if prefix else relative def track_path_from_abs(path: Path) -> str: if is_under_root(path, LOCAL_MUSIC_DIR): return path.relative_to(LOCAL_MUSIC_DIR).as_posix() if is_under_root(path, CLOUD_MUSIC_DIR): return f"cloud/{path.relative_to(CLOUD_MUSIC_DIR).as_posix()}" return path.name def cloud_webdav_url(relative_path: str) -> str: inner_path = relative_path.removeprefix("cloud/").lstrip("/") encoded = "/".join(urllib.parse.quote(part) for part in inner_path.split("/") if part) base = CLOUD_WEBDAV_BASE.rstrip("/") return f"{base}/{encoded}" if encoded else base def cloud_alist_api_path(relative_path: str) -> str: inner_path = relative_path.removeprefix("cloud/").lstrip("/") return f"/百度网盘/mp3file/{inner_path}" if inner_path else "/百度网盘/mp3file" def alist_login_token() -> str: payload = json.dumps( { "username": CLOUD_WEBDAV_USER, "password": CLOUD_WEBDAV_PASSWORD, } ).encode("utf-8") request = urllib.request.Request( f"{CLOUD_ALIST_BASE.rstrip('/')}/api/auth/login", data=payload, headers={"Content-Type": "application/json", "User-Agent": "MusicWebPlayer/1.0"}, ) try: with urllib.request.urlopen(request, timeout=20) as response: result = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as error: raise HTTPException(status_code=error.code, detail=error.reason) except urllib.error.URLError as error: raise HTTPException(status_code=502, detail=str(error.reason)) token = result.get("data", {}).get("token") if not token: raise HTTPException(status_code=502, detail="AList login failed") return token def cloud_raw_url(relative_path: str) -> str: payload = json.dumps({"path": cloud_alist_api_path(relative_path), "password": ""}).encode("utf-8") request = urllib.request.Request( f"{CLOUD_ALIST_BASE.rstrip('/')}/api/fs/get", data=payload, headers={ "Content-Type": "application/json", "Authorization": alist_login_token(), "User-Agent": "MusicWebPlayer/1.0", }, ) try: with urllib.request.urlopen(request, timeout=20) as response: result = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as error: raise HTTPException(status_code=error.code, detail=error.reason) except urllib.error.URLError as error: raise HTTPException(status_code=502, detail=str(error.reason)) raw_url = result.get("data", {}).get("raw_url") if not raw_url: raise HTTPException(status_code=404, detail="Cloud raw url not found") return raw_url def is_supported_file(path: Path) -> bool: return path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS def track_url(relative_path: str) -> str: return f"/api/stream/{relative_path}" def build_track(relative_path: str) -> dict[str, str]: filename = Path(relative_path).name return { "id": relative_path, "name": filename, "path": relative_path, "url": track_url(relative_path), "folder": str(Path(relative_path).parent).replace(".", "").strip("/"), } def find_cover_for_directory(root: Path, prefix: str = "") -> str | None: candidates = ( "cover.jpg", "cover.jpeg", "cover.png", "cover.webp", "folder.jpg", "folder.jpeg", "folder.png", "folder.webp", ) for candidate in candidates: path = root / candidate if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS: return track_path_for(root, path, prefix) for child in sorted(root.iterdir(), key=lambda item: item.name.lower()): if child.is_file() and child.suffix.lower() in IMAGE_EXTENSIONS: return track_path_for(root, child, prefix) return None def build_tree(root: Path, prefix: str = "", label: str = "音乐库") -> dict[str, Any]: cover_path = find_cover_for_directory(root, prefix) node = { "name": label, "path": prefix, "cover": f"/api/cover/{cover_path}" if cover_path else None, "folders": [], "tracks": [], } for child in sorted(root.iterdir(), key=lambda item: (item.is_file(), item.name.lower())): if child.is_dir(): child_rel = child.relative_to(root).as_posix() child_prefix = f"{prefix}/{child_rel}" if prefix else child_rel node["folders"].append(build_tree(child, child_prefix, child.name)) elif is_supported_file(child): node["tracks"].append(build_track(track_path_for(root, child, prefix))) return node def collect_tracks(root: Path, prefix: str = "") -> list[dict[str, str]]: tracks: list[dict[str, str]] = [] for path in sorted(root.rglob("*")): if is_supported_file(path): tracks.append(build_track(track_path_for(root, path, prefix))) return tracks @app.on_event("startup") def startup_event() -> None: ensure_storage() @app.get("/", response_class=HTMLResponse) def index(request: Request) -> HTMLResponse: return templates.TemplateResponse("index.html", {"request": request}) @app.get("/api/library") def library() -> JSONResponse: ensure_storage() tree = { "name": "音乐库", "path": "", "cover": None, "folders": [ build_tree(root, prefix, label) for prefix, root, label in MUSIC_ROOTS if root.exists() ], "tracks": [], } all_tracks: list[dict[str, str]] = [] for prefix, root, _ in MUSIC_ROOTS: if root.exists(): all_tracks.extend(collect_tracks(root, prefix)) return JSONResponse( { "tree": tree, "all_tracks": all_tracks, "playlists": load_playlists(), } ) @app.get("/api/stream/{file_path:path}") def stream_file(request: Request, file_path: str) -> StreamingResponse: if file_path.startswith("cloud/"): return RedirectResponse(cloud_raw_url(file_path), status_code=307) file = safe_music_path(file_path) if not file.exists() or not file.is_file(): raise HTTPException(status_code=404, detail="File not found") media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream" def file_iterator() -> Any: with file.open("rb") as handle: while chunk := handle.read(1024 * 1024): if not chunk: break yield chunk return StreamingResponse( file_iterator(), media_type=media_type, headers={"Cache-Control": "private, no-cache"}, ) @app.get("/api/cloud-url/{file_path:path}") def get_cloud_url(file_path: str) -> JSONResponse: if not file_path.startswith("cloud/"): raise HTTPException(status_code=400, detail="Not a cloud track") return JSONResponse({"url": cloud_raw_url(file_path)}) def proxy_cloud_stream(remote_url: str, range_header: str | None = None) -> StreamingResponse: auth = base64.b64encode(f"{CLOUD_WEBDAV_USER}:{CLOUD_WEBDAV_PASSWORD}".encode("utf-8")).decode("ascii") headers = { "Authorization": f"Basic {auth}", "User-Agent": "MusicWebPlayer/1.0", } if range_header: headers["Range"] = range_header request = urllib.request.Request( remote_url, headers=headers, ) try: response = urllib.request.urlopen(request, timeout=30) except urllib.error.HTTPError as error: raise HTTPException(status_code=error.code, detail=error.reason) except urllib.error.URLError as error: raise HTTPException(status_code=502, detail=str(error.reason)) media_type = mimetypes.guess_type(urllib.parse.urlparse(remote_url).path)[0] or response.headers.get_content_type() or "audio/mpeg" passthrough_headers = { "Cache-Control": "private, no-cache", "Accept-Ranges": response.headers.get("Accept-Ranges") or "bytes", "Content-Type": media_type, } for key in ("ETag", "Last-Modified", "Content-Range"): value = response.headers.get(key) if value: passthrough_headers[key] = value def remote_iterator() -> Any: with response: while chunk := response.read(1024 * 1024): yield chunk return StreamingResponse( remote_iterator(), status_code=getattr(response, "status", 200), media_type=media_type, headers=passthrough_headers, ) @app.get("/api/cover/{file_path:path}") def cover_file(file_path: str) -> FileResponse: file = safe_music_path(file_path) if not file.exists() or not file.is_file() or file.suffix.lower() not in IMAGE_EXTENSIONS: raise HTTPException(status_code=404, detail="Cover not found") media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream" return FileResponse(file, media_type=media_type, filename=file.name) @app.post("/api/upload") async def upload_files( files: list[UploadFile] = File(...), target_dir: str = Form(default=""), ) -> JSONResponse: destination = safe_music_path(target_dir) if not str(destination).startswith(str(LOCAL_MUSIC_DIR.resolve())): raise HTTPException(status_code=400, detail="Upload destination must be local music dir") destination.mkdir(parents=True, exist_ok=True) saved: list[str] = [] for upload in files: suffix = Path(upload.filename or "").suffix.lower() if suffix not in SUPPORTED_EXTENSIONS: continue filename = Path(upload.filename or f"upload-{uuid.uuid4().hex}").name file_path = destination / filename with file_path.open("wb") as buffer: shutil.copyfileobj(upload.file, buffer) saved.append(track_path_from_abs(file_path)) return JSONResponse({"saved": saved}) @app.post("/api/folder") def create_folder(payload: FolderCreateRequest) -> JSONResponse: folder = safe_music_path(payload.path) if not str(folder).startswith(str(LOCAL_MUSIC_DIR.resolve())): raise HTTPException(status_code=400, detail="Folder must be created in local music dir") folder.mkdir(parents=True, exist_ok=True) return JSONResponse({"created": track_path_from_abs(folder)}) @app.post("/api/move") def move_file(payload: MoveRequest) -> JSONResponse: source = safe_music_path(payload.source) destination_dir = safe_music_path(payload.destination_dir) if not source.exists(): raise HTTPException(status_code=404, detail="Source not found") if not destination_dir.exists(): destination_dir.mkdir(parents=True, exist_ok=True) destination = destination_dir / source.name shutil.move(str(source), str(destination)) return JSONResponse({"moved": track_path_from_abs(destination)}) @app.post("/api/playlists") def create_playlist(payload: PlaylistCreateRequest) -> JSONResponse: playlists = load_playlists() playlist = { "id": uuid.uuid4().hex, "name": payload.name.strip() or "未命名播放列表", "tracks": payload.tracks, } playlists.append(playlist) save_playlists(playlists) return JSONResponse(playlist) @app.put("/api/playlists/{playlist_id}") def update_playlist(playlist_id: str, payload: PlaylistUpdateRequest) -> JSONResponse: playlists = load_playlists() for playlist in playlists: if playlist["id"] == playlist_id: playlist["tracks"] = payload.tracks save_playlists(playlists) return JSONResponse(playlist) raise HTTPException(status_code=404, detail="Playlist not found") @app.delete("/api/playlists/{playlist_id}") def delete_playlist(playlist_id: str) -> JSONResponse: playlists = load_playlists() filtered = [playlist for playlist in playlists if playlist["id"] != playlist_id] if len(filtered) == len(playlists): raise HTTPException(status_code=404, detail="Playlist not found") save_playlists(filtered) return JSONResponse({"deleted": playlist_id})