| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275 |
- # -*- coding: utf-8 -*-
- """Grammar highlighter powered by spaCy + benepar constituency parsing."""
- import asyncio
- import html
- import re
- from collections import Counter
- from dataclasses import dataclass, field
- from html.parser import HTMLParser
- from string import Template
- from typing import Any, Dict, List, Optional, Tuple
- from urllib import error as urllib_error, request as urllib_request
- from urllib.parse import urlparse, urlunparse
- import benepar
- import httpx
- import spacy
- from fastapi import FastAPI, HTTPException
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import HTMLResponse
- from pydantic import BaseModel, Field
- from spacy.cli import download as spacy_download
- from spacy.language import Language
- from spacy.tokens import Span as SpacySpan, Token as SpacyToken
- from style_config import SENTENCE_HELPER_ENABLED, STYLE_BLOCK
- BENE_PAR_WARNING: Optional[str] = None
- HAS_BENEPAR: bool = False # new: track whether benepar was successfully attached
- def _ensure_benepar_warning(message: str) -> None:
- """Record a warning once when benepar annotations are unavailable."""
- global BENE_PAR_WARNING
- if not BENE_PAR_WARNING:
- BENE_PAR_WARNING = message
- def _load_spacy_pipeline(
- model_name: str = "en_core_web_sm", benepar_model: str = "benepar_en3"
- ) -> Language:
- global BENE_PAR_WARNING, HAS_BENEPAR
- BENE_PAR_WARNING = None
- HAS_BENEPAR = False
- try:
- nlp = spacy.load(model_name)
- except OSError:
- try:
- spacy_download(model_name)
- nlp = spacy.load(model_name)
- except Exception as exc: # pragma: no cover - install helper
- raise RuntimeError(
- f"spaCy model '{model_name}' is required. Install via `python -m spacy download {model_name}`."
- ) from exc
- # Ensure we have sentence segmentation available
- pipe_names = set(nlp.pipe_names)
- if not ({"parser", "senter", "sentencizer"} & pipe_names):
- try:
- nlp.add_pipe("sentencizer")
- except Exception:
- pass # if already present or unavailable, ignore
- # Try to add benepar
- if "benepar" not in nlp.pipe_names:
- try:
- nlp.add_pipe("benepar", config={"model": benepar_model}, last=True)
- HAS_BENEPAR = True
- except ValueError:
- try:
- benepar.download(benepar_model)
- nlp.add_pipe("benepar", config={"model": benepar_model}, last=True)
- HAS_BENEPAR = True
- except Exception as exc: # pragma: no cover - install helper
- HAS_BENEPAR = False
- BENE_PAR_WARNING = (
- "Benepar model '{model}' unavailable ({err}). Falling back to dependency-based spans."
- ).format(model=benepar_model, err=exc)
- except Exception as exc:
- HAS_BENEPAR = False
- BENE_PAR_WARNING = (
- "Failed to attach benepar parser to spaCy pipeline. Falling back to dependency-based spans ({err})."
- ).format(err=exc)
- else:
- HAS_BENEPAR = True
- return nlp
- try:
- NLP: Optional[Language] = _load_spacy_pipeline()
- NLP_LOAD_ERROR: Optional[Exception] = None
- except Exception as exc: # pragma: no cover - import-time diagnostics
- NLP = None
- NLP_LOAD_ERROR = exc
- class AnalyzeRequest(BaseModel):
- text: str = Field(..., description="Raw English text to highlight")
- class AnalyzeResponse(BaseModel):
- highlighted_html: str
- @dataclass
- class Token:
- text: str
- start: int
- end: int
- kind: str # 'word' | 'space' | 'punct'
- @dataclass
- class Span:
- start_token: int
- end_token: int
- cls: str
- attrs: Optional[Dict[str, str]] = None
- @dataclass
- class SentenceSummary:
- subjects: List[str] = field(default_factory=list)
- predicates: List[str] = field(default_factory=list)
- objects: List[str] = field(default_factory=list)
- complements: List[str] = field(default_factory=list)
- clauses: List[str] = field(default_factory=list)
- clause_functions: List[str] = field(default_factory=list)
- connectors: List[str] = field(default_factory=list)
- residual_roles: List[str] = field(default_factory=list)
- sentence_length: int = 0
- TOKEN_REGEX = re.compile(
- r"""
- (?:\s+)
- |(?:\d+(?:[\.,]\d+)*)
- |(?:\w+(?:[-']\w+)*)
- |(?:.)
- """,
- re.VERBOSE | re.UNICODE,
- )
- WORD_LIKE_RE = re.compile(r"\w+(?:[-']\w+)*\Z", re.UNICODE)
- NUMBER_RE = re.compile(r"\d+(?:[\.,]\d+)*\Z", re.UNICODE)
- PARAGRAPH_BREAK_RE = re.compile(r"(?:\r?\n[ \t]*){2,}")
- SUBJECT_DEPS = {"nsubj", "nsubjpass", "csubj", "csubjpass"}
- DIRECT_OBJECT_DEPS = {"dobj", "obj"}
- INDIRECT_OBJECT_DEPS = {"iobj", "dative"}
- COMPLEMENT_DEPS = {"attr", "oprd", "acomp", "ccomp", "xcomp"}
- ADVERBIAL_DEPS = {"advmod", "npadvmod", "advcl", "obl", "prep", "pcomp"}
- RELATIVE_PRONOUNS = {"which", "that", "who", "whom", "whose", "where", "when"}
- SUBORDINATORS_TO_FUNCTION = {
- "when": "TIME",
- "while": "TIME",
- "after": "TIME",
- "before": "TIME",
- "until": "TIME",
- "as": "TIME",
- "once": "TIME",
- "since": "TIME",
- "because": "REASON",
- "now that": "REASON",
- "if": "CONDITION",
- "unless": "CONDITION",
- "provided": "CONDITION",
- "provided that": "CONDITION",
- "although": "CONCESSION",
- "though": "CONCESSION",
- "even though": "CONCESSION",
- "whereas": "CONCESSION",
- "so that": "RESULT",
- "so": "RESULT",
- "lest": "PURPOSE",
- "in order that": "PURPOSE",
- }
- FINITE_VERB_TAGS = {"VBD", "VBP", "VBZ"}
- NONFINITE_VERB_TAGS = {"VBG", "VBN"}
- FIXED_MULTIWORD_PHRASES: Tuple[Tuple[re.Pattern, str], ...] = tuple(
- (
- re.compile(pattern, re.IGNORECASE),
- label,
- )
- for pattern, label in [
- (r"\bas well as\b", "as well as"),
- (r"\brather than\b", "rather than"),
- (r"\bin addition to\b", "in addition to"),
- (r"\bin spite of\b", "in spite of"),
- (r"\baccording to\b", "according to"),
- (r"\bas soon as\b", "as soon as"),
- ]
- )
- CLAUSE_FUNCTION_LABELS = {
- "TIME": "时间",
- "REASON": "原因",
- "CONDITION": "条件",
- "CONCESSION": "让步",
- "RESULT": "结果",
- "PURPOSE": "目的",
- }
- RESIDUAL_DEP_LABELS = {
- "det": "限定词",
- "prep": "介词",
- "case": "介词标记",
- "cc": "并列连词",
- "mark": "从属连词",
- "poss": "所有格标记",
- "nummod": "数量修饰语",
- "aux": "助动词",
- "prt": "小品词",
- }
- RESIDUAL_POS_LABELS = {
- "ADJ": "形容词修饰语",
- "ADV": "副词",
- "NUM": "数词",
- "PRON": "代词",
- }
- def _classify_segment(seg: str) -> str:
- if not seg:
- return "punct"
- if seg.isspace():
- return "space"
- if NUMBER_RE.fullmatch(seg) or WORD_LIKE_RE.fullmatch(seg):
- return "word"
- return "punct"
- def _append_fallback_tokens(text: str, start: int, end: int, tokens: List[Token]) -> None:
- for idx in range(start, end):
- ch = text[idx]
- if ch.isspace():
- kind = "space"
- elif ch.isalnum() or ch == "_":
- kind = "word"
- else:
- kind = "punct"
- tokens.append(Token(ch, idx, idx + 1, kind))
- def tokenize_preserve(text: str) -> List[Token]:
- tokens: List[Token] = []
- if not text:
- return tokens
- last_end = 0
- for match in TOKEN_REGEX.finditer(text):
- if match.start() > last_end:
- _append_fallback_tokens(text, last_end, match.start(), tokens)
- seg = text[match.start() : match.end()]
- tokens.append(Token(seg, match.start(), match.end(), _classify_segment(seg)))
- last_end = match.end()
- if last_end < len(text):
- _append_fallback_tokens(text, last_end, len(text), tokens)
- if not tokens and text:
- tokens = [Token(text, 0, len(text), "word" if text[0].isalnum() else "punct")]
- return tokens
- def build_char_to_token_map(tokens: List[Token]) -> Dict[int, int]:
- mapping: Dict[int, int] = {}
- for idx, tok in enumerate(tokens):
- for pos in range(tok.start, tok.end):
- mapping[pos] = idx
- return mapping
- def char_span_to_token_span(
- char_start: int, char_end: int, mapping: Dict[int, int]
- ) -> Tuple[int, int]:
- if char_end <= char_start:
- return -1, -1
- start_idx = mapping.get(char_start)
- end_idx = mapping.get(char_end - 1)
- if start_idx is None or end_idx is None:
- return -1, -1
- return start_idx, end_idx + 1
- def add_char_based_span(
- spans: List[Span],
- char_start: int,
- char_end: int,
- cls: str,
- mapping: Dict[int, int],
- attrs: Optional[Dict[str, str]] = None,
- ) -> None:
- s_tok, e_tok = char_span_to_token_span(char_start, char_end, mapping)
- if s_tok < 0 or e_tok < 0:
- return
- safe_attrs = None
- if attrs:
- safe_attrs = {k: html.escape(v, quote=True) for k, v in attrs.items() if v}
- spans.append(Span(start_token=s_tok, end_token=e_tok, cls=cls, attrs=safe_attrs))
- def add_span(spans: List[Span], start_token: int, end_token: int, cls: str, attrs: Optional[Dict[str, str]] = None):
- if start_token < 0 or end_token < 0 or end_token <= start_token:
- return
- spans.append(Span(start_token=start_token, end_token=end_token, cls=cls, attrs=attrs))
- def subtree_char_span(token: SpacyToken) -> Tuple[int, int]:
- subtree = list(token.subtree)
- if not subtree:
- return token.idx, token.idx + len(token.text)
- return subtree[0].idx, subtree[-1].idx + len(subtree[-1].text)
- def _subtree_text(token: SpacyToken) -> str:
- span = token.doc[token.left_edge.i : token.right_edge.i + 1]
- return span.text
- def _find_antecedent_word(sentence: SpacySpan, clause_start_char: int) -> Optional[str]:
- candidate = None
- for tok in sentence:
- if tok.idx >= clause_start_char:
- break
- if tok.pos_ in {"NOUN", "PROPN", "PRON"}:
- candidate = tok.text
- return candidate
- def _is_nonfinite_clause(span: SpacySpan) -> bool:
- tags = {tok.tag_ for tok in span if tok.tag_}
- if tags & FINITE_VERB_TAGS:
- return False
- if "TO" in tags or tags & NONFINITE_VERB_TAGS:
- return True
- return False
- def _classify_noun_clause(span: SpacySpan) -> Optional[str]:
- deps = {tok.dep_ for tok in span}
- if deps & {"csubj", "csubjpass"}:
- return "subject"
- if deps & {"ccomp", "xcomp"}:
- return "complement"
- if deps & {"dobj", "obj"}:
- return "object"
- return None
- def _split_paragraph_ranges(text: str) -> List[Tuple[int, int]]:
- """Return inclusive paragraph ranges, keeping separators intact."""
- if not text:
- return [(0, 0)]
- ranges: List[Tuple[int, int]] = []
- start = 0
- for match in PARAGRAPH_BREAK_RE.finditer(text):
- ranges.append((start, match.start()))
- start = match.end()
- ranges.append((start, len(text)))
- # Ensure at least one range and sorted order
- if not ranges:
- ranges = [(0, len(text))]
- return ranges
- def _circled_number(value: int) -> str:
- """Return the circled number style for sentence numbering."""
- if value <= 0:
- return ""
- if value <= 20:
- return chr(ord("\u2460") + value - 1)
- if 21 <= value <= 35:
- return chr(ord("\u3251") + value - 21)
- if 36 <= value <= 50:
- return chr(ord("\u32B1") + value - 36)
- return f"({value})"
- def annotate_constituents(
- sentence: SpacySpan,
- spans: List[Span],
- mapping: Dict[int, int],
- sentence_start_char: int,
- sentence_end_char: int,
- summary: Optional[SentenceSummary] = None,
- ) -> None:
- # If benepar is not attached or a previous warning indicates fallback, skip.
- if not HAS_BENEPAR or BENE_PAR_WARNING:
- _ensure_benepar_warning(
- "Benepar component missing or unavailable. Using dependency-based spans."
- )
- return
- # If the extension is not present, skip
- if not SpacySpan.has_extension("constituents"):
- _ensure_benepar_warning(
- "Benepar component missing from spaCy pipeline. Falling back to dependency spans."
- )
- return
- try:
- constituents = sentence._.constituents
- except Exception as exc:
- # Catch any error while accessing benepar results and fallback safely
- _ensure_benepar_warning(
- f"Benepar constituency parse unavailable: {exc}. Falling back to dependency spans."
- )
- return
- seen_ranges = set()
- for const in constituents:
- label = getattr(const, "label_", None)
- if not label:
- continue
- start_char, end_char = const.start_char, const.end_char
- if start_char == sentence_start_char and end_char == sentence_end_char:
- continue # skip the entire sentence span itself
- key = (start_char, end_char, label)
- is_relative = False
- if label in {"PP", "ADVP"}:
- if key in seen_ranges:
- continue
- seen_ranges.add(key)
- add_char_based_span(spans, start_char, end_char, "role-adverbial", mapping)
- continue
- if label == "SBAR" and const:
- first_token = const[0]
- lowered = first_token.text.lower()
- if lowered in RELATIVE_PRONOUNS:
- antecedent = _find_antecedent_word(sentence, start_char)
- attrs = {"data-modifies": antecedent} if antecedent else None
- add_char_based_span(spans, start_char, end_char, "clause-relative", mapping, attrs)
- if summary:
- summary.clauses.append("定语从句")
- is_relative = True
- else:
- function = SUBORDINATORS_TO_FUNCTION.get(lowered)
- attrs = {"data-function": function}
- add_char_based_span(spans, start_char, end_char, "clause-adverbial", mapping, attrs)
- if summary:
- summary.clauses.append("状语从句")
- if function:
- summary.clause_functions.append(function)
- continue
- if label in {"S", "VP"}:
- if _is_nonfinite_clause(const):
- add_char_based_span(spans, start_char, end_char, "clause-nonfinite", mapping)
- if summary:
- summary.clauses.append("非限定结构")
- continue
- if label == "S" and not is_relative:
- role = _classify_noun_clause(const)
- if role:
- attrs = {"data-clause-role": role}
- add_char_based_span(spans, start_char, end_char, "clause-noun", mapping, attrs)
- if summary:
- summary.clauses.append(f"名词性从句({role})")
- def _predicate_span_bounds(head: SpacyToken) -> Tuple[int, int]:
- """Return a character range covering predicate head + functional dependents."""
- tokens = [head]
- for child in head.children:
- if child.dep_ in {"aux", "auxpass", "prt", "cop", "neg"}:
- tokens.append(child)
- start_char = min(tok.idx for tok in tokens)
- end_char = max(tok.idx + len(tok.text) for tok in tokens)
- return start_char, end_char
- def _predicate_heads(sentence: SpacySpan) -> List[SpacyToken]:
- """Collect predicate heads including coordinated verbs."""
- candidates: List[SpacyToken] = []
- for tok in sentence:
- if tok.pos_ not in {"VERB", "AUX"} and tok.tag_ not in FINITE_VERB_TAGS:
- continue
- if tok.dep_ == "ROOT":
- candidates.append(tok)
- continue
- if tok.dep_ == "conj" and tok.head.pos_ in {"VERB", "AUX"}:
- candidates.append(tok)
- continue
- if tok.dep_ in {"ccomp", "xcomp", "advcl", "acl", "relcl", "parataxis"}:
- candidates.append(tok)
- seen = set()
- ordered: List[SpacyToken] = []
- for tok in sorted(candidates, key=lambda t: t.i):
- if tok.i in seen:
- continue
- seen.add(tok.i)
- ordered.append(tok)
- return ordered
- def _add_fixed_phrases(
- sentence: SpacySpan, mapping: Dict[int, int], spans: List[Span], summary: SentenceSummary
- ) -> None:
- base = sentence.start_char
- text = sentence.text
- for pattern, label in FIXED_MULTIWORD_PHRASES:
- for match in pattern.finditer(text):
- start_char = base + match.start()
- end_char = base + match.end()
- add_char_based_span(
- spans,
- start_char,
- end_char,
- "phrase-fixed",
- mapping,
- attrs={"data-phrase": label},
- )
- summary.connectors.append(label.lower())
- def annotate_sentence(
- tokens: List[Token],
- sentence: SpacySpan,
- mapping: Dict[int, int],
- ) -> Tuple[List[Span], SentenceSummary]:
- spans: List[Span] = []
- summary = SentenceSummary(sentence_length=len(sentence))
- sent_bounds = char_span_to_token_span(sentence.start_char, sentence.end_char, mapping)
- sent_start_tok, sent_end_tok = sent_bounds
- def add_subtree(token: SpacyToken, cls: str):
- start_char, end_char = subtree_char_span(token)
- add_char_based_span(spans, start_char, end_char, cls, mapping)
- def add_token(token: SpacyToken, cls: str):
- add_char_based_span(spans, token.idx, token.idx + len(token.text), cls, mapping)
- for tok in sentence:
- if tok.dep_ in SUBJECT_DEPS:
- add_subtree(tok, "role-subject")
- summary.subjects.append(_subtree_text(tok))
- for head in _predicate_heads(sentence):
- start_char, end_char = _predicate_span_bounds(head)
- add_char_based_span(spans, start_char, end_char, "role-predicate", mapping)
- predicate_text = sentence.doc.text[start_char:end_char].strip()
- summary.predicates.append(predicate_text or head.text)
- for tok in sentence:
- if tok.dep_ in DIRECT_OBJECT_DEPS:
- add_subtree(tok, "role-object-do")
- summary.objects.append(_subtree_text(tok))
- break
- io_token = next((tok for tok in sentence if tok.dep_ in INDIRECT_OBJECT_DEPS), None)
- if io_token is None:
- for tok in sentence:
- if tok.dep_ == "pobj" and tok.head.dep_ == "prep" and tok.head.lemma_.lower() in {"to", "for"}:
- io_token = tok
- break
- if io_token:
- add_subtree(io_token, "role-object-io")
- summary.objects.append(_subtree_text(io_token))
- for tok in sentence:
- if tok.dep_ in COMPLEMENT_DEPS:
- add_subtree(tok, "role-complement")
- summary.complements.append(_subtree_text(tok))
- break
- for tok in sentence:
- lowered = tok.text.lower()
- if tok.dep_ in {"cc", "mark", "preconj"} or tok.pos_ in {"CCONJ", "SCONJ"}:
- add_token(tok, "role-connector")
- summary.connectors.append(lowered)
- if tok.dep_ == "det" or tok.pos_ == "DET":
- add_token(tok, "role-determiner")
- if tok.dep_ in {"amod", "poss", "compound", "nummod"}:
- add_token(tok, "role-modifier")
- adverbial_ranges = set()
- for tok in sentence:
- if tok.dep_ in ADVERBIAL_DEPS:
- adverbial_ranges.add(subtree_char_span(tok))
- for start_char, end_char in adverbial_ranges:
- add_char_based_span(spans, start_char, end_char, "role-adverbial", mapping)
- for tok in sentence:
- if tok.dep_ == "appos":
- add_subtree(tok, "role-apposition")
- if sent_start_tok >= 0 and sent_end_tok >= 0:
- stack = []
- for idx in range(sent_start_tok, sent_end_tok):
- token = tokens[idx]
- if token.text == "(":
- stack.append(idx)
- elif token.text == ")" and stack:
- add_span(spans, stack.pop(), idx + 1, "role-parenthetical")
- comma_token_idxs = [
- i
- for i in range(sent_start_tok, sent_end_tok)
- if tokens[i].kind == "punct" and tokens[i].text == ","
- ]
- for idx, first_comma in enumerate(comma_token_idxs):
- if idx + 1 >= len(comma_token_idxs):
- break
- second_comma = comma_token_idxs[idx + 1]
- start_char = tokens[first_comma].start
- end_char = tokens[second_comma].end
- span = sentence.doc.char_span(start_char, end_char, alignment_mode="expand")
- if span and any(tok.tag_ == "VBG" for tok in span):
- add_span(spans, first_comma, second_comma + 1, "role-absolute")
- annotate_constituents(
- sentence,
- spans,
- mapping,
- sentence.start_char,
- sentence.end_char,
- summary,
- )
- _add_fixed_phrases(sentence, mapping, spans, summary)
- return spans, summary
- def _label_residual_token(token: SpacyToken) -> Optional[str]:
- dep_label = RESIDUAL_DEP_LABELS.get(token.dep_)
- if dep_label:
- return dep_label
- return RESIDUAL_POS_LABELS.get(token.pos_)
- def _collect_residual_roles(
- sentence: SpacySpan,
- tokens: List[Token],
- spans: List[Span],
- sent_bounds: Tuple[int, int],
- summary: SentenceSummary,
- mapping: Dict[int, int],
- ) -> None:
- sent_start, sent_end = sent_bounds
- if sent_start < 0 or sent_end < 0 or sent_start >= sent_end:
- return
- coverage = [False] * (sent_end - sent_start)
- for span in spans:
- lo = max(span.start_token, sent_start)
- hi = min(span.end_token, sent_end)
- for idx in range(lo, hi):
- coverage[idx - sent_start] = True
- doc = sentence.doc
- for offset, covered in enumerate(coverage):
- if covered:
- continue
- token = tokens[sent_start + offset]
- if token.kind != "word":
- continue
- span = doc.char_span(token.start, token.end, alignment_mode="expand")
- if not span or not span.text.strip():
- continue
- label = _label_residual_token(span[0])
- if label and label not in summary.residual_roles:
- summary.residual_roles.append(label)
- if label:
- add_char_based_span(
- spans,
- token.start,
- token.end,
- "role-residual",
- mapping,
- attrs={"data-role": label},
- )
- def _classify_sentence_complexity(summary: SentenceSummary) -> Tuple[str, bool]:
- clause_count = len(summary.clauses)
- connector_count = len(summary.connectors)
- word_count = summary.sentence_length
- if clause_count >= 2:
- return "多重复杂句", True
- if clause_count == 1:
- return "主从复合句", True
- if connector_count >= 2:
- return "并列复合句", True
- if word_count >= 25:
- return "长句", True
- return "简单句", False
- def _translate_clause_functions(functions: List[str]) -> List[str]:
- translated = []
- for item in functions:
- label = CLAUSE_FUNCTION_LABELS.get(item, item)
- if label not in translated:
- translated.append(label)
- return translated
- def build_sentence_note(summary: SentenceSummary) -> Tuple[str, bool]:
- note_parts: List[str] = []
- clause_label = "无"
- if summary.clauses:
- counts = Counter(summary.clauses)
- clause_label = "、".join(
- f"{name}×{count}" if count > 1 else name for name, count in counts.items()
- )
- functions = _translate_clause_functions(summary.clause_functions)
- connectors = list(dict.fromkeys(summary.connectors))
- residual = summary.residual_roles
- subjects_seq = list(dict.fromkeys(summary.subjects))
- predicates_seq = list(dict.fromkeys(summary.predicates))
- objects_seq = list(dict.fromkeys(summary.objects))
- complements_seq = list(dict.fromkeys(summary.complements))
- subjects = "、".join(subjects_seq) if subjects_seq else "未识别"
- predicates = "、".join(predicates_seq) if predicates_seq else "未识别"
- objects = "、".join(objects_seq) if objects_seq else "无"
- complements = "、".join(complements_seq) if complements_seq else "无"
- note_parts.append(f"主语:{subjects}")
- note_parts.append(f"谓语:{predicates}")
- note_parts.append(f"宾语:{objects}")
- if complements != "无":
- note_parts.append(f"补语:{complements}")
- note_parts.append(f"从句:{clause_label}")
- if functions:
- note_parts.append(f"从句功能:{'、'.join(functions)}")
- connector_text = "、".join(connectors) if connectors else "未检测到典型连接词"
- note_parts.append(f"连接词:{connector_text}")
- if residual:
- note_parts.append(f"未高亮:{'、'.join(residual)}")
- complexity_label, is_complex = _classify_sentence_complexity(summary)
- note_parts.insert(0, f"句型:{complexity_label}")
- note_parts.append(f"词数:{summary.sentence_length}")
- return ";".join(note_parts), is_complex
- def render_with_spans(tokens: List[Token], spans: List[Span]) -> str:
- spans = sorted(spans, key=lambda s: (s.start_token, -s.end_token))
- out_parts: List[str] = []
- active_stack: List[Span] = []
- span_queue = list(spans)
- current_idx = 0
- def open_span(span: Span):
- attrs = ""
- if span.attrs:
- attrs = " " + " ".join(
- f"{k}='" + html.escape(v, quote=True) + "'" for k, v in span.attrs.items()
- )
- out_parts.append(f"<span class='{span.cls}'{attrs}>")
- def close_span():
- out_parts.append("</span>")
- while current_idx < len(tokens):
- opening = [sp for sp in span_queue if sp.start_token == current_idx]
- for sp in opening:
- open_span(sp)
- active_stack.append(sp)
- span_queue.remove(sp)
- token = tokens[current_idx]
- out_parts.append(html.escape(token.text))
- current_idx += 1
- while active_stack and active_stack[-1].end_token == current_idx:
- active_stack.pop()
- close_span()
- while active_stack:
- active_stack.pop()
- close_span()
- return "".join(out_parts)
- def _run_pipeline_without_benepar(text: str) -> "spacy.tokens.Doc":
- """Run the spaCy pipeline skipping benepar, for robust fallback."""
- assert NLP is not None
- doc = NLP.make_doc(text)
- for name, proc in NLP.pipeline:
- if name == "benepar":
- continue
- doc = proc(doc)
- return doc
- def highlight_text_with_spacy(text: str, paragraph_meta: Optional[List[Dict[str, str]]] = None) -> str:
- if NLP is None:
- raise RuntimeError(f"spaCy pipeline unavailable: {NLP_LOAD_ERROR}")
- tokens = tokenize_preserve(text)
- if not tokens:
- return ""
- mapping = build_char_to_token_map(tokens)
- # Robust doc creation: if benepar causes any error, skip it and fallback.
- try:
- doc = NLP(text)
- except Exception as exc:
- _ensure_benepar_warning(
- f"Benepar failed during processing: {exc}. Falling back to dependency-based spans."
- )
- doc = _run_pipeline_without_benepar(text)
- paragraph_ranges = _split_paragraph_ranges(text)
- paragraph_counters = [0 for _ in paragraph_ranges]
- paragraph_idx = 0
- paragraph_spans: List[Span] = []
- paragraph_attrs = paragraph_meta if paragraph_meta and len(paragraph_meta) == len(paragraph_ranges) else None
- for idx, (start, end) in enumerate(paragraph_ranges):
- attrs = None
- if paragraph_attrs:
- attrs = paragraph_attrs[idx] or None
- add_char_based_span(paragraph_spans, start, end, "paragraph-scope", mapping, attrs=attrs)
- spans: List[Span] = list(paragraph_spans)
- for sent in doc.sents:
- while paragraph_idx < len(paragraph_ranges) and paragraph_ranges[paragraph_idx][1] <= sent.start_char:
- paragraph_idx += 1
- current_idx = min(paragraph_idx, len(paragraph_ranges) - 1)
- paragraph_counters[current_idx] += 1
- sentence_label = _circled_number(paragraph_counters[current_idx])
- sentence_spans, summary = annotate_sentence(tokens, sent, mapping)
- sent_bounds = char_span_to_token_span(sent.start_char, sent.end_char, mapping)
- sent_start, sent_end = sent_bounds
- if sent_start >= 0 and sent_end >= 0:
- _collect_residual_roles(sent, tokens, sentence_spans, sent_bounds, summary, mapping)
- helper_note, is_complex = build_sentence_note(summary)
- attrs = {
- "data-sid": sentence_label,
- "data-note": helper_note,
- "data-complex": "1" if is_complex else "0",
- }
- sentence_spans.append(Span(start_token=sent_start, end_token=sent_end, cls="sentence-scope", attrs=attrs))
- spans.extend(sentence_spans)
- return render_with_spans(tokens, spans)
- app = FastAPI(title="Grammar Highlight API (spaCy + benepar)")
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.post("/analyze", response_model=AnalyzeResponse)
- async def analyze(req: AnalyzeRequest):
- text = req.text
- if text is None or not text.strip():
- raise HTTPException(status_code=400, detail="Text is required")
- try:
- sanitized_fragment = highlight_text_with_spacy(text)
- helper_state = "on" if SENTENCE_HELPER_ENABLED else "off"
- return AnalyzeResponse(
- highlighted_html=f"{STYLE_BLOCK}<div class='analysis' data-helper='{helper_state}'>{sanitized_fragment}</div>"
- )
- except RuntimeError as exc:
- raise HTTPException(status_code=500, detail=str(exc)) from exc
- except Exception as exc: # pragma: no cover - defensive
- raise HTTPException(status_code=500, detail=f"Analysis failed: {exc}") from exc
- @app.get("/health")
- async def health():
- status = "ok" if NLP is not None else "failed"
- detail = None if NLP is not None else str(NLP_LOAD_ERROR)
- payload = {"status": status}
- if detail:
- payload["detail"] = detail
- if BENE_PAR_WARNING:
- payload["warning"] = BENE_PAR_WARNING
- payload["benepar_attached"] = HAS_BENEPAR
- return payload
- @app.get("/proxy", response_class=HTMLResponse)
- async def proxy(url: Optional[str] = None, show_images: bool = False):
- if not url:
- return HTMLResponse(_render_proxy_page(show_images=show_images))
- try:
- normalized_url, title, page_text, images, code_blocks, paragraph_meta = await _fetch_remote_plaintext(url)
- highlighted_fragment = highlight_text_with_spacy(page_text, paragraph_meta=paragraph_meta or None)
- if code_blocks:
- highlighted_fragment = _inject_proxy_codeblocks(highlighted_fragment, code_blocks)
- image_notice = None
- if images:
- if show_images:
- highlighted_fragment = _inject_proxy_images(highlighted_fragment, images)
- else:
- highlighted_fragment = _strip_proxy_image_markers(highlighted_fragment)
- image_notice = (
- f"检测到 {len(images)} 张正文图片,为提速默认隐藏。勾选“显示图片”后重新抓取即可加载原图。"
- )
- html_body = _render_proxy_page(
- url_value=normalized_url,
- message="分析完成,结果如下。",
- highlight_fragment=highlighted_fragment,
- source_url=normalized_url,
- source_title=title,
- show_images=show_images,
- image_notice=image_notice,
- )
- return HTMLResponse(html_body)
- except ValueError as exc:
- body = _render_proxy_page(url_value=url or "", message=str(exc), is_error=True, show_images=show_images)
- return HTMLResponse(body, status_code=400)
- except httpx.HTTPError as exc:
- # Provide a clearer message for common HTTP errors from the remote site.
- msg = None
- if isinstance(exc, httpx.HTTPStatusError) and exc.response is not None:
- status = exc.response.status_code
- if status == 403:
- msg = (
- "抓取页面失败:目标站点返回 403 Forbidden(禁止访问)。"
- "该网站很可能禁止自动抓取或代理访问,目前无法通过本工具获取正文,"
- "可以尝试在浏览器中打开并手动复制需要的内容。"
- )
- else:
- msg = f"抓取页面失败:目标站点返回 HTTP {status}。"
- if msg is None:
- msg = f"抓取页面失败:{exc}"
- body = _render_proxy_page(
- url_value=url or "",
- message=msg,
- is_error=True,
- show_images=show_images,
- )
- return HTMLResponse(body, status_code=502)
- except Exception as exc:
- body = _render_proxy_page(
- url_value=url or "",
- message=f"代理分析失败:{exc}",
- is_error=True,
- show_images=show_images,
- )
- return HTMLResponse(body, status_code=500)
- @app.get("/", response_class=HTMLResponse)
- async def ui():
- return """<!DOCTYPE html>
- <html lang=\"zh-CN\">
- <head>
- <meta charset=\"UTF-8\" />
- <meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
- <title>Grammar Highlighter</title>
- <style>
- body { font-family: system-ui, -apple-system, sans-serif; margin: 2rem; line-height: 1.6; }
- textarea { width: 100%; min-height: 140px; font-size: 1rem; padding: 0.75rem; border: 1px solid #d0d7de; border-radius: 0.5rem; }
- button { margin-top: 0.75rem; padding: 0.6rem 1.4rem; font-size: 1rem; cursor: pointer; border: none; border-radius: 999px; background: #1f7a8c; color: #fff; }
- button + button { margin-left: 0.5rem; background: #6b7280; }
- button:disabled { opacity: 0.6; cursor: wait; }
- #result { margin-top: 1.5rem; border-top: 1px solid #e5e7eb; padding-top: 1rem; min-height: 2rem; }
- #status { margin-left: 0.75rem; color: #3b82f6; }
- .err { color: #b00020; }
- .muted { color: #6b7280; font-size: 0.9rem; }
- .tts-controls { margin-top: 0.75rem; display: flex; align-items: center; gap: 0.75rem; flex-wrap: wrap; }
- .tts-controls button { margin-top: 0; background: #f97316; }
- .tts-status { font-size: 0.95rem; color: #475569; }
- </style>
- </head>
- <body>
- <h1>Grammar Highlighter (spaCy + benepar)</h1>
- <textarea id=\"text\" placeholder=\"Type the English text you want to analyze...\"></textarea>
- <div>
- <button type=\"button\" id=\"submit\">Analyze</button>
- <button type=\"button\" id=\"clear\">清空输入</button>
- <span id=\"status\"></span>
- </div>
- <div class=\"tts-controls\">
- <button type=\"button\" id=\"tts\">朗读高亮文本</button>
- <button type=\"button\" id=\"tts-selection\">朗读选中文本</button>
- <span class=\"tts-status\" id=\"tts-status\"></span>
- </div>
- <div id=\"result\"></div>
- <script>
- const btn = document.getElementById('submit');
- const btnClear = document.getElementById('clear');
- const textarea = document.getElementById('text');
- const statusEl = document.getElementById('status');
- const ttsBtn = document.getElementById('tts');
- const ttsSelectionBtn = document.getElementById('tts-selection');
- const ttsStatus = document.getElementById('tts-status');
- const result = document.getElementById('result');
- const TTS_ENDPOINT = 'http://141.140.15.30:8028/generate';
- let currentAudio = null;
- let queuedAudios = [];
- let streamingFinished = false;
- function resetUI() {
- result.innerHTML = '';
- statusEl.textContent = '';
- statusEl.classList.remove('err');
- ttsStatus.textContent = '';
- setTtsButtonsDisabled(false);
- resetAudioPlayback();
- }
- btn.addEventListener('click', async () => {
- resetUI();
- const value = textarea.value.trim();
- if (!value) {
- statusEl.textContent = '请输入要分析的英文文本。';
- statusEl.classList.add('err');
- return;
- }
- btn.disabled = true;
- statusEl.textContent = 'Analyzing ...';
- try {
- const response = await fetch('/analyze', {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ text: value })
- });
- if (!response.ok) {
- const error = await response.json().catch(() => ({ detail: 'Request failed' }));
- throw new Error(error.detail || 'Request failed');
- }
- const data = await response.json();
- result.innerHTML = data.highlighted_html || '';
- statusEl.textContent = '';
- } catch (err) {
- statusEl.textContent = '错误:' + (err.message || 'Unknown error');
- statusEl.classList.add('err');
- } finally {
- btn.disabled = false;
- }
- });
- btnClear.addEventListener('click', () => {
- textarea.value = '';
- resetUI();
- textarea.focus();
- });
- function extractHighlightedText() {
- const highlightRoot = result.querySelector('.analysis');
- return highlightRoot ? highlightRoot.textContent.trim() : '';
- }
- function setTtsButtonsDisabled(disabled) {
- if (ttsBtn) {
- ttsBtn.disabled = disabled;
- }
- if (ttsSelectionBtn) {
- ttsSelectionBtn.disabled = disabled;
- }
- }
- function resetAudioPlayback() {
- queuedAudios = [];
- streamingFinished = false;
- if (currentAudio) {
- currentAudio.pause();
- currentAudio = null;
- }
- }
- function markStreamingFinished() {
- streamingFinished = true;
- if (!currentAudio && !queuedAudios.length) {
- ttsStatus.textContent = '播放完成';
- }
- }
- function playNextAudioChunk() {
- if (!queuedAudios.length) {
- currentAudio = null;
- if (streamingFinished) {
- ttsStatus.textContent = '播放完成';
- } else {
- ttsStatus.textContent = '等待更多语音...';
- }
- return;
- }
- const chunk = queuedAudios.shift();
- ttsStatus.textContent = '播放中...';
- currentAudio = new Audio('data:audio/wav;base64,' + chunk);
- currentAudio.onended = playNextAudioChunk;
- currentAudio.onerror = () => {
- ttsStatus.textContent = '播放失败';
- currentAudio = null;
- };
- currentAudio.play().catch(err => {
- ttsStatus.textContent = '自动播放被阻止:' + err.message;
- currentAudio = null;
- });
- }
- function enqueueAudioChunk(chunk) {
- queuedAudios.push(chunk);
- if (!currentAudio) {
- playNextAudioChunk();
- }
- }
- function parseTtsLine(line) {
- try {
- const parsed = JSON.parse(line);
- if (parsed && parsed.audio) {
- enqueueAudioChunk(parsed.audio);
- return true;
- }
- } catch (err) {
- console.warn('无法解析TTS响应行', err);
- }
- return false;
- }
- async function consumeTtsResponse(response) {
- let chunkCount = 0;
- const handleLine = rawLine => {
- const trimmed = rawLine.replace(/\\r/g, '').trim();
- if (!trimmed) return;
- if (parseTtsLine(trimmed)) {
- chunkCount += 1;
- }
- };
- if (response.body && response.body.getReader) {
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let buffer = '';
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- buffer += decoder.decode(value, { stream: true });
- let newlineIndex;
- while ((newlineIndex = buffer.indexOf('\\n')) >= 0) {
- const line = buffer.slice(0, newlineIndex);
- buffer = buffer.slice(newlineIndex + 1);
- handleLine(line);
- }
- }
- buffer += decoder.decode();
- if (buffer) {
- handleLine(buffer);
- }
- } else {
- const payload = await response.text();
- payload.split('\\n').forEach(handleLine);
- }
- return chunkCount;
- }
- function getSelectedPageText() {
- const selection = window.getSelection ? window.getSelection() : null;
- return selection ? selection.toString().trim() : '';
- }
- async function streamTtsRequest(text) {
- const response = await fetch(TTS_ENDPOINT, {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ text })
- });
- if (!response.ok) {
- throw new Error('接口响应错误');
- }
- const chunkCount = await consumeTtsResponse(response);
- if (!chunkCount) {
- throw new Error('接口未返回音频数据');
- }
- markStreamingFinished();
- }
- function createTtsRequest(textResolver, emptyMessage) {
- return async () => {
- const text = textResolver();
- if (!text) {
- ttsStatus.textContent = emptyMessage;
- return;
- }
- setTtsButtonsDisabled(true);
- ttsStatus.textContent = '请求语音...';
- resetAudioPlayback();
- try {
- await streamTtsRequest(text);
- } catch (err) {
- ttsStatus.textContent = 'TTS 出错:' + (err && err.message ? err.message : err);
- resetAudioPlayback();
- } finally {
- setTtsButtonsDisabled(false);
- }
- };
- }
- if (ttsBtn) {
- ttsBtn.addEventListener('click', createTtsRequest(extractHighlightedText, '请先生成高亮结果'));
- }
- if (ttsSelectionBtn) {
- ttsSelectionBtn.addEventListener('click', createTtsRequest(getSelectedPageText, '请先选择要朗读的文本'));
- }
- </script>
- </body>
- </html>"""
- PROXY_PAGE_TEMPLATE = Template(
- """<!DOCTYPE html>
- <html lang=\"zh-CN\">
- <head>
- <meta charset=\"UTF-8\" />
- <meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
- <title>Grammar Proxy Highlighter</title>
- <style>
- body { font-family: system-ui, -apple-system, \"Segoe UI\", sans-serif; margin: 0 auto; max-width: 860px; padding: 1.5rem; line-height: 1.65; }
- h1 { font-size: 1.45rem; margin-bottom: 1rem; }
- form { display: flex; flex-wrap: wrap; gap: 0.5rem; margin-bottom: 0.75rem; }
- input[type=\"url\"] { flex: 1 1 260px; padding: 0.65rem; font-size: 1rem; border-radius: 0.5rem; border: 1px solid #d0d7de; }
- button { padding: 0.65rem 1.4rem; border: none; border-radius: 999px; background: #2563eb; color: #fff; font-size: 1rem; cursor: pointer; }
- .show-images-toggle { display: inline-flex; align-items: center; gap: 0.35rem; font-size: 0.9rem; color: #475569; }
- .show-images-toggle input { width: auto; }
- .tts-controls { margin-top: 0.5rem; display: flex; align-items: center; flex-wrap: wrap; gap: 0.75rem; }
- .tts-controls button { background: #f97316; }
- .tts-status { font-size: 0.95rem; color: #475569; }
- .status { margin-top: 0.25rem; font-size: 0.95rem; }
- .status.err { color: #b00020; }
- .status.ok { color: #059669; }
- section.result { margin-top: 1.4rem; padding-top: 1rem; border-top: 1px solid #e5e7eb; }
- section.result .source { font-size: 0.95rem; margin-bottom: 0.5rem; color: #475569; word-break: break-word; }
- section.result .source a { color: inherit; text-decoration: underline; }
- section.result img { display:block; margin:0.75rem auto; max-width:100%; height:auto; max-width:min(100%,800px); }
- .image-hint { font-size:0.9rem; color:#6b7280; margin:0.5rem 0 0; }
- .clear-floating { position: fixed; left: 0; right: 0; bottom: 0; padding: 0.55rem 1.5rem; border-radius: 0; border-top: 1px solid #e5e7eb; background: rgba(249,250,251,0.96); display: flex; justify-content: center; z-index: 40; }
- .clear-floating button { padding: 0.55rem 1.8rem; border-radius: 999px; background: #6b7280; color: #fff; font-size: 0.95rem; }
- .clear-floating button:hover { filter: brightness(1.05); }
- @media (prefers-reduced-motion: reduce) { .clear-floating { scroll-behavior: auto; } }
- @media (max-width: 640px) { body { padding-bottom: 3.2rem; } }
- </style>
- $style_block
- </head>
- <body>
- <h1>网页代理高亮</h1>
- <form method=\"get\" action=\"/proxy\" class=\"url-form\">
- <input type=\"url\" name=\"url\" value=\"$url_value\" placeholder=\"https://example.com/article\" required />
- <button type=\"submit\">抓取并高亮</button>
- <label class=\"show-images-toggle\">
- <input type=\"checkbox\" name=\"show_images\" value=\"1\" $show_images_checked />
- <span>显示图片(默认关闭以提升速度)</span>
- </label>
- </form>
- $status_block
- <div class=\"tts-controls\">
- <button type=\"button\" id=\"proxy-tts-btn\" disabled>朗读高亮文本</button>
- <button type=\"button\" id=\"proxy-tts-selection\">朗读选中文本</button>
- <span class=\"tts-status\" id=\"proxy-tts-status\"></span>
- </div>
- $result_block
- <div class=\"clear-floating\">
- <button type=\"button\" id=\"proxy-reset\">清空并重置</button>
- </div>
- <script>
- (function() {
- var resetBtn = document.getElementById('proxy-reset');
- if (resetBtn) {
- resetBtn.addEventListener('click', function() {
- // 简单做法:回到无参数的 /proxy,相当于重置页面状态
- window.location.href = '/proxy';
- });
- }
- var ttsBtn = document.getElementById('proxy-tts-btn');
- var ttsSelectionBtn = document.getElementById('proxy-tts-selection');
- var ttsStatus = document.getElementById('proxy-tts-status');
- var TTS_ENDPOINT = 'http://141.140.15.30:8028/generate';
- var currentAudio = null;
- var queuedAudios = [];
- var streamingFinished = false;
- function extractProxyText() {
- var container = document.querySelector('section.result .analysis');
- return container ? container.textContent.trim() : '';
- }
- function setTtsButtonsDisabled(disabled) {
- if (ttsBtn) {
- ttsBtn.disabled = disabled;
- }
- if (ttsSelectionBtn) {
- ttsSelectionBtn.disabled = disabled;
- }
- }
- function resetAudioPlayback() {
- queuedAudios = [];
- streamingFinished = false;
- if (currentAudio) {
- currentAudio.pause();
- currentAudio = null;
- }
- }
- function markStreamingFinished() {
- streamingFinished = true;
- if (!currentAudio && !queuedAudios.length) {
- ttsStatus.textContent = '播放完成';
- }
- }
- function playNextAudioChunk() {
- if (!queuedAudios.length) {
- currentAudio = null;
- if (streamingFinished) {
- ttsStatus.textContent = '播放完成';
- } else {
- ttsStatus.textContent = '等待更多语音...';
- }
- return;
- }
- var chunk = queuedAudios.shift();
- ttsStatus.textContent = '播放中...';
- currentAudio = new Audio('data:audio/wav;base64,' + chunk);
- currentAudio.onended = playNextAudioChunk;
- currentAudio.onerror = function() {
- ttsStatus.textContent = '播放失败';
- currentAudio = null;
- };
- currentAudio.play().catch(function(err) {
- ttsStatus.textContent = '自动播放被阻止:' + err.message;
- currentAudio = null;
- });
- }
- function enqueueAudioChunk(chunk) {
- queuedAudios.push(chunk);
- if (!currentAudio) {
- playNextAudioChunk();
- }
- }
- function parseTtsLine(line) {
- try {
- var parsed = JSON.parse(line);
- if (parsed && parsed.audio) {
- enqueueAudioChunk(parsed.audio);
- return true;
- }
- } catch (err) {
- console.warn('无法解析TTS响应行', err);
- }
- return false;
- }
- async function consumeTtsResponse(response) {
- var chunkCount = 0;
- var handleLine = function(rawLine) {
- var trimmed = rawLine.replace(/\\r/g, '').trim();
- if (!trimmed) return;
- if (parseTtsLine(trimmed)) {
- chunkCount += 1;
- }
- };
- if (response.body && response.body.getReader) {
- var reader = response.body.getReader();
- var decoder = new TextDecoder();
- var buffer = '';
- while (true) {
- var readResult = await reader.read();
- if (readResult.done) {
- break;
- }
- buffer += decoder.decode(readResult.value, { stream: true });
- var newlineIndex;
- while ((newlineIndex = buffer.indexOf('\\n')) >= 0) {
- var line = buffer.slice(0, newlineIndex);
- buffer = buffer.slice(newlineIndex + 1);
- handleLine(line);
- }
- }
- buffer += decoder.decode();
- if (buffer) {
- handleLine(buffer);
- }
- } else {
- var payload = await response.text();
- payload.split('\\n').forEach(handleLine);
- }
- return chunkCount;
- }
- function getSelectedPageText() {
- var selection = window.getSelection ? window.getSelection() : null;
- return selection ? selection.toString().trim() : '';
- }
- async function streamTtsRequest(text) {
- var response = await fetch(TTS_ENDPOINT, {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ text: text })
- });
- if (!response.ok) {
- throw new Error('接口响应错误');
- }
- var chunkCount = await consumeTtsResponse(response);
- if (!chunkCount) {
- throw new Error('接口未返回音频数据');
- }
- markStreamingFinished();
- }
- function createTtsRequest(textResolver, emptyMessage) {
- return async function() {
- var text = textResolver();
- if (!text) {
- ttsStatus.textContent = emptyMessage;
- return;
- }
- setTtsButtonsDisabled(true);
- ttsStatus.textContent = '请求语音...';
- resetAudioPlayback();
- try {
- await streamTtsRequest(text);
- } catch (err) {
- ttsStatus.textContent = 'TTS 出错:' + (err && err.message ? err.message : err);
- resetAudioPlayback();
- } finally {
- setTtsButtonsDisabled(false);
- }
- };
- }
- if (ttsBtn) {
- ttsBtn.addEventListener('click', createTtsRequest(extractProxyText, '暂无可朗读内容'));
- var hasText = !!extractProxyText();
- ttsBtn.disabled = !hasText;
- if (!hasText) {
- ttsStatus.textContent = '高亮完成后可朗读';
- }
- }
- if (ttsSelectionBtn) {
- ttsSelectionBtn.addEventListener('click', createTtsRequest(getSelectedPageText, '请先选择要朗读的文本'));
- }
- })();
- </script>
- </body>
- </html>"""
- )
- ALLOWED_URL_SCHEMES = {"http", "https"}
- MAX_REMOTE_HTML_BYTES = 1_000_000
- REMOTE_FETCH_TIMEOUT = 10.0
- REMOTE_FETCH_HEADERS = {
- # Use a browser-like user agent and common headers so that sites which
- # block generic HTTP clients are more likely to return normal content.
- "User-Agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/124.0.0.0 Safari/537.36"
- ),
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
- "Accept-Language": "en-US,en;q=0.9",
- # Let httpx / the underlying HTTP stack negotiate an encoding it can
- # actually decode. If we unconditionally advertise "br" but the runtime
- # does not have brotli support installed, some sites will respond with
- # brotli-compressed payloads that end up as乱码 or decoding errors.
- #
- # Most modern servers default to gzip or identity when the header is
- # absent, which are both handled fine by httpx.
- # "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Upgrade-Insecure-Requests": "1",
- # A few anti‑bot setups check these request headers; keeping them close
- # to real desktop Chrome values slightly improves compatibility, even
- # though they are not a guarantee against 403 responses.
- "Sec-Fetch-Site": "none",
- "Sec-Fetch-Mode": "navigate",
- "Sec-Fetch-User": "?1",
- "Sec-Fetch-Dest": "document",
- }
- SIMPLE_FETCH_HEADERS = {
- # Minimal browser-like headers for the fallback "simple request" path.
- "User-Agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/124.0.0.0 Safari/537.36"
- ),
- "Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Connection": "close",
- }
- def _inject_proxy_images(html_fragment: str, images: List[Dict[str, str]]) -> str:
- """Replace stable image placeholders with <img> tags in the highlighted HTML."""
- result = html_fragment
- for idx, img in enumerate(images):
- marker = img.get("marker") or f"__GHIMG_{idx}__"
- src = html.escape(img.get("src", "") or "", quote=True)
- if not src:
- continue
- alt = html.escape(img.get("alt", "") or "", quote=True)
- title = html.escape(img.get("title", "") or "", quote=True)
- attrs = [f"src='{src}'"]
- if alt:
- attrs.append(f"alt='{alt}'")
- if title:
- attrs.append(f"title='{title}'")
- # Preserve simple width/height hints when they look safe. Most modern
- # pages rely on CSS for sizing, but explicit attributes can help keep
- # code snippets or diagrams close to their original scale.
- def _safe_dim(value: Optional[str]) -> Optional[str]:
- if not value:
- return None
- value = value.strip()
- if re.fullmatch(r"\d+(?:\.\d+)?(px|%)?", value):
- return value
- return None
- width = _safe_dim(img.get("width"))
- height = _safe_dim(img.get("height"))
- if width:
- attrs.append(f"width='{html.escape(width, quote=True)}'")
- if height:
- attrs.append(f"height='{html.escape(height, quote=True)}'")
- img_tag = "<img " + " ".join(attrs) + " />"
- # Simple textual replacement is sufficient because placeholders
- # are emitted as plain word tokens without HTML meta characters.
- result = result.replace(marker, img_tag)
- return result
- IMG_MARKER_RE = re.compile(r"__GHIMG_\d+__")
- def _strip_proxy_image_markers(html_fragment: str) -> str:
- """Remove residual image placeholders when images are hidden."""
- if IMG_MARKER_RE.search(html_fragment) is None:
- return html_fragment
- return IMG_MARKER_RE.sub("", html_fragment)
- def _inject_proxy_codeblocks(html_fragment: str, code_blocks: List[Dict[str, str]]) -> str:
- """Replace code placeholders with <pre><code> blocks, preserving formatting."""
- result = html_fragment
- for idx, block in enumerate(code_blocks):
- marker = block.get("marker") or f"__GHCODE_{idx}__"
- raw = block.get("text") or ""
- if not raw.strip():
- continue
- # Escape HTML but keep newlines so that <pre> preserves formatting.
- code_html = html.escape(raw, quote=False)
- pre_tag = f"<pre><code>{code_html}</code></pre>"
- result = result.replace(marker, pre_tag)
- return result
- class SimpleHTMLStripper(HTMLParser):
- def __init__(self):
- super().__init__()
- # Accumulate visible text into paragraph-like blocks while skipping
- # navigation / sidebars / ads etc. We do this with a small HTML
- # structure–aware state machine instead of flattening everything.
- self._blocks: List[Dict[str, Any]] = []
- self._current_parts: List[str] = []
- # Track when we are inside potentially main content containers
- # like <article> or <main>.
- self._article_depth = 0
- # Track whether we are inside a preformatted code block so that we
- # can preserve indentation and line breaks instead of collapsing
- # whitespace as normal text.
- self._in_pre = False
- self._in_code = False
- self._current_code_chunks: List[str] = []
- self._code_blocks: List[Dict[str, str]] = []
- # Stack of flags indicating which open tags should be skipped.
- # When any active flag is True, textual data is ignored.
- self._skip_stack: List[bool] = []
- self._skip_depth = 0
- self._title_chunks: List[str] = []
- self._in_title = False
- self._h1_chunks: List[str] = []
- self._h1_main_chunks: List[str] = []
- self._in_h1 = False
- # Collected inline images from the main content, in document order.
- # Each image is represented as a small dict with sanitized attributes.
- self._images: List[Dict[str, str]] = []
- # Active list containers (<ul>/<ol>) and current <li> nesting state.
- self._list_stack: List[Dict[str, Any]] = []
- self._list_item_stack: List[Dict[str, Any]] = []
- # Keywords commonly used in class/id attributes for non‑article areas
- _NOISE_KEYWORDS = {
- "sidebar",
- "side-bar",
- "aside",
- "nav",
- "menu",
- "breadcrumb",
- "breadcrumbs",
- "pagination",
- "pager",
- "comment",
- "comments",
- "reply",
- "advert",
- "ad-",
- "ads",
- "sponsor",
- "promo",
- "promotion",
- "related",
- "recommend",
- "share",
- "social",
- "subscribe",
- "signup",
- "login",
- "popup",
- "modal",
- "banner",
- "cookie",
- "notification",
- "toolbar",
- "footer",
- "header-bar",
- }
- # Tags whose textual content is almost never part of the main article.
- _ALWAYS_SKIP_TAGS = {
- "script",
- "style",
- "noscript",
- "nav",
- "aside",
- "footer",
- "form",
- "svg",
- "iframe",
- "button",
- "input",
- "textarea",
- "select",
- "option",
- "label",
- }
- # Structural container tags where noise classes/roles are meaningful.
- # For purely inline tags we avoid applying aggressive noise heuristics
- # so that important inline text (e.g. spans in the first sentence) is
- # not accidentally dropped.
- _STRUCTURAL_NOISE_TAGS = {
- "div",
- "section",
- "aside",
- "nav",
- "header",
- "footer",
- "main",
- "article",
- "ul",
- "ol",
- "li",
- }
- # Block-level tags that naturally mark paragraph boundaries.
- _BLOCK_TAGS = {
- "p",
- "li",
- "blockquote",
- "h1",
- "h2",
- "h3",
- "h4",
- "h5",
- "h6",
- "pre",
- "table",
- "tr",
- }
- # Keywords for containers that are likely to hold the main article body.
- # Used to decide which regions count as "main content" for both text
- # and inline images.
- _CONTENT_KEYWORDS = {
- "content",
- "main-content",
- "article-body",
- "post-body",
- "post-content",
- "entry-content",
- "story-body",
- "blog-post",
- "markdown-body",
- "readable-content",
- }
- # Keywords on image-related class/id/src that usually indicate avatars,
- # logo icons, decorative banners, etc., which we want to drop from the
- # extracted main content.
- _IMAGE_NOISE_KEYWORDS = {
- "avatar",
- "author",
- "logo",
- "icon",
- "favicon",
- "badge",
- "banner",
- "thumb",
- "thumbnail",
- "profile",
- "cover",
- "background",
- "sprite",
- "emoji",
- "reaction",
- }
- _TEXT_NOISE_KEYWORDS = {
- "menu",
- "menus",
- "navigation",
- "nav",
- "目录",
- "目錄",
- "导航",
- "導航",
- "菜单",
- "菜單",
- "广告",
- "廣告",
- "ad",
- "ads",
- "sponsor",
- "sponsored",
- "上一篇",
- "下一篇",
- "返回顶部",
- "返回頂部",
- "分享",
- "分享至",
- "相关推荐",
- "相关阅读",
- "相關閱讀",
- "recommended",
- "related posts",
- "login",
- "signup",
- }
- _TEXT_NOISE_PREFIXES = (
- "目录",
- "目錄",
- "导航",
- "導航",
- "菜单",
- "菜單",
- "广告",
- "廣告",
- "上一篇",
- "下一篇",
- "上一页",
- "下一页",
- "返回目录",
- "返回目錄",
- "返回顶部",
- "返回頂部",
- "分享",
- "相关",
- "相關",
- "recommended",
- "login",
- "signup",
- )
- def _finish_paragraph(self) -> None:
- """Flush current buffered tokens into a paragraph list."""
- if not self._current_parts:
- return
- # For regular paragraphs we still collapse excessive internal
- # whitespace, but we keep logical breaks between paragraphs
- # themselves so that the downstream highlighter can reconstruct
- # paragraph structure.
- text = " ".join(self._current_parts)
- text = re.sub(r"\s+", " ", text).strip()
- self._current_parts = []
- if not text:
- return
- if self._looks_like_noise_paragraph(text):
- return
- block_kind = "paragraph"
- list_kind: Optional[str] = None
- list_depth = 0
- list_index: Optional[int] = None
- if self._list_item_stack:
- list_ctx = self._list_item_stack[-1]
- block_kind = "list-item"
- list_kind = list_ctx.get("list_type") or "ul"
- depth_value = list_ctx.get("depth", 1)
- try:
- depth_int = int(depth_value)
- except (TypeError, ValueError):
- depth_int = 1
- list_depth = min(max(depth_int, 1), 5)
- if list_kind == "ol":
- idx = list_ctx.get("index")
- if isinstance(idx, int):
- list_index = idx
- self._blocks.append(
- {
- "text": text,
- "is_main": self._article_depth > 0,
- "kind": block_kind,
- "list_kind": list_kind,
- "list_depth": list_depth,
- "list_index": list_index,
- }
- )
- def _looks_like_noise_paragraph(self, text: str) -> bool:
- normalized = text.strip()
- if not normalized:
- return True
- lowered = normalized.lower()
- compact = re.sub(r"\s+", "", lowered)
- for prefix in self._TEXT_NOISE_PREFIXES:
- if lowered.startswith(prefix.lower()):
- if len(normalized) <= 80:
- return True
- if len(normalized) <= 80:
- for keyword in self._TEXT_NOISE_KEYWORDS:
- if keyword in lowered or keyword in compact:
- return True
- # Skip very short bullet-like crumbs that mostly consist of symbols.
- if len(normalized) <= 6 and sum(ch.isalnum() for ch in normalized) <= 1:
- return True
- return False
- @staticmethod
- def _parse_ordered_start(raw_value: Optional[str]) -> int:
- if raw_value is None:
- return 1
- value = raw_value.strip()
- if not value:
- return 1
- try:
- parsed = int(value)
- return parsed if parsed >= 1 else 1
- except ValueError:
- return 1
- def handle_starttag(self, tag, attrs):
- lowered = tag.lower()
- # Paragraph boundary before starting a new block element or <br>.
- if lowered in self._BLOCK_TAGS or lowered == "br":
- if self._skip_depth == 0:
- self._finish_paragraph()
- # Entering a <pre> region – treat it as a dedicated code block.
- if lowered == "pre" and self._skip_depth == 0:
- self._finish_paragraph()
- self._in_pre = True
- self._current_code_chunks = []
- # Decide whether this element should be skipped entirely.
- attr_dict = {k.lower(): (v or "") for k, v in attrs}
- role = attr_dict.get("role", "").lower()
- classes_ids = (attr_dict.get("class", "") + " " + attr_dict.get("id", "")).lower()
- is_noise_attr = False
- # Only treat class/id keywords as layout "noise" on structural
- # containers (div/section/nav/etc). Inline tags with "comment"
- # in their class (like mdspan-comment on Towards Data Science)
- # should not be discarded, otherwise we lose the first words
- # of sentences.
- if lowered in self._STRUCTURAL_NOISE_TAGS:
- is_noise_attr = any(key in classes_ids for key in self._NOISE_KEYWORDS)
- if role in {"navigation", "banner", "contentinfo", "complementary"}:
- is_noise_attr = True
- skip_this = lowered in self._ALWAYS_SKIP_TAGS or is_noise_attr
- if skip_this:
- self._skip_depth += 1
- self._skip_stack.append(skip_this)
- # Track when we are inside an article-like container; only count if not skipped.
- if self._skip_depth == 0 and lowered in {"article", "main", "section", "div"}:
- # Treat semantic containers and common "main content" classes as
- # part of the article area so that we keep their text and inline
- # media but still avoid sidebars / nav.
- if lowered in {"article", "main"} or any(
- key in classes_ids for key in self._CONTENT_KEYWORDS
- ) or role == "main":
- self._article_depth += 1
- if self._skip_depth == 0 and lowered in {"ul", "ol"}:
- start = 1
- if lowered == "ol":
- start = self._parse_ordered_start(attr_dict.get("start"))
- self._list_stack.append(
- {
- "type": lowered,
- "start": start,
- "next_index": start,
- }
- )
- if lowered == "li" and self._skip_depth == 0:
- list_ctx = self._list_stack[-1] if self._list_stack else None
- depth = len(self._list_stack) if self._list_stack else 1
- list_type = list_ctx.get("type") if list_ctx else "ul"
- index = None
- if list_ctx and list_ctx["type"] == "ol":
- index = list_ctx["next_index"]
- list_ctx["next_index"] = index + 1
- li_value = attr_dict.get("value")
- if li_value and list_ctx and list_ctx["type"] == "ol":
- try:
- value_idx = int(li_value)
- index = value_idx
- list_ctx["next_index"] = value_idx + 1
- except ValueError:
- pass
- self._list_item_stack.append(
- {
- "list_type": list_type,
- "index": index,
- "depth": depth,
- }
- )
- if lowered == "title" and self._skip_depth == 0:
- self._in_title = True
- if lowered == "h1" and self._skip_depth == 0:
- self._in_h1 = True
- if lowered == "code" and self._skip_depth == 0 and self._in_pre:
- # Nested <code> inside <pre> – keep track but we don't need
- # separate buffering beyond the enclosing pre block.
- self._in_code = True
- # Inline image handling: only keep <img> elements that are inside the
- # main article content (tracked via _article_depth) and that do not
- # look like avatars / logos / decorative icons. We insert a stable
- # placeholder token into the text stream so that the /proxy renderer
- # can later replace it with a real <img> tag while preserving the
- # grammar highlighting.
- if lowered == "img" and self._skip_depth == 0 and self._article_depth > 0:
- src = attr_dict.get("src", "").strip()
- if src:
- alt = attr_dict.get("alt", "") or ""
- title = attr_dict.get("title", "") or ""
- width = (attr_dict.get("width") or "").strip()
- height = (attr_dict.get("height") or "").strip()
- img_classes_ids = classes_ids + " " + src.lower()
- if any(key in img_classes_ids for key in self._IMAGE_NOISE_KEYWORDS):
- return
- marker = f"__GHIMG_{len(self._images)}__"
- img_info: Dict[str, str] = {
- "marker": marker,
- "src": src,
- "alt": alt,
- "title": title,
- }
- if width:
- img_info["width"] = width
- if height:
- img_info["height"] = height
- self._images.append(img_info)
- # Treat the image as an inline token within the current
- # paragraph. Paragraph finishing logic will ensure it
- # stays grouped with surrounding text.
- self._current_parts.append(marker)
- def handle_endtag(self, tag):
- lowered = tag.lower()
- if lowered == "code" and self._in_code:
- self._in_code = False
- if lowered == "pre" and self._in_pre:
- self._in_pre = False
- # Finalize the current code block into a single placeholder
- # token so that it passes through the grammar highlighter
- # untouched, and can later be restored as a <pre><code> block.
- code_text = "".join(self._current_code_chunks)
- self._current_code_chunks = []
- if code_text.strip() and self._skip_depth == 0:
- marker = f"__GHCODE_{len(self._code_blocks)}__"
- self._code_blocks.append({"marker": marker, "text": code_text})
- # We append the marker to the paragraph parts so that
- # get_text() emits it in the right position.
- self._current_parts.append(marker)
- # Closing a block element ends the current paragraph.
- if lowered in self._BLOCK_TAGS and self._skip_depth == 0:
- self._finish_paragraph()
- if lowered == "li" and self._skip_depth == 0 and self._list_item_stack:
- self._list_item_stack.pop()
- if lowered in {"ul", "ol"} and self._skip_depth == 0 and self._list_stack:
- self._list_stack.pop()
- if lowered == "title":
- self._in_title = False
- if lowered == "h1":
- self._in_h1 = False
- if lowered in {"article", "main", "section"} and self._skip_depth == 0 and self._article_depth > 0:
- self._article_depth -= 1
- if self._skip_stack:
- skip_this = self._skip_stack.pop()
- if skip_this and self._skip_depth > 0:
- self._skip_depth -= 1
- def handle_data(self, data):
- if self._skip_depth > 0:
- return
- if self._in_pre or self._in_code:
- # Preserve code blocks exactly as they appear, including
- # newlines and indentation.
- self._current_code_chunks.append(data)
- return
- stripped = data.strip()
- if not stripped:
- return
- if self._in_title:
- self._title_chunks.append(stripped)
- return
- # Regular visible text
- self._current_parts.append(stripped)
- if self._in_h1:
- self._h1_chunks.append(stripped)
- if self._article_depth > 0:
- self._h1_main_chunks.append(stripped)
- def get_text(self) -> str:
- # Flush any trailing paragraph.
- self._finish_paragraph()
- blocks = self._selected_blocks()
- if not blocks:
- return ""
- return "\n\n".join(block["text"] for block in blocks)
- def _selected_blocks(self) -> List[Dict[str, Any]]:
- if not self._blocks:
- return []
- main_blocks = [block for block in self._blocks if block.get("is_main")]
- return main_blocks if main_blocks else self._blocks
- def get_blocks(self) -> List[Dict[str, Any]]:
- blocks = self._selected_blocks()
- return [dict(block) for block in blocks]
- def get_title(self) -> str:
- # Prefer <h1> heading (especially inside <article>/<main>) as the
- # primary title; fall back to <title>.
- if self._h1_main_chunks:
- raw = " ".join(self._h1_main_chunks)
- elif self._h1_chunks:
- raw = " ".join(self._h1_chunks)
- elif self._title_chunks:
- raw = " ".join(self._title_chunks)
- else:
- return ""
- return re.sub(r"\s+", " ", raw).strip()
- def get_images(self) -> List[Dict[str, str]]:
- """Return the list of captured inline images in document order."""
- return list(self._images)
- def get_code_blocks(self) -> List[Dict[str, str]]:
- """Return captured code blocks (from <pre>/<code>) in document order."""
- return list(self._code_blocks)
- def _normalize_target_url(raw_url: str) -> str:
- candidate = (raw_url or "").strip()
- if not candidate:
- raise ValueError("请输入要抓取的 URL。")
- parsed = urlparse(candidate if "://" in candidate else f"https://{candidate}")
- if parsed.scheme not in ALLOWED_URL_SCHEMES:
- raise ValueError("仅支持 http/https 协议链接。")
- if not parsed.netloc:
- raise ValueError("URL 缺少域名部分。")
- sanitized = parsed._replace(fragment="")
- return urlunparse(sanitized)
- def _fallback_html_to_text(html_body: str) -> str:
- """Very simple HTML-to-text fallback used when structured extraction fails.
- This does not attempt to distinguish main content from navigation, but it
- guarantees we return *something* for pages whose structure confuses the
- SimpleHTMLStripper heuristics (e.g. some mirror sites).
- """
- # Drop script/style/noscript content outright.
- cleaned = re.sub(
- r"(?is)<(script|style|noscript)[^>]*>.*?</\1>",
- " ",
- html_body,
- )
- # Convert common block separators into newlines.
- cleaned = re.sub(r"(?i)<br\s*/?>", "\n", cleaned)
- cleaned = re.sub(r"(?i)</p\s*>", "\n\n", cleaned)
- cleaned = re.sub(r"(?i)</(div|section|article|li|h[1-6])\s*>", "\n\n", cleaned)
- # Remove all remaining tags.
- cleaned = re.sub(r"(?is)<[^>]+>", " ", cleaned)
- cleaned = html.unescape(cleaned)
- # Normalize whitespace but keep paragraph-level blank lines.
- cleaned = cleaned.replace("\r", "")
- # Collapse runs of spaces/tabs inside lines.
- cleaned = re.sub(r"[ \t\f\v]+", " ", cleaned)
- # Collapse 3+ blank lines into just 2.
- cleaned = re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned)
- cleaned = cleaned.strip()
- return cleaned
- def _build_paragraph_metadata(blocks: List[Dict[str, Any]]) -> List[Dict[str, str]]:
- """Convert stripped block info into span attributes for downstream rendering."""
- if not blocks:
- return []
- paragraph_meta: List[Dict[str, str]] = []
- for block in blocks:
- attrs: Dict[str, str] = {}
- if block.get("kind") == "list-item" and block.get("list_kind"):
- attrs["data-list-kind"] = str(block["list_kind"])
- depth = block.get("list_depth")
- if depth:
- attrs["data-list-depth"] = str(depth)
- if block.get("list_kind") == "ol" and block.get("list_index") is not None:
- attrs["data-list-index"] = str(block["list_index"])
- paragraph_meta.append(attrs)
- return paragraph_meta
- def _decode_html_bytes(raw_content: bytes, encoding_hint: Optional[str]) -> str:
- encoding_candidates: List[str] = []
- if encoding_hint:
- encoding_candidates.append(encoding_hint)
- encoding_candidates.extend(["utf-8", "latin-1"])
- last_exc: Optional[Exception] = None
- for enc in encoding_candidates:
- try:
- html_body = raw_content.decode(enc, errors="replace")
- break
- except Exception as exc: # pragma: no cover - defensive
- last_exc = exc
- else: # pragma: no cover - extremely unlikely
- raise RuntimeError(f"无法解码远程页面内容: {last_exc}")
- if len(html_body) > MAX_REMOTE_HTML_BYTES:
- html_body = html_body[:MAX_REMOTE_HTML_BYTES]
- return html_body
- async def _download_html_via_httpx(url: str) -> str:
- async with httpx.AsyncClient(timeout=REMOTE_FETCH_TIMEOUT, follow_redirects=True) as client:
- response = await client.get(url, headers=REMOTE_FETCH_HEADERS)
- html_body = _decode_html_bytes(response.content, response.encoding)
- response.raise_for_status()
- return html_body
- async def _download_html_via_stdlib(url: str) -> str:
- def _sync_fetch() -> Tuple[bytes, Optional[str]]:
- req = urllib_request.Request(url, headers=SIMPLE_FETCH_HEADERS)
- opener = urllib_request.build_opener(urllib_request.ProxyHandler({}))
- with opener.open(req, timeout=REMOTE_FETCH_TIMEOUT) as resp:
- data = resp.read(MAX_REMOTE_HTML_BYTES + 1)
- headers = getattr(resp, "headers", None)
- encoding_hint = None
- if headers is not None:
- get_charset = getattr(headers, "get_content_charset", None)
- if callable(get_charset):
- encoding_hint = get_charset()
- if not encoding_hint:
- content_type = headers.get("Content-Type", "")
- match = re.search(r"charset=([\w-]+)", content_type or "", re.IGNORECASE)
- if match:
- encoding_hint = match.group(1)
- return data, encoding_hint
- raw_content, encoding_hint = await asyncio.to_thread(_sync_fetch)
- return _decode_html_bytes(raw_content, encoding_hint)
- async def _download_html_with_fallback(url: str) -> str:
- first_exc: Optional[Exception] = None
- try:
- return await _download_html_via_httpx(url)
- except httpx.HTTPStatusError as exc:
- status = exc.response.status_code if exc.response is not None else None
- if status not in {401, 403, 407, 451, 429}:
- raise
- first_exc = exc
- except httpx.HTTPError as exc:
- first_exc = exc
- try:
- return await _download_html_via_stdlib(url)
- except (urllib_error.URLError, urllib_error.HTTPError, TimeoutError) as fallback_exc:
- if first_exc:
- raise first_exc from fallback_exc
- raise
- async def _fetch_remote_plaintext(
- url: str,
- ) -> Tuple[str, str, str, List[Dict[str, str]], List[Dict[str, str]], List[Dict[str, str]]]:
- normalized = _normalize_target_url(url)
- html_body = await _download_html_with_fallback(normalized)
- stripper = SimpleHTMLStripper()
- stripper.feed(html_body)
- title = stripper.get_title() or normalized
- images = stripper.get_images()
- code_blocks = stripper.get_code_blocks()
- plain_text = stripper.get_text()
- block_info = stripper.get_blocks()
- if not plain_text:
- plain_text = _fallback_html_to_text(html_body)
- if not plain_text:
- raise ValueError("未能从该页面提取正文。")
- # Fallback text no longer contains structured placeholders, so any
- # collected media/code markers would be invalid.
- images = []
- code_blocks = []
- block_info = []
- paragraph_meta = _build_paragraph_metadata(block_info)
- return normalized, title, plain_text, images, code_blocks, paragraph_meta
- def _render_proxy_page(
- *,
- url_value: str = "",
- message: Optional[str] = None,
- is_error: bool = False,
- highlight_fragment: Optional[str] = None,
- source_url: Optional[str] = None,
- source_title: Optional[str] = None,
- show_images: bool = False,
- image_notice: Optional[str] = None,
- ) -> str:
- helper_state = "on" if SENTENCE_HELPER_ENABLED else "off"
- status_block = ""
- if message:
- cls = "status err" if is_error else "status ok"
- status_block = f"<p class='{cls}'>{html.escape(message)}</p>"
- style_block = STYLE_BLOCK if highlight_fragment else ""
- result_block = ""
- if highlight_fragment and source_url:
- safe_url = html.escape(source_url, quote=True)
- safe_title = html.escape(source_title or source_url)
- image_hint = ""
- if image_notice:
- image_hint = f"<p class='image-hint'>{html.escape(image_notice)}</p>"
- result_block = (
- "<section class='result'>"
- f"<div class='source'>原页面:<a href='{safe_url}' target='_blank' rel='noopener'>{safe_title}</a></div>"
- f"<div class='analysis' data-helper='{helper_state}'>{highlight_fragment}</div>"
- f"{image_hint}"
- "</section>"
- )
- show_images_checked = "checked" if show_images else ""
- return PROXY_PAGE_TEMPLATE.substitute(
- style_block=style_block,
- url_value=html.escape(url_value or "", quote=True),
- status_block=status_block,
- result_block=result_block,
- show_images_checked=show_images_checked,
- )
|