"""Database helpers shared across the backend.""" from __future__ import annotations import asyncio import datetime from typing import Any, Callable, Dict, List, Optional, Union from sqlalchemy import create_engine, text from sqlalchemy.orm import Session, declarative_base, sessionmaker from . import config Base = declarative_base() ENGINE = None SessionLocal: Optional[sessionmaker] = None FILE_LOCK = asyncio.Lock() MessageContent = Union[str, List[Dict[str, Any]]] def now_utc() -> datetime.datetime: return datetime.datetime.utcnow() def ensure_directories() -> None: for path in [config.DATA_DIR, config.BACKUP_DIR, config.BLOG_DIR, config.UPLOAD_DIR, config.STATIC_DIR]: path.mkdir(parents=True, exist_ok=True) def ensure_database_initialized() -> None: global ENGINE, SessionLocal if ENGINE is not None: return raw_engine = create_engine(config.RAW_DATABASE_URL, future=True, pool_pre_ping=True) with raw_engine.connect() as connection: connection.execute( text( f"CREATE DATABASE IF NOT EXISTS `{config.DATABASE_NAME}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci" ) ) raw_engine.dispose() ENGINE = create_engine(config.DATABASE_URL, future=True, pool_pre_ping=True) SessionLocal = sessionmaker(bind=ENGINE, autoflush=False, expire_on_commit=False, future=True) Base.metadata.create_all(bind=ENGINE) async def db_call(func: Callable[[Session], Any], *args: Any, **kwargs: Any) -> Any: def wrapped() -> Any: if SessionLocal is None: raise RuntimeError("数据库尚未初始化") with SessionLocal() as session: return func(session, *args, **kwargs) return await asyncio.to_thread(wrapped)