db.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. """Database helpers shared across the backend."""
  2. from __future__ import annotations
  3. import asyncio
  4. import datetime
  5. from typing import Any, Callable, Dict, List, Optional, Union
  6. from sqlalchemy import create_engine, text
  7. from sqlalchemy.orm import Session, declarative_base, sessionmaker
  8. from . import config
  9. Base = declarative_base()
  10. ENGINE = None
  11. SessionLocal: Optional[sessionmaker] = None
  12. FILE_LOCK = asyncio.Lock()
  13. MessageContent = Union[str, List[Dict[str, Any]]]
  14. def now_utc() -> datetime.datetime:
  15. return datetime.datetime.utcnow()
  16. def ensure_directories() -> None:
  17. for path in [config.DATA_DIR, config.BACKUP_DIR, config.BLOG_DIR, config.UPLOAD_DIR, config.STATIC_DIR]:
  18. path.mkdir(parents=True, exist_ok=True)
  19. def ensure_database_initialized() -> None:
  20. global ENGINE, SessionLocal
  21. if ENGINE is not None:
  22. return
  23. raw_engine = create_engine(config.RAW_DATABASE_URL, future=True, pool_pre_ping=True)
  24. with raw_engine.connect() as connection:
  25. connection.execute(
  26. text(
  27. f"CREATE DATABASE IF NOT EXISTS `{config.DATABASE_NAME}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
  28. )
  29. )
  30. raw_engine.dispose()
  31. ENGINE = create_engine(config.DATABASE_URL, future=True, pool_pre_ping=True)
  32. SessionLocal = sessionmaker(bind=ENGINE, autoflush=False, expire_on_commit=False, future=True)
  33. Base.metadata.create_all(bind=ENGINE)
  34. async def db_call(func: Callable[[Session], Any], *args: Any, **kwargs: Any) -> Any:
  35. def wrapped() -> Any:
  36. if SessionLocal is None:
  37. raise RuntimeError("数据库尚未初始化")
  38. with SessionLocal() as session:
  39. return func(session, *args, **kwargs)
  40. return await asyncio.to_thread(wrapped)