| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- #!/usr/bin/env python3
- import hashlib
- from datetime import datetime, timezone
- import pymysql
- from config import MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE
- def hash_password(password: str) -> str:
- return hashlib.sha256(password.encode("utf-8")).hexdigest()
- def get_conn(database=None):
- return pymysql.connect(
- host=MYSQL_HOST,
- port=MYSQL_PORT,
- user=MYSQL_USER,
- password=MYSQL_PASSWORD,
- database=database,
- charset="utf8mb4",
- autocommit=True,
- cursorclass=pymysql.cursors.DictCursor,
- )
- def ensure_database():
- conn = get_conn()
- try:
- with conn.cursor() as cur:
- cur.execute(f"CREATE DATABASE IF NOT EXISTS `{MYSQL_DATABASE}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
- finally:
- conn.close()
- def ensure_tables_and_admin():
- conn = get_conn(MYSQL_DATABASE)
- try:
- with conn.cursor() as cur:
- # Legacy migration: reader_pro -> user
- cur.execute("SHOW TABLES LIKE 'reader_pro'")
- has_legacy_user = bool(cur.fetchone())
- cur.execute("SHOW TABLES LIKE 'user'")
- has_user = bool(cur.fetchone())
- if has_legacy_user and not has_user:
- cur.execute("RENAME TABLE reader_pro TO user")
- # Legacy migration: reader_pro_progress -> user_progress
- cur.execute("SHOW TABLES LIKE 'reader_pro_progress'")
- has_legacy_progress = bool(cur.fetchone())
- cur.execute("SHOW TABLES LIKE 'user_progress'")
- has_user_progress = bool(cur.fetchone())
- if has_legacy_progress and not has_user_progress:
- cur.execute("RENAME TABLE reader_pro_progress TO user_progress")
- # Legacy migration: reader_pro_config -> user_config
- cur.execute("SHOW TABLES LIKE 'reader_pro_config'")
- has_legacy_config = bool(cur.fetchone())
- cur.execute("SHOW TABLES LIKE 'user_config'")
- has_user_config = bool(cur.fetchone())
- if has_legacy_config and not has_user_config:
- cur.execute("RENAME TABLE reader_pro_config TO user_config")
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS user (
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
- username VARCHAR(64) NOT NULL UNIQUE,
- password_hash VARCHAR(255) NOT NULL,
- is_admin TINYINT(1) NOT NULL DEFAULT 0,
- is_active TINYINT(1) NOT NULL DEFAULT 1,
- session_token VARCHAR(128) NULL,
- session_expires_at DATETIME NULL,
- last_file VARCHAR(1024) NULL,
- last_page INT NULL,
- created_at DATETIME NOT NULL,
- updated_at DATETIME NOT NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
- """
- )
- cur.execute("SHOW COLUMNS FROM user LIKE 'is_active'")
- if not cur.fetchone():
- cur.execute("ALTER TABLE user ADD COLUMN is_active TINYINT(1) NOT NULL DEFAULT 1 AFTER is_admin")
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS user_progress (
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
- user_id BIGINT NOT NULL,
- file_path VARCHAR(512) NOT NULL,
- page INT NOT NULL,
- updated_at DATETIME NOT NULL,
- UNIQUE KEY uniq_user_file (user_id, file_path),
- KEY idx_user_updated (user_id, updated_at),
- CONSTRAINT fk_user_progress_user
- FOREIGN KEY (user_id) REFERENCES user(id)
- ON DELETE CASCADE
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
- """
- )
- cur.execute(
- """
- CREATE TABLE IF NOT EXISTS user_config (
- config_key VARCHAR(128) PRIMARY KEY,
- config_value TEXT NULL,
- updated_at DATETIME NOT NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
- """
- )
- now = datetime.now(timezone.utc).replace(tzinfo=None)
- admin_hash = hash_password("admin123")
- cur.execute("SELECT id FROM user WHERE username=%s", ("admin",))
- row = cur.fetchone()
- if row:
- cur.execute(
- """
- UPDATE user
- SET password_hash=%s, is_admin=1, is_active=1, updated_at=%s
- WHERE username=%s
- """,
- (admin_hash, now, "admin"),
- )
- admin_action = "updated"
- else:
- cur.execute(
- """
- INSERT INTO user (username, password_hash, is_admin, is_active, created_at, updated_at)
- VALUES (%s, %s, 1, 1, %s, %s)
- """,
- ("admin", admin_hash, now, now),
- )
- admin_action = "created"
- finally:
- conn.close()
- return admin_action
- def main():
- ensure_database()
- admin_action = ensure_tables_and_admin()
- print(f"[OK] Database `{MYSQL_DATABASE}` initialized.")
- print("[OK] Tables: user, user_progress, user_config")
- print(f"[OK] Admin user `admin` {admin_action}, password set to `admin123`")
- if __name__ == "__main__":
- main()
|