main.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. from __future__ import annotations
  2. import json
  3. import mimetypes
  4. import shutil
  5. import uuid
  6. from pathlib import Path
  7. from typing import Any
  8. from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
  9. from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
  10. from fastapi.staticfiles import StaticFiles
  11. from fastapi.templating import Jinja2Templates
  12. from pydantic import BaseModel
  13. BASE_DIR = Path(__file__).resolve().parent.parent
  14. MUSIC_DIR = BASE_DIR / "mp3file"
  15. PLAYLISTS_FILE = BASE_DIR / "playlists.json"
  16. SUPPORTED_EXTENSIONS = {
  17. ".mp3",
  18. ".wav",
  19. ".flac",
  20. ".m3u",
  21. ".m3u8",
  22. ".ogg",
  23. ".aac",
  24. ".wma",
  25. ".opus",
  26. ".oga",
  27. ".mp4",
  28. ".m4a",
  29. ".webm",
  30. }
  31. IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
  32. app = FastAPI(title="MusicWeb", version="1.0.0")
  33. app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
  34. templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
  35. class FolderCreateRequest(BaseModel):
  36. path: str
  37. class MoveRequest(BaseModel):
  38. source: str
  39. destination_dir: str
  40. class PlaylistCreateRequest(BaseModel):
  41. name: str
  42. tracks: list[str]
  43. class PlaylistUpdateRequest(BaseModel):
  44. tracks: list[str]
  45. def ensure_storage() -> None:
  46. MUSIC_DIR.mkdir(parents=True, exist_ok=True)
  47. if not PLAYLISTS_FILE.exists():
  48. PLAYLISTS_FILE.write_text("[]", encoding="utf-8")
  49. def safe_music_path(relative_path: str) -> Path:
  50. candidate = (MUSIC_DIR / relative_path).resolve()
  51. if candidate != MUSIC_DIR.resolve() and MUSIC_DIR.resolve() not in candidate.parents:
  52. raise HTTPException(status_code=400, detail="Invalid path")
  53. return candidate
  54. def load_playlists() -> list[dict[str, Any]]:
  55. ensure_storage()
  56. return json.loads(PLAYLISTS_FILE.read_text(encoding="utf-8"))
  57. def save_playlists(playlists: list[dict[str, Any]]) -> None:
  58. PLAYLISTS_FILE.write_text(
  59. json.dumps(playlists, ensure_ascii=False, indent=2), encoding="utf-8"
  60. )
  61. def is_supported_file(path: Path) -> bool:
  62. return path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS
  63. def track_url(relative_path: str) -> str:
  64. return f"/api/stream/{relative_path}"
  65. def build_track(relative_path: str) -> dict[str, str]:
  66. filename = Path(relative_path).name
  67. return {
  68. "id": relative_path,
  69. "name": filename,
  70. "path": relative_path,
  71. "url": track_url(relative_path),
  72. "folder": str(Path(relative_path).parent).replace(".", "").strip("/"),
  73. }
  74. def find_cover_for_directory(root: Path) -> str | None:
  75. candidates = (
  76. "cover.jpg",
  77. "cover.jpeg",
  78. "cover.png",
  79. "cover.webp",
  80. "folder.jpg",
  81. "folder.jpeg",
  82. "folder.png",
  83. "folder.webp",
  84. )
  85. for candidate in candidates:
  86. path = root / candidate
  87. if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS:
  88. return path.relative_to(MUSIC_DIR).as_posix()
  89. for child in sorted(root.iterdir(), key=lambda item: item.name.lower()):
  90. if child.is_file() and child.suffix.lower() in IMAGE_EXTENSIONS:
  91. return child.relative_to(MUSIC_DIR).as_posix()
  92. return None
  93. def build_tree(root: Path) -> dict[str, Any]:
  94. rel_root = root.relative_to(MUSIC_DIR) if root != MUSIC_DIR else Path(".")
  95. cover_path = find_cover_for_directory(root)
  96. node = {
  97. "name": "音乐库" if root == MUSIC_DIR else root.name,
  98. "path": "" if rel_root == Path(".") else rel_root.as_posix(),
  99. "cover": f"/api/cover/{cover_path}" if cover_path else None,
  100. "folders": [],
  101. "tracks": [],
  102. }
  103. for child in sorted(root.iterdir(), key=lambda item: (item.is_file(), item.name.lower())):
  104. if child.is_dir():
  105. node["folders"].append(build_tree(child))
  106. elif is_supported_file(child):
  107. node["tracks"].append(build_track(child.relative_to(MUSIC_DIR).as_posix()))
  108. return node
  109. def collect_tracks(root: Path) -> list[dict[str, str]]:
  110. tracks: list[dict[str, str]] = []
  111. for path in sorted(root.rglob("*")):
  112. if is_supported_file(path):
  113. tracks.append(build_track(path.relative_to(MUSIC_DIR).as_posix()))
  114. return tracks
  115. @app.on_event("startup")
  116. def startup_event() -> None:
  117. ensure_storage()
  118. @app.get("/", response_class=HTMLResponse)
  119. def index(request: Request) -> HTMLResponse:
  120. return templates.TemplateResponse("index.html", {"request": request})
  121. @app.get("/api/library")
  122. def library() -> JSONResponse:
  123. ensure_storage()
  124. return JSONResponse(
  125. {
  126. "tree": build_tree(MUSIC_DIR),
  127. "all_tracks": collect_tracks(MUSIC_DIR),
  128. "playlists": load_playlists(),
  129. }
  130. )
  131. @app.get("/api/stream/{file_path:path}")
  132. def stream_file(file_path: str) -> FileResponse:
  133. file = safe_music_path(file_path)
  134. if not file.exists() or not file.is_file():
  135. raise HTTPException(status_code=404, detail="File not found")
  136. media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
  137. return FileResponse(
  138. file,
  139. media_type=media_type,
  140. filename=file.name,
  141. headers={"Cache-Control": "private, max-age=31536000, immutable"},
  142. )
  143. @app.get("/api/cover/{file_path:path}")
  144. def cover_file(file_path: str) -> FileResponse:
  145. file = safe_music_path(file_path)
  146. if not file.exists() or not file.is_file() or file.suffix.lower() not in IMAGE_EXTENSIONS:
  147. raise HTTPException(status_code=404, detail="Cover not found")
  148. media_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
  149. return FileResponse(file, media_type=media_type, filename=file.name)
  150. @app.post("/api/upload")
  151. async def upload_files(
  152. files: list[UploadFile] = File(...),
  153. target_dir: str = Form(default=""),
  154. ) -> JSONResponse:
  155. destination = safe_music_path(target_dir)
  156. destination.mkdir(parents=True, exist_ok=True)
  157. saved: list[str] = []
  158. for upload in files:
  159. suffix = Path(upload.filename or "").suffix.lower()
  160. if suffix not in SUPPORTED_EXTENSIONS:
  161. continue
  162. filename = Path(upload.filename or f"upload-{uuid.uuid4().hex}").name
  163. file_path = destination / filename
  164. with file_path.open("wb") as buffer:
  165. shutil.copyfileobj(upload.file, buffer)
  166. saved.append(file_path.relative_to(MUSIC_DIR).as_posix())
  167. return JSONResponse({"saved": saved})
  168. @app.post("/api/folder")
  169. def create_folder(payload: FolderCreateRequest) -> JSONResponse:
  170. folder = safe_music_path(payload.path)
  171. folder.mkdir(parents=True, exist_ok=True)
  172. return JSONResponse({"created": folder.relative_to(MUSIC_DIR).as_posix()})
  173. @app.post("/api/move")
  174. def move_file(payload: MoveRequest) -> JSONResponse:
  175. source = safe_music_path(payload.source)
  176. destination_dir = safe_music_path(payload.destination_dir)
  177. if not source.exists():
  178. raise HTTPException(status_code=404, detail="Source not found")
  179. if not destination_dir.exists():
  180. destination_dir.mkdir(parents=True, exist_ok=True)
  181. destination = destination_dir / source.name
  182. shutil.move(str(source), str(destination))
  183. return JSONResponse({"moved": destination.relative_to(MUSIC_DIR).as_posix()})
  184. @app.post("/api/playlists")
  185. def create_playlist(payload: PlaylistCreateRequest) -> JSONResponse:
  186. playlists = load_playlists()
  187. playlist = {
  188. "id": uuid.uuid4().hex,
  189. "name": payload.name.strip() or "未命名播放列表",
  190. "tracks": payload.tracks,
  191. }
  192. playlists.append(playlist)
  193. save_playlists(playlists)
  194. return JSONResponse(playlist)
  195. @app.put("/api/playlists/{playlist_id}")
  196. def update_playlist(playlist_id: str, payload: PlaylistUpdateRequest) -> JSONResponse:
  197. playlists = load_playlists()
  198. for playlist in playlists:
  199. if playlist["id"] == playlist_id:
  200. playlist["tracks"] = payload.tracks
  201. save_playlists(playlists)
  202. return JSONResponse(playlist)
  203. raise HTTPException(status_code=404, detail="Playlist not found")
  204. @app.delete("/api/playlists/{playlist_id}")
  205. def delete_playlist(playlist_id: str) -> JSONResponse:
  206. playlists = load_playlists()
  207. filtered = [playlist for playlist in playlists if playlist["id"] != playlist_id]
  208. if len(filtered) == len(playlists):
  209. raise HTTPException(status_code=404, detail="Playlist not found")
  210. save_playlists(filtered)
  211. return JSONResponse({"deleted": playlist_id})