# -*- coding: utf-8 -*- """Grammar highlighter powered by spaCy + benepar constituency parsing.""" import asyncio import html import json import re from collections import Counter from dataclasses import dataclass, field from html.parser import HTMLParser from string import Template from typing import Any, Dict, List, Optional, Tuple from urllib import error as urllib_error, request as urllib_request from urllib.parse import urlparse, urlunparse import benepar import httpx import spacy from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from pydantic import BaseModel, Field from spacy.cli import download as spacy_download from spacy.language import Language from spacy.tokens import Span as SpacySpan, Token as SpacyToken from style_config import STYLE_BLOCK BENE_PAR_WARNING: Optional[str] = None HAS_BENEPAR: bool = False # new: track whether benepar was successfully attached def _ensure_benepar_warning(message: str) -> None: """Record a warning once when benepar annotations are unavailable.""" global BENE_PAR_WARNING if not BENE_PAR_WARNING: BENE_PAR_WARNING = message def _load_spacy_pipeline( model_name: str = "en_core_web_sm", benepar_model: str = "benepar_en3" ) -> Language: global BENE_PAR_WARNING, HAS_BENEPAR BENE_PAR_WARNING = None HAS_BENEPAR = False try: nlp = spacy.load(model_name) except OSError: try: spacy_download(model_name) nlp = spacy.load(model_name) except Exception as exc: # pragma: no cover - install helper raise RuntimeError( f"spaCy model '{model_name}' is required. Install via `python -m spacy download {model_name}`." ) from exc # Ensure we have sentence segmentation available pipe_names = set(nlp.pipe_names) if not ({"parser", "senter", "sentencizer"} & pipe_names): try: nlp.add_pipe("sentencizer") except Exception: pass # if already present or unavailable, ignore # Try to add benepar if "benepar" not in nlp.pipe_names: try: nlp.add_pipe("benepar", config={"model": benepar_model}, last=True) HAS_BENEPAR = True except ValueError: try: benepar.download(benepar_model) nlp.add_pipe("benepar", config={"model": benepar_model}, last=True) HAS_BENEPAR = True except Exception as exc: # pragma: no cover - install helper HAS_BENEPAR = False BENE_PAR_WARNING = ( "Benepar model '{model}' unavailable ({err}). Falling back to dependency-based spans." ).format(model=benepar_model, err=exc) except Exception as exc: HAS_BENEPAR = False BENE_PAR_WARNING = ( "Failed to attach benepar parser to spaCy pipeline. Falling back to dependency-based spans ({err})." ).format(err=exc) else: HAS_BENEPAR = True return nlp try: NLP: Optional[Language] = _load_spacy_pipeline() NLP_LOAD_ERROR: Optional[Exception] = None except Exception as exc: # pragma: no cover - import-time diagnostics NLP = None NLP_LOAD_ERROR = exc class AnalyzeRequest(BaseModel): text: str = Field(..., description="Raw English text to highlight") class AnalyzeResponse(BaseModel): highlighted_html: str @dataclass class Token: text: str start: int end: int kind: str # 'word' | 'space' | 'punct' @dataclass class Span: start_token: int end_token: int cls: str attrs: Optional[Dict[str, str]] = None @dataclass class SentenceSummary: subjects: List[str] = field(default_factory=list) predicates: List[str] = field(default_factory=list) objects: List[str] = field(default_factory=list) complements: List[str] = field(default_factory=list) clauses: List[str] = field(default_factory=list) clause_functions: List[str] = field(default_factory=list) connectors: List[str] = field(default_factory=list) residual_roles: List[str] = field(default_factory=list) sentence_length: int = 0 TOKEN_REGEX = re.compile( r""" (?:\s+) |(?:\d+(?:[\.,]\d+)*) |(?:\w+(?:[-']\w+)*) |(?:.) """, re.VERBOSE | re.UNICODE, ) WORD_LIKE_RE = re.compile(r"\w+(?:[-']\w+)*\Z", re.UNICODE) NUMBER_RE = re.compile(r"\d+(?:[\.,]\d+)*\Z", re.UNICODE) PARAGRAPH_BREAK_RE = re.compile(r"(?:\r?\n[ \t]*){2,}") SUBJECT_DEPS = {"nsubj", "nsubjpass", "csubj", "csubjpass"} DIRECT_OBJECT_DEPS = {"dobj", "obj"} INDIRECT_OBJECT_DEPS = {"iobj", "dative"} COMPLEMENT_DEPS = {"attr", "oprd", "acomp", "ccomp", "xcomp"} ADVERBIAL_DEPS = {"advmod", "npadvmod", "advcl", "obl", "prep", "pcomp"} RELATIVE_PRONOUNS = {"which", "that", "who", "whom", "whose", "where", "when"} SUBORDINATORS_TO_FUNCTION = { "when": "TIME", "while": "TIME", "after": "TIME", "before": "TIME", "until": "TIME", "as": "TIME", "once": "TIME", "since": "TIME", "because": "REASON", "now that": "REASON", "if": "CONDITION", "unless": "CONDITION", "provided": "CONDITION", "provided that": "CONDITION", "although": "CONCESSION", "though": "CONCESSION", "even though": "CONCESSION", "whereas": "CONCESSION", "so that": "RESULT", "so": "RESULT", "lest": "PURPOSE", "in order that": "PURPOSE", } FINITE_VERB_TAGS = {"VBD", "VBP", "VBZ"} NONFINITE_VERB_TAGS = {"VBG", "VBN"} FIXED_MULTIWORD_PHRASES: Tuple[Tuple[re.Pattern, str], ...] = tuple( ( re.compile(pattern, re.IGNORECASE), label, ) for pattern, label in [ (r"\bas well as\b", "as well as"), (r"\brather than\b", "rather than"), (r"\bin addition to\b", "in addition to"), (r"\bin spite of\b", "in spite of"), (r"\baccording to\b", "according to"), (r"\bas soon as\b", "as soon as"), ] ) CLAUSE_FUNCTION_LABELS = { "TIME": "时间", "REASON": "原因", "CONDITION": "条件", "CONCESSION": "让步", "RESULT": "结果", "PURPOSE": "目的", } RESIDUAL_DEP_LABELS = { "det": "限定词", "prep": "介词", "case": "介词标记", "cc": "并列连词", "mark": "从属连词", "poss": "所有格标记", "nummod": "数量修饰语", "aux": "助动词", "prt": "小品词", } RESIDUAL_POS_LABELS = { "ADJ": "形容词修饰语", "ADV": "副词", "NUM": "数词", "PRON": "代词", } def _classify_segment(seg: str) -> str: if not seg: return "punct" if seg.isspace(): return "space" if NUMBER_RE.fullmatch(seg) or WORD_LIKE_RE.fullmatch(seg): return "word" return "punct" def _append_fallback_tokens(text: str, start: int, end: int, tokens: List[Token]) -> None: for idx in range(start, end): ch = text[idx] if ch.isspace(): kind = "space" elif ch.isalnum() or ch == "_": kind = "word" else: kind = "punct" tokens.append(Token(ch, idx, idx + 1, kind)) def tokenize_preserve(text: str) -> List[Token]: tokens: List[Token] = [] if not text: return tokens last_end = 0 for match in TOKEN_REGEX.finditer(text): if match.start() > last_end: _append_fallback_tokens(text, last_end, match.start(), tokens) seg = text[match.start() : match.end()] tokens.append(Token(seg, match.start(), match.end(), _classify_segment(seg))) last_end = match.end() if last_end < len(text): _append_fallback_tokens(text, last_end, len(text), tokens) if not tokens and text: tokens = [Token(text, 0, len(text), "word" if text[0].isalnum() else "punct")] return tokens def build_char_to_token_map(tokens: List[Token]) -> Dict[int, int]: mapping: Dict[int, int] = {} for idx, tok in enumerate(tokens): for pos in range(tok.start, tok.end): mapping[pos] = idx return mapping def char_span_to_token_span( char_start: int, char_end: int, mapping: Dict[int, int] ) -> Tuple[int, int]: if char_end <= char_start: return -1, -1 start_idx = mapping.get(char_start) end_idx = mapping.get(char_end - 1) if start_idx is None or end_idx is None: return -1, -1 return start_idx, end_idx + 1 def add_char_based_span( spans: List[Span], char_start: int, char_end: int, cls: str, mapping: Dict[int, int], attrs: Optional[Dict[str, str]] = None, ) -> None: s_tok, e_tok = char_span_to_token_span(char_start, char_end, mapping) if s_tok < 0 or e_tok < 0: return safe_attrs = None if attrs: safe_attrs = {k: html.escape(v, quote=True) for k, v in attrs.items() if v} spans.append(Span(start_token=s_tok, end_token=e_tok, cls=cls, attrs=safe_attrs)) def add_span(spans: List[Span], start_token: int, end_token: int, cls: str, attrs: Optional[Dict[str, str]] = None): if start_token < 0 or end_token < 0 or end_token <= start_token: return spans.append(Span(start_token=start_token, end_token=end_token, cls=cls, attrs=attrs)) def subtree_char_span(token: SpacyToken) -> Tuple[int, int]: subtree = list(token.subtree) if not subtree: return token.idx, token.idx + len(token.text) return subtree[0].idx, subtree[-1].idx + len(subtree[-1].text) def _subtree_text(token: SpacyToken) -> str: span = token.doc[token.left_edge.i : token.right_edge.i + 1] return span.text def _find_antecedent_word(sentence: SpacySpan, clause_start_char: int) -> Optional[str]: candidate = None for tok in sentence: if tok.idx >= clause_start_char: break if tok.pos_ in {"NOUN", "PROPN", "PRON"}: candidate = tok.text return candidate def _is_nonfinite_clause(span: SpacySpan) -> bool: tags = {tok.tag_ for tok in span if tok.tag_} if tags & FINITE_VERB_TAGS: return False if "TO" in tags or tags & NONFINITE_VERB_TAGS: return True return False def _classify_noun_clause(span: SpacySpan) -> Optional[str]: deps = {tok.dep_ for tok in span} if deps & {"csubj", "csubjpass"}: return "subject" if deps & {"ccomp", "xcomp"}: return "complement" if deps & {"dobj", "obj"}: return "object" return None def _split_paragraph_ranges(text: str) -> List[Tuple[int, int]]: """Return inclusive paragraph ranges, keeping separators intact.""" if not text: return [(0, 0)] ranges: List[Tuple[int, int]] = [] start = 0 for match in PARAGRAPH_BREAK_RE.finditer(text): ranges.append((start, match.start())) start = match.end() ranges.append((start, len(text))) # Ensure at least one range and sorted order if not ranges: ranges = [(0, len(text))] return ranges def _circled_number(value: int) -> str: """Return the circled number style for sentence numbering.""" if value <= 0: return "" if value <= 20: return chr(ord("\u2460") + value - 1) if 21 <= value <= 35: return chr(ord("\u3251") + value - 21) if 36 <= value <= 50: return chr(ord("\u32B1") + value - 36) return f"({value})" def annotate_constituents( sentence: SpacySpan, spans: List[Span], mapping: Dict[int, int], sentence_start_char: int, sentence_end_char: int, summary: Optional[SentenceSummary] = None, ) -> None: # If benepar is not attached or a previous warning indicates fallback, skip. if not HAS_BENEPAR or BENE_PAR_WARNING: _ensure_benepar_warning( "Benepar component missing or unavailable. Using dependency-based spans." ) return # If the extension is not present, skip if not SpacySpan.has_extension("constituents"): _ensure_benepar_warning( "Benepar component missing from spaCy pipeline. Falling back to dependency spans." ) return try: constituents = sentence._.constituents except Exception as exc: # Catch any error while accessing benepar results and fallback safely _ensure_benepar_warning( f"Benepar constituency parse unavailable: {exc}. Falling back to dependency spans." ) return seen_ranges = set() for const in constituents: label = getattr(const, "label_", None) if not label: continue start_char, end_char = const.start_char, const.end_char if start_char == sentence_start_char and end_char == sentence_end_char: continue # skip the entire sentence span itself key = (start_char, end_char, label) is_relative = False if label in {"PP", "ADVP"}: if key in seen_ranges: continue seen_ranges.add(key) add_char_based_span(spans, start_char, end_char, "role-adverbial", mapping) continue if label == "SBAR" and const: first_token = const[0] lowered = first_token.text.lower() if lowered in RELATIVE_PRONOUNS: antecedent = _find_antecedent_word(sentence, start_char) attrs = {"data-modifies": antecedent} if antecedent else None add_char_based_span(spans, start_char, end_char, "clause-relative", mapping, attrs) if summary: summary.clauses.append("定语从句") is_relative = True else: function = SUBORDINATORS_TO_FUNCTION.get(lowered) attrs = {"data-function": function} add_char_based_span(spans, start_char, end_char, "clause-adverbial", mapping, attrs) if summary: summary.clauses.append("状语从句") if function: summary.clause_functions.append(function) continue if label in {"S", "VP"}: if _is_nonfinite_clause(const): add_char_based_span(spans, start_char, end_char, "clause-nonfinite", mapping) if summary: summary.clauses.append("非限定结构") continue if label == "S" and not is_relative: role = _classify_noun_clause(const) if role: attrs = {"data-clause-role": role} add_char_based_span(spans, start_char, end_char, "clause-noun", mapping, attrs) if summary: summary.clauses.append(f"名词性从句({role})") def _predicate_span_bounds(head: SpacyToken) -> Tuple[int, int]: """Return a character range covering predicate head + functional dependents.""" tokens = [head] for child in head.children: if child.dep_ in {"aux", "auxpass", "prt", "cop", "neg"}: tokens.append(child) start_char = min(tok.idx for tok in tokens) end_char = max(tok.idx + len(tok.text) for tok in tokens) return start_char, end_char def _predicate_heads(sentence: SpacySpan) -> List[SpacyToken]: """Collect predicate heads including coordinated verbs.""" candidates: List[SpacyToken] = [] for tok in sentence: if tok.pos_ not in {"VERB", "AUX"} and tok.tag_ not in FINITE_VERB_TAGS: continue if tok.dep_ == "ROOT": candidates.append(tok) continue if tok.dep_ == "conj" and tok.head.pos_ in {"VERB", "AUX"}: candidates.append(tok) continue if tok.dep_ in {"ccomp", "xcomp", "advcl", "acl", "relcl", "parataxis"}: candidates.append(tok) seen = set() ordered: List[SpacyToken] = [] for tok in sorted(candidates, key=lambda t: t.i): if tok.i in seen: continue seen.add(tok.i) ordered.append(tok) return ordered def _add_fixed_phrases( sentence: SpacySpan, mapping: Dict[int, int], spans: List[Span], summary: Optional[SentenceSummary] = None, ) -> None: base = sentence.start_char text = sentence.text for pattern, label in FIXED_MULTIWORD_PHRASES: for match in pattern.finditer(text): start_char = base + match.start() end_char = base + match.end() add_char_based_span( spans, start_char, end_char, "phrase-fixed", mapping, attrs={"data-phrase": label}, ) if summary is not None: summary.connectors.append(label.lower()) def annotate_sentence( tokens: List[Token], sentence: SpacySpan, mapping: Dict[int, int], collect_summary: bool = True, ) -> Tuple[List[Span], Optional[SentenceSummary]]: spans: List[Span] = [] summary = SentenceSummary(sentence_length=len(sentence)) if collect_summary else None sent_bounds = char_span_to_token_span(sentence.start_char, sentence.end_char, mapping) sent_start_tok, sent_end_tok = sent_bounds def add_subtree(token: SpacyToken, cls: str): start_char, end_char = subtree_char_span(token) add_char_based_span(spans, start_char, end_char, cls, mapping) def add_token(token: SpacyToken, cls: str): add_char_based_span(spans, token.idx, token.idx + len(token.text), cls, mapping) for tok in sentence: if tok.dep_ in SUBJECT_DEPS: add_subtree(tok, "role-subject") if summary is not None: summary.subjects.append(_subtree_text(tok)) for head in _predicate_heads(sentence): start_char, end_char = _predicate_span_bounds(head) add_char_based_span(spans, start_char, end_char, "role-predicate", mapping) predicate_text = sentence.doc.text[start_char:end_char].strip() if summary is not None: summary.predicates.append(predicate_text or head.text) for tok in sentence: if tok.dep_ in DIRECT_OBJECT_DEPS: add_subtree(tok, "role-object-do") if summary is not None: summary.objects.append(_subtree_text(tok)) break io_token = next((tok for tok in sentence if tok.dep_ in INDIRECT_OBJECT_DEPS), None) if io_token is None: for tok in sentence: if tok.dep_ == "pobj" and tok.head.dep_ == "prep" and tok.head.lemma_.lower() in {"to", "for"}: io_token = tok break if io_token: add_subtree(io_token, "role-object-io") if summary is not None: summary.objects.append(_subtree_text(io_token)) for tok in sentence: if tok.dep_ in COMPLEMENT_DEPS: add_subtree(tok, "role-complement") if summary is not None: summary.complements.append(_subtree_text(tok)) break for tok in sentence: lowered = tok.text.lower() if tok.dep_ in {"cc", "mark", "preconj"} or tok.pos_ in {"CCONJ", "SCONJ"}: add_token(tok, "role-connector") if summary is not None: summary.connectors.append(lowered) if tok.dep_ == "det" or tok.pos_ == "DET": add_token(tok, "role-determiner") if tok.dep_ in {"amod", "poss", "compound", "nummod"}: add_token(tok, "role-modifier") adverbial_ranges = set() for tok in sentence: if tok.dep_ in ADVERBIAL_DEPS: adverbial_ranges.add(subtree_char_span(tok)) for start_char, end_char in adverbial_ranges: add_char_based_span(spans, start_char, end_char, "role-adverbial", mapping) for tok in sentence: if tok.dep_ == "appos": add_subtree(tok, "role-apposition") if sent_start_tok >= 0 and sent_end_tok >= 0: stack = [] for idx in range(sent_start_tok, sent_end_tok): token = tokens[idx] if token.text == "(": stack.append(idx) elif token.text == ")" and stack: add_span(spans, stack.pop(), idx + 1, "role-parenthetical") comma_token_idxs = [ i for i in range(sent_start_tok, sent_end_tok) if tokens[i].kind == "punct" and tokens[i].text == "," ] for idx, first_comma in enumerate(comma_token_idxs): if idx + 1 >= len(comma_token_idxs): break second_comma = comma_token_idxs[idx + 1] start_char = tokens[first_comma].start end_char = tokens[second_comma].end span = sentence.doc.char_span(start_char, end_char, alignment_mode="expand") if span and any(tok.tag_ == "VBG" for tok in span): add_span(spans, first_comma, second_comma + 1, "role-absolute") annotate_constituents( sentence, spans, mapping, sentence.start_char, sentence.end_char, summary, ) _add_fixed_phrases(sentence, mapping, spans, summary) return spans, summary def _label_residual_token(token: SpacyToken) -> Optional[str]: dep_label = RESIDUAL_DEP_LABELS.get(token.dep_) if dep_label: return dep_label return RESIDUAL_POS_LABELS.get(token.pos_) def _collect_residual_roles( sentence: SpacySpan, tokens: List[Token], spans: List[Span], sent_bounds: Tuple[int, int], summary: Optional[SentenceSummary], mapping: Dict[int, int], ) -> None: sent_start, sent_end = sent_bounds if sent_start < 0 or sent_end < 0 or sent_start >= sent_end: return coverage = [False] * (sent_end - sent_start) for span in spans: lo = max(span.start_token, sent_start) hi = min(span.end_token, sent_end) for idx in range(lo, hi): coverage[idx - sent_start] = True doc = sentence.doc for offset, covered in enumerate(coverage): if covered: continue token = tokens[sent_start + offset] if token.kind != "word": continue span = doc.char_span(token.start, token.end, alignment_mode="expand") if not span or not span.text.strip(): continue label = _label_residual_token(span[0]) if summary is not None and label and label not in summary.residual_roles: summary.residual_roles.append(label) if label: add_char_based_span( spans, token.start, token.end, "role-residual", mapping, attrs={"data-role": label}, ) def _classify_sentence_complexity(summary: SentenceSummary) -> Tuple[str, bool]: clause_count = len(summary.clauses) connector_count = len(summary.connectors) word_count = summary.sentence_length if clause_count >= 2: return "多重复杂句", True if clause_count == 1: return "主从复合句", True if connector_count >= 2: return "并列复合句", True if word_count >= 25: return "长句", True return "简单句", False def _translate_clause_functions(functions: List[str]) -> List[str]: translated = [] for item in functions: label = CLAUSE_FUNCTION_LABELS.get(item, item) if label not in translated: translated.append(label) return translated def build_sentence_note(summary: SentenceSummary) -> Tuple[str, bool]: note_parts: List[str] = [] clause_label = "无" if summary.clauses: counts = Counter(summary.clauses) clause_label = "、".join( f"{name}×{count}" if count > 1 else name for name, count in counts.items() ) functions = _translate_clause_functions(summary.clause_functions) connectors = list(dict.fromkeys(summary.connectors)) residual = summary.residual_roles subjects_seq = list(dict.fromkeys(summary.subjects)) predicates_seq = list(dict.fromkeys(summary.predicates)) objects_seq = list(dict.fromkeys(summary.objects)) complements_seq = list(dict.fromkeys(summary.complements)) subjects = "、".join(subjects_seq) if subjects_seq else "未识别" predicates = "、".join(predicates_seq) if predicates_seq else "未识别" objects = "、".join(objects_seq) if objects_seq else "无" complements = "、".join(complements_seq) if complements_seq else "无" note_parts.append(f"主语:{subjects}") note_parts.append(f"谓语:{predicates}") note_parts.append(f"宾语:{objects}") if complements != "无": note_parts.append(f"补语:{complements}") note_parts.append(f"从句:{clause_label}") if functions: note_parts.append(f"从句功能:{'、'.join(functions)}") connector_text = "、".join(connectors) if connectors else "未检测到典型连接词" note_parts.append(f"连接词:{connector_text}") if residual: note_parts.append(f"未高亮:{'、'.join(residual)}") complexity_label, is_complex = _classify_sentence_complexity(summary) note_parts.insert(0, f"句型:{complexity_label}") note_parts.append(f"词数:{summary.sentence_length}") return ";".join(note_parts), is_complex def render_with_spans(tokens: List[Token], spans: List[Span]) -> str: spans = sorted(spans, key=lambda s: (s.start_token, -s.end_token)) out_parts: List[str] = [] active_stack: List[Span] = [] span_queue = list(spans) current_idx = 0 def open_span(span: Span): attrs = "" if span.attrs: attrs = " " + " ".join( f"{k}='" + html.escape(v, quote=True) + "'" for k, v in span.attrs.items() ) out_parts.append(f"") def close_span(): out_parts.append("") while current_idx < len(tokens): opening = [sp for sp in span_queue if sp.start_token == current_idx] for sp in opening: open_span(sp) active_stack.append(sp) span_queue.remove(sp) token = tokens[current_idx] out_parts.append(html.escape(token.text)) current_idx += 1 while active_stack and active_stack[-1].end_token == current_idx: active_stack.pop() close_span() while active_stack: active_stack.pop() close_span() return "".join(out_parts) def _run_pipeline_without_benepar(text: str) -> "spacy.tokens.Doc": """Run the spaCy pipeline skipping benepar, for robust fallback.""" assert NLP is not None doc = NLP.make_doc(text) for name, proc in NLP.pipeline: if name == "benepar": continue doc = proc(doc) return doc def highlight_text_with_spacy( text: str, paragraph_meta: Optional[List[Dict[str, str]]] = None, include_helper: bool = False, ) -> str: if NLP is None: raise RuntimeError(f"spaCy pipeline unavailable: {NLP_LOAD_ERROR}") tokens = tokenize_preserve(text) if not tokens: return "" mapping = build_char_to_token_map(tokens) # Robust doc creation: if benepar causes any error, skip it and fallback. try: doc = NLP(text) except Exception as exc: _ensure_benepar_warning( f"Benepar failed during processing: {exc}. Falling back to dependency-based spans." ) doc = _run_pipeline_without_benepar(text) paragraph_ranges = _split_paragraph_ranges(text) paragraph_counters = [0 for _ in paragraph_ranges] paragraph_idx = 0 paragraph_spans: List[Span] = [] paragraph_attrs = paragraph_meta if paragraph_meta and len(paragraph_meta) == len(paragraph_ranges) else None for idx, (start, end) in enumerate(paragraph_ranges): attrs = None if paragraph_attrs: attrs = paragraph_attrs[idx] or None add_char_based_span(paragraph_spans, start, end, "paragraph-scope", mapping, attrs=attrs) spans: List[Span] = list(paragraph_spans) for sent in doc.sents: while paragraph_idx < len(paragraph_ranges) and paragraph_ranges[paragraph_idx][1] <= sent.start_char: paragraph_idx += 1 current_idx = min(paragraph_idx, len(paragraph_ranges) - 1) paragraph_counters[current_idx] += 1 sentence_label = _circled_number(paragraph_counters[current_idx]) sentence_spans, summary = annotate_sentence(tokens, sent, mapping, collect_summary=include_helper) sent_bounds = char_span_to_token_span(sent.start_char, sent.end_char, mapping) sent_start, sent_end = sent_bounds if sent_start >= 0 and sent_end >= 0: _collect_residual_roles(sent, tokens, sentence_spans, sent_bounds, summary, mapping) helper_note = "" is_complex = False if include_helper and summary is not None: helper_note, is_complex = build_sentence_note(summary) attrs = { "data-sid": sentence_label, } if include_helper: attrs["data-complex"] = "1" if is_complex else "0" attrs["data-note"] = helper_note sentence_spans.append(Span(start_token=sent_start, end_token=sent_end, cls="sentence-scope", attrs=attrs)) spans.extend(sentence_spans) return render_with_spans(tokens, spans) def _build_analysis_container(fragment: str, include_helper: bool) -> str: helper_state = "on" if include_helper else "off" return f"
{fragment}
" def _build_highlighted_html(fragment: str, include_helper: bool) -> str: return f"{STYLE_BLOCK}{_build_analysis_container(fragment, include_helper)}" def _perform_analysis(text: str, include_helper: bool) -> AnalyzeResponse: sanitized_fragment = highlight_text_with_spacy(text, include_helper=include_helper) highlighted_html = _build_highlighted_html(sanitized_fragment, include_helper) return AnalyzeResponse(highlighted_html=highlighted_html) app = FastAPI(title="Grammar Highlight API (spaCy + benepar)") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post("/analyze", response_model=AnalyzeResponse) async def analyze(req: AnalyzeRequest): text = req.text if text is None or not text.strip(): raise HTTPException(status_code=400, detail="Text is required") try: return _perform_analysis(text, include_helper=False) except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc except Exception as exc: # pragma: no cover - defensive raise HTTPException(status_code=500, detail=f"Analysis failed: {exc}") from exc @app.post("/analyze/detail", response_model=AnalyzeResponse) async def analyze_with_helper(req: AnalyzeRequest): text = req.text if text is None or not text.strip(): raise HTTPException(status_code=400, detail="Text is required") try: return _perform_analysis(text, include_helper=True) except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc except Exception as exc: # pragma: no cover - defensive raise HTTPException(status_code=500, detail=f"Analysis failed: {exc}") from exc @app.get("/health") async def health(): status = "ok" if NLP is not None else "failed" detail = None if NLP is not None else str(NLP_LOAD_ERROR) payload = {"status": status} if detail: payload["detail"] = detail if BENE_PAR_WARNING: payload["warning"] = BENE_PAR_WARNING payload["benepar_attached"] = HAS_BENEPAR return payload @app.get("/proxy", response_class=HTMLResponse) async def proxy(url: Optional[str] = None, show_images: bool = False): if not url: return HTMLResponse(_render_proxy_page(show_images=show_images)) try: normalized_url, title, page_text, images, code_blocks, paragraph_meta = await _fetch_remote_plaintext(url) highlighted_fragment = highlight_text_with_spacy(page_text, paragraph_meta=paragraph_meta or None) if code_blocks: highlighted_fragment = _inject_proxy_codeblocks(highlighted_fragment, code_blocks) image_notice = None if images: if show_images: highlighted_fragment = _inject_proxy_images(highlighted_fragment, images) else: highlighted_fragment = _strip_proxy_image_markers(highlighted_fragment) image_notice = ( f"检测到 {len(images)} 张正文图片,为提速默认隐藏。勾选“显示图片”后重新抓取即可加载原图。" ) html_body = _render_proxy_page( url_value=normalized_url, message="分析完成,结果如下。", highlight_fragment=highlighted_fragment, source_url=normalized_url, source_title=title, show_images=show_images, image_notice=image_notice, source_plaintext=page_text, ) return HTMLResponse(html_body) except ValueError as exc: body = _render_proxy_page(url_value=url or "", message=str(exc), is_error=True, show_images=show_images) return HTMLResponse(body, status_code=400) except httpx.HTTPError as exc: # Provide a clearer message for common HTTP errors from the remote site. msg = None if isinstance(exc, httpx.HTTPStatusError) and exc.response is not None: status = exc.response.status_code if status == 403: msg = ( "抓取页面失败:目标站点返回 403 Forbidden(禁止访问)。" "该网站很可能禁止自动抓取或代理访问,目前无法通过本工具获取正文," "可以尝试在浏览器中打开并手动复制需要的内容。" ) else: msg = f"抓取页面失败:目标站点返回 HTTP {status}。" if msg is None: msg = f"抓取页面失败:{exc}" body = _render_proxy_page( url_value=url or "", message=msg, is_error=True, show_images=show_images, ) return HTMLResponse(body, status_code=502) except Exception as exc: body = _render_proxy_page( url_value=url or "", message=f"代理分析失败:{exc}", is_error=True, show_images=show_images, ) return HTMLResponse(body, status_code=500) @app.get("/", response_class=HTMLResponse) async def ui(): return """ Grammar Highlighter

Grammar Highlighter (spaCy + benepar)

""" PROXY_PAGE_TEMPLATE = Template( """ Grammar Proxy Highlighter $style_block

网页代理高亮

$status_block
$result_block $source_text_script
""" ) ALLOWED_URL_SCHEMES = {"http", "https"} MAX_REMOTE_HTML_BYTES = 1_000_000 REMOTE_FETCH_TIMEOUT = 10.0 REMOTE_FETCH_HEADERS = { # Use a browser-like user agent and common headers so that sites which # block generic HTTP clients are more likely to return normal content. "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", # Let httpx / the underlying HTTP stack negotiate an encoding it can # actually decode. If we unconditionally advertise "br" but the runtime # does not have brotli support installed, some sites will respond with # brotli-compressed payloads that end up as乱码 or decoding errors. # # Most modern servers default to gzip or identity when the header is # absent, which are both handled fine by httpx. # "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", # A few anti‑bot setups check these request headers; keeping them close # to real desktop Chrome values slightly improves compatibility, even # though they are not a guarantee against 403 responses. "Sec-Fetch-Site": "none", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-User": "?1", "Sec-Fetch-Dest": "document", } SIMPLE_FETCH_HEADERS = { # Minimal browser-like headers for the fallback "simple request" path. "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Connection": "close", } def _inject_proxy_images(html_fragment: str, images: List[Dict[str, str]]) -> str: """Replace stable image placeholders with tags in the highlighted HTML.""" result = html_fragment for idx, img in enumerate(images): marker = img.get("marker") or f"__GHIMG_{idx}__" src = html.escape(img.get("src", "") or "", quote=True) if not src: continue alt = html.escape(img.get("alt", "") or "", quote=True) title = html.escape(img.get("title", "") or "", quote=True) attrs = [f"src='{src}'"] if alt: attrs.append(f"alt='{alt}'") if title: attrs.append(f"title='{title}'") # Preserve simple width/height hints when they look safe. Most modern # pages rely on CSS for sizing, but explicit attributes can help keep # code snippets or diagrams close to their original scale. def _safe_dim(value: Optional[str]) -> Optional[str]: if not value: return None value = value.strip() if re.fullmatch(r"\d+(?:\.\d+)?(px|%)?", value): return value return None width = _safe_dim(img.get("width")) height = _safe_dim(img.get("height")) if width: attrs.append(f"width='{html.escape(width, quote=True)}'") if height: attrs.append(f"height='{html.escape(height, quote=True)}'") img_tag = "" # Simple textual replacement is sufficient because placeholders # are emitted as plain word tokens without HTML meta characters. result = result.replace(marker, img_tag) return result IMG_MARKER_RE = re.compile(r"__GHIMG_\d+__") def _strip_proxy_image_markers(html_fragment: str) -> str: """Remove residual image placeholders when images are hidden.""" if IMG_MARKER_RE.search(html_fragment) is None: return html_fragment return IMG_MARKER_RE.sub("", html_fragment) def _inject_proxy_codeblocks(html_fragment: str, code_blocks: List[Dict[str, str]]) -> str: """Replace code placeholders with
 blocks, preserving formatting."""
    result = html_fragment
    for idx, block in enumerate(code_blocks):
        marker = block.get("marker") or f"__GHCODE_{idx}__"
        raw = block.get("text") or ""
        if not raw.strip():
            continue
        # Escape HTML but keep newlines so that 
 preserves formatting.
        code_html = html.escape(raw, quote=False)
        pre_tag = f"
{code_html}
" result = result.replace(marker, pre_tag) return result class SimpleHTMLStripper(HTMLParser): def __init__(self): super().__init__() # Accumulate visible text into paragraph-like blocks while skipping # navigation / sidebars / ads etc. We do this with a small HTML # structure–aware state machine instead of flattening everything. self._blocks: List[Dict[str, Any]] = [] self._current_parts: List[str] = [] # Track when we are inside potentially main content containers # like
or
. self._article_depth = 0 # Track whether we are inside a preformatted code block so that we # can preserve indentation and line breaks instead of collapsing # whitespace as normal text. self._in_pre = False self._in_code = False self._current_code_chunks: List[str] = [] self._code_blocks: List[Dict[str, str]] = [] # Stack of flags indicating which open tags should be skipped. # When any active flag is True, textual data is ignored. self._skip_stack: List[bool] = [] self._skip_depth = 0 self._title_chunks: List[str] = [] self._in_title = False self._h1_chunks: List[str] = [] self._h1_main_chunks: List[str] = [] self._in_h1 = False # Collected inline images from the main content, in document order. # Each image is represented as a small dict with sanitized attributes. self._images: List[Dict[str, str]] = [] # Active list containers (
    /
      ) and current
    1. nesting state. self._list_stack: List[Dict[str, Any]] = [] self._list_item_stack: List[Dict[str, Any]] = [] # Keywords commonly used in class/id attributes for non‑article areas _NOISE_KEYWORDS = { "sidebar", "side-bar", "aside", "nav", "menu", "breadcrumb", "breadcrumbs", "pagination", "pager", "comment", "comments", "reply", "advert", "ad-", "ads", "sponsor", "promo", "promotion", "related", "recommend", "share", "social", "subscribe", "signup", "login", "popup", "modal", "banner", "cookie", "notification", "toolbar", "footer", "header-bar", } # Tags whose textual content is almost never part of the main article. _ALWAYS_SKIP_TAGS = { "script", "style", "noscript", "nav", "aside", "footer", "form", "svg", "iframe", "button", "input", "textarea", "select", "option", "label", } # Structural container tags where noise classes/roles are meaningful. # For purely inline tags we avoid applying aggressive noise heuristics # so that important inline text (e.g. spans in the first sentence) is # not accidentally dropped. _STRUCTURAL_NOISE_TAGS = { "div", "section", "aside", "nav", "header", "footer", "main", "article", "ul", "ol", "li", } # Block-level tags that naturally mark paragraph boundaries. _BLOCK_TAGS = { "p", "li", "blockquote", "h1", "h2", "h3", "h4", "h5", "h6", "pre", "table", "tr", } # Keywords for containers that are likely to hold the main article body. # Used to decide which regions count as "main content" for both text # and inline images. _CONTENT_KEYWORDS = { "content", "main-content", "article-body", "post-body", "post-content", "entry-content", "story-body", "blog-post", "markdown-body", "readable-content", } # Keywords on image-related class/id/src that usually indicate avatars, # logo icons, decorative banners, etc., which we want to drop from the # extracted main content. _IMAGE_NOISE_KEYWORDS = { "avatar", "author", "logo", "icon", "favicon", "badge", "banner", "thumb", "thumbnail", "profile", "cover", "background", "sprite", "emoji", "reaction", } _TEXT_NOISE_KEYWORDS = { "menu", "menus", "navigation", "nav", "目录", "目錄", "导航", "導航", "菜单", "菜單", "广告", "廣告", "ad", "ads", "sponsor", "sponsored", "上一篇", "下一篇", "返回顶部", "返回頂部", "分享", "分享至", "相关推荐", "相关阅读", "相關閱讀", "recommended", "related posts", "login", "signup", } _TEXT_NOISE_PREFIXES = ( "目录", "目錄", "导航", "導航", "菜单", "菜單", "广告", "廣告", "上一篇", "下一篇", "上一页", "下一页", "返回目录", "返回目錄", "返回顶部", "返回頂部", "分享", "相关", "相關", "recommended", "login", "signup", ) def _finish_paragraph(self) -> None: """Flush current buffered tokens into a paragraph list.""" if not self._current_parts: return # For regular paragraphs we still collapse excessive internal # whitespace, but we keep logical breaks between paragraphs # themselves so that the downstream highlighter can reconstruct # paragraph structure. text = " ".join(self._current_parts) text = re.sub(r"\s+", " ", text).strip() self._current_parts = [] if not text: return if self._looks_like_noise_paragraph(text): return block_kind = "paragraph" list_kind: Optional[str] = None list_depth = 0 list_index: Optional[int] = None if self._list_item_stack: list_ctx = self._list_item_stack[-1] block_kind = "list-item" list_kind = list_ctx.get("list_type") or "ul" depth_value = list_ctx.get("depth", 1) try: depth_int = int(depth_value) except (TypeError, ValueError): depth_int = 1 list_depth = min(max(depth_int, 1), 5) if list_kind == "ol": idx = list_ctx.get("index") if isinstance(idx, int): list_index = idx self._blocks.append( { "text": text, "is_main": self._article_depth > 0, "kind": block_kind, "list_kind": list_kind, "list_depth": list_depth, "list_index": list_index, } ) def _looks_like_noise_paragraph(self, text: str) -> bool: normalized = text.strip() if not normalized: return True lowered = normalized.lower() compact = re.sub(r"\s+", "", lowered) for prefix in self._TEXT_NOISE_PREFIXES: if lowered.startswith(prefix.lower()): if len(normalized) <= 80: return True if len(normalized) <= 80: for keyword in self._TEXT_NOISE_KEYWORDS: if keyword in lowered or keyword in compact: return True # Skip very short bullet-like crumbs that mostly consist of symbols. if len(normalized) <= 6 and sum(ch.isalnum() for ch in normalized) <= 1: return True return False @staticmethod def _parse_ordered_start(raw_value: Optional[str]) -> int: if raw_value is None: return 1 value = raw_value.strip() if not value: return 1 try: parsed = int(value) return parsed if parsed >= 1 else 1 except ValueError: return 1 def handle_starttag(self, tag, attrs): lowered = tag.lower() # Paragraph boundary before starting a new block element or
      . if lowered in self._BLOCK_TAGS or lowered == "br": if self._skip_depth == 0: self._finish_paragraph() # Entering a
       region – treat it as a dedicated code block.
              if lowered == "pre" and self._skip_depth == 0:
                  self._finish_paragraph()
                  self._in_pre = True
                  self._current_code_chunks = []
      
              # Decide whether this element should be skipped entirely.
              attr_dict = {k.lower(): (v or "") for k, v in attrs}
              role = attr_dict.get("role", "").lower()
              classes_ids = (attr_dict.get("class", "") + " " + attr_dict.get("id", "")).lower()
      
              is_noise_attr = False
              # Only treat class/id keywords as layout "noise" on structural
              # containers (div/section/nav/etc). Inline tags with "comment"
              # in their class (like mdspan-comment on Towards Data Science)
              # should not be discarded, otherwise we lose the first words
              # of sentences.
              if lowered in self._STRUCTURAL_NOISE_TAGS:
                  is_noise_attr = any(key in classes_ids for key in self._NOISE_KEYWORDS)
                  if role in {"navigation", "banner", "contentinfo", "complementary"}:
                      is_noise_attr = True
      
              skip_this = lowered in self._ALWAYS_SKIP_TAGS or is_noise_attr
              if skip_this:
                  self._skip_depth += 1
              self._skip_stack.append(skip_this)
      
              # Track when we are inside an article-like container; only count if not skipped.
              if self._skip_depth == 0 and lowered in {"article", "main", "section", "div"}:
                  # Treat semantic containers and common "main content" classes as
                  # part of the article area so that we keep their text and inline
                  # media but still avoid sidebars / nav.
                  if lowered in {"article", "main"} or any(
                      key in classes_ids for key in self._CONTENT_KEYWORDS
                  ) or role == "main":
                      self._article_depth += 1
      
              if self._skip_depth == 0 and lowered in {"ul", "ol"}:
                  start = 1
                  if lowered == "ol":
                      start = self._parse_ordered_start(attr_dict.get("start"))
                  self._list_stack.append(
                      {
                          "type": lowered,
                          "start": start,
                          "next_index": start,
                      }
                  )
      
              if lowered == "li" and self._skip_depth == 0:
                  list_ctx = self._list_stack[-1] if self._list_stack else None
                  depth = len(self._list_stack) if self._list_stack else 1
                  list_type = list_ctx.get("type") if list_ctx else "ul"
                  index = None
                  if list_ctx and list_ctx["type"] == "ol":
                      index = list_ctx["next_index"]
                      list_ctx["next_index"] = index + 1
                  li_value = attr_dict.get("value")
                  if li_value and list_ctx and list_ctx["type"] == "ol":
                      try:
                          value_idx = int(li_value)
                          index = value_idx
                          list_ctx["next_index"] = value_idx + 1
                      except ValueError:
                          pass
                  self._list_item_stack.append(
                      {
                          "list_type": list_type,
                          "index": index,
                          "depth": depth,
                      }
                  )
      
              if lowered == "title" and self._skip_depth == 0:
                  self._in_title = True
              if lowered == "h1" and self._skip_depth == 0:
                  self._in_h1 = True
      
              if lowered == "code" and self._skip_depth == 0 and self._in_pre:
                  # Nested  inside 
       – keep track but we don't need
                  # separate buffering beyond the enclosing pre block.
                  self._in_code = True
      
              # Inline image handling: only keep  elements that are inside the
              # main article content (tracked via _article_depth) and that do not
              # look like avatars / logos / decorative icons. We insert a stable
              # placeholder token into the text stream so that the /proxy renderer
              # can later replace it with a real  tag while preserving the
              # grammar highlighting.
              if lowered == "img" and self._skip_depth == 0 and self._article_depth > 0:
                  src = attr_dict.get("src", "").strip()
                  if src:
                      alt = attr_dict.get("alt", "") or ""
                      title = attr_dict.get("title", "") or ""
                      width = (attr_dict.get("width") or "").strip()
                      height = (attr_dict.get("height") or "").strip()
                      img_classes_ids = classes_ids + " " + src.lower()
                      if any(key in img_classes_ids for key in self._IMAGE_NOISE_KEYWORDS):
                          return
                      marker = f"__GHIMG_{len(self._images)}__"
                      img_info: Dict[str, str] = {
                          "marker": marker,
                          "src": src,
                          "alt": alt,
                          "title": title,
                      }
                      if width:
                          img_info["width"] = width
                      if height:
                          img_info["height"] = height
                      self._images.append(img_info)
                      # Treat the image as an inline token within the current
                      # paragraph. Paragraph finishing logic will ensure it
                      # stays grouped with surrounding text.
                      self._current_parts.append(marker)
      
          def handle_endtag(self, tag):
              lowered = tag.lower()
              if lowered == "code" and self._in_code:
                  self._in_code = False
      
              if lowered == "pre" and self._in_pre:
                  self._in_pre = False
                  # Finalize the current code block into a single placeholder
                  # token so that it passes through the grammar highlighter
                  # untouched, and can later be restored as a 
       block.
                  code_text = "".join(self._current_code_chunks)
                  self._current_code_chunks = []
                  if code_text.strip() and self._skip_depth == 0:
                      marker = f"__GHCODE_{len(self._code_blocks)}__"
                      self._code_blocks.append({"marker": marker, "text": code_text})
                      # We append the marker to the paragraph parts so that
                      # get_text() emits it in the right position.
                      self._current_parts.append(marker)
      
              # Closing a block element ends the current paragraph.
              if lowered in self._BLOCK_TAGS and self._skip_depth == 0:
                  self._finish_paragraph()
      
              if lowered == "li" and self._skip_depth == 0 and self._list_item_stack:
                  self._list_item_stack.pop()
              if lowered in {"ul", "ol"} and self._skip_depth == 0 and self._list_stack:
                  self._list_stack.pop()
      
              if lowered == "title":
                  self._in_title = False
              if lowered == "h1":
                  self._in_h1 = False
      
              if lowered in {"article", "main", "section"} and self._skip_depth == 0 and self._article_depth > 0:
                  self._article_depth -= 1
      
              if self._skip_stack:
                  skip_this = self._skip_stack.pop()
                  if skip_this and self._skip_depth > 0:
                      self._skip_depth -= 1
      
          def handle_data(self, data):
              if self._skip_depth > 0:
                  return
              if self._in_pre or self._in_code:
                  # Preserve code blocks exactly as they appear, including
                  # newlines and indentation.
                  self._current_code_chunks.append(data)
                  return
              stripped = data.strip()
              if not stripped:
                  return
              if self._in_title:
                  self._title_chunks.append(stripped)
                  return
      
              # Regular visible text
              self._current_parts.append(stripped)
              if self._in_h1:
                  self._h1_chunks.append(stripped)
                  if self._article_depth > 0:
                      self._h1_main_chunks.append(stripped)
      
          def get_text(self) -> str:
              # Flush any trailing paragraph.
              self._finish_paragraph()
              blocks = self._selected_blocks()
              if not blocks:
                  return ""
              return "\n\n".join(block["text"] for block in blocks)
      
          def _selected_blocks(self) -> List[Dict[str, Any]]:
              if not self._blocks:
                  return []
              main_blocks = [block for block in self._blocks if block.get("is_main")]
              return main_blocks if main_blocks else self._blocks
      
          def get_blocks(self) -> List[Dict[str, Any]]:
              blocks = self._selected_blocks()
              return [dict(block) for block in blocks]
      
          def get_title(self) -> str:
              # Prefer 

      heading (especially inside
      /
      ) as the # primary title; fall back to . if self._h1_main_chunks: raw = " ".join(self._h1_main_chunks) elif self._h1_chunks: raw = " ".join(self._h1_chunks) elif self._title_chunks: raw = " ".join(self._title_chunks) else: return "" return re.sub(r"\s+", " ", raw).strip() def get_images(self) -> List[Dict[str, str]]: """Return the list of captured inline images in document order.""" return list(self._images) def get_code_blocks(self) -> List[Dict[str, str]]: """Return captured code blocks (from <pre>/<code>) in document order.""" return list(self._code_blocks) def _normalize_target_url(raw_url: str) -> str: candidate = (raw_url or "").strip() if not candidate: raise ValueError("请输入要抓取的 URL。") parsed = urlparse(candidate if "://" in candidate else f"https://{candidate}") if parsed.scheme not in ALLOWED_URL_SCHEMES: raise ValueError("仅支持 http/https 协议链接。") if not parsed.netloc: raise ValueError("URL 缺少域名部分。") sanitized = parsed._replace(fragment="") return urlunparse(sanitized) def _fallback_html_to_text(html_body: str) -> str: """Very simple HTML-to-text fallback used when structured extraction fails. This does not attempt to distinguish main content from navigation, but it guarantees we return *something* for pages whose structure confuses the SimpleHTMLStripper heuristics (e.g. some mirror sites). """ # Drop script/style/noscript content outright. cleaned = re.sub( r"(?is)<(script|style|noscript)[^>]*>.*?</\1>", " ", html_body, ) # Convert common block separators into newlines. cleaned = re.sub(r"(?i)<br\s*/?>", "\n", cleaned) cleaned = re.sub(r"(?i)</p\s*>", "\n\n", cleaned) cleaned = re.sub(r"(?i)</(div|section|article|li|h[1-6])\s*>", "\n\n", cleaned) # Remove all remaining tags. cleaned = re.sub(r"(?is)<[^>]+>", " ", cleaned) cleaned = html.unescape(cleaned) # Normalize whitespace but keep paragraph-level blank lines. cleaned = cleaned.replace("\r", "") # Collapse runs of spaces/tabs inside lines. cleaned = re.sub(r"[ \t\f\v]+", " ", cleaned) # Collapse 3+ blank lines into just 2. cleaned = re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned) cleaned = cleaned.strip() return cleaned def _build_paragraph_metadata(blocks: List[Dict[str, Any]]) -> List[Dict[str, str]]: """Convert stripped block info into span attributes for downstream rendering.""" if not blocks: return [] paragraph_meta: List[Dict[str, str]] = [] for block in blocks: attrs: Dict[str, str] = {} if block.get("kind") == "list-item" and block.get("list_kind"): attrs["data-list-kind"] = str(block["list_kind"]) depth = block.get("list_depth") if depth: attrs["data-list-depth"] = str(depth) if block.get("list_kind") == "ol" and block.get("list_index") is not None: attrs["data-list-index"] = str(block["list_index"]) paragraph_meta.append(attrs) return paragraph_meta def _decode_html_bytes(raw_content: bytes, encoding_hint: Optional[str]) -> str: encoding_candidates: List[str] = [] if encoding_hint: encoding_candidates.append(encoding_hint) encoding_candidates.extend(["utf-8", "latin-1"]) last_exc: Optional[Exception] = None for enc in encoding_candidates: try: html_body = raw_content.decode(enc, errors="replace") break except Exception as exc: # pragma: no cover - defensive last_exc = exc else: # pragma: no cover - extremely unlikely raise RuntimeError(f"无法解码远程页面内容: {last_exc}") if len(html_body) > MAX_REMOTE_HTML_BYTES: html_body = html_body[:MAX_REMOTE_HTML_BYTES] return html_body async def _download_html_via_httpx(url: str) -> str: async with httpx.AsyncClient(timeout=REMOTE_FETCH_TIMEOUT, follow_redirects=True) as client: response = await client.get(url, headers=REMOTE_FETCH_HEADERS) html_body = _decode_html_bytes(response.content, response.encoding) response.raise_for_status() return html_body async def _download_html_via_stdlib(url: str) -> str: def _sync_fetch() -> Tuple[bytes, Optional[str]]: req = urllib_request.Request(url, headers=SIMPLE_FETCH_HEADERS) opener = urllib_request.build_opener(urllib_request.ProxyHandler({})) with opener.open(req, timeout=REMOTE_FETCH_TIMEOUT) as resp: data = resp.read(MAX_REMOTE_HTML_BYTES + 1) headers = getattr(resp, "headers", None) encoding_hint = None if headers is not None: get_charset = getattr(headers, "get_content_charset", None) if callable(get_charset): encoding_hint = get_charset() if not encoding_hint: content_type = headers.get("Content-Type", "") match = re.search(r"charset=([\w-]+)", content_type or "", re.IGNORECASE) if match: encoding_hint = match.group(1) return data, encoding_hint raw_content, encoding_hint = await asyncio.to_thread(_sync_fetch) return _decode_html_bytes(raw_content, encoding_hint) async def _download_html_with_fallback(url: str) -> str: first_exc: Optional[Exception] = None try: return await _download_html_via_httpx(url) except httpx.HTTPStatusError as exc: status = exc.response.status_code if exc.response is not None else None if status not in {401, 403, 407, 451, 429}: raise first_exc = exc except httpx.HTTPError as exc: first_exc = exc try: return await _download_html_via_stdlib(url) except (urllib_error.URLError, urllib_error.HTTPError, TimeoutError) as fallback_exc: if first_exc: raise first_exc from fallback_exc raise async def _fetch_remote_plaintext( url: str, ) -> Tuple[str, str, str, List[Dict[str, str]], List[Dict[str, str]], List[Dict[str, str]]]: normalized = _normalize_target_url(url) html_body = await _download_html_with_fallback(normalized) stripper = SimpleHTMLStripper() stripper.feed(html_body) title = stripper.get_title() or normalized images = stripper.get_images() code_blocks = stripper.get_code_blocks() plain_text = stripper.get_text() block_info = stripper.get_blocks() if not plain_text: plain_text = _fallback_html_to_text(html_body) if not plain_text: raise ValueError("未能从该页面提取正文。") # Fallback text no longer contains structured placeholders, so any # collected media/code markers would be invalid. images = [] code_blocks = [] block_info = [] paragraph_meta = _build_paragraph_metadata(block_info) return normalized, title, plain_text, images, code_blocks, paragraph_meta def _render_proxy_page( *, url_value: str = "", message: Optional[str] = None, is_error: bool = False, highlight_fragment: Optional[str] = None, helper_enabled: bool = False, source_url: Optional[str] = None, source_title: Optional[str] = None, show_images: bool = False, image_notice: Optional[str] = None, source_plaintext: Optional[str] = None, ) -> str: helper_state = "on" if helper_enabled else "off" status_block = "" if message: cls = "status err" if is_error else "status ok" status_block = f"<p class='{cls}'>{html.escape(message)}</p>" style_block = STYLE_BLOCK if highlight_fragment else "" result_block = "" source_script = "" if highlight_fragment and source_url: safe_url = html.escape(source_url, quote=True) safe_title = html.escape(source_title or source_url) image_hint = "" if image_notice: image_hint = f"<p class='image-hint'>{html.escape(image_notice)}</p>" if source_plaintext: source_script = f"<script>window.__proxySourceText = {json.dumps(source_plaintext)}</script>" result_block = ( "<section class='result'>" f"<div class='source'>原页面:<a href='{safe_url}' target='_blank' rel='noopener'>{safe_title}</a></div>" f"<div class='analysis' data-helper='{helper_state}'>{highlight_fragment}</div>" f"{image_hint}" "</section>" ) show_images_checked = "checked" if show_images else "" return PROXY_PAGE_TEMPLATE.substitute( style_block=style_block, url_value=html.escape(url_value or "", quote=True), status_block=status_block, result_block=result_block, show_images_checked=show_images_checked, source_text_script=source_script, )