| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837 |
- # -*- coding: utf-8 -*-
- """Grammar highlighter powered by spaCy + benepar constituency parsing."""
- import asyncio
- import html
- import json
- import re
- from collections import Counter
- from dataclasses import dataclass, field
- from html.parser import HTMLParser
- from string import Template
- from typing import Any, Dict, List, Optional, Tuple
- from urllib import error as urllib_error, request as urllib_request
- from urllib.parse import urlparse, urlunparse
- import benepar
- import httpx
- import spacy
- from fastapi import FastAPI, HTTPException
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import HTMLResponse
- from pydantic import BaseModel, Field
- from spacy.cli import download as spacy_download
- from spacy.language import Language
- from spacy.tokens import Span as SpacySpan, Token as SpacyToken
- from style_config import STYLE_BLOCK
- BENE_PAR_WARNING: Optional[str] = None
- HAS_BENEPAR: bool = False # new: track whether benepar was successfully attached
- def _ensure_benepar_warning(message: str) -> None:
- """Record a warning once when benepar annotations are unavailable."""
- global BENE_PAR_WARNING
- if not BENE_PAR_WARNING:
- BENE_PAR_WARNING = message
- def _load_spacy_pipeline(
- model_name: str = "en_core_web_sm", benepar_model: str = "benepar_en3"
- ) -> Language:
- global BENE_PAR_WARNING, HAS_BENEPAR
- BENE_PAR_WARNING = None
- HAS_BENEPAR = False
- try:
- nlp = spacy.load(model_name)
- except OSError:
- try:
- spacy_download(model_name)
- nlp = spacy.load(model_name)
- except Exception as exc: # pragma: no cover - install helper
- raise RuntimeError(
- f"spaCy model '{model_name}' is required. Install via `python -m spacy download {model_name}`."
- ) from exc
- # Ensure we have sentence segmentation available
- pipe_names = set(nlp.pipe_names)
- if not ({"parser", "senter", "sentencizer"} & pipe_names):
- try:
- nlp.add_pipe("sentencizer")
- except Exception:
- pass # if already present or unavailable, ignore
- # Try to add benepar
- if "benepar" not in nlp.pipe_names:
- try:
- nlp.add_pipe("benepar", config={"model": benepar_model}, last=True)
- HAS_BENEPAR = True
- except ValueError:
- try:
- benepar.download(benepar_model)
- nlp.add_pipe("benepar", config={"model": benepar_model}, last=True)
- HAS_BENEPAR = True
- except Exception as exc: # pragma: no cover - install helper
- HAS_BENEPAR = False
- BENE_PAR_WARNING = (
- "Benepar model '{model}' unavailable ({err}). Falling back to dependency-based spans."
- ).format(model=benepar_model, err=exc)
- except Exception as exc:
- HAS_BENEPAR = False
- BENE_PAR_WARNING = (
- "Failed to attach benepar parser to spaCy pipeline. Falling back to dependency-based spans ({err})."
- ).format(err=exc)
- else:
- HAS_BENEPAR = True
- return nlp
- try:
- NLP: Optional[Language] = _load_spacy_pipeline()
- NLP_LOAD_ERROR: Optional[Exception] = None
- except Exception as exc: # pragma: no cover - import-time diagnostics
- NLP = None
- NLP_LOAD_ERROR = exc
- class AnalyzeRequest(BaseModel):
- text: str = Field(..., description="Raw English text to highlight")
- class AnalyzeResponse(BaseModel):
- highlighted_html: str
- @dataclass
- class Token:
- text: str
- start: int
- end: int
- kind: str # 'word' | 'space' | 'punct'
- @dataclass
- class Span:
- start_token: int
- end_token: int
- cls: str
- attrs: Optional[Dict[str, str]] = None
- @dataclass
- class SentenceSummary:
- subjects: List[str] = field(default_factory=list)
- predicates: List[str] = field(default_factory=list)
- objects: List[str] = field(default_factory=list)
- complements: List[str] = field(default_factory=list)
- clauses: List[str] = field(default_factory=list)
- clause_functions: List[str] = field(default_factory=list)
- connectors: List[str] = field(default_factory=list)
- residual_roles: List[str] = field(default_factory=list)
- sentence_length: int = 0
- TOKEN_REGEX = re.compile(
- r"""
- (?:\s+)
- |(?:\d+(?:[\.,]\d+)*)
- |(?:\w+(?:[-']\w+)*)
- |(?:.)
- """,
- re.VERBOSE | re.UNICODE,
- )
- WORD_LIKE_RE = re.compile(r"\w+(?:[-']\w+)*\Z", re.UNICODE)
- NUMBER_RE = re.compile(r"\d+(?:[\.,]\d+)*\Z", re.UNICODE)
- PARAGRAPH_BREAK_RE = re.compile(r"(?:\r?\n[ \t]*){2,}")
- SUBJECT_DEPS = {"nsubj", "nsubjpass", "csubj", "csubjpass"}
- DIRECT_OBJECT_DEPS = {"dobj", "obj"}
- INDIRECT_OBJECT_DEPS = {"iobj", "dative"}
- COMPLEMENT_DEPS = {"attr", "oprd", "acomp", "ccomp", "xcomp"}
- ADVERBIAL_DEPS = {"advmod", "npadvmod", "advcl", "obl", "prep", "pcomp"}
- RELATIVE_PRONOUNS = {"which", "that", "who", "whom", "whose", "where", "when"}
- SUBORDINATORS_TO_FUNCTION = {
- "when": "TIME",
- "while": "TIME",
- "after": "TIME",
- "before": "TIME",
- "until": "TIME",
- "as": "TIME",
- "once": "TIME",
- "since": "TIME",
- "because": "REASON",
- "now that": "REASON",
- "if": "CONDITION",
- "unless": "CONDITION",
- "provided": "CONDITION",
- "provided that": "CONDITION",
- "although": "CONCESSION",
- "though": "CONCESSION",
- "even though": "CONCESSION",
- "whereas": "CONCESSION",
- "so that": "RESULT",
- "so": "RESULT",
- "lest": "PURPOSE",
- "in order that": "PURPOSE",
- }
- FINITE_VERB_TAGS = {"VBD", "VBP", "VBZ"}
- NONFINITE_VERB_TAGS = {"VBG", "VBN"}
- CLAUSE_PREDICATE_DEPS = {
- "advcl",
- "ccomp",
- "xcomp",
- "acl",
- "relcl",
- "csubj",
- "csubjpass",
- "parataxis",
- }
- FIXED_MULTIWORD_PHRASES: Tuple[Tuple[re.Pattern, str], ...] = tuple(
- (
- re.compile(pattern, re.IGNORECASE),
- label,
- )
- for pattern, label in [
- (r"\bas well as\b", "as well as"),
- (r"\brather than\b", "rather than"),
- (r"\bin addition to\b", "in addition to"),
- (r"\bin spite of\b", "in spite of"),
- (r"\baccording to\b", "according to"),
- (r"\bas soon as\b", "as soon as"),
- ]
- )
- CLAUSE_FUNCTION_LABELS = {
- "TIME": "时间",
- "REASON": "原因",
- "CONDITION": "条件",
- "CONCESSION": "让步",
- "RESULT": "结果",
- "PURPOSE": "目的",
- }
- def _iter_infinitive_markers(token: SpacyToken) -> List[SpacyToken]:
- """Collect 'to' markers attached to a verb head."""
- markers = []
- for child in token.children:
- if child.lower_ == "to" and child.tag_ == "TO":
- markers.append(child)
- return markers
- def _token_is_infinitive(token: SpacyToken) -> bool:
- if token.pos_ not in {"VERB", "AUX"}:
- return False
- verb_forms = set(token.morph.get("VerbForm"))
- if "Inf" not in verb_forms and token.tag_ != "VB":
- return False
- return bool(_iter_infinitive_markers(token))
- def _token_is_gerund(token: SpacyToken) -> bool:
- if token.pos_ not in {"VERB", "AUX"}:
- return False
- verb_forms = set(token.morph.get("VerbForm"))
- if "Ger" in verb_forms:
- return True
- return token.tag_ == "VBG"
- def _annotate_nonfinite_verbals(
- sentence: SpacySpan,
- spans: List[Span],
- mapping: Dict[int, int],
- ) -> None:
- """Highlight infinitive和gerund短语,帮助识别非限定动词。"""
- for token in sentence:
- if _token_is_infinitive(token):
- start_char, end_char = subtree_char_span(token)
- markers = _iter_infinitive_markers(token)
- if markers:
- start_char = min(start_char, min(child.idx for child in markers))
- add_char_based_span(
- spans,
- start_char,
- end_char,
- "verbal-infinitive",
- mapping,
- attrs={"data-form": "不定式"},
- )
- seen_gerunds = set()
- for token in sentence:
- if token.i in seen_gerunds:
- continue
- if _token_is_gerund(token):
- start_char, end_char = subtree_char_span(token)
- add_char_based_span(
- spans,
- start_char,
- end_char,
- "verbal-gerund",
- mapping,
- attrs={"data-form": "动名词"},
- )
- seen_gerunds.add(token.i)
- RESIDUAL_DEP_LABELS = {
- "det": "限定词",
- "prep": "介词",
- "case": "介词标记",
- "cc": "并列连词",
- "mark": "从属连词",
- "poss": "所有格标记",
- "nummod": "数量修饰语",
- "aux": "助动词",
- "prt": "小品词",
- }
- RESIDUAL_POS_LABELS = {
- "ADJ": "形容词修饰语",
- "ADV": "副词",
- "NUM": "数词",
- "PRON": "代词",
- }
- def _classify_segment(seg: str) -> str:
- if not seg:
- return "punct"
- if seg.isspace():
- return "space"
- if NUMBER_RE.fullmatch(seg) or WORD_LIKE_RE.fullmatch(seg):
- return "word"
- return "punct"
- def _append_fallback_tokens(text: str, start: int, end: int, tokens: List[Token]) -> None:
- for idx in range(start, end):
- ch = text[idx]
- if ch.isspace():
- kind = "space"
- elif ch.isalnum() or ch == "_":
- kind = "word"
- else:
- kind = "punct"
- tokens.append(Token(ch, idx, idx + 1, kind))
- def tokenize_preserve(text: str) -> List[Token]:
- tokens: List[Token] = []
- if not text:
- return tokens
- last_end = 0
- for match in TOKEN_REGEX.finditer(text):
- if match.start() > last_end:
- _append_fallback_tokens(text, last_end, match.start(), tokens)
- seg = text[match.start() : match.end()]
- tokens.append(Token(seg, match.start(), match.end(), _classify_segment(seg)))
- last_end = match.end()
- if last_end < len(text):
- _append_fallback_tokens(text, last_end, len(text), tokens)
- if not tokens and text:
- tokens = [Token(text, 0, len(text), "word" if text[0].isalnum() else "punct")]
- return tokens
- def build_char_to_token_map(tokens: List[Token]) -> Dict[int, int]:
- mapping: Dict[int, int] = {}
- for idx, tok in enumerate(tokens):
- for pos in range(tok.start, tok.end):
- mapping[pos] = idx
- return mapping
- def char_span_to_token_span(
- char_start: int, char_end: int, mapping: Dict[int, int]
- ) -> Tuple[int, int]:
- if char_end <= char_start:
- return -1, -1
- start_idx = mapping.get(char_start)
- end_idx = mapping.get(char_end - 1)
- if start_idx is None or end_idx is None:
- return -1, -1
- return start_idx, end_idx + 1
- def add_char_based_span(
- spans: List[Span],
- char_start: int,
- char_end: int,
- cls: str,
- mapping: Dict[int, int],
- attrs: Optional[Dict[str, str]] = None,
- ) -> None:
- s_tok, e_tok = char_span_to_token_span(char_start, char_end, mapping)
- if s_tok < 0 or e_tok < 0:
- return
- safe_attrs = None
- if attrs:
- safe_attrs = {k: html.escape(v, quote=True) for k, v in attrs.items() if v}
- spans.append(Span(start_token=s_tok, end_token=e_tok, cls=cls, attrs=safe_attrs))
- def add_span(spans: List[Span], start_token: int, end_token: int, cls: str, attrs: Optional[Dict[str, str]] = None):
- if start_token < 0 or end_token < 0 or end_token <= start_token:
- return
- spans.append(Span(start_token=start_token, end_token=end_token, cls=cls, attrs=attrs))
- def subtree_char_span(token: SpacyToken) -> Tuple[int, int]:
- subtree = list(token.subtree)
- if not subtree:
- return token.idx, token.idx + len(token.text)
- return subtree[0].idx, subtree[-1].idx + len(subtree[-1].text)
- def _subtree_text(token: SpacyToken) -> str:
- span = token.doc[token.left_edge.i : token.right_edge.i + 1]
- return span.text
- def _find_antecedent_word(sentence: SpacySpan, clause_start_char: int) -> Optional[str]:
- candidate = None
- for tok in sentence:
- if tok.idx >= clause_start_char:
- break
- if tok.pos_ in {"NOUN", "PROPN", "PRON"}:
- candidate = tok.text
- return candidate
- def _is_nonfinite_clause(span: SpacySpan) -> bool:
- tags = {tok.tag_ for tok in span if tok.tag_}
- if tags & FINITE_VERB_TAGS:
- return False
- if "TO" in tags or tags & NONFINITE_VERB_TAGS:
- return True
- return False
- def _classify_noun_clause(span: SpacySpan) -> Optional[str]:
- deps = {tok.dep_ for tok in span}
- if deps & {"csubj", "csubjpass"}:
- return "subject"
- if deps & {"ccomp", "xcomp"}:
- return "complement"
- if deps & {"dobj", "obj"}:
- return "object"
- return None
- def _split_paragraph_ranges(text: str) -> List[Tuple[int, int]]:
- """Return inclusive paragraph ranges, keeping separators intact."""
- if not text:
- return [(0, 0)]
- ranges: List[Tuple[int, int]] = []
- start = 0
- for match in PARAGRAPH_BREAK_RE.finditer(text):
- ranges.append((start, match.start()))
- start = match.end()
- ranges.append((start, len(text)))
- # Ensure at least one range and sorted order
- if not ranges:
- ranges = [(0, len(text))]
- return ranges
- def _circled_number(value: int) -> str:
- """Return the circled number style for sentence numbering."""
- if value <= 0:
- return ""
- if value <= 20:
- return chr(ord("\u2460") + value - 1)
- if 21 <= value <= 35:
- return chr(ord("\u3251") + value - 21)
- if 36 <= value <= 50:
- return chr(ord("\u32B1") + value - 36)
- return f"({value})"
- def annotate_constituents(
- sentence: SpacySpan,
- spans: List[Span],
- mapping: Dict[int, int],
- sentence_start_char: int,
- sentence_end_char: int,
- summary: Optional[SentenceSummary] = None,
- ) -> None:
- # If benepar is not attached or a previous warning indicates fallback, skip.
- if not HAS_BENEPAR or BENE_PAR_WARNING:
- _ensure_benepar_warning(
- "Benepar component missing or unavailable. Using dependency-based spans."
- )
- return
- # If the extension is not present, skip
- if not SpacySpan.has_extension("constituents"):
- _ensure_benepar_warning(
- "Benepar component missing from spaCy pipeline. Falling back to dependency spans."
- )
- return
- try:
- constituents = sentence._.constituents
- except Exception as exc:
- # Catch any error while accessing benepar results and fallback safely
- _ensure_benepar_warning(
- f"Benepar constituency parse unavailable: {exc}. Falling back to dependency spans."
- )
- return
- seen_ranges = set()
- for const in constituents:
- label = getattr(const, "label_", None)
- if not label:
- continue
- start_char, end_char = const.start_char, const.end_char
- if start_char == sentence_start_char and end_char == sentence_end_char:
- continue # skip the entire sentence span itself
- key = (start_char, end_char, label)
- is_relative = False
- if label in {"PP", "ADVP"}:
- if key in seen_ranges:
- continue
- seen_ranges.add(key)
- add_char_based_span(spans, start_char, end_char, "role-adverbial", mapping)
- continue
- if label == "SBAR" and const:
- first_token = const[0]
- lowered = first_token.text.lower()
- if lowered in RELATIVE_PRONOUNS:
- antecedent = _find_antecedent_word(sentence, start_char)
- attrs = {"data-modifies": antecedent} if antecedent else None
- add_char_based_span(spans, start_char, end_char, "clause-relative", mapping, attrs)
- if summary:
- summary.clauses.append("定语从句")
- is_relative = True
- else:
- function = SUBORDINATORS_TO_FUNCTION.get(lowered)
- attrs = {"data-function": function}
- add_char_based_span(spans, start_char, end_char, "clause-adverbial", mapping, attrs)
- if summary:
- summary.clauses.append("状语从句")
- if function:
- summary.clause_functions.append(function)
- continue
- if label in {"S", "VP"}:
- if _is_nonfinite_clause(const):
- add_char_based_span(spans, start_char, end_char, "clause-nonfinite", mapping)
- if summary:
- summary.clauses.append("非限定结构")
- continue
- if label == "S" and not is_relative:
- role = _classify_noun_clause(const)
- if role:
- attrs = {"data-clause-role": role}
- add_char_based_span(spans, start_char, end_char, "clause-noun", mapping, attrs)
- if summary:
- summary.clauses.append(f"名词性从句({role})")
- def _predicate_span_bounds(head: SpacyToken) -> Tuple[int, int]:
- """Return a character range covering predicate head + functional dependents."""
- tokens = [head]
- for child in head.children:
- if child.dep_ in {"aux", "auxpass", "prt", "cop", "neg"}:
- tokens.append(child)
- start_char = min(tok.idx for tok in tokens)
- end_char = max(tok.idx + len(tok.text) for tok in tokens)
- return start_char, end_char
- def _token_is_finite(token: SpacyToken) -> bool:
- """Return True if token carries finite verb morphology."""
- if token.pos_ not in {"VERB", "AUX"}:
- return False
- verb_forms = set(token.morph.get("VerbForm"))
- if "Fin" in verb_forms or "Imp" in verb_forms:
- return True
- if token.tag_ in FINITE_VERB_TAGS or token.tag_ == "MD":
- return True
- return False
- def _has_finite_auxiliary(token: SpacyToken) -> bool:
- """Detect whether the verb head has a finite auxiliary helper."""
- for child in token.children:
- if child.dep_ in {"aux", "auxpass", "cop"} and _token_is_finite(child):
- return True
- return False
- def _is_finite_predicate_head(token: SpacyToken) -> bool:
- """Filter predicate heads to exclude bare infinitives/participles."""
- if _token_is_finite(token):
- return True
- verb_forms = set(token.morph.get("VerbForm"))
- if "Inf" in verb_forms:
- return _has_finite_auxiliary(token)
- if verb_forms & {"Part", "Ger"}:
- return _has_finite_auxiliary(token)
- if token.tag_ in NONFINITE_VERB_TAGS:
- return _has_finite_auxiliary(token)
- if token.tag_ == "VB":
- has_to_marker = any(
- child.dep_ == "mark" and child.lower_ == "to" for child in token.children
- )
- if has_to_marker:
- return False
- return token.dep_ == "ROOT"
- return False
- def _predicate_heads(sentence: SpacySpan) -> List[SpacyToken]:
- """Collect predicate heads including coordinated verbs."""
- candidates: List[SpacyToken] = []
- for tok in sentence:
- if tok.pos_ not in {"VERB", "AUX"} and tok.tag_ not in FINITE_VERB_TAGS:
- continue
- if tok.dep_ == "ROOT":
- candidates.append(tok)
- continue
- if tok.dep_ == "conj" and tok.head.pos_ in {"VERB", "AUX"}:
- candidates.append(tok)
- continue
- if tok.dep_ in {"ccomp", "xcomp", "advcl", "acl", "relcl", "parataxis"}:
- candidates.append(tok)
- seen = set()
- ordered: List[SpacyToken] = []
- for tok in sorted(candidates, key=lambda t: t.i):
- if tok.i in seen:
- continue
- seen.add(tok.i)
- if _is_finite_predicate_head(tok):
- ordered.append(tok)
- return ordered
- def _is_clause_predicate(token: SpacyToken) -> bool:
- """Return True if predicate originates inside从句."""
- if token.dep_ in CLAUSE_PREDICATE_DEPS:
- return True
- if token.dep_ != "conj":
- return False
- ancestor = token.head
- safety = 0
- while ancestor is not None and safety < 10:
- if ancestor.dep_ in CLAUSE_PREDICATE_DEPS:
- return True
- if ancestor.dep_ != "conj" or ancestor.head is ancestor:
- break
- ancestor = ancestor.head
- safety += 1
- return False
- def _add_fixed_phrases(
- sentence: SpacySpan,
- mapping: Dict[int, int],
- spans: List[Span],
- summary: Optional[SentenceSummary] = None,
- ) -> None:
- base = sentence.start_char
- text = sentence.text
- for pattern, label in FIXED_MULTIWORD_PHRASES:
- for match in pattern.finditer(text):
- start_char = base + match.start()
- end_char = base + match.end()
- add_char_based_span(
- spans,
- start_char,
- end_char,
- "phrase-fixed",
- mapping,
- attrs={"data-phrase": label},
- )
- if summary is not None:
- summary.connectors.append(label.lower())
- def annotate_sentence(
- tokens: List[Token],
- sentence: SpacySpan,
- mapping: Dict[int, int],
- collect_summary: bool = True,
- ) -> Tuple[List[Span], Optional[SentenceSummary]]:
- spans: List[Span] = []
- summary = SentenceSummary(sentence_length=len(sentence)) if collect_summary else None
- sent_bounds = char_span_to_token_span(sentence.start_char, sentence.end_char, mapping)
- sent_start_tok, sent_end_tok = sent_bounds
- def add_subtree(token: SpacyToken, cls: str):
- start_char, end_char = subtree_char_span(token)
- add_char_based_span(spans, start_char, end_char, cls, mapping)
- def add_token(token: SpacyToken, cls: str):
- add_char_based_span(spans, token.idx, token.idx + len(token.text), cls, mapping)
- for tok in sentence:
- if tok.dep_ in SUBJECT_DEPS:
- add_subtree(tok, "role-subject")
- if summary is not None:
- summary.subjects.append(_subtree_text(tok))
- for head in _predicate_heads(sentence):
- start_char, end_char = _predicate_span_bounds(head)
- cls = "role-predicate"
- if _is_clause_predicate(head):
- cls = "role-predicate role-predicate-clause"
- add_char_based_span(spans, start_char, end_char, cls, mapping)
- predicate_text = sentence.doc.text[start_char:end_char].strip()
- if summary is not None:
- summary.predicates.append(predicate_text or head.text)
- for tok in sentence:
- if tok.dep_ in DIRECT_OBJECT_DEPS:
- add_subtree(tok, "role-object-do")
- if summary is not None:
- summary.objects.append(_subtree_text(tok))
- break
- io_token = next((tok for tok in sentence if tok.dep_ in INDIRECT_OBJECT_DEPS), None)
- if io_token is None:
- for tok in sentence:
- if tok.dep_ == "pobj" and tok.head.dep_ == "prep" and tok.head.lemma_.lower() in {"to", "for"}:
- io_token = tok
- break
- if io_token:
- add_subtree(io_token, "role-object-io")
- if summary is not None:
- summary.objects.append(_subtree_text(io_token))
- for tok in sentence:
- if tok.dep_ in COMPLEMENT_DEPS:
- add_subtree(tok, "role-complement")
- if summary is not None:
- summary.complements.append(_subtree_text(tok))
- break
- for tok in sentence:
- lowered = tok.text.lower()
- if tok.dep_ in {"cc", "mark", "preconj"} or tok.pos_ in {"CCONJ", "SCONJ"}:
- add_token(tok, "role-connector")
- if summary is not None:
- summary.connectors.append(lowered)
- if tok.dep_ == "det" or tok.pos_ == "DET":
- add_token(tok, "role-determiner")
- if tok.dep_ in {"amod", "poss", "compound", "nummod"}:
- add_token(tok, "role-modifier")
- adverbial_ranges = set()
- for tok in sentence:
- if tok.dep_ in ADVERBIAL_DEPS:
- adverbial_ranges.add(subtree_char_span(tok))
- for start_char, end_char in adverbial_ranges:
- add_char_based_span(spans, start_char, end_char, "role-adverbial", mapping)
- for tok in sentence:
- if tok.dep_ == "appos":
- add_subtree(tok, "role-apposition")
- if sent_start_tok >= 0 and sent_end_tok >= 0:
- stack = []
- for idx in range(sent_start_tok, sent_end_tok):
- token = tokens[idx]
- if token.text == "(":
- stack.append(idx)
- elif token.text == ")" and stack:
- add_span(spans, stack.pop(), idx + 1, "role-parenthetical")
- comma_token_idxs = [
- i
- for i in range(sent_start_tok, sent_end_tok)
- if tokens[i].kind == "punct" and tokens[i].text == ","
- ]
- for idx, first_comma in enumerate(comma_token_idxs):
- if idx + 1 >= len(comma_token_idxs):
- break
- second_comma = comma_token_idxs[idx + 1]
- start_char = tokens[first_comma].start
- end_char = tokens[second_comma].end
- span = sentence.doc.char_span(start_char, end_char, alignment_mode="expand")
- if span and any(tok.tag_ == "VBG" for tok in span):
- add_span(spans, first_comma, second_comma + 1, "role-absolute")
- _annotate_nonfinite_verbals(sentence, spans, mapping)
- annotate_constituents(
- sentence,
- spans,
- mapping,
- sentence.start_char,
- sentence.end_char,
- summary,
- )
- _add_fixed_phrases(sentence, mapping, spans, summary)
- return spans, summary
- def _label_residual_token(token: SpacyToken) -> Optional[str]:
- dep_label = RESIDUAL_DEP_LABELS.get(token.dep_)
- if dep_label:
- return dep_label
- return RESIDUAL_POS_LABELS.get(token.pos_)
- def _collect_residual_roles(
- sentence: SpacySpan,
- tokens: List[Token],
- spans: List[Span],
- sent_bounds: Tuple[int, int],
- summary: Optional[SentenceSummary],
- mapping: Dict[int, int],
- ) -> None:
- sent_start, sent_end = sent_bounds
- if sent_start < 0 or sent_end < 0 or sent_start >= sent_end:
- return
- coverage = [False] * (sent_end - sent_start)
- for span in spans:
- lo = max(span.start_token, sent_start)
- hi = min(span.end_token, sent_end)
- for idx in range(lo, hi):
- coverage[idx - sent_start] = True
- doc = sentence.doc
- for offset, covered in enumerate(coverage):
- if covered:
- continue
- token = tokens[sent_start + offset]
- if token.kind != "word":
- continue
- span = doc.char_span(token.start, token.end, alignment_mode="expand")
- if not span or not span.text.strip():
- continue
- label = _label_residual_token(span[0])
- if summary is not None and label and label not in summary.residual_roles:
- summary.residual_roles.append(label)
- if label:
- add_char_based_span(
- spans,
- token.start,
- token.end,
- "role-residual",
- mapping,
- attrs={"data-role": label},
- )
- def _classify_sentence_complexity(summary: SentenceSummary) -> Tuple[str, bool]:
- clause_count = len(summary.clauses)
- connector_count = len(summary.connectors)
- word_count = summary.sentence_length
- if clause_count >= 2:
- return "多重复杂句", True
- if clause_count == 1:
- return "主从复合句", True
- if connector_count >= 2:
- return "并列复合句", True
- if word_count >= 25:
- return "长句", True
- return "简单句", False
- def _translate_clause_functions(functions: List[str]) -> List[str]:
- translated = []
- for item in functions:
- label = CLAUSE_FUNCTION_LABELS.get(item, item)
- if label not in translated:
- translated.append(label)
- return translated
- def build_sentence_note(summary: SentenceSummary) -> Tuple[str, bool]:
- note_parts: List[str] = []
- clause_label = "无"
- if summary.clauses:
- counts = Counter(summary.clauses)
- clause_label = "、".join(
- f"{name}×{count}" if count > 1 else name for name, count in counts.items()
- )
- functions = _translate_clause_functions(summary.clause_functions)
- connectors = list(dict.fromkeys(summary.connectors))
- residual = summary.residual_roles
- subjects_seq = list(dict.fromkeys(summary.subjects))
- predicates_seq = list(dict.fromkeys(summary.predicates))
- objects_seq = list(dict.fromkeys(summary.objects))
- complements_seq = list(dict.fromkeys(summary.complements))
- subjects = "、".join(subjects_seq) if subjects_seq else "未识别"
- predicates = "、".join(predicates_seq) if predicates_seq else "未识别"
- objects = "、".join(objects_seq) if objects_seq else "无"
- complements = "、".join(complements_seq) if complements_seq else "无"
- note_parts.append(f"主语:{subjects}")
- note_parts.append(f"谓语:{predicates}")
- note_parts.append(f"宾语:{objects}")
- if complements != "无":
- note_parts.append(f"补语:{complements}")
- note_parts.append(f"从句:{clause_label}")
- if functions:
- note_parts.append(f"从句功能:{'、'.join(functions)}")
- connector_text = "、".join(connectors) if connectors else "未检测到典型连接词"
- note_parts.append(f"连接词:{connector_text}")
- if residual:
- note_parts.append(f"未高亮:{'、'.join(residual)}")
- complexity_label, is_complex = _classify_sentence_complexity(summary)
- note_parts.insert(0, f"句型:{complexity_label}")
- note_parts.append(f"词数:{summary.sentence_length}")
- return ";".join(note_parts), is_complex
- def render_with_spans(tokens: List[Token], spans: List[Span]) -> str:
- spans = sorted(spans, key=lambda s: (s.start_token, -s.end_token))
- out_parts: List[str] = []
- active_stack: List[Span] = []
- span_queue = list(spans)
- current_idx = 0
- def open_span(span: Span):
- attrs = ""
- if span.attrs:
- attrs = " " + " ".join(
- f"{k}='" + html.escape(v, quote=True) + "'" for k, v in span.attrs.items()
- )
- out_parts.append(f"<span class='{span.cls}'{attrs}>")
- def close_span():
- out_parts.append("</span>")
- while current_idx < len(tokens):
- opening = [sp for sp in span_queue if sp.start_token == current_idx]
- for sp in opening:
- open_span(sp)
- active_stack.append(sp)
- span_queue.remove(sp)
- token = tokens[current_idx]
- out_parts.append(html.escape(token.text))
- current_idx += 1
- while active_stack and active_stack[-1].end_token == current_idx:
- active_stack.pop()
- close_span()
- while active_stack:
- active_stack.pop()
- close_span()
- return "".join(out_parts)
- def _run_pipeline_without_benepar(text: str) -> "spacy.tokens.Doc":
- """Run the spaCy pipeline skipping benepar, for robust fallback."""
- assert NLP is not None
- doc = NLP.make_doc(text)
- for name, proc in NLP.pipeline:
- if name == "benepar":
- continue
- doc = proc(doc)
- return doc
- def highlight_text_with_spacy(
- text: str,
- paragraph_meta: Optional[List[Dict[str, str]]] = None,
- include_helper: bool = False,
- paragraph_ranges: Optional[List[Tuple[int, int]]] = None,
- ) -> str:
- if NLP is None:
- raise RuntimeError(f"spaCy pipeline unavailable: {NLP_LOAD_ERROR}")
- tokens = tokenize_preserve(text)
- if not tokens:
- return ""
- mapping = build_char_to_token_map(tokens)
- # Robust doc creation: if benepar causes any error, skip it and fallback.
- try:
- doc = NLP(text)
- except Exception as exc:
- _ensure_benepar_warning(
- f"Benepar failed during processing: {exc}. Falling back to dependency-based spans."
- )
- doc = _run_pipeline_without_benepar(text)
- ranges = None
- if paragraph_ranges:
- valid = True
- for start, end in paragraph_ranges:
- if start < 0 or end < start or end > len(text):
- valid = False
- break
- if valid:
- ranges = list(paragraph_ranges)
- if not ranges:
- ranges = _split_paragraph_ranges(text)
- paragraph_counters = [0 for _ in ranges]
- paragraph_idx = 0
- paragraph_spans: List[Span] = []
- paragraph_attrs = paragraph_meta if paragraph_meta and len(paragraph_meta) == len(ranges) else None
- for idx, (start, end) in enumerate(ranges):
- attrs = None
- if paragraph_attrs:
- attrs = paragraph_attrs[idx] or None
- add_char_based_span(paragraph_spans, start, end, "paragraph-scope", mapping, attrs=attrs)
- spans: List[Span] = list(paragraph_spans)
- for sent in doc.sents:
- while paragraph_idx < len(ranges) and ranges[paragraph_idx][1] <= sent.start_char:
- paragraph_idx += 1
- current_idx = min(paragraph_idx, len(ranges) - 1)
- paragraph_counters[current_idx] += 1
- sentence_label = _circled_number(paragraph_counters[current_idx])
- sentence_spans, summary = annotate_sentence(tokens, sent, mapping, collect_summary=include_helper)
- sent_bounds = char_span_to_token_span(sent.start_char, sent.end_char, mapping)
- sent_start, sent_end = sent_bounds
- if sent_start >= 0 and sent_end >= 0:
- _collect_residual_roles(sent, tokens, sentence_spans, sent_bounds, summary, mapping)
- helper_note = ""
- is_complex = False
- if include_helper and summary is not None:
- helper_note, is_complex = build_sentence_note(summary)
- attrs = {
- "data-sid": sentence_label,
- }
- if include_helper:
- attrs["data-complex"] = "1" if is_complex else "0"
- attrs["data-note"] = helper_note
- sentence_spans.append(Span(start_token=sent_start, end_token=sent_end, cls="sentence-scope", attrs=attrs))
- spans.extend(sentence_spans)
- return render_with_spans(tokens, spans)
- def _build_analysis_container(fragment: str, include_helper: bool) -> str:
- helper_state = "on" if include_helper else "off"
- return f"<div class='analysis' data-helper='{helper_state}'>{fragment}</div>"
- def _build_highlighted_html(fragment: str, include_helper: bool) -> str:
- return f"{STYLE_BLOCK}{_build_analysis_container(fragment, include_helper)}"
- def _perform_analysis(text: str, include_helper: bool) -> AnalyzeResponse:
- sanitized_fragment = highlight_text_with_spacy(text, include_helper=include_helper)
- highlighted_html = _build_highlighted_html(sanitized_fragment, include_helper)
- return AnalyzeResponse(highlighted_html=highlighted_html)
- app = FastAPI(title="Grammar Highlight API (spaCy + benepar)")
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.post("/analyze", response_model=AnalyzeResponse)
- async def analyze(req: AnalyzeRequest):
- text = req.text
- if text is None or not text.strip():
- raise HTTPException(status_code=400, detail="Text is required")
- try:
- return _perform_analysis(text, include_helper=False)
- except RuntimeError as exc:
- raise HTTPException(status_code=500, detail=str(exc)) from exc
- except Exception as exc: # pragma: no cover - defensive
- raise HTTPException(status_code=500, detail=f"Analysis failed: {exc}") from exc
- @app.post("/analyze/detail", response_model=AnalyzeResponse)
- async def analyze_with_helper(req: AnalyzeRequest):
- text = req.text
- if text is None or not text.strip():
- raise HTTPException(status_code=400, detail="Text is required")
- try:
- return _perform_analysis(text, include_helper=True)
- except RuntimeError as exc:
- raise HTTPException(status_code=500, detail=str(exc)) from exc
- except Exception as exc: # pragma: no cover - defensive
- raise HTTPException(status_code=500, detail=f"Analysis failed: {exc}") from exc
- @app.get("/health")
- async def health():
- status = "ok" if NLP is not None else "failed"
- detail = None if NLP is not None else str(NLP_LOAD_ERROR)
- payload = {"status": status}
- if detail:
- payload["detail"] = detail
- if BENE_PAR_WARNING:
- payload["warning"] = BENE_PAR_WARNING
- payload["benepar_attached"] = HAS_BENEPAR
- return payload
- @app.get("/proxy", response_class=HTMLResponse)
- async def proxy(url: Optional[str] = None, show_images: bool = False):
- if not url:
- return HTMLResponse(_render_proxy_page(show_images=show_images))
- try:
- (
- normalized_url,
- title,
- page_text,
- images,
- code_blocks,
- paragraph_meta,
- paragraph_ranges,
- ) = await _fetch_remote_plaintext(url)
- highlighted_fragment = highlight_text_with_spacy(
- page_text,
- paragraph_meta=paragraph_meta or None,
- paragraph_ranges=paragraph_ranges or None,
- )
- if code_blocks:
- highlighted_fragment = _inject_proxy_codeblocks(highlighted_fragment, code_blocks)
- image_notice = None
- if images:
- if show_images:
- highlighted_fragment = _inject_proxy_images(highlighted_fragment, images)
- else:
- highlighted_fragment = _strip_proxy_image_markers(highlighted_fragment)
- image_notice = (
- f"检测到 {len(images)} 张正文图片,为提速默认隐藏。勾选“显示图片”后重新抓取即可加载原图。"
- )
- html_body = _render_proxy_page(
- url_value=normalized_url,
- message="分析完成,结果如下。",
- highlight_fragment=highlighted_fragment,
- source_url=normalized_url,
- source_title=title,
- show_images=show_images,
- image_notice=image_notice,
- source_plaintext=page_text,
- )
- return HTMLResponse(html_body)
- except ValueError as exc:
- body = _render_proxy_page(url_value=url or "", message=str(exc), is_error=True, show_images=show_images)
- return HTMLResponse(body, status_code=400)
- except httpx.HTTPError as exc:
- # Provide a clearer message for common HTTP errors from the remote site.
- msg = None
- if isinstance(exc, httpx.HTTPStatusError) and exc.response is not None:
- status = exc.response.status_code
- if status == 403:
- msg = (
- "抓取页面失败:目标站点返回 403 Forbidden(禁止访问)。"
- "该网站很可能禁止自动抓取或代理访问,目前无法通过本工具获取正文,"
- "可以尝试在浏览器中打开并手动复制需要的内容。"
- )
- else:
- msg = f"抓取页面失败:目标站点返回 HTTP {status}。"
- if msg is None:
- msg = f"抓取页面失败:{exc}"
- body = _render_proxy_page(
- url_value=url or "",
- message=msg,
- is_error=True,
- show_images=show_images,
- )
- return HTMLResponse(body, status_code=502)
- except Exception as exc:
- body = _render_proxy_page(
- url_value=url or "",
- message=f"代理分析失败:{exc}",
- is_error=True,
- show_images=show_images,
- )
- return HTMLResponse(body, status_code=500)
- @app.get("/", response_class=HTMLResponse)
- async def ui():
- return """<!DOCTYPE html>
- <html lang=\"zh-CN\">
- <head>
- <meta charset=\"UTF-8\" />
- <meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
- <title>Grammar Highlighter</title>
- <style>
- body { font-family: system-ui, -apple-system, sans-serif; margin: 2rem; line-height: 1.6; }
- textarea { width: 100%; min-height: 140px; font-size: 1rem; padding: 0.75rem; border: 1px solid #d0d7de; border-radius: 0.5rem; }
- button { margin-top: 0.75rem; padding: 0.6rem 1.4rem; font-size: 1rem; cursor: pointer; border: none; border-radius: 999px; background: #1f7a8c; color: #fff; }
- button + button { margin-left: 0.5rem; background: #6b7280; }
- button:disabled { opacity: 0.6; cursor: wait; }
- #result { margin-top: 1.5rem; border-top: 1px solid #e5e7eb; padding-top: 1rem; min-height: 2rem; }
- #status { margin-left: 0.75rem; color: #3b82f6; }
- .err { color: #b00020; }
- .muted { color: #6b7280; font-size: 0.9rem; }
- .tts-controls { margin-top: 0.75rem; display: flex; align-items: center; gap: 0.75rem; flex-wrap: wrap; }
- .tts-controls button { margin-top: 0; background: #f97316; }
- .tts-status { font-size: 0.95rem; color: #475569; }
- .sentence-scope.anchor-highlight { outline: 2px dashed #f97316; outline-offset: 2px; }
- </style>
- </head>
- <body>
- <h1>Grammar Highlighter (spaCy + benepar)</h1>
- <textarea id=\"text\" placeholder=\"Type the English text you want to analyze...\"></textarea>
- <div>
- <button type=\"button\" id=\"submit\">Analyze</button>
- <button type=\"button\" id=\"clear\">清空输入</button>
- <span id=\"status\"></span>
- </div>
- <div class=\"tts-controls\">
- <button type=\"button\" id=\"tts\">朗读高亮文本</button>
- <button type=\"button\" id=\"tts-selection\">朗读选中文本</button>
- <button type=\"button\" id=\"tts-anchor\" disabled>从点击处朗读</button>
- <button type=\"button\" id=\"tts-toggle\" disabled>暂停播放</button>
- <span class=\"tts-status\" id=\"tts-status\"></span>
- </div>
- <div id=\"result\"></div>
- <script>
- const btn = document.getElementById('submit');
- const btnClear = document.getElementById('clear');
- const textarea = document.getElementById('text');
- const statusEl = document.getElementById('status');
- const ttsBtn = document.getElementById('tts');
- const ttsSelectionBtn = document.getElementById('tts-selection');
- const ttsAnchorBtn = document.getElementById('tts-anchor');
- const ttsToggleBtn = document.getElementById('tts-toggle');
- const ttsStatus = document.getElementById('tts-status');
- const result = document.getElementById('result');
- const TTS_ENDPOINT = 'http://141.140.15.30:8028/generate';
- let currentAudio = null;
- let queuedAudios = [];
- let streamingFinished = false;
- let lastAnalyzedText = '';
- let anchorSentenceIndex = 0;
- let isPaused = false;
- let hasHighlightContent = false;
- function resetUI() {
- result.innerHTML = '';
- statusEl.textContent = '';
- statusEl.classList.remove('err');
- ttsStatus.textContent = '';
- hasHighlightContent = false;
- if (ttsAnchorBtn) {
- ttsAnchorBtn.disabled = true;
- }
- resetAnchorState();
- setTtsButtonsDisabled(false);
- resetAudioPlayback();
- }
- function getSentenceNodes() {
- const analysis = result.querySelector('.analysis');
- return analysis ? Array.from(analysis.querySelectorAll('.sentence-scope')) : [];
- }
- function clearAnchorHighlight() {
- const highlighted = result.querySelectorAll('.sentence-scope.anchor-highlight');
- highlighted.forEach(el => el.classList.remove('anchor-highlight'));
- }
- function resetAnchorState() {
- anchorSentenceIndex = 0;
- clearAnchorHighlight();
- }
- function setAnchorFromSentence(sentenceEl) {
- const sentences = getSentenceNodes();
- const idx = sentences.indexOf(sentenceEl);
- if (idx === -1) return;
- anchorSentenceIndex = idx;
- clearAnchorHighlight();
- sentenceEl.classList.add('anchor-highlight');
- const sid = sentenceEl.getAttribute('data-sid') || (idx + 1);
- ttsStatus.textContent = '已选择第 ' + sid + ' 句作为朗读起点';
- }
- btn.addEventListener('click', async () => {
- resetUI();
- const value = textarea.value.trim();
- if (!value) {
- statusEl.textContent = '请输入要分析的英文文本。';
- statusEl.classList.add('err');
- return;
- }
- btn.disabled = true;
- statusEl.textContent = 'Analyzing ...';
- try {
- const response = await fetch('/analyze', {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ text: value })
- });
- if (!response.ok) {
- const error = await response.json().catch(() => ({ detail: 'Request failed' }));
- throw new Error(error.detail || 'Request failed');
- }
- const data = await response.json();
- result.innerHTML = data.highlighted_html || '';
- lastAnalyzedText = value;
- resetAnchorState();
- hasHighlightContent = true;
- if (ttsAnchorBtn) {
- ttsAnchorBtn.disabled = false;
- }
- statusEl.textContent = '';
- } catch (err) {
- statusEl.textContent = '错误:' + (err.message || 'Unknown error');
- statusEl.classList.add('err');
- } finally {
- btn.disabled = false;
- }
- });
- btnClear.addEventListener('click', () => {
- textarea.value = '';
- lastAnalyzedText = '';
- resetUI();
- textarea.focus();
- });
- result.addEventListener('click', event => {
- if (!hasHighlightContent) {
- return;
- }
- const target = event.target;
- const isTextNode = typeof Node !== 'undefined' && target && target.nodeType === Node.TEXT_NODE;
- const base = isTextNode ? target.parentElement : target;
- if (!base || typeof base.closest !== 'function') {
- return;
- }
- const sentenceEl = base.closest('.sentence-scope');
- if (sentenceEl) {
- setAnchorFromSentence(sentenceEl);
- }
- });
- function extractHighlightedText() {
- const highlightRoot = result.querySelector('.analysis');
- return highlightRoot ? highlightRoot.textContent.trim() : '';
- }
- function getFullTextForTts() {
- return lastAnalyzedText || extractHighlightedText();
- }
- function extractAnchorText() {
- const sentences = getSentenceNodes();
- if (!sentences.length) return '';
- const start = Math.min(anchorSentenceIndex, sentences.length - 1);
- const parts = [];
- for (let i = start; i < sentences.length; i++) {
- const text = sentences[i].textContent.trim();
- if (text) {
- parts.push(text);
- }
- }
- return parts.join(' ');
- }
- function setTtsButtonsDisabled(disabled) {
- if (ttsBtn) {
- ttsBtn.disabled = disabled;
- }
- if (ttsSelectionBtn) {
- ttsSelectionBtn.disabled = disabled;
- }
- if (ttsAnchorBtn) {
- ttsAnchorBtn.disabled = disabled || !hasHighlightContent;
- }
- }
- function resetAudioPlayback() {
- queuedAudios = [];
- streamingFinished = false;
- if (currentAudio) {
- currentAudio.pause();
- currentAudio = null;
- }
- resetPauseResumeState();
- }
- function setPauseResumeEnabled(enabled) {
- if (ttsToggleBtn) {
- ttsToggleBtn.disabled = !enabled;
- }
- }
- function resetPauseResumeState() {
- isPaused = false;
- if (ttsToggleBtn) {
- ttsToggleBtn.textContent = '暂停播放';
- }
- setPauseResumeEnabled(false);
- }
- function markStreamingFinished() {
- streamingFinished = true;
- if (!currentAudio && !queuedAudios.length && !isPaused) {
- ttsStatus.textContent = '播放完成';
- setPauseResumeEnabled(false);
- }
- }
- function playNextAudioChunk() {
- if (!queuedAudios.length) {
- currentAudio = null;
- if (streamingFinished && !isPaused) {
- ttsStatus.textContent = '播放完成';
- setPauseResumeEnabled(false);
- } else if (!streamingFinished) {
- ttsStatus.textContent = '等待更多语音...';
- }
- return;
- }
- const chunk = queuedAudios.shift();
- ttsStatus.textContent = '播放中...';
- currentAudio = new Audio('data:audio/wav;base64,' + chunk);
- currentAudio.onended = () => {
- if (!isPaused) {
- playNextAudioChunk();
- }
- };
- currentAudio.onerror = () => {
- ttsStatus.textContent = '播放失败';
- currentAudio = null;
- setPauseResumeEnabled(false);
- };
- currentAudio.play().catch(err => {
- ttsStatus.textContent = '自动播放被阻止:' + err.message;
- currentAudio = null;
- queuedAudios.unshift(chunk);
- setPauseResumeEnabled(true);
- });
- }
- function enqueueAudioChunk(chunk) {
- queuedAudios.push(chunk);
- setPauseResumeEnabled(true);
- if (!currentAudio) {
- playNextAudioChunk();
- }
- }
- function handlePauseResumeToggle() {
- if (!ttsToggleBtn) {
- return;
- }
- if (!currentAudio && !queuedAudios.length) {
- ttsStatus.textContent = '暂无可暂停的语音';
- return;
- }
- if (!currentAudio) {
- playNextAudioChunk();
- ttsToggleBtn.textContent = '暂停播放';
- return;
- }
- if (!isPaused) {
- currentAudio.pause();
- isPaused = true;
- ttsToggleBtn.textContent = '继续播放';
- ttsStatus.textContent = '已暂停';
- } else {
- currentAudio.play().then(() => {
- isPaused = false;
- ttsToggleBtn.textContent = '暂停播放';
- ttsStatus.textContent = '播放中...';
- }).catch(err => {
- ttsStatus.textContent = '无法继续播放:' + err.message;
- });
- }
- }
- function normalizeTtsLine(rawLine) {
- if (typeof rawLine !== 'string') {
- return '';
- }
- let trimmed = rawLine.replace(/\\r/g, '').trim();
- if (!trimmed) {
- return '';
- }
- if (trimmed.startsWith('data:')) {
- trimmed = trimmed.slice(5).trim();
- }
- if (!trimmed || trimmed === '[DONE]') {
- return '';
- }
- return trimmed;
- }
- function parseTtsLine(line) {
- const normalized = normalizeTtsLine(line);
- if (!normalized) {
- return false;
- }
- try {
- const parsed = JSON.parse(normalized);
- if (parsed && parsed.audio) {
- enqueueAudioChunk(parsed.audio);
- return true;
- }
- } catch (err) {
- console.warn('无法解析TTS响应行', err);
- }
- return false;
- }
- async function consumeTtsResponse(response) {
- let chunkCount = 0;
- const handleLine = rawLine => {
- if (parseTtsLine(rawLine)) {
- chunkCount += 1;
- }
- };
- if (response.body && response.body.getReader) {
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let buffer = '';
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- buffer += decoder.decode(value, { stream: true });
- let newlineIndex;
- while ((newlineIndex = buffer.indexOf('\\n')) >= 0) {
- const line = buffer.slice(0, newlineIndex);
- buffer = buffer.slice(newlineIndex + 1);
- handleLine(line);
- }
- }
- buffer += decoder.decode();
- if (buffer) {
- handleLine(buffer);
- }
- } else {
- const payload = await response.text();
- payload.split('\\n').forEach(handleLine);
- }
- return chunkCount;
- }
- function getSelectedPageText() {
- const selection = window.getSelection ? window.getSelection() : null;
- return selection ? selection.toString().trim() : '';
- }
- async function streamTtsRequest(text) {
- const response = await fetch(TTS_ENDPOINT, {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ text })
- });
- if (!response.ok) {
- throw new Error('接口响应错误');
- }
- const chunkCount = await consumeTtsResponse(response);
- if (!chunkCount) {
- throw new Error('接口未返回音频数据');
- }
- markStreamingFinished();
- }
- function createTtsRequest(textResolver, emptyMessage) {
- return async () => {
- const text = textResolver();
- if (!text) {
- ttsStatus.textContent = emptyMessage;
- return;
- }
- setTtsButtonsDisabled(true);
- ttsStatus.textContent = '请求语音...';
- resetAudioPlayback();
- try {
- await streamTtsRequest(text);
- } catch (err) {
- ttsStatus.textContent = 'TTS 出错:' + (err && err.message ? err.message : err);
- resetAudioPlayback();
- } finally {
- setTtsButtonsDisabled(false);
- }
- };
- }
- if (ttsBtn) {
- ttsBtn.addEventListener('click', createTtsRequest(getFullTextForTts, '请先生成高亮结果'));
- }
- if (ttsSelectionBtn) {
- ttsSelectionBtn.addEventListener('click', createTtsRequest(getSelectedPageText, '请先选择要朗读的文本'));
- }
- if (ttsAnchorBtn) {
- ttsAnchorBtn.addEventListener('click', createTtsRequest(extractAnchorText, '请先在结果中点击句子作为朗读起点'));
- }
- if (ttsToggleBtn) {
- ttsToggleBtn.addEventListener('click', handlePauseResumeToggle);
- }
- </script>
- </body>
- </html>"""
- PROXY_PAGE_TEMPLATE = Template(
- """<!DOCTYPE html>
- <html lang=\"zh-CN\">
- <head>
- <meta charset=\"UTF-8\" />
- <meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
- <title>Grammar Proxy Highlighter</title>
- <style>
- body { font-family: system-ui, -apple-system, \"Segoe UI\", sans-serif; margin: 0 auto; max-width: 860px; padding: 1.5rem; line-height: 1.65; }
- h1 { font-size: 1.45rem; margin-bottom: 1rem; }
- form { display: flex; flex-wrap: wrap; gap: 0.5rem; margin-bottom: 0.75rem; }
- input[type=\"url\"] { flex: 1 1 260px; padding: 0.65rem; font-size: 1rem; border-radius: 0.5rem; border: 1px solid #d0d7de; }
- button { padding: 0.65rem 1.4rem; border: none; border-radius: 999px; background: #2563eb; color: #fff; font-size: 1rem; cursor: pointer; }
- .show-images-toggle { display: inline-flex; align-items: center; gap: 0.35rem; font-size: 0.9rem; color: #475569; }
- .show-images-toggle input { width: auto; }
- .tts-controls { margin-top: 0.5rem; display: flex; align-items: center; flex-wrap: wrap; gap: 0.75rem; }
- .tts-controls button { background: #f97316; }
- .tts-status { font-size: 0.95rem; color: #475569; }
- .sentence-scope.anchor-highlight { outline: 2px dashed #f97316; outline-offset: 2px; }
- .status { margin-top: 0.25rem; font-size: 0.95rem; }
- .status.err { color: #b00020; }
- .status.ok { color: #059669; }
- section.result { margin-top: 1.4rem; padding-top: 1rem; border-top: 1px solid #e5e7eb; }
- section.result .source { font-size: 0.95rem; margin-bottom: 0.5rem; color: #475569; word-break: break-word; }
- section.result .source a { color: inherit; text-decoration: underline; }
- section.result img { display:block; margin:0.75rem auto; max-width:100%; height:auto; max-width:min(100%,800px); }
- .image-hint { font-size:0.9rem; color:#6b7280; margin:0.5rem 0 0; }
- .clear-floating { position: fixed; left: 0; right: 0; bottom: 0; padding: 0.55rem 1.5rem; border-radius: 0; border-top: 1px solid #e5e7eb; background: rgba(249,250,251,0.96); display: flex; justify-content: center; z-index: 40; }
- .clear-floating button { padding: 0.55rem 1.8rem; border-radius: 999px; background: #6b7280; color: #fff; font-size: 0.95rem; }
- .clear-floating button:hover { filter: brightness(1.05); }
- @media (prefers-reduced-motion: reduce) { .clear-floating { scroll-behavior: auto; } }
- @media (max-width: 640px) { body { padding-bottom: 3.2rem; } }
- </style>
- $style_block
- </head>
- <body>
- <h1>网页代理高亮</h1>
- <form method=\"get\" action=\"/proxy\" class=\"url-form\">
- <input type=\"url\" name=\"url\" value=\"$url_value\" placeholder=\"https://example.com/article\" required />
- <button type=\"submit\">抓取并高亮</button>
- <label class=\"show-images-toggle\">
- <input type=\"checkbox\" name=\"show_images\" value=\"1\" $show_images_checked />
- <span>显示图片(默认关闭以提升速度)</span>
- </label>
- </form>
- $status_block
- <div class=\"tts-controls\">
- <button type=\"button\" id=\"proxy-tts-btn\" disabled>朗读高亮文本</button>
- <button type=\"button\" id=\"proxy-tts-selection\">朗读选中文本</button>
- <button type=\"button\" id=\"proxy-tts-anchor\" disabled>从点击处朗读</button>
- <button type=\"button\" id=\"proxy-tts-toggle\" disabled>暂停播放</button>
- <span class=\"tts-status\" id=\"proxy-tts-status\"></span>
- </div>
- $result_block
- $source_text_script
- <div class=\"clear-floating\">
- <button type=\"button\" id=\"proxy-reset\">清空并重置</button>
- </div>
- <script>
- (function() {
- var resetBtn = document.getElementById('proxy-reset');
- if (resetBtn) {
- resetBtn.addEventListener('click', function() {
- window.location.href = '/proxy';
- });
- }
- var ttsBtn = document.getElementById('proxy-tts-btn');
- var ttsSelectionBtn = document.getElementById('proxy-tts-selection');
- var ttsAnchorBtn = document.getElementById('proxy-tts-anchor');
- var ttsToggleBtn = document.getElementById('proxy-tts-toggle');
- var ttsStatus = document.getElementById('proxy-tts-status');
- var analysisRoot = document.querySelector('section.result .analysis');
- var proxySourceText = window.__proxySourceText || '';
- var TTS_ENDPOINT = 'http://141.140.15.30:8028/generate';
- var currentAudio = null;
- var queuedAudios = [];
- var streamingFinished = false;
- var anchorSentenceIndex = 0;
- var isPaused = false;
- if (analysisRoot && ttsBtn) {
- ttsBtn.disabled = false;
- }
- if (analysisRoot && ttsAnchorBtn) {
- ttsAnchorBtn.disabled = false;
- }
- function extractProxyText() {
- var container = document.querySelector('section.result .analysis');
- return container ? container.textContent.trim() : '';
- }
- function getSentenceNodes() {
- var container = document.querySelector('section.result .analysis');
- return container ? Array.from(container.querySelectorAll('.sentence-scope')) : [];
- }
- function clearAnchorHighlight() {
- var highlighted = document.querySelectorAll('section.result .sentence-scope.anchor-highlight');
- highlighted.forEach(function(el) {
- el.classList.remove('anchor-highlight');
- });
- }
- function resetAnchorState() {
- anchorSentenceIndex = 0;
- clearAnchorHighlight();
- }
- function setAnchorFromSentence(sentenceEl) {
- var sentences = getSentenceNodes();
- var idx = sentences.indexOf(sentenceEl);
- if (idx === -1) return;
- anchorSentenceIndex = idx;
- clearAnchorHighlight();
- sentenceEl.classList.add('anchor-highlight');
- var sid = sentenceEl.getAttribute('data-sid') || (idx + 1);
- ttsStatus.textContent = '已选择第 ' + sid + ' 句作为朗读起点';
- }
- resetAnchorState();
- var resultSection = document.querySelector('section.result');
- if (resultSection) {
- resultSection.addEventListener('click', function(evt) {
- var target = evt.target;
- var isTextNode = typeof Node !== 'undefined' && target && target.nodeType === Node.TEXT_NODE;
- var base = isTextNode ? target.parentElement : target;
- if (!base || typeof base.closest !== 'function') {
- return;
- }
- var sentenceEl = base.closest('.sentence-scope');
- if (sentenceEl) {
- setAnchorFromSentence(sentenceEl);
- }
- });
- }
- function getFullTextForTts() {
- var text = proxySourceText || extractProxyText();
- return text.trim();
- }
- function extractAnchorText() {
- var sentences = getSentenceNodes();
- if (!sentences.length) return '';
- var start = Math.min(anchorSentenceIndex, sentences.length - 1);
- var parts = [];
- for (var i = start; i < sentences.length; i++) {
- var text = sentences[i].textContent.trim();
- if (text) {
- parts.push(text);
- }
- }
- return parts.join(' ');
- }
- function setTtsButtonsDisabled(disabled) {
- if (ttsBtn) {
- ttsBtn.disabled = disabled;
- }
- if (ttsSelectionBtn) {
- ttsSelectionBtn.disabled = disabled;
- }
- if (ttsAnchorBtn) {
- ttsAnchorBtn.disabled = disabled || !analysisRoot;
- }
- }
- function resetAudioPlayback() {
- queuedAudios = [];
- streamingFinished = false;
- if (currentAudio) {
- currentAudio.pause();
- currentAudio = null;
- }
- resetPauseResumeState();
- }
- function setPauseResumeEnabled(enabled) {
- if (ttsToggleBtn) {
- ttsToggleBtn.disabled = !enabled;
- }
- }
- function resetPauseResumeState() {
- isPaused = false;
- if (ttsToggleBtn) {
- ttsToggleBtn.textContent = '暂停播放';
- }
- setPauseResumeEnabled(false);
- }
- function markStreamingFinished() {
- streamingFinished = true;
- if (!currentAudio && !queuedAudios.length && !isPaused) {
- ttsStatus.textContent = '播放完成';
- setPauseResumeEnabled(false);
- }
- }
- function playNextAudioChunk() {
- if (!queuedAudios.length) {
- currentAudio = null;
- if (streamingFinished && !isPaused) {
- ttsStatus.textContent = '播放完成';
- setPauseResumeEnabled(false);
- } else if (!streamingFinished) {
- ttsStatus.textContent = '等待更多语音...';
- }
- return;
- }
- var chunk = queuedAudios.shift();
- ttsStatus.textContent = '播放中...';
- currentAudio = new Audio('data:audio/wav;base64,' + chunk);
- currentAudio.onended = function() {
- if (!isPaused) {
- playNextAudioChunk();
- }
- };
- currentAudio.onerror = function() {
- ttsStatus.textContent = '播放失败';
- currentAudio = null;
- setPauseResumeEnabled(false);
- };
- currentAudio.play().catch(function(err) {
- ttsStatus.textContent = '自动播放被阻止:' + err.message;
- currentAudio = null;
- queuedAudios.unshift(chunk);
- setPauseResumeEnabled(true);
- });
- }
- function enqueueAudioChunk(chunk) {
- queuedAudios.push(chunk);
- setPauseResumeEnabled(true);
- if (!currentAudio) {
- playNextAudioChunk();
- }
- }
- function handlePauseResumeToggle() {
- if (!ttsToggleBtn) {
- return;
- }
- if (!currentAudio && !queuedAudios.length) {
- ttsStatus.textContent = '暂无可暂停的语音';
- return;
- }
- if (!currentAudio) {
- playNextAudioChunk();
- ttsToggleBtn.textContent = '暂停播放';
- return;
- }
- if (!isPaused) {
- currentAudio.pause();
- isPaused = true;
- ttsToggleBtn.textContent = '继续播放';
- ttsStatus.textContent = '已暂停';
- } else {
- currentAudio.play().then(function() {
- isPaused = false;
- ttsToggleBtn.textContent = '暂停播放';
- ttsStatus.textContent = '播放中...';
- }).catch(function(err) {
- ttsStatus.textContent = '无法继续播放:' + err.message;
- });
- }
- }
- function normalizeProxyTtsLine(rawLine) {
- if (typeof rawLine !== 'string') {
- return '';
- }
- var trimmed = rawLine.replace(/\\r/g, '').trim();
- if (!trimmed) {
- return '';
- }
- if (trimmed.indexOf('data:') === 0) {
- trimmed = trimmed.slice(5).trim();
- }
- if (!trimmed || trimmed === '[DONE]') {
- return '';
- }
- return trimmed;
- }
- function parseTtsLine(line) {
- var normalized = normalizeProxyTtsLine(line);
- if (!normalized) {
- return false;
- }
- try {
- var parsed = JSON.parse(normalized);
- if (parsed && parsed.audio) {
- enqueueAudioChunk(parsed.audio);
- return true;
- }
- } catch (err) {
- console.warn('无法解析TTS响应行', err);
- }
- return false;
- }
- async function consumeTtsResponse(response) {
- var chunkCount = 0;
- var handleLine = function(rawLine) {
- if (parseTtsLine(rawLine)) {
- chunkCount += 1;
- }
- };
- if (response.body && response.body.getReader) {
- var reader = response.body.getReader();
- var decoder = new TextDecoder();
- var buffer = '';
- while (true) {
- var readResult = await reader.read();
- if (readResult.done) {
- break;
- }
- buffer += decoder.decode(readResult.value, { stream: true });
- var newlineIndex;
- while ((newlineIndex = buffer.indexOf('\\n')) >= 0) {
- var line = buffer.slice(0, newlineIndex);
- buffer = buffer.slice(newlineIndex + 1);
- handleLine(line);
- }
- }
- buffer += decoder.decode();
- if (buffer) {
- handleLine(buffer);
- }
- } else {
- var payload = await response.text();
- payload.split('\\n').forEach(handleLine);
- }
- return chunkCount;
- }
- function getSelectedPageText() {
- var selection = window.getSelection ? window.getSelection() : null;
- return selection ? selection.toString().trim() : '';
- }
- async function streamTtsRequest(text) {
- var response = await fetch(TTS_ENDPOINT, {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ text: text })
- });
- if (!response.ok) {
- throw new Error('接口响应错误');
- }
- var chunkCount = await consumeTtsResponse(response);
- if (!chunkCount) {
- throw new Error('接口未返回音频数据');
- }
- markStreamingFinished();
- }
- function createTtsRequest(textResolver, emptyMessage) {
- return async function() {
- var text = textResolver();
- if (!text) {
- ttsStatus.textContent = emptyMessage;
- return;
- }
- setTtsButtonsDisabled(true);
- ttsStatus.textContent = '请求语音...';
- resetAudioPlayback();
- try {
- await streamTtsRequest(text);
- } catch (err) {
- ttsStatus.textContent = 'TTS 出错:' + (err && err.message ? err.message : err);
- resetAudioPlayback();
- } finally {
- setTtsButtonsDisabled(false);
- }
- };
- }
- if (ttsBtn) {
- ttsBtn.addEventListener('click', createTtsRequest(getFullTextForTts, '请先抓取文章内容再朗读'));
- }
- if (ttsSelectionBtn) {
- ttsSelectionBtn.addEventListener('click', createTtsRequest(getSelectedPageText, '请先选择要朗读的文本'));
- }
- if (ttsAnchorBtn) {
- ttsAnchorBtn.addEventListener('click', createTtsRequest(extractAnchorText, '请先点击句子作为朗读起点'));
- }
- if (ttsToggleBtn) {
- ttsToggleBtn.addEventListener('click', handlePauseResumeToggle);
- }
- })();
- </script>
- </body>
- </html>"""
- )
- ALLOWED_URL_SCHEMES = {"http", "https"}
- MAX_REMOTE_HTML_BYTES = 1_000_000
- REMOTE_FETCH_TIMEOUT = 10.0
- REMOTE_FETCH_HEADERS = {
- # Use a browser-like user agent and common headers so that sites which
- # block generic HTTP clients are more likely to return normal content.
- "User-Agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/124.0.0.0 Safari/537.36"
- ),
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
- "Accept-Language": "en-US,en;q=0.9",
- # Let httpx / the underlying HTTP stack negotiate an encoding it can
- # actually decode. If we unconditionally advertise "br" but the runtime
- # does not have brotli support installed, some sites will respond with
- # brotli-compressed payloads that end up as乱码 or decoding errors.
- #
- # Most modern servers default to gzip or identity when the header is
- # absent, which are both handled fine by httpx.
- # "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Upgrade-Insecure-Requests": "1",
- # A few anti‑bot setups check these request headers; keeping them close
- # to real desktop Chrome values slightly improves compatibility, even
- # though they are not a guarantee against 403 responses.
- "Sec-Fetch-Site": "none",
- "Sec-Fetch-Mode": "navigate",
- "Sec-Fetch-User": "?1",
- "Sec-Fetch-Dest": "document",
- }
- SIMPLE_FETCH_HEADERS = {
- # Minimal browser-like headers for the fallback "simple request" path.
- "User-Agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/124.0.0.0 Safari/537.36"
- ),
- "Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Connection": "close",
- }
- def _inject_proxy_images(html_fragment: str, images: List[Dict[str, str]]) -> str:
- """Replace stable image placeholders with <img> tags in the highlighted HTML."""
- result = html_fragment
- for idx, img in enumerate(images):
- marker = img.get("marker") or f"__GHIMG_{idx}__"
- src = html.escape(img.get("src", "") or "", quote=True)
- if not src:
- continue
- alt = html.escape(img.get("alt", "") or "", quote=True)
- title = html.escape(img.get("title", "") or "", quote=True)
- attrs = [f"src='{src}'"]
- if alt:
- attrs.append(f"alt='{alt}'")
- if title:
- attrs.append(f"title='{title}'")
- # Preserve simple width/height hints when they look safe. Most modern
- # pages rely on CSS for sizing, but explicit attributes can help keep
- # code snippets or diagrams close to their original scale.
- def _safe_dim(value: Optional[str]) -> Optional[str]:
- if not value:
- return None
- value = value.strip()
- if re.fullmatch(r"\d+(?:\.\d+)?(px|%)?", value):
- return value
- return None
- width = _safe_dim(img.get("width"))
- height = _safe_dim(img.get("height"))
- if width:
- attrs.append(f"width='{html.escape(width, quote=True)}'")
- if height:
- attrs.append(f"height='{html.escape(height, quote=True)}'")
- img_tag = "<img " + " ".join(attrs) + " />"
- # Simple textual replacement is sufficient because placeholders
- # are emitted as plain word tokens without HTML meta characters.
- result = result.replace(marker, img_tag)
- return result
- IMG_MARKER_RE = re.compile(r"__GHIMG_\d+__")
- def _strip_proxy_image_markers(html_fragment: str) -> str:
- """Remove residual image placeholders when images are hidden."""
- if IMG_MARKER_RE.search(html_fragment) is None:
- return html_fragment
- return IMG_MARKER_RE.sub("", html_fragment)
- def _inject_proxy_codeblocks(html_fragment: str, code_blocks: List[Dict[str, str]]) -> str:
- """Replace code placeholders with <pre><code> blocks, preserving formatting."""
- result = html_fragment
- for idx, block in enumerate(code_blocks):
- marker = block.get("marker") or f"__GHCODE_{idx}__"
- raw = block.get("text") or ""
- if not raw.strip():
- continue
- # Escape HTML but keep newlines so that <pre> preserves formatting.
- code_html = html.escape(raw, quote=False)
- pre_tag = f"<pre><code>{code_html}</code></pre>"
- result = result.replace(marker, pre_tag)
- return result
- class SimpleHTMLStripper(HTMLParser):
- def __init__(self):
- super().__init__()
- # Accumulate visible text into paragraph-like blocks while skipping
- # navigation / sidebars / ads etc. We do this with a small HTML
- # structure–aware state machine instead of flattening everything.
- self._blocks: List[Dict[str, Any]] = []
- self._current_parts: List[str] = []
- # Track when we are inside potentially main content containers
- # like <article> or <main>.
- self._article_depth = 0
- # Track whether we are inside a preformatted code block so that we
- # can preserve indentation and line breaks instead of collapsing
- # whitespace as normal text.
- self._in_pre = False
- self._in_code = False
- self._current_code_chunks: List[str] = []
- self._code_blocks: List[Dict[str, str]] = []
- # Stack of flags indicating which open tags should be skipped.
- # When any active flag is True, textual data is ignored.
- self._skip_stack: List[bool] = []
- self._skip_depth = 0
- self._title_chunks: List[str] = []
- self._in_title = False
- self._h1_chunks: List[str] = []
- self._h1_main_chunks: List[str] = []
- self._in_h1 = False
- # Collected inline images from the main content, in document order.
- # Each image is represented as a small dict with sanitized attributes.
- self._images: List[Dict[str, str]] = []
- # Active list containers (<ul>/<ol>) and current <li> nesting state.
- self._list_stack: List[Dict[str, Any]] = []
- self._list_item_stack: List[Dict[str, Any]] = []
- # Keywords commonly used in class/id attributes for non‑article areas
- _NOISE_KEYWORDS = {
- "sidebar",
- "side-bar",
- "aside",
- "nav",
- "menu",
- "breadcrumb",
- "breadcrumbs",
- "pagination",
- "pager",
- "comment",
- "comments",
- "reply",
- "advert",
- "ad-",
- "ads",
- "sponsor",
- "promo",
- "promotion",
- "related",
- "recommend",
- "share",
- "social",
- "subscribe",
- "signup",
- "login",
- "popup",
- "modal",
- "banner",
- "cookie",
- "notification",
- "toolbar",
- "footer",
- "header-bar",
- }
- # Tags whose textual content is almost never part of the main article.
- _ALWAYS_SKIP_TAGS = {
- "script",
- "style",
- "noscript",
- "nav",
- "aside",
- "footer",
- "form",
- "svg",
- "iframe",
- "button",
- "input",
- "textarea",
- "select",
- "option",
- "label",
- }
- # Structural container tags where noise classes/roles are meaningful.
- # For purely inline tags we avoid applying aggressive noise heuristics
- # so that important inline text (e.g. spans in the first sentence) is
- # not accidentally dropped.
- _STRUCTURAL_NOISE_TAGS = {
- "div",
- "section",
- "aside",
- "nav",
- "header",
- "footer",
- "main",
- "article",
- "ul",
- "ol",
- "li",
- }
- # Block-level tags that naturally mark paragraph boundaries.
- _BLOCK_TAGS = {
- "p",
- "li",
- "blockquote",
- "h1",
- "h2",
- "h3",
- "h4",
- "h5",
- "h6",
- "pre",
- "table",
- "tr",
- }
- # Keywords for containers that are likely to hold the main article body.
- # Used to decide which regions count as "main content" for both text
- # and inline images.
- _CONTENT_KEYWORDS = {
- "content",
- "main-content",
- "article-body",
- "post-body",
- "post-content",
- "entry-content",
- "story-body",
- "blog-post",
- "markdown-body",
- "readable-content",
- }
- # Keywords on image-related class/id/src that usually indicate avatars,
- # logo icons, decorative banners, etc., which we want to drop from the
- # extracted main content.
- _IMAGE_NOISE_KEYWORDS = {
- "avatar",
- "author",
- "logo",
- "icon",
- "favicon",
- "badge",
- "banner",
- "thumb",
- "thumbnail",
- "profile",
- "cover",
- "background",
- "sprite",
- "emoji",
- "reaction",
- }
- _TEXT_NOISE_KEYWORDS = {
- "menu",
- "menus",
- "navigation",
- "nav",
- "目录",
- "目錄",
- "导航",
- "導航",
- "菜单",
- "菜單",
- "广告",
- "廣告",
- "ad",
- "ads",
- "sponsor",
- "sponsored",
- "上一篇",
- "下一篇",
- "返回顶部",
- "返回頂部",
- "分享",
- "分享至",
- "相关推荐",
- "相关阅读",
- "相關閱讀",
- "recommended",
- "related posts",
- "login",
- "signup",
- }
- _TEXT_NOISE_PREFIXES = (
- "目录",
- "目錄",
- "导航",
- "導航",
- "菜单",
- "菜單",
- "广告",
- "廣告",
- "上一篇",
- "下一篇",
- "上一页",
- "下一页",
- "返回目录",
- "返回目錄",
- "返回顶部",
- "返回頂部",
- "分享",
- "相关",
- "相關",
- "recommended",
- "login",
- "signup",
- )
- def _finish_paragraph(self) -> None:
- """Flush current buffered tokens into a paragraph list."""
- if not self._current_parts:
- return
- # For regular paragraphs we still collapse excessive internal
- # whitespace, but we keep logical breaks between paragraphs
- # themselves so that the downstream highlighter can reconstruct
- # paragraph structure.
- text = " ".join(self._current_parts)
- text = re.sub(r"\s+", " ", text).strip()
- self._current_parts = []
- if not text:
- return
- if self._looks_like_noise_paragraph(text):
- return
- block_kind = "paragraph"
- list_kind: Optional[str] = None
- list_depth = 0
- list_index: Optional[int] = None
- if self._list_item_stack:
- list_ctx = self._list_item_stack[-1]
- block_kind = "list-item"
- list_kind = list_ctx.get("list_type") or "ul"
- depth_value = list_ctx.get("depth", 1)
- try:
- depth_int = int(depth_value)
- except (TypeError, ValueError):
- depth_int = 1
- list_depth = min(max(depth_int, 1), 5)
- if list_kind == "ol":
- idx = list_ctx.get("index")
- if isinstance(idx, int):
- list_index = idx
- self._blocks.append(
- {
- "text": text,
- "is_main": self._article_depth > 0,
- "kind": block_kind,
- "list_kind": list_kind,
- "list_depth": list_depth,
- "list_index": list_index,
- }
- )
- def _looks_like_noise_paragraph(self, text: str) -> bool:
- normalized = text.strip()
- if not normalized:
- return True
- lowered = normalized.lower()
- compact = re.sub(r"\s+", "", lowered)
- for prefix in self._TEXT_NOISE_PREFIXES:
- if lowered.startswith(prefix.lower()):
- if len(normalized) <= 80:
- return True
- if len(normalized) <= 80:
- for keyword in self._TEXT_NOISE_KEYWORDS:
- if keyword in lowered or keyword in compact:
- return True
- # Skip very short bullet-like crumbs that mostly consist of symbols.
- if len(normalized) <= 6 and sum(ch.isalnum() for ch in normalized) <= 1:
- return True
- return False
- @staticmethod
- def _parse_ordered_start(raw_value: Optional[str]) -> int:
- if raw_value is None:
- return 1
- value = raw_value.strip()
- if not value:
- return 1
- try:
- parsed = int(value)
- return parsed if parsed >= 1 else 1
- except ValueError:
- return 1
- def handle_starttag(self, tag, attrs):
- lowered = tag.lower()
- # Paragraph boundary before starting a new block element or <br>.
- if lowered in self._BLOCK_TAGS or lowered == "br":
- if self._skip_depth == 0:
- self._finish_paragraph()
- # Entering a <pre> region – treat it as a dedicated code block.
- if lowered == "pre" and self._skip_depth == 0:
- self._finish_paragraph()
- self._in_pre = True
- self._current_code_chunks = []
- # Decide whether this element should be skipped entirely.
- attr_dict = {k.lower(): (v or "") for k, v in attrs}
- role = attr_dict.get("role", "").lower()
- classes_ids = (attr_dict.get("class", "") + " " + attr_dict.get("id", "")).lower()
- is_noise_attr = False
- # Only treat class/id keywords as layout "noise" on structural
- # containers (div/section/nav/etc). Inline tags with "comment"
- # in their class (like mdspan-comment on Towards Data Science)
- # should not be discarded, otherwise we lose the first words
- # of sentences.
- if lowered in self._STRUCTURAL_NOISE_TAGS:
- is_noise_attr = any(key in classes_ids for key in self._NOISE_KEYWORDS)
- if role in {"navigation", "banner", "contentinfo", "complementary"}:
- is_noise_attr = True
- skip_this = lowered in self._ALWAYS_SKIP_TAGS or is_noise_attr
- if skip_this:
- self._skip_depth += 1
- self._skip_stack.append(skip_this)
- # Track when we are inside an article-like container; only count if not skipped.
- if self._skip_depth == 0 and lowered in {"article", "main", "section", "div"}:
- # Treat semantic containers and common "main content" classes as
- # part of the article area so that we keep their text and inline
- # media but still avoid sidebars / nav.
- if lowered in {"article", "main"} or any(
- key in classes_ids for key in self._CONTENT_KEYWORDS
- ) or role == "main":
- self._article_depth += 1
- if self._skip_depth == 0 and lowered in {"ul", "ol"}:
- start = 1
- if lowered == "ol":
- start = self._parse_ordered_start(attr_dict.get("start"))
- self._list_stack.append(
- {
- "type": lowered,
- "start": start,
- "next_index": start,
- }
- )
- if lowered == "li" and self._skip_depth == 0:
- list_ctx = self._list_stack[-1] if self._list_stack else None
- depth = len(self._list_stack) if self._list_stack else 1
- list_type = list_ctx.get("type") if list_ctx else "ul"
- index = None
- if list_ctx and list_ctx["type"] == "ol":
- index = list_ctx["next_index"]
- list_ctx["next_index"] = index + 1
- li_value = attr_dict.get("value")
- if li_value and list_ctx and list_ctx["type"] == "ol":
- try:
- value_idx = int(li_value)
- index = value_idx
- list_ctx["next_index"] = value_idx + 1
- except ValueError:
- pass
- self._list_item_stack.append(
- {
- "list_type": list_type,
- "index": index,
- "depth": depth,
- }
- )
- if lowered == "title" and self._skip_depth == 0:
- self._in_title = True
- if lowered == "h1" and self._skip_depth == 0:
- self._in_h1 = True
- if lowered == "code" and self._skip_depth == 0 and self._in_pre:
- # Nested <code> inside <pre> – keep track but we don't need
- # separate buffering beyond the enclosing pre block.
- self._in_code = True
- # Inline image handling: only keep <img> elements that are inside the
- # main article content (tracked via _article_depth) and that do not
- # look like avatars / logos / decorative icons. We insert a stable
- # placeholder token into the text stream so that the /proxy renderer
- # can later replace it with a real <img> tag while preserving the
- # grammar highlighting.
- if lowered == "img" and self._skip_depth == 0 and self._article_depth > 0:
- src = attr_dict.get("src", "").strip()
- if src:
- alt = attr_dict.get("alt", "") or ""
- title = attr_dict.get("title", "") or ""
- width = (attr_dict.get("width") or "").strip()
- height = (attr_dict.get("height") or "").strip()
- img_classes_ids = classes_ids + " " + src.lower()
- if any(key in img_classes_ids for key in self._IMAGE_NOISE_KEYWORDS):
- return
- marker = f"__GHIMG_{len(self._images)}__"
- img_info: Dict[str, str] = {
- "marker": marker,
- "src": src,
- "alt": alt,
- "title": title,
- }
- if width:
- img_info["width"] = width
- if height:
- img_info["height"] = height
- self._images.append(img_info)
- # Treat the image as an inline token within the current
- # paragraph. Paragraph finishing logic will ensure it
- # stays grouped with surrounding text.
- self._current_parts.append(marker)
- def handle_endtag(self, tag):
- lowered = tag.lower()
- if lowered == "code" and self._in_code:
- self._in_code = False
- if lowered == "pre" and self._in_pre:
- self._in_pre = False
- # Finalize the current code block into a single placeholder
- # token so that it passes through the grammar highlighter
- # untouched, and can later be restored as a <pre><code> block.
- code_text = "".join(self._current_code_chunks)
- self._current_code_chunks = []
- if code_text.strip() and self._skip_depth == 0:
- marker = f"__GHCODE_{len(self._code_blocks)}__"
- self._code_blocks.append({"marker": marker, "text": code_text})
- # We append the marker to the paragraph parts so that
- # get_text() emits it in the right position.
- self._current_parts.append(marker)
- # Closing a block element ends the current paragraph.
- if lowered in self._BLOCK_TAGS and self._skip_depth == 0:
- self._finish_paragraph()
- if lowered == "li" and self._skip_depth == 0 and self._list_item_stack:
- self._list_item_stack.pop()
- if lowered in {"ul", "ol"} and self._skip_depth == 0 and self._list_stack:
- self._list_stack.pop()
- if lowered == "title":
- self._in_title = False
- if lowered == "h1":
- self._in_h1 = False
- if lowered in {"article", "main", "section"} and self._skip_depth == 0 and self._article_depth > 0:
- self._article_depth -= 1
- if self._skip_stack:
- skip_this = self._skip_stack.pop()
- if skip_this and self._skip_depth > 0:
- self._skip_depth -= 1
- def handle_data(self, data):
- if self._skip_depth > 0:
- return
- if self._in_pre or self._in_code:
- # Preserve code blocks exactly as they appear, including
- # newlines and indentation.
- self._current_code_chunks.append(data)
- return
- stripped = data.strip()
- if not stripped:
- return
- if self._in_title:
- self._title_chunks.append(stripped)
- return
- # Regular visible text
- self._current_parts.append(stripped)
- if self._in_h1:
- self._h1_chunks.append(stripped)
- if self._article_depth > 0:
- self._h1_main_chunks.append(stripped)
- def get_text(self) -> str:
- # Flush any trailing paragraph.
- self._finish_paragraph()
- blocks = self._selected_blocks()
- if not blocks:
- return ""
- # Keep natural paragraphs contiguous with a single newline instead of
- # injecting blank lines that did not exist in the source.
- return "\n".join(block["text"] for block in blocks)
- def _selected_blocks(self) -> List[Dict[str, Any]]:
- if not self._blocks:
- return []
- main_blocks = [block for block in self._blocks if block.get("is_main")]
- return main_blocks if main_blocks else self._blocks
- def get_blocks(self) -> List[Dict[str, Any]]:
- blocks = self._selected_blocks()
- return [dict(block) for block in blocks]
- def get_title(self) -> str:
- # Prefer <h1> heading (especially inside <article>/<main>) as the
- # primary title; fall back to <title>.
- if self._h1_main_chunks:
- raw = " ".join(self._h1_main_chunks)
- elif self._h1_chunks:
- raw = " ".join(self._h1_chunks)
- elif self._title_chunks:
- raw = " ".join(self._title_chunks)
- else:
- return ""
- return re.sub(r"\s+", " ", raw).strip()
- def get_images(self) -> List[Dict[str, str]]:
- """Return the list of captured inline images in document order."""
- return list(self._images)
- def get_code_blocks(self) -> List[Dict[str, str]]:
- """Return captured code blocks (from <pre>/<code>) in document order."""
- return list(self._code_blocks)
- def _normalize_target_url(raw_url: str) -> str:
- candidate = (raw_url or "").strip()
- if not candidate:
- raise ValueError("请输入要抓取的 URL。")
- parsed = urlparse(candidate if "://" in candidate else f"https://{candidate}")
- if parsed.scheme not in ALLOWED_URL_SCHEMES:
- raise ValueError("仅支持 http/https 协议链接。")
- if not parsed.netloc:
- raise ValueError("URL 缺少域名部分。")
- sanitized = parsed._replace(fragment="")
- return urlunparse(sanitized)
- def _fallback_html_to_text(html_body: str) -> str:
- """Very simple HTML-to-text fallback used when structured extraction fails.
- This does not attempt to distinguish main content from navigation, but it
- guarantees we return *something* for pages whose structure confuses the
- SimpleHTMLStripper heuristics (e.g. some mirror sites).
- """
- # Drop script/style/noscript content outright.
- cleaned = re.sub(
- r"(?is)<(script|style|noscript)[^>]*>.*?</\1>",
- " ",
- html_body,
- )
- # Convert common block separators into newlines.
- cleaned = re.sub(r"(?i)<br\s*/?>", "\n", cleaned)
- cleaned = re.sub(r"(?i)</p\s*>", "\n\n", cleaned)
- cleaned = re.sub(r"(?i)</(div|section|article|li|h[1-6])\s*>", "\n\n", cleaned)
- # Remove all remaining tags.
- cleaned = re.sub(r"(?is)<[^>]+>", " ", cleaned)
- cleaned = html.unescape(cleaned)
- # Normalize whitespace but keep paragraph-level blank lines.
- cleaned = cleaned.replace("\r", "")
- # Collapse runs of spaces/tabs inside lines.
- cleaned = re.sub(r"[ \t\f\v]+", " ", cleaned)
- # Collapse 3+ blank lines into just 2.
- cleaned = re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned)
- cleaned = cleaned.strip()
- return cleaned
- def _build_paragraph_metadata(blocks: List[Dict[str, Any]]) -> List[Dict[str, str]]:
- """Convert stripped block info into span attributes for downstream rendering."""
- if not blocks:
- return []
- paragraph_meta: List[Dict[str, str]] = []
- for block in blocks:
- attrs: Dict[str, str] = {}
- if block.get("kind") == "list-item" and block.get("list_kind"):
- attrs["data-list-kind"] = str(block["list_kind"])
- depth = block.get("list_depth")
- if depth:
- attrs["data-list-depth"] = str(depth)
- if block.get("list_kind") == "ol" and block.get("list_index") is not None:
- attrs["data-list-index"] = str(block["list_index"])
- paragraph_meta.append(attrs)
- return paragraph_meta
- def _build_paragraph_ranges(blocks: List[Dict[str, Any]]) -> List[Tuple[int, int]]:
- """Map each stripped block to its char span within the joined plain text."""
- if not blocks:
- return []
- ranges: List[Tuple[int, int]] = []
- cursor = 0
- for idx, block in enumerate(blocks):
- text = block.get("text") or ""
- start = cursor
- end = start + len(text)
- ranges.append((start, end))
- cursor = end
- # Plain text joins blocks with a single newline; skip trailing newline.
- if idx < len(blocks) - 1:
- cursor += 1
- return ranges
- def _decode_html_bytes(raw_content: bytes, encoding_hint: Optional[str]) -> str:
- encoding_candidates: List[str] = []
- if encoding_hint:
- encoding_candidates.append(encoding_hint)
- encoding_candidates.extend(["utf-8", "latin-1"])
- last_exc: Optional[Exception] = None
- for enc in encoding_candidates:
- try:
- html_body = raw_content.decode(enc, errors="replace")
- break
- except Exception as exc: # pragma: no cover - defensive
- last_exc = exc
- else: # pragma: no cover - extremely unlikely
- raise RuntimeError(f"无法解码远程页面内容: {last_exc}")
- if len(html_body) > MAX_REMOTE_HTML_BYTES:
- html_body = html_body[:MAX_REMOTE_HTML_BYTES]
- return html_body
- async def _download_html_via_httpx(url: str) -> str:
- async with httpx.AsyncClient(timeout=REMOTE_FETCH_TIMEOUT, follow_redirects=True) as client:
- response = await client.get(url, headers=REMOTE_FETCH_HEADERS)
- html_body = _decode_html_bytes(response.content, response.encoding)
- response.raise_for_status()
- return html_body
- async def _download_html_via_stdlib(url: str) -> str:
- def _sync_fetch() -> Tuple[bytes, Optional[str]]:
- req = urllib_request.Request(url, headers=SIMPLE_FETCH_HEADERS)
- opener = urllib_request.build_opener(urllib_request.ProxyHandler({}))
- with opener.open(req, timeout=REMOTE_FETCH_TIMEOUT) as resp:
- data = resp.read(MAX_REMOTE_HTML_BYTES + 1)
- headers = getattr(resp, "headers", None)
- encoding_hint = None
- if headers is not None:
- get_charset = getattr(headers, "get_content_charset", None)
- if callable(get_charset):
- encoding_hint = get_charset()
- if not encoding_hint:
- content_type = headers.get("Content-Type", "")
- match = re.search(r"charset=([\w-]+)", content_type or "", re.IGNORECASE)
- if match:
- encoding_hint = match.group(1)
- return data, encoding_hint
- raw_content, encoding_hint = await asyncio.to_thread(_sync_fetch)
- return _decode_html_bytes(raw_content, encoding_hint)
- async def _download_html_with_fallback(url: str) -> str:
- first_exc: Optional[Exception] = None
- try:
- return await _download_html_via_httpx(url)
- except httpx.HTTPStatusError as exc:
- status = exc.response.status_code if exc.response is not None else None
- if status not in {401, 403, 407, 451, 429}:
- raise
- first_exc = exc
- except httpx.HTTPError as exc:
- first_exc = exc
- try:
- return await _download_html_via_stdlib(url)
- except (urllib_error.URLError, urllib_error.HTTPError, TimeoutError) as fallback_exc:
- if first_exc:
- raise first_exc from fallback_exc
- raise
- async def _fetch_remote_plaintext(
- url: str,
- ) -> Tuple[
- str,
- str,
- str,
- List[Dict[str, str]],
- List[Dict[str, str]],
- List[Dict[str, str]],
- List[Tuple[int, int]],
- ]:
- normalized = _normalize_target_url(url)
- html_body = await _download_html_with_fallback(normalized)
- stripper = SimpleHTMLStripper()
- stripper.feed(html_body)
- title = stripper.get_title() or normalized
- images = stripper.get_images()
- code_blocks = stripper.get_code_blocks()
- plain_text = stripper.get_text()
- block_info = stripper.get_blocks()
- paragraph_ranges = _build_paragraph_ranges(block_info)
- if not plain_text:
- plain_text = _fallback_html_to_text(html_body)
- if not plain_text:
- raise ValueError("未能从该页面提取正文。")
- # Fallback text no longer contains structured placeholders, so any
- # collected media/code markers would be invalid.
- images = []
- code_blocks = []
- block_info = []
- paragraph_ranges = []
- paragraph_meta = _build_paragraph_metadata(block_info)
- return normalized, title, plain_text, images, code_blocks, paragraph_meta, paragraph_ranges
- def _render_proxy_page(
- *,
- url_value: str = "",
- message: Optional[str] = None,
- is_error: bool = False,
- highlight_fragment: Optional[str] = None,
- helper_enabled: bool = False,
- source_url: Optional[str] = None,
- source_title: Optional[str] = None,
- show_images: bool = False,
- image_notice: Optional[str] = None,
- source_plaintext: Optional[str] = None,
- ) -> str:
- helper_state = "on" if helper_enabled else "off"
- status_block = ""
- if message:
- cls = "status err" if is_error else "status ok"
- status_block = f"<p class='{cls}'>{html.escape(message)}</p>"
- style_block = STYLE_BLOCK if highlight_fragment else ""
- result_block = ""
- source_script = ""
- if highlight_fragment and source_url:
- safe_url = html.escape(source_url, quote=True)
- safe_title = html.escape(source_title or source_url)
- image_hint = ""
- if image_notice:
- image_hint = f"<p class='image-hint'>{html.escape(image_notice)}</p>"
- if source_plaintext:
- source_script = f"<script>window.__proxySourceText = {json.dumps(source_plaintext)}</script>"
- result_block = (
- "<section class='result'>"
- f"<div class='source'>原页面:<a href='{safe_url}' target='_blank' rel='noopener'>{safe_title}</a></div>"
- f"<div class='analysis' data-helper='{helper_state}'>{highlight_fragment}</div>"
- f"{image_hint}"
- "</section>"
- )
- show_images_checked = "checked" if show_images else ""
- return PROXY_PAGE_TEMPLATE.substitute(
- style_block=style_block,
- url_value=html.escape(url_value or "", quote=True),
- status_block=status_block,
- result_block=result_block,
- show_images_checked=show_images_checked,
- source_text_script=source_script,
- )
|