admin.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. """Administrative API routes."""
  2. from typing import Any, Dict, Optional
  3. from fastapi import APIRouter, Depends
  4. from chatfast.services.auth import (
  5. AdminUserRequest,
  6. AdminUserUpdateRequest,
  7. UserInfo,
  8. admin_create_user as admin_create_user_service,
  9. admin_delete_user as admin_delete_user_service,
  10. admin_get_user as admin_get_user_service,
  11. admin_list_users as admin_list_users_service,
  12. admin_update_user as admin_update_user_service,
  13. require_admin,
  14. )
  15. from chatfast.services.chat import list_exports_admin
  16. router = APIRouter(prefix="/api/admin", tags=["admin"])
  17. @router.get("/users")
  18. async def api_admin_list_users(
  19. keyword: Optional[str] = None,
  20. page: int = 0,
  21. page_size: int = 20,
  22. admin: UserInfo = Depends(require_admin),
  23. ) -> Dict[str, Any]:
  24. return await admin_list_users_service(keyword, page, page_size)
  25. @router.post("/users")
  26. async def api_admin_create_user(payload: AdminUserRequest, admin: UserInfo = Depends(require_admin)) -> Dict[str, Any]:
  27. return await admin_create_user_service(payload.username, payload.password)
  28. @router.get("/users/{user_id}")
  29. async def api_admin_get_user(user_id: int, admin: UserInfo = Depends(require_admin)) -> Dict[str, Any]:
  30. return await admin_get_user_service(user_id)
  31. @router.put("/users/{user_id}")
  32. async def api_admin_update_user(
  33. user_id: int,
  34. payload: AdminUserUpdateRequest,
  35. admin: UserInfo = Depends(require_admin),
  36. ) -> Dict[str, Any]:
  37. return await admin_update_user_service(user_id, payload)
  38. @router.delete("/users/{user_id}")
  39. async def api_admin_delete_user(user_id: int, admin: UserInfo = Depends(require_admin)) -> Dict[str, Any]:
  40. return await admin_delete_user_service(user_id)
  41. @router.get("/exports")
  42. async def api_admin_exports(
  43. keyword: Optional[str] = None,
  44. admin: UserInfo = Depends(require_admin),
  45. ) -> Dict[str, Any]:
  46. items = await list_exports_admin(keyword)
  47. return {"items": items}