from __future__ import annotations import json import mimetypes import shutil import uuid from pathlib import Path from typing import Any from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel BASE_DIR = Path(__file__).resolve().parent.parent MUSIC_DIR = BASE_DIR / "mp3file" PLAYLISTS_FILE = BASE_DIR / "playlists.json" SUPPORTED_EXTENSIONS = { ".mp3", ".wav", ".flac", ".m3u", ".m3u8", ".ogg", ".aac", ".wma", ".opus", ".oga", ".mp4", ".m4a", ".webm", } IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} app = FastAPI(title="MusicWeb", version="1.0.0") app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) class FolderCreateRequest(BaseModel): path: str class MoveRequest(BaseModel): source: str destination_dir: str class PlaylistCreateRequest(BaseModel): name: str tracks: list[str] class PlaylistUpdateRequest(BaseModel): tracks: list[str] def ensure_storage() -> None: MUSIC_DIR.mkdir(parents=True, exist_ok=True) if not PLAYLISTS_FILE.exists(): PLAYLISTS_FILE.write_text("[]", encoding="utf-8") def safe_music_path(relative_path: str) -> Path: candidate = (MUSIC_DIR / relative_path).resolve() if candidate != MUSIC_DIR.resolve() and MUSIC_DIR.resolve() not in candidate.parents: raise HTTPException(status_code=400, detail="Invalid path") return candidate def load_playlists() -> list[dict[str, Any]]: ensure_storage() return json.loads(PLAYLISTS_FILE.read_text(encoding="utf-8")) def save_playlists(playlists: list[dict[str, Any]]) -> None: PLAYLISTS_FILE.write_text( json.dumps(playlists, ensure_ascii=False, indent=2), encoding="utf-8" ) def is_supported_file(path: Path) -> bool: return path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS def track_url(relative_path: str) -> str: return f"/api/stream/{relative_path}" def build_track(relative_path: str) -> dict[str, str]: filename = Path(relative_path).name return { "id": relative_path, "name": filename, "path": relative_path, "url": track_url(relative_path), "folder": str(Path(relative_path).parent).replace(".", "").strip("/"), } def find_cover_for_directory(root: Path) -> str | None: candidates = ( "cover.jpg", "cover.jpeg", "cover.png", "cover.webp", "folder.jpg", "folder.jpeg", "folder.png", "folder.webp", ) for candidate in candidates: path = root / candidate if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS: return path.relative_to(MUSIC_DIR).as_posix() for child in sorted(root.iterdir(), key=lambda item: item.name.lower()): if child.is_file() and child.suffix.lower() in IMAGE_EXTENSIONS: return child.relative_to(MUSIC_DIR).as_posix() return None def build_tree(root: Path) -> dict[str, Any]: rel_root = root.relative_to(MUSIC_DIR) if root != MUSIC_DIR else Path(".") cover_path = find_cover_for_directory(root) node = { "name": "音乐库" if root == MUSIC_DIR else root.name, "path": "" if rel_root == Path(".") else rel_root.as_posix(), "cover": f"/api/cover/{cover_path}" if cover_path else None, "folders": [], "tracks": [], } for child in sorted(root.iterdir(), key=lambda item: (item.is_file(), item.name.lower())): if child.is_dir(): node["folders"].append(build_tree(child)) elif is_supported_file(child): node["tracks"].append(build_track(child.relative_to(MUSIC_DIR).as_posix())) return node def collect_tracks(root: Path) -> list[dict[str, str]]: tracks: list[dict[str, str]] = [] for path in sorted(root.rglob("*")): if is_supported_file(path): tracks.append(build_track(path.relative_to(MUSIC_DIR).as_posix())) return tracks @app.on_event("startup") def startup_event() -> None: ensure_storage() @app.get("/", response_class=HTMLResponse) def index(request: Request) -> HTMLResponse: return templates.TemplateResponse("index.html", {"request": request}) @app.get("/api/library") def library() -> JSONResponse: ensure_storage() return JSONResponse( { "tree": build_tree(MUSIC_DIR), "all_tracks": collect_tracks(MUSIC_DIR), "playlists": load_playlists(), } ) @app.get("/api/stream/{file_path:path}") def stream_file(file_path: str) -> FileResponse: file = safe_music_path(file_path) if not file.exists() or not file.is_file(): raise HTTPException(status_code=404, detail="File not found") media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream" return FileResponse( file, media_type=media_type, filename=file.name, headers={"Cache-Control": "private, max-age=31536000, immutable"}, ) @app.get("/api/cover/{file_path:path}") def cover_file(file_path: str) -> FileResponse: file = safe_music_path(file_path) if not file.exists() or not file.is_file() or file.suffix.lower() not in IMAGE_EXTENSIONS: raise HTTPException(status_code=404, detail="Cover not found") media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream" return FileResponse(file, media_type=media_type, filename=file.name) @app.post("/api/upload") async def upload_files( files: list[UploadFile] = File(...), target_dir: str = Form(default=""), ) -> JSONResponse: destination = safe_music_path(target_dir) destination.mkdir(parents=True, exist_ok=True) saved: list[str] = [] for upload in files: suffix = Path(upload.filename or "").suffix.lower() if suffix not in SUPPORTED_EXTENSIONS: continue filename = Path(upload.filename or f"upload-{uuid.uuid4().hex}").name file_path = destination / filename with file_path.open("wb") as buffer: shutil.copyfileobj(upload.file, buffer) saved.append(file_path.relative_to(MUSIC_DIR).as_posix()) return JSONResponse({"saved": saved}) @app.post("/api/folder") def create_folder(payload: FolderCreateRequest) -> JSONResponse: folder = safe_music_path(payload.path) folder.mkdir(parents=True, exist_ok=True) return JSONResponse({"created": folder.relative_to(MUSIC_DIR).as_posix()}) @app.post("/api/move") def move_file(payload: MoveRequest) -> JSONResponse: source = safe_music_path(payload.source) destination_dir = safe_music_path(payload.destination_dir) if not source.exists(): raise HTTPException(status_code=404, detail="Source not found") if not destination_dir.exists(): destination_dir.mkdir(parents=True, exist_ok=True) destination = destination_dir / source.name shutil.move(str(source), str(destination)) return JSONResponse({"moved": destination.relative_to(MUSIC_DIR).as_posix()}) @app.post("/api/playlists") def create_playlist(payload: PlaylistCreateRequest) -> JSONResponse: playlists = load_playlists() playlist = { "id": uuid.uuid4().hex, "name": payload.name.strip() or "未命名播放列表", "tracks": payload.tracks, } playlists.append(playlist) save_playlists(playlists) return JSONResponse(playlist) @app.put("/api/playlists/{playlist_id}") def update_playlist(playlist_id: str, payload: PlaylistUpdateRequest) -> JSONResponse: playlists = load_playlists() for playlist in playlists: if playlist["id"] == playlist_id: playlist["tracks"] = payload.tracks save_playlists(playlists) return JSONResponse(playlist) raise HTTPException(status_code=404, detail="Playlist not found") @app.delete("/api/playlists/{playlist_id}") def delete_playlist(playlist_id: str) -> JSONResponse: playlists = load_playlists() filtered = [playlist for playlist in playlists if playlist["id"] != playlist_id] if len(filtered) == len(playlists): raise HTTPException(status_code=404, detail="Playlist not found") save_playlists(filtered) return JSONResponse({"deleted": playlist_id})