| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476 |
- from __future__ import annotations
- import json
- import mimetypes
- import shutil
- import uuid
- import base64
- import urllib.error
- import urllib.parse
- import urllib.request
- from pathlib import Path
- from typing import Any
- from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
- from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
- from fastapi.staticfiles import StaticFiles
- from fastapi.templating import Jinja2Templates
- from pydantic import BaseModel
- BASE_DIR = Path(__file__).resolve().parent.parent
- LOCAL_MUSIC_DIR = BASE_DIR / "mp3file"
- CLOUD_MUSIC_DIR = Path("/mnt/baiducloud/百度网盘/mp3file")
- CLOUD_WEBDAV_BASE = "http://110.42.102.94:5244/dav"
- CLOUD_ALIST_BASE = "http://110.42.102.94:5244"
- CLOUD_WEBDAV_USER = "sequoia00"
- CLOUD_WEBDAV_PASSWORD = "792199bb"
- MUSIC_ROOTS: list[tuple[str, Path, str]] = [
- ("", LOCAL_MUSIC_DIR, "本地音乐"),
- ("cloud", CLOUD_MUSIC_DIR, "百度网盘"),
- ]
- PLAYLISTS_FILE = BASE_DIR / "playlists.json"
- SUPPORTED_EXTENSIONS = {
- ".mp3",
- ".wav",
- ".flac",
- ".m3u",
- ".m3u8",
- ".ogg",
- ".aac",
- ".wma",
- ".opus",
- ".oga",
- ".mp4",
- ".m4a",
- ".webm",
- }
- IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
- app = FastAPI(title="MusicWeb", version="1.0.0")
- app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
- templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
- class FolderCreateRequest(BaseModel):
- path: str
- class MoveRequest(BaseModel):
- source: str
- destination_dir: str
- class PlaylistCreateRequest(BaseModel):
- name: str
- tracks: list[str]
- class PlaylistUpdateRequest(BaseModel):
- tracks: list[str]
- def ensure_storage() -> None:
- LOCAL_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
- CLOUD_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
- if not PLAYLISTS_FILE.exists():
- PLAYLISTS_FILE.write_text("[]", encoding="utf-8")
- def safe_music_path(relative_path: str) -> Path:
- prefix, _, inner_path = relative_path.partition("/")
- roots = {key: root.resolve() for key, root, _ in MUSIC_ROOTS}
- if prefix in roots:
- candidate = (roots[prefix] / inner_path).resolve()
- if candidate != roots[prefix] and roots[prefix] not in candidate.parents:
- raise HTTPException(status_code=400, detail="Invalid path")
- return candidate
- candidate = (LOCAL_MUSIC_DIR / relative_path).resolve()
- local_root = LOCAL_MUSIC_DIR.resolve()
- if candidate != local_root and local_root not in candidate.parents:
- raise HTTPException(status_code=400, detail="Invalid path")
- return candidate
- def load_playlists() -> list[dict[str, Any]]:
- ensure_storage()
- return json.loads(PLAYLISTS_FILE.read_text(encoding="utf-8"))
- def save_playlists(playlists: list[dict[str, Any]]) -> None:
- PLAYLISTS_FILE.write_text(
- json.dumps(playlists, ensure_ascii=False, indent=2), encoding="utf-8"
- )
- def is_under_root(path: Path, root: Path) -> bool:
- resolved_path = path.resolve()
- resolved_root = root.resolve()
- return resolved_path == resolved_root or resolved_root in resolved_path.parents
- def track_path_for(root: Path, path: Path, prefix: str = "") -> str:
- relative = path.relative_to(root).as_posix()
- return f"{prefix}/{relative}" if prefix else relative
- def track_path_from_abs(path: Path) -> str:
- if is_under_root(path, LOCAL_MUSIC_DIR):
- return path.relative_to(LOCAL_MUSIC_DIR).as_posix()
- if is_under_root(path, CLOUD_MUSIC_DIR):
- return f"cloud/{path.relative_to(CLOUD_MUSIC_DIR).as_posix()}"
- return path.name
- def cloud_webdav_url(relative_path: str) -> str:
- inner_path = relative_path.removeprefix("cloud/").lstrip("/")
- encoded = "/".join(urllib.parse.quote(part) for part in inner_path.split("/") if part)
- base = CLOUD_WEBDAV_BASE.rstrip("/")
- return f"{base}/{encoded}" if encoded else base
- def cloud_alist_api_path(relative_path: str) -> str:
- inner_path = relative_path.removeprefix("cloud/").lstrip("/")
- return f"/百度网盘/mp3file/{inner_path}" if inner_path else "/百度网盘/mp3file"
- def alist_login_token() -> str:
- payload = json.dumps(
- {
- "username": CLOUD_WEBDAV_USER,
- "password": CLOUD_WEBDAV_PASSWORD,
- }
- ).encode("utf-8")
- request = urllib.request.Request(
- f"{CLOUD_ALIST_BASE.rstrip('/')}/api/auth/login",
- data=payload,
- headers={"Content-Type": "application/json", "User-Agent": "MusicWebPlayer/1.0"},
- )
- try:
- with urllib.request.urlopen(request, timeout=20) as response:
- result = json.loads(response.read().decode("utf-8"))
- except urllib.error.HTTPError as error:
- raise HTTPException(status_code=error.code, detail=error.reason)
- except urllib.error.URLError as error:
- raise HTTPException(status_code=502, detail=str(error.reason))
- token = result.get("data", {}).get("token")
- if not token:
- raise HTTPException(status_code=502, detail="AList login failed")
- return token
- def cloud_raw_url(relative_path: str) -> str:
- payload = json.dumps({"path": cloud_alist_api_path(relative_path), "password": ""}).encode("utf-8")
- request = urllib.request.Request(
- f"{CLOUD_ALIST_BASE.rstrip('/')}/api/fs/get",
- data=payload,
- headers={
- "Content-Type": "application/json",
- "Authorization": alist_login_token(),
- "User-Agent": "MusicWebPlayer/1.0",
- },
- )
- try:
- with urllib.request.urlopen(request, timeout=20) as response:
- result = json.loads(response.read().decode("utf-8"))
- except urllib.error.HTTPError as error:
- raise HTTPException(status_code=error.code, detail=error.reason)
- except urllib.error.URLError as error:
- raise HTTPException(status_code=502, detail=str(error.reason))
- data = result.get("data", {})
- sign = data.get("sign")
- path = data.get("path")
- if not sign or not path:
- raise HTTPException(status_code=404, detail="Cloud signed url not found")
- encoded_path = "/".join(urllib.parse.quote(part) for part in path.lstrip("/").split("/"))
- quoted_sign = urllib.parse.quote(sign, safe="")
- return f"{CLOUD_ALIST_BASE.rstrip('/')}/d/{urllib.parse.quote('百度网盘')}/{encoded_path}?sign={quoted_sign}"
- def is_supported_file(path: Path) -> bool:
- return path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS
- def track_url(relative_path: str) -> str:
- return f"/api/stream/{relative_path}"
- def build_track(relative_path: str) -> dict[str, str]:
- filename = Path(relative_path).name
- return {
- "id": relative_path,
- "name": filename,
- "path": relative_path,
- "url": track_url(relative_path),
- "folder": str(Path(relative_path).parent).replace(".", "").strip("/"),
- }
- def find_cover_for_directory(root: Path, prefix: str = "") -> str | None:
- candidates = (
- "cover.jpg",
- "cover.jpeg",
- "cover.png",
- "cover.webp",
- "folder.jpg",
- "folder.jpeg",
- "folder.png",
- "folder.webp",
- )
- for candidate in candidates:
- path = root / candidate
- if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS:
- return track_path_for(root, path, prefix)
- for child in sorted(root.iterdir(), key=lambda item: item.name.lower()):
- if child.is_file() and child.suffix.lower() in IMAGE_EXTENSIONS:
- return track_path_for(root, child, prefix)
- return None
- def build_tree(root: Path, prefix: str = "", label: str = "音乐库") -> dict[str, Any]:
- cover_path = find_cover_for_directory(root, prefix)
- node = {
- "name": label,
- "path": prefix,
- "cover": f"/api/cover/{cover_path}" if cover_path else None,
- "folders": [],
- "tracks": [],
- }
- for child in sorted(root.iterdir(), key=lambda item: (item.is_file(), item.name.lower())):
- if child.is_dir():
- child_rel = child.relative_to(root).as_posix()
- child_prefix = f"{prefix}/{child_rel}" if prefix else child_rel
- node["folders"].append(build_tree(child, child_prefix, child.name))
- elif is_supported_file(child):
- node["tracks"].append(build_track(track_path_for(root, child, prefix)))
- return node
- def collect_tracks(root: Path, prefix: str = "") -> list[dict[str, str]]:
- tracks: list[dict[str, str]] = []
- for path in sorted(root.rglob("*")):
- if is_supported_file(path):
- tracks.append(build_track(track_path_for(root, path, prefix)))
- return tracks
- def root_entry(source: str) -> tuple[str, Path, str]:
- for prefix, root, label in MUSIC_ROOTS:
- if prefix == source:
- return prefix, root, label
- raise HTTPException(status_code=404, detail="Library source not found")
- def build_library_payload(source: str | None = None) -> dict[str, Any]:
- ensure_storage()
- entries = MUSIC_ROOTS if source is None else [root_entry(source)]
- tree = {
- "name": "音乐库",
- "path": "",
- "cover": None,
- "folders": [
- build_tree(root, prefix, label)
- for prefix, root, label in entries
- if root.exists()
- ],
- "tracks": [],
- }
- all_tracks: list[dict[str, str]] = []
- for prefix, root, _ in entries:
- if root.exists():
- all_tracks.extend(collect_tracks(root, prefix))
- return {
- "tree": tree,
- "all_tracks": all_tracks,
- "playlists": load_playlists(),
- }
- @app.on_event("startup")
- def startup_event() -> None:
- ensure_storage()
- @app.get("/", response_class=HTMLResponse)
- def index(request: Request) -> HTMLResponse:
- return templates.TemplateResponse("index.html", {"request": request})
- @app.get("/api/library")
- def library() -> JSONResponse:
- return JSONResponse(build_library_payload())
- @app.get("/api/library/{source}")
- def library_by_source(source: str) -> JSONResponse:
- normalized = "" if source == "local" else source
- return JSONResponse(build_library_payload(normalized))
- @app.get("/api/stream/{file_path:path}")
- def stream_file(request: Request, file_path: str) -> StreamingResponse:
- if file_path.startswith("cloud/"):
- return proxy_cloud_stream(cloud_raw_url(file_path), request.headers.get("range"))
- file = safe_music_path(file_path)
- if not file.exists() or not file.is_file():
- raise HTTPException(status_code=404, detail="File not found")
- media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
- def file_iterator() -> Any:
- with file.open("rb") as handle:
- while chunk := handle.read(1024 * 1024):
- if not chunk:
- break
- yield chunk
- return StreamingResponse(
- file_iterator(),
- media_type=media_type,
- headers={"Cache-Control": "private, no-cache"},
- )
- @app.get("/api/cloud-url/{file_path:path}")
- def get_cloud_url(file_path: str) -> JSONResponse:
- if not file_path.startswith("cloud/"):
- raise HTTPException(status_code=400, detail="Not a cloud track")
- return JSONResponse({"url": cloud_raw_url(file_path)})
- def proxy_cloud_stream(remote_url: str, range_header: str | None = None) -> StreamingResponse:
- auth = base64.b64encode(f"{CLOUD_WEBDAV_USER}:{CLOUD_WEBDAV_PASSWORD}".encode("utf-8")).decode("ascii")
- headers = {
- "Authorization": f"Basic {auth}",
- "User-Agent": "MusicWebPlayer/1.0",
- }
- if range_header:
- headers["Range"] = range_header
- request = urllib.request.Request(
- remote_url,
- headers=headers,
- )
- try:
- response = urllib.request.urlopen(request, timeout=30)
- except urllib.error.HTTPError as error:
- raise HTTPException(status_code=error.code, detail=error.reason)
- except urllib.error.URLError as error:
- raise HTTPException(status_code=502, detail=str(error.reason))
- media_type = mimetypes.guess_type(urllib.parse.urlparse(remote_url).path)[0] or response.headers.get_content_type() or "audio/mpeg"
- passthrough_headers = {
- "Cache-Control": "private, no-cache",
- "Accept-Ranges": response.headers.get("Accept-Ranges") or "bytes",
- "Content-Type": media_type,
- }
- for key in ("ETag", "Last-Modified", "Content-Range"):
- value = response.headers.get(key)
- if value:
- passthrough_headers[key] = value
- def remote_iterator() -> Any:
- with response:
- while chunk := response.read(1024 * 1024):
- yield chunk
- return StreamingResponse(
- remote_iterator(),
- status_code=getattr(response, "status", 200),
- media_type=media_type,
- headers=passthrough_headers,
- )
- @app.get("/api/cover/{file_path:path}")
- def cover_file(request: Request, file_path: str):
- if file_path.startswith("cloud/"):
- if Path(file_path).suffix.lower() not in IMAGE_EXTENSIONS:
- raise HTTPException(status_code=404, detail="Cover not found")
- return proxy_cloud_stream(cloud_raw_url(file_path), request.headers.get("range"))
- file = safe_music_path(file_path)
- if not file.exists() or not file.is_file() or file.suffix.lower() not in IMAGE_EXTENSIONS:
- raise HTTPException(status_code=404, detail="Cover not found")
- media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
- return FileResponse(file, media_type=media_type, filename=file.name)
- @app.post("/api/upload")
- async def upload_files(
- files: list[UploadFile] = File(...),
- target_dir: str = Form(default=""),
- ) -> JSONResponse:
- destination = safe_music_path(target_dir)
- if not str(destination).startswith(str(LOCAL_MUSIC_DIR.resolve())):
- raise HTTPException(status_code=400, detail="Upload destination must be local music dir")
- destination.mkdir(parents=True, exist_ok=True)
- saved: list[str] = []
- for upload in files:
- suffix = Path(upload.filename or "").suffix.lower()
- if suffix not in SUPPORTED_EXTENSIONS:
- continue
- filename = Path(upload.filename or f"upload-{uuid.uuid4().hex}").name
- file_path = destination / filename
- with file_path.open("wb") as buffer:
- shutil.copyfileobj(upload.file, buffer)
- saved.append(track_path_from_abs(file_path))
- return JSONResponse({"saved": saved})
- @app.post("/api/folder")
- def create_folder(payload: FolderCreateRequest) -> JSONResponse:
- folder = safe_music_path(payload.path)
- if not str(folder).startswith(str(LOCAL_MUSIC_DIR.resolve())):
- raise HTTPException(status_code=400, detail="Folder must be created in local music dir")
- folder.mkdir(parents=True, exist_ok=True)
- return JSONResponse({"created": track_path_from_abs(folder)})
- @app.post("/api/move")
- def move_file(payload: MoveRequest) -> JSONResponse:
- source = safe_music_path(payload.source)
- destination_dir = safe_music_path(payload.destination_dir)
- if not source.exists():
- raise HTTPException(status_code=404, detail="Source not found")
- if not destination_dir.exists():
- destination_dir.mkdir(parents=True, exist_ok=True)
- destination = destination_dir / source.name
- shutil.move(str(source), str(destination))
- return JSONResponse({"moved": track_path_from_abs(destination)})
- @app.post("/api/playlists")
- def create_playlist(payload: PlaylistCreateRequest) -> JSONResponse:
- playlists = load_playlists()
- playlist = {
- "id": uuid.uuid4().hex,
- "name": payload.name.strip() or "未命名播放列表",
- "tracks": payload.tracks,
- }
- playlists.append(playlist)
- save_playlists(playlists)
- return JSONResponse(playlist)
- @app.put("/api/playlists/{playlist_id}")
- def update_playlist(playlist_id: str, payload: PlaylistUpdateRequest) -> JSONResponse:
- playlists = load_playlists()
- for playlist in playlists:
- if playlist["id"] == playlist_id:
- playlist["tracks"] = payload.tracks
- save_playlists(playlists)
- return JSONResponse(playlist)
- raise HTTPException(status_code=404, detail="Playlist not found")
- @app.delete("/api/playlists/{playlist_id}")
- def delete_playlist(playlist_id: str) -> JSONResponse:
- playlists = load_playlists()
- filtered = [playlist for playlist in playlists if playlist["id"] != playlist_id]
- if len(filtered) == len(playlists):
- raise HTTPException(status_code=404, detail="Playlist not found")
- save_playlists(filtered)
- return JSONResponse({"deleted": playlist_id})
|