| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- from __future__ import annotations
- import json
- import mimetypes
- import shutil
- import uuid
- from pathlib import Path
- from typing import Any
- from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
- from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
- from fastapi.staticfiles import StaticFiles
- from fastapi.templating import Jinja2Templates
- from pydantic import BaseModel
- BASE_DIR = Path(__file__).resolve().parent.parent
- MUSIC_DIR = BASE_DIR / "mp3file"
- PLAYLISTS_FILE = BASE_DIR / "playlists.json"
- SUPPORTED_EXTENSIONS = {
- ".mp3",
- ".wav",
- ".flac",
- ".m3u",
- ".m3u8",
- ".ogg",
- ".aac",
- ".wma",
- ".opus",
- ".oga",
- ".mp4",
- ".m4a",
- ".webm",
- }
- IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
- app = FastAPI(title="MusicWeb", version="1.0.0")
- app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
- templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
- class FolderCreateRequest(BaseModel):
- path: str
- class MoveRequest(BaseModel):
- source: str
- destination_dir: str
- class PlaylistCreateRequest(BaseModel):
- name: str
- tracks: list[str]
- class PlaylistUpdateRequest(BaseModel):
- tracks: list[str]
- def ensure_storage() -> None:
- MUSIC_DIR.mkdir(parents=True, exist_ok=True)
- if not PLAYLISTS_FILE.exists():
- PLAYLISTS_FILE.write_text("[]", encoding="utf-8")
- def safe_music_path(relative_path: str) -> Path:
- candidate = (MUSIC_DIR / relative_path).resolve()
- if candidate != MUSIC_DIR.resolve() and MUSIC_DIR.resolve() not in candidate.parents:
- raise HTTPException(status_code=400, detail="Invalid path")
- return candidate
- def load_playlists() -> list[dict[str, Any]]:
- ensure_storage()
- return json.loads(PLAYLISTS_FILE.read_text(encoding="utf-8"))
- def save_playlists(playlists: list[dict[str, Any]]) -> None:
- PLAYLISTS_FILE.write_text(
- json.dumps(playlists, ensure_ascii=False, indent=2), encoding="utf-8"
- )
- def is_supported_file(path: Path) -> bool:
- return path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS
- def track_url(relative_path: str) -> str:
- return f"/api/stream/{relative_path}"
- def build_track(relative_path: str) -> dict[str, str]:
- filename = Path(relative_path).name
- return {
- "id": relative_path,
- "name": filename,
- "path": relative_path,
- "url": track_url(relative_path),
- "folder": str(Path(relative_path).parent).replace(".", "").strip("/"),
- }
- def find_cover_for_directory(root: Path) -> str | None:
- candidates = (
- "cover.jpg",
- "cover.jpeg",
- "cover.png",
- "cover.webp",
- "folder.jpg",
- "folder.jpeg",
- "folder.png",
- "folder.webp",
- )
- for candidate in candidates:
- path = root / candidate
- if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS:
- return path.relative_to(MUSIC_DIR).as_posix()
- for child in sorted(root.iterdir(), key=lambda item: item.name.lower()):
- if child.is_file() and child.suffix.lower() in IMAGE_EXTENSIONS:
- return child.relative_to(MUSIC_DIR).as_posix()
- return None
- def build_tree(root: Path) -> dict[str, Any]:
- rel_root = root.relative_to(MUSIC_DIR) if root != MUSIC_DIR else Path(".")
- cover_path = find_cover_for_directory(root)
- node = {
- "name": "音乐库" if root == MUSIC_DIR else root.name,
- "path": "" if rel_root == Path(".") else rel_root.as_posix(),
- "cover": f"/api/cover/{cover_path}" if cover_path else None,
- "folders": [],
- "tracks": [],
- }
- for child in sorted(root.iterdir(), key=lambda item: (item.is_file(), item.name.lower())):
- if child.is_dir():
- node["folders"].append(build_tree(child))
- elif is_supported_file(child):
- node["tracks"].append(build_track(child.relative_to(MUSIC_DIR).as_posix()))
- return node
- def collect_tracks(root: Path) -> list[dict[str, str]]:
- tracks: list[dict[str, str]] = []
- for path in sorted(root.rglob("*")):
- if is_supported_file(path):
- tracks.append(build_track(path.relative_to(MUSIC_DIR).as_posix()))
- return tracks
- @app.on_event("startup")
- def startup_event() -> None:
- ensure_storage()
- @app.get("/", response_class=HTMLResponse)
- def index(request: Request) -> HTMLResponse:
- return templates.TemplateResponse("index.html", {"request": request})
- @app.get("/api/library")
- def library() -> JSONResponse:
- ensure_storage()
- return JSONResponse(
- {
- "tree": build_tree(MUSIC_DIR),
- "all_tracks": collect_tracks(MUSIC_DIR),
- "playlists": load_playlists(),
- }
- )
- @app.get("/api/stream/{file_path:path}")
- def stream_file(file_path: str) -> FileResponse:
- file = safe_music_path(file_path)
- if not file.exists() or not file.is_file():
- raise HTTPException(status_code=404, detail="File not found")
- media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
- return FileResponse(file, media_type=media_type, filename=file.name)
- @app.get("/api/cover/{file_path:path}")
- def cover_file(file_path: str) -> FileResponse:
- file = safe_music_path(file_path)
- if not file.exists() or not file.is_file() or file.suffix.lower() not in IMAGE_EXTENSIONS:
- raise HTTPException(status_code=404, detail="Cover not found")
- media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
- return FileResponse(file, media_type=media_type, filename=file.name)
- @app.post("/api/upload")
- async def upload_files(
- files: list[UploadFile] = File(...),
- target_dir: str = Form(default=""),
- ) -> JSONResponse:
- destination = safe_music_path(target_dir)
- destination.mkdir(parents=True, exist_ok=True)
- saved: list[str] = []
- for upload in files:
- suffix = Path(upload.filename or "").suffix.lower()
- if suffix not in SUPPORTED_EXTENSIONS:
- continue
- filename = Path(upload.filename or f"upload-{uuid.uuid4().hex}").name
- file_path = destination / filename
- with file_path.open("wb") as buffer:
- shutil.copyfileobj(upload.file, buffer)
- saved.append(file_path.relative_to(MUSIC_DIR).as_posix())
- return JSONResponse({"saved": saved})
- @app.post("/api/folder")
- def create_folder(payload: FolderCreateRequest) -> JSONResponse:
- folder = safe_music_path(payload.path)
- folder.mkdir(parents=True, exist_ok=True)
- return JSONResponse({"created": folder.relative_to(MUSIC_DIR).as_posix()})
- @app.post("/api/move")
- def move_file(payload: MoveRequest) -> JSONResponse:
- source = safe_music_path(payload.source)
- destination_dir = safe_music_path(payload.destination_dir)
- if not source.exists():
- raise HTTPException(status_code=404, detail="Source not found")
- if not destination_dir.exists():
- destination_dir.mkdir(parents=True, exist_ok=True)
- destination = destination_dir / source.name
- shutil.move(str(source), str(destination))
- return JSONResponse({"moved": destination.relative_to(MUSIC_DIR).as_posix()})
- @app.post("/api/playlists")
- def create_playlist(payload: PlaylistCreateRequest) -> JSONResponse:
- playlists = load_playlists()
- playlist = {
- "id": uuid.uuid4().hex,
- "name": payload.name.strip() or "未命名播放列表",
- "tracks": payload.tracks,
- }
- playlists.append(playlist)
- save_playlists(playlists)
- return JSONResponse(playlist)
- @app.put("/api/playlists/{playlist_id}")
- def update_playlist(playlist_id: str, payload: PlaylistUpdateRequest) -> JSONResponse:
- playlists = load_playlists()
- for playlist in playlists:
- if playlist["id"] == playlist_id:
- playlist["tracks"] = payload.tracks
- save_playlists(playlists)
- return JSONResponse(playlist)
- raise HTTPException(status_code=404, detail="Playlist not found")
- @app.delete("/api/playlists/{playlist_id}")
- def delete_playlist(playlist_id: str) -> JSONResponse:
- playlists = load_playlists()
- filtered = [playlist for playlist in playlists if playlist["id"] != playlist_id]
- if len(filtered) == len(playlists):
- raise HTTPException(status_code=404, detail="Playlist not found")
- save_playlists(filtered)
- return JSONResponse({"deleted": playlist_id})
|