| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640 |
- from __future__ import annotations
- import json
- import mimetypes
- import shutil
- import threading
- import uuid
- import base64
- import urllib.error
- import urllib.parse
- import urllib.request
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Any
- from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
- from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
- from fastapi.staticfiles import StaticFiles
- from fastapi.templating import Jinja2Templates
- from pydantic import BaseModel
- BASE_DIR = Path(__file__).resolve().parent.parent
- LOCAL_MUSIC_DIR = BASE_DIR / "mp3file"
- CLOUD_MUSIC_DIR = Path("/mnt/baiducloud/百度网盘/mp3file")
- CLOUD_WEBDAV_BASE = "http://110.42.102.94:5244/dav"
- CLOUD_ALIST_BASE = "http://110.42.102.94:5244"
- CLOUD_WEBDAV_USER = "sequoia00"
- CLOUD_WEBDAV_PASSWORD = "792199bb"
- MUSIC_ROOTS: list[tuple[str, Path, str]] = [
- ("", LOCAL_MUSIC_DIR, "本地音乐"),
- ("cloud", CLOUD_MUSIC_DIR, "百度网盘"),
- ]
- PLAYLISTS_FILE = BASE_DIR / "playlists.json"
- CACHE_DIR = BASE_DIR / ".cache"
- CLOUD_LIBRARY_CACHE_FILE = CACHE_DIR / "cloud_library.json"
- CLOUD_COVER_CACHE_DIR = CACHE_DIR / "cloud_covers"
- SUPPORTED_EXTENSIONS = {
- ".mp3",
- ".wav",
- ".flac",
- ".m3u",
- ".m3u8",
- ".ogg",
- ".aac",
- ".wma",
- ".opus",
- ".oga",
- ".mp4",
- ".m4a",
- ".webm",
- }
- IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
- app = FastAPI(title="MusicWeb", version="1.0.0")
- app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
- templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
- CLOUD_REFRESH_STATE = {"running": False}
- CLOUD_REFRESH_LOCK = threading.Lock()
- class FolderCreateRequest(BaseModel):
- path: str
- class MoveRequest(BaseModel):
- source: str
- destination_dir: str
- class PlaylistCreateRequest(BaseModel):
- name: str
- tracks: list[str]
- class PlaylistUpdateRequest(BaseModel):
- tracks: list[str]
- def ensure_storage() -> None:
- LOCAL_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
- CLOUD_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
- CACHE_DIR.mkdir(parents=True, exist_ok=True)
- CLOUD_COVER_CACHE_DIR.mkdir(parents=True, exist_ok=True)
- if not PLAYLISTS_FILE.exists():
- PLAYLISTS_FILE.write_text("[]", encoding="utf-8")
- def safe_music_path(relative_path: str) -> Path:
- prefix, _, inner_path = relative_path.partition("/")
- roots = {key: root.resolve() for key, root, _ in MUSIC_ROOTS}
- if prefix in roots:
- candidate = (roots[prefix] / inner_path).resolve()
- if candidate != roots[prefix] and roots[prefix] not in candidate.parents:
- raise HTTPException(status_code=400, detail="Invalid path")
- return candidate
- candidate = (LOCAL_MUSIC_DIR / relative_path).resolve()
- local_root = LOCAL_MUSIC_DIR.resolve()
- if candidate != local_root and local_root not in candidate.parents:
- raise HTTPException(status_code=400, detail="Invalid path")
- return candidate
- def load_playlists() -> list[dict[str, Any]]:
- ensure_storage()
- return json.loads(PLAYLISTS_FILE.read_text(encoding="utf-8"))
- def save_playlists(playlists: list[dict[str, Any]]) -> None:
- PLAYLISTS_FILE.write_text(
- json.dumps(playlists, ensure_ascii=False, indent=2), encoding="utf-8"
- )
- def is_under_root(path: Path, root: Path) -> bool:
- resolved_path = path.resolve()
- resolved_root = root.resolve()
- return resolved_path == resolved_root or resolved_root in resolved_path.parents
- def track_path_for(root: Path, path: Path, prefix: str = "") -> str:
- relative = path.relative_to(root).as_posix()
- return f"{prefix}/{relative}" if prefix else relative
- def track_path_from_abs(path: Path) -> str:
- if is_under_root(path, LOCAL_MUSIC_DIR):
- return path.relative_to(LOCAL_MUSIC_DIR).as_posix()
- if is_under_root(path, CLOUD_MUSIC_DIR):
- return f"cloud/{path.relative_to(CLOUD_MUSIC_DIR).as_posix()}"
- return path.name
- def path_is_under(path: Path, root: Path) -> bool:
- try:
- resolved_path = path.resolve()
- resolved_root = root.resolve()
- except OSError:
- return False
- return resolved_path == resolved_root or resolved_root in resolved_path.parents
- def trigger_cloud_cache_refresh_if_needed(*paths: Path) -> None:
- if any(path_is_under(path, CLOUD_MUSIC_DIR) for path in paths):
- start_cloud_library_refresh()
- def cloud_cover_cache_path(relative_path: str) -> Path:
- inner_path = relative_path.removeprefix("cloud/").lstrip("/")
- target = (CLOUD_COVER_CACHE_DIR / inner_path).resolve()
- cache_root = CLOUD_COVER_CACHE_DIR.resolve()
- if target != cache_root and cache_root not in target.parents:
- raise HTTPException(status_code=400, detail="Invalid cover path")
- return target
- def cloud_webdav_url(relative_path: str) -> str:
- inner_path = relative_path.removeprefix("cloud/").lstrip("/")
- encoded = "/".join(urllib.parse.quote(part) for part in inner_path.split("/") if part)
- base = CLOUD_WEBDAV_BASE.rstrip("/")
- return f"{base}/{encoded}" if encoded else base
- def cloud_alist_api_path(relative_path: str) -> str:
- inner_path = relative_path.removeprefix("cloud/").lstrip("/")
- return f"/百度网盘/mp3file/{inner_path}" if inner_path else "/百度网盘/mp3file"
- def alist_login_token() -> str:
- payload = json.dumps(
- {
- "username": CLOUD_WEBDAV_USER,
- "password": CLOUD_WEBDAV_PASSWORD,
- }
- ).encode("utf-8")
- request = urllib.request.Request(
- f"{CLOUD_ALIST_BASE.rstrip('/')}/api/auth/login",
- data=payload,
- headers={"Content-Type": "application/json", "User-Agent": "MusicWebPlayer/1.0"},
- )
- try:
- with urllib.request.urlopen(request, timeout=20) as response:
- result = json.loads(response.read().decode("utf-8"))
- except urllib.error.HTTPError as error:
- raise HTTPException(status_code=error.code, detail=error.reason)
- except urllib.error.URLError as error:
- raise HTTPException(status_code=502, detail=str(error.reason))
- token = result.get("data", {}).get("token")
- if not token:
- raise HTTPException(status_code=502, detail="AList login failed")
- return token
- def cloud_raw_url(relative_path: str) -> str:
- payload = json.dumps({"path": cloud_alist_api_path(relative_path), "password": ""}).encode("utf-8")
- request = urllib.request.Request(
- f"{CLOUD_ALIST_BASE.rstrip('/')}/api/fs/get",
- data=payload,
- headers={
- "Content-Type": "application/json",
- "Authorization": alist_login_token(),
- "User-Agent": "MusicWebPlayer/1.0",
- },
- )
- try:
- with urllib.request.urlopen(request, timeout=20) as response:
- result = json.loads(response.read().decode("utf-8"))
- except urllib.error.HTTPError as error:
- raise HTTPException(status_code=error.code, detail=error.reason)
- except urllib.error.URLError as error:
- raise HTTPException(status_code=502, detail=str(error.reason))
- data = result.get("data", {})
- sign = data.get("sign")
- path = data.get("path")
- if not sign or not path:
- raise HTTPException(status_code=404, detail="Cloud signed url not found")
- encoded_path = "/".join(urllib.parse.quote(part) for part in path.lstrip("/").split("/"))
- quoted_sign = urllib.parse.quote(sign, safe="")
- return f"{CLOUD_ALIST_BASE.rstrip('/')}/d/{urllib.parse.quote('百度网盘')}/{encoded_path}?sign={quoted_sign}"
- def is_supported_file(path: Path) -> bool:
- return path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS
- def track_url(relative_path: str) -> str:
- return f"/api/stream/{relative_path}"
- def build_track(relative_path: str) -> dict[str, str]:
- filename = Path(relative_path).name
- return {
- "id": relative_path,
- "name": filename,
- "path": relative_path,
- "url": track_url(relative_path),
- "folder": str(Path(relative_path).parent).replace(".", "").strip("/"),
- }
- def find_cover_for_directory(root: Path, prefix: str = "") -> str | None:
- path = root / "cover.jpg"
- if path.is_file():
- return track_path_for(root, path, prefix)
- return None
- def sanitize_cover_value(cover: Any) -> str | None:
- if not isinstance(cover, str) or not cover:
- return None
- parsed = urllib.parse.urlparse(cover)
- path = parsed.path or cover
- return cover if path.lower().endswith("/cover.jpg") else None
- def sanitize_library_tree(node: dict[str, Any]) -> dict[str, Any]:
- node["cover"] = sanitize_cover_value(node.get("cover"))
- for child in node.get("folders", []):
- if isinstance(child, dict):
- sanitize_library_tree(child)
- return node
- def build_tree(root: Path, prefix: str = "", label: str = "音乐库") -> dict[str, Any]:
- cover_path = find_cover_for_directory(root, prefix)
- node = {
- "name": label,
- "path": prefix,
- "cover": f"/api/cover/{cover_path}" if cover_path else None,
- "folders": [],
- "tracks": [],
- }
- for child in sorted(root.iterdir(), key=lambda item: (item.is_file(), item.name.lower())):
- if child.is_dir():
- child_rel = child.relative_to(root).as_posix()
- child_prefix = f"{prefix}/{child_rel}" if prefix else child_rel
- node["folders"].append(build_tree(child, child_prefix, child.name))
- elif is_supported_file(child):
- node["tracks"].append(build_track(track_path_for(root, child, prefix)))
- return node
- def collect_tracks(root: Path, prefix: str = "") -> list[dict[str, str]]:
- tracks: list[dict[str, str]] = []
- for path in sorted(root.rglob("*")):
- if is_supported_file(path):
- tracks.append(build_track(track_path_for(root, path, prefix)))
- return tracks
- def root_entry(source: str) -> tuple[str, Path, str]:
- for prefix, root, label in MUSIC_ROOTS:
- if prefix == source:
- return prefix, root, label
- raise HTTPException(status_code=404, detail="Library source not found")
- def current_timestamp() -> str:
- return datetime.now(timezone.utc).isoformat()
- def build_library_payload(source: str | None = None) -> dict[str, Any]:
- ensure_storage()
- entries = MUSIC_ROOTS if source is None else [root_entry(source)]
- tree = {
- "name": "音乐库",
- "path": "",
- "cover": None,
- "folders": [
- build_tree(root, prefix, label)
- for prefix, root, label in entries
- if root.exists()
- ],
- "tracks": [],
- }
- all_tracks: list[dict[str, str]] = []
- for prefix, root, _ in entries:
- if root.exists():
- all_tracks.extend(collect_tracks(root, prefix))
- sanitize_library_tree(tree)
- return {
- "tree": tree,
- "all_tracks": all_tracks,
- "playlists": load_playlists(),
- }
- def read_cloud_library_cache() -> dict[str, Any] | None:
- if not CLOUD_LIBRARY_CACHE_FILE.exists():
- return None
- try:
- payload = json.loads(CLOUD_LIBRARY_CACHE_FILE.read_text(encoding="utf-8"))
- except (OSError, json.JSONDecodeError):
- return None
- if not isinstance(payload, dict):
- return None
- tree = payload.get("tree")
- if isinstance(tree, dict):
- sanitize_library_tree(tree)
- return payload
- def write_cloud_library_cache(payload: dict[str, Any]) -> dict[str, Any]:
- cached_payload = {
- **payload,
- "cache": {
- "updated_at": current_timestamp(),
- "is_cached": True,
- "refreshing": False,
- },
- }
- CLOUD_LIBRARY_CACHE_FILE.write_text(
- json.dumps(cached_payload, ensure_ascii=False, indent=2),
- encoding="utf-8",
- )
- return cached_payload
- def build_cloud_library_payload() -> dict[str, Any]:
- return build_library_payload("cloud")
- def cloud_library_payload_from_cache() -> dict[str, Any] | None:
- payload = read_cloud_library_cache()
- if not payload:
- return None
- cache_meta = payload.get("cache") if isinstance(payload.get("cache"), dict) else {}
- payload["cache"] = {
- "updated_at": cache_meta.get("updated_at"),
- "is_cached": True,
- "refreshing": CLOUD_REFRESH_STATE["running"],
- }
- return payload
- def refresh_cloud_library_cache_sync() -> dict[str, Any]:
- payload = build_cloud_library_payload()
- return write_cloud_library_cache(payload)
- def start_cloud_library_refresh() -> bool:
- with CLOUD_REFRESH_LOCK:
- if CLOUD_REFRESH_STATE["running"]:
- return False
- CLOUD_REFRESH_STATE["running"] = True
- def runner() -> None:
- try:
- refresh_cloud_library_cache_sync()
- finally:
- with CLOUD_REFRESH_LOCK:
- CLOUD_REFRESH_STATE["running"] = False
- threading.Thread(target=runner, daemon=True).start()
- return True
- @app.on_event("startup")
- def startup_event() -> None:
- ensure_storage()
- if not CLOUD_LIBRARY_CACHE_FILE.exists():
- start_cloud_library_refresh()
- @app.get("/", response_class=HTMLResponse)
- def index(request: Request) -> HTMLResponse:
- return templates.TemplateResponse("index.html", {"request": request})
- @app.get("/api/library")
- def library() -> JSONResponse:
- return JSONResponse(build_library_payload())
- @app.get("/api/library/{source}")
- def library_by_source(source: str) -> JSONResponse:
- normalized = "" if source == "local" else source
- if normalized == "cloud":
- cached = cloud_library_payload_from_cache()
- if cached:
- start_cloud_library_refresh()
- return JSONResponse(cached)
- payload = refresh_cloud_library_cache_sync()
- return JSONResponse(payload)
- return JSONResponse(build_library_payload(normalized))
- @app.get("/api/library/{source}/refresh")
- def refresh_library_by_source(source: str) -> JSONResponse:
- normalized = "" if source == "local" else source
- if normalized != "cloud":
- return JSONResponse(build_library_payload(normalized))
- started = start_cloud_library_refresh()
- cached = cloud_library_payload_from_cache()
- if cached:
- cached["cache"]["refreshing"] = True
- return JSONResponse({"started": started, "library": cached})
- payload = refresh_cloud_library_cache_sync()
- return JSONResponse({"started": started, "library": payload})
- @app.get("/api/stream/{file_path:path}")
- def stream_file(request: Request, file_path: str) -> StreamingResponse:
- if file_path.startswith("cloud/"):
- return proxy_cloud_stream(cloud_raw_url(file_path), request.headers.get("range"))
- file = safe_music_path(file_path)
- if not file.exists() or not file.is_file():
- raise HTTPException(status_code=404, detail="File not found")
- media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
- def file_iterator() -> Any:
- with file.open("rb") as handle:
- while chunk := handle.read(1024 * 1024):
- if not chunk:
- break
- yield chunk
- return StreamingResponse(
- file_iterator(),
- media_type=media_type,
- headers={"Cache-Control": "private, no-cache"},
- )
- @app.get("/api/cloud-url/{file_path:path}")
- def get_cloud_url(file_path: str) -> JSONResponse:
- if not file_path.startswith("cloud/"):
- raise HTTPException(status_code=400, detail="Not a cloud track")
- return JSONResponse({"url": cloud_raw_url(file_path)})
- def proxy_cloud_stream(remote_url: str, range_header: str | None = None) -> StreamingResponse:
- auth = base64.b64encode(f"{CLOUD_WEBDAV_USER}:{CLOUD_WEBDAV_PASSWORD}".encode("utf-8")).decode("ascii")
- headers = {
- "Authorization": f"Basic {auth}",
- "User-Agent": "MusicWebPlayer/1.0",
- }
- if range_header:
- headers["Range"] = range_header
- request = urllib.request.Request(
- remote_url,
- headers=headers,
- )
- try:
- response = urllib.request.urlopen(request, timeout=30)
- except urllib.error.HTTPError as error:
- raise HTTPException(status_code=error.code, detail=error.reason)
- except urllib.error.URLError as error:
- raise HTTPException(status_code=502, detail=str(error.reason))
- media_type = mimetypes.guess_type(urllib.parse.urlparse(remote_url).path)[0] or response.headers.get_content_type() or "audio/mpeg"
- passthrough_headers = {
- "Cache-Control": "private, no-cache",
- "Accept-Ranges": response.headers.get("Accept-Ranges") or "bytes",
- "Content-Type": media_type,
- }
- for key in ("ETag", "Last-Modified", "Content-Range"):
- value = response.headers.get(key)
- if value:
- passthrough_headers[key] = value
- def remote_iterator() -> Any:
- with response:
- while chunk := response.read(1024 * 1024):
- yield chunk
- return StreamingResponse(
- remote_iterator(),
- status_code=getattr(response, "status", 200),
- media_type=media_type,
- headers=passthrough_headers,
- )
- def cache_cloud_cover(relative_path: str) -> Path:
- cached_file = cloud_cover_cache_path(relative_path)
- if cached_file.exists() and cached_file.is_file():
- return cached_file
- cached_file.parent.mkdir(parents=True, exist_ok=True)
- remote_url = cloud_raw_url(relative_path)
- request = urllib.request.Request(
- remote_url,
- headers={"User-Agent": "MusicWebPlayer/1.0"},
- )
- try:
- with urllib.request.urlopen(request, timeout=30) as response:
- with cached_file.open("wb") as output:
- shutil.copyfileobj(response, output)
- except urllib.error.HTTPError as error:
- raise HTTPException(status_code=error.code, detail=error.reason)
- except urllib.error.URLError as error:
- raise HTTPException(status_code=502, detail=str(error.reason))
- except OSError as error:
- raise HTTPException(status_code=500, detail=str(error))
- return cached_file
- @app.get("/api/cover/{file_path:path}")
- def cover_file(request: Request, file_path: str):
- if file_path.startswith("cloud/"):
- if Path(file_path).name.lower() != "cover.jpg":
- raise HTTPException(status_code=404, detail="Cover not found")
- cached_file = cache_cloud_cover(file_path)
- media_type = mimetypes.guess_type(cached_file.name)[0] or "image/jpeg"
- return FileResponse(cached_file, media_type=media_type, filename=cached_file.name)
- file = safe_music_path(file_path)
- if file.name.lower() != "cover.jpg" or not file.exists() or not file.is_file():
- raise HTTPException(status_code=404, detail="Cover not found")
- media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
- return FileResponse(file, media_type=media_type, filename=file.name)
- @app.post("/api/upload")
- async def upload_files(
- files: list[UploadFile] = File(...),
- target_dir: str = Form(default=""),
- ) -> JSONResponse:
- destination = safe_music_path(target_dir)
- if not str(destination).startswith(str(LOCAL_MUSIC_DIR.resolve())):
- raise HTTPException(status_code=400, detail="Upload destination must be local music dir")
- destination.mkdir(parents=True, exist_ok=True)
- saved: list[str] = []
- for upload in files:
- suffix = Path(upload.filename or "").suffix.lower()
- if suffix not in SUPPORTED_EXTENSIONS:
- continue
- filename = Path(upload.filename or f"upload-{uuid.uuid4().hex}").name
- file_path = destination / filename
- with file_path.open("wb") as buffer:
- shutil.copyfileobj(upload.file, buffer)
- saved.append(track_path_from_abs(file_path))
- trigger_cloud_cache_refresh_if_needed(destination)
- return JSONResponse({"saved": saved})
- @app.post("/api/folder")
- def create_folder(payload: FolderCreateRequest) -> JSONResponse:
- folder = safe_music_path(payload.path)
- if not str(folder).startswith(str(LOCAL_MUSIC_DIR.resolve())):
- raise HTTPException(status_code=400, detail="Folder must be created in local music dir")
- folder.mkdir(parents=True, exist_ok=True)
- trigger_cloud_cache_refresh_if_needed(folder)
- return JSONResponse({"created": track_path_from_abs(folder)})
- @app.post("/api/move")
- def move_file(payload: MoveRequest) -> JSONResponse:
- source = safe_music_path(payload.source)
- destination_dir = safe_music_path(payload.destination_dir)
- if not source.exists():
- raise HTTPException(status_code=404, detail="Source not found")
- if not destination_dir.exists():
- destination_dir.mkdir(parents=True, exist_ok=True)
- destination = destination_dir / source.name
- shutil.move(str(source), str(destination))
- trigger_cloud_cache_refresh_if_needed(source, destination_dir, destination)
- return JSONResponse({"moved": track_path_from_abs(destination)})
- @app.post("/api/playlists")
- def create_playlist(payload: PlaylistCreateRequest) -> JSONResponse:
- playlists = load_playlists()
- playlist = {
- "id": uuid.uuid4().hex,
- "name": payload.name.strip() or "未命名播放列表",
- "tracks": payload.tracks,
- }
- playlists.append(playlist)
- save_playlists(playlists)
- return JSONResponse(playlist)
- @app.put("/api/playlists/{playlist_id}")
- def update_playlist(playlist_id: str, payload: PlaylistUpdateRequest) -> JSONResponse:
- playlists = load_playlists()
- for playlist in playlists:
- if playlist["id"] == playlist_id:
- playlist["tracks"] = payload.tracks
- save_playlists(playlists)
- return JSONResponse(playlist)
- raise HTTPException(status_code=404, detail="Playlist not found")
- @app.delete("/api/playlists/{playlist_id}")
- def delete_playlist(playlist_id: str) -> JSONResponse:
- playlists = load_playlists()
- filtered = [playlist for playlist in playlists if playlist["id"] != playlist_id]
- if len(filtered) == len(playlists):
- raise HTTPException(status_code=404, detail="Playlist not found")
- save_playlists(filtered)
- return JSONResponse({"deleted": playlist_id})
|