| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- from fastapi import FastAPI, Request,File, UploadFile, HTTPException,Form, Response
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import JSONResponse
- from fastapi.responses import RedirectResponse
- from fastapi.responses import StreamingResponse
- from fastapi.staticfiles import StaticFiles
- import os
- import shutil
- import uuid
- from openai import OpenAI
- from pydantic import BaseModel
- import hashlib
- import asyncio
- from typing import AsyncGenerator
- from pydub import AudioSegment # 用于音频处理
- app = FastAPI()
- # 配置允许的跨域源,* 表示允许所有
- origins = [
- "*",
- # 若要限制特定域名,可以在这里添加,例如:
- # "http://localhost",
- # "http://localhost:8000",
- ]
- app.add_middleware(
- CORSMiddleware,
- allow_origins=origins, # 允许的来源
- allow_credentials=True,
- allow_methods=["*"], # 允许的方法
- allow_headers=["*"], # 允许的请求头
- )
- # 指定上传文件保存的目录
- UPLOAD_DIRECTORY = "static/files"
- if not os.path.exists(UPLOAD_DIRECTORY):
- os.makedirs(UPLOAD_DIRECTORY)
- # 配置静态文件服务,使上传的PDF可以通过URL访问
- app.mount("/static/files", StaticFiles(directory=UPLOAD_DIRECTORY), name="static_files")
- app.mount("/static/web", StaticFiles(directory="static/web"), name="static_web") # 假设 viewer.html 在 static/web
- # 挂载静态文件
- app.mount("/static", StaticFiles(directory="static"), name="static")
- # 根路径重定向到 PDF.js viewer
- @app.get("/")
- def root():
- return RedirectResponse(url="/static/web/viewer.html?file=/static/files/compress.pdf")
- # 如果需要自定义 PDF 文件上传或动态渲染,可以在此添加更多路由
- def sanitize_filename(name: str) -> str:
- return "".join(c for c in name if c.isalnum() or c in (' ', '.', '_', '-')).rstrip()
- @app.post("/upload-pdf")
- async def upload_pdf(file: UploadFile = File(...), custom_name: str = Form(...)):
- if file.content_type != 'application/pdf':
- raise HTTPException(status_code=400, detail="文件类型必须是 PDF")
- # 清理文件名
- sanitized_name = sanitize_filename(custom_name)
- if not sanitized_name:
- return JSONResponse(status_code=400, content={"success": False, "error": "无效的文件名"})
-
- # 添加 .pdf 扩展名
- unique_filename = f"{sanitized_name}.pdf"
- file_path = os.path.join(UPLOAD_DIRECTORY, unique_filename)
- # 如果文件已存在,添加 UUID 以确保唯一性
- if os.path.exists(file_path):
- return JSONResponse(status_code=400, content={"success": False, "error": "文件名已存在,请使用其他名称"})
- try:
- with open(file_path, "wb") as buffer:
- shutil.copyfileobj(file.file, buffer)
- except Exception as e:
- raise HTTPException(status_code=500, detail="上传过程中出错")
- finally:
- file.file.close()
- # 构建文件的相对路径
- file_relative_path = f"/static/files/{unique_filename}"
- return JSONResponse(content={"success": True, "file_path": file_relative_path})
- @app.get("/list-pdfs")
- async def list_pdfs():
- try:
- files = os.listdir(UPLOAD_DIRECTORY)
- # 过滤出PDF文件并构建可访问的URL
- pdf_files = [
- {
- "name": file,
- "url": f"/static/files/{file}"
- }
- for file in files if file.lower().endswith(".pdf")
- ]
- return JSONResponse(content={"success": True, "files": pdf_files})
- except Exception as e:
- raise HTTPException(status_code=500, detail="无法获取文件列表")
- class TextToSpeechRequest(BaseModel):
- user_input: str
- # 配置OpenAI客户端
- api_key = "sk-bpaahUHgzoriWpjV24524eC7BbBf47D5A4Ce59EbFdB57f35" # 请确保使用环境变量存储API密钥
- client = OpenAI(
- base_url="https://api.wlai.vip/v1",
- api_key=api_key
- )
- # 音频缓存目录
- CACHE_DIR = "audio_cache"
- os.makedirs(CACHE_DIR, exist_ok=True)
- @app.post("/text-to-speech/")
- async def text_to_speech(request: TextToSpeechRequest):
- user_input = request.user_input
- try:
- # 生成文本的hash值作为缓存文件名
- # print(user_input)
- text_hash = hashlib.md5(user_input.encode('utf-8')).hexdigest()
- audio_path = os.path.join(CACHE_DIR, f"{text_hash}.mp3")
- if os.path.exists(audio_path):
- # 如果缓存存在,直接返回缓存的音频
- # print("have")
- with open(audio_path, "rb") as f:
- audio_data = f.read()
- return Response(content=audio_data, media_type="audio/mpeg")
- else:
- # 如果缓存不存在,调用OpenAI API生成音频
- with client.audio.speech.with_streaming_response.create(
- model="tts-1",
- voice="nova",
- input=user_input,
- ) as response:
- response.stream_to_file(audio_path)
- with open(audio_path, "rb") as f:
- audio_data = f.read()
- return Response(content=audio_data, media_type="audio/mpeg")
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- # 整页阅读,分块
- # 最大字符数,以根据需求调整
- MAX_CHUNK_SIZE = 200 # 每个块的最大字符数
- def split_text_into_chunks(text: str, max_chunk_size: int = MAX_CHUNK_SIZE) -> list:
- """
- 将文本分割成不超过 max_chunk_size 个字符的块。
- 尝试在句号、感叹号或问号处断开,以避免中间断句。
- """
- import re
- sentences = re.split('(?<=[.!?]) +', text)
- chunks = []
- current_chunk = ""
- for sentence in sentences:
- if len(current_chunk) + len(sentence) + 1 <= max_chunk_size:
- current_chunk += " " + sentence if current_chunk else sentence
- else:
- if current_chunk:
- chunks.append(current_chunk)
- if len(sentence) > max_chunk_size:
- # 如果单个句子超过最大长度,强制分割
- for i in range(0, len(sentence), max_chunk_size):
- chunks.append(sentence[i:i + max_chunk_size])
- current_chunk = ""
- else:
- current_chunk = sentence
- if current_chunk:
- chunks.append(current_chunk)
- return chunks
- async def generate_tts_audio(chunk: str) -> str:
- """
- 生成给定文本块的语音音频,并缓存到文件系统中。
- 返回音频文件的路径。
- """
- text_hash = hashlib.md5(chunk.encode('utf-8')).hexdigest()
- audio_path = os.path.join(CACHE_DIR, f"{text_hash}.mp3")
- if not os.path.exists(audio_path):
- try:
- with client.audio.speech.with_streaming_response.create(
- model="tts-1",
- voice="nova",
- input=chunk,
- ) as response:
- response.stream_to_file(audio_path)
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"TTS生成失败: {str(e)}")
- return audio_path
- def concatenate_audios(audio_paths: list, output_path: str) -> None:
- """
- 将多个音频文件按顺序拼接成一个音频文件。
- """
- combined = AudioSegment.empty()
- for path in audio_paths:
- audio = AudioSegment.from_mp3(path)
- combined += audio
- combined.export(output_path, format="mp3")
- @app.post("/page-to-speech/")
- async def page_to_speech(request: TextToSpeechRequest):
- user_input = request.user_input.strip()
- if not user_input:
- raise HTTPException(status_code=400, detail="输入文本为空。")
- # 生成整个文本的hash值作为整体缓存文件名
- full_text_hash = hashlib.md5(user_input.encode('utf-8')).hexdigest()
- full_audio_path = os.path.join(CACHE_DIR, f"{full_text_hash}_full.mp3")
- if os.path.exists(full_audio_path):
- # 如果整体缓存存在,直接返回
- return StreamingResponse(open(full_audio_path, "rb"), media_type="audio/mpeg")
- # 分割文本为多个块
- chunks = split_text_into_chunks(user_input)
- audio_paths = []
- async def audio_generator() -> AsyncGenerator[bytes, None]:
- for chunk in chunks:
- audio_path = await generate_tts_audio(chunk)
- audio_paths.append(audio_path)
- with open(audio_path, "rb") as f:
- yield f.read()
- await asyncio.sleep(0) # 让事件循环有机会处理其它任务
- # 异步生成并缓存整体音频
- async def create_full_audio():
- await asyncio.gather(*(generate_tts_audio(chunk) for chunk in chunks))
- concatenate_audios(audio_paths, full_audio_path)
- asyncio.create_task(create_full_audio())
- return StreamingResponse(audio_generator(), media_type="audio/mpeg")
|