|
|
@@ -0,0 +1,2275 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""Grammar highlighter powered by spaCy + benepar constituency parsing."""
|
|
|
+
|
|
|
+import asyncio
|
|
|
+import html
|
|
|
+import re
|
|
|
+from collections import Counter
|
|
|
+from dataclasses import dataclass, field
|
|
|
+from html.parser import HTMLParser
|
|
|
+from string import Template
|
|
|
+from typing import Any, Dict, List, Optional, Tuple
|
|
|
+from urllib import error as urllib_error, request as urllib_request
|
|
|
+from urllib.parse import urlparse, urlunparse
|
|
|
+
|
|
|
+import benepar
|
|
|
+import httpx
|
|
|
+import spacy
|
|
|
+from fastapi import FastAPI, HTTPException
|
|
|
+from fastapi.middleware.cors import CORSMiddleware
|
|
|
+from fastapi.responses import HTMLResponse
|
|
|
+from pydantic import BaseModel, Field
|
|
|
+from spacy.cli import download as spacy_download
|
|
|
+from spacy.language import Language
|
|
|
+from spacy.tokens import Span as SpacySpan, Token as SpacyToken
|
|
|
+from style_config import SENTENCE_HELPER_ENABLED, STYLE_BLOCK
|
|
|
+
|
|
|
+BENE_PAR_WARNING: Optional[str] = None
|
|
|
+HAS_BENEPAR: bool = False # new: track whether benepar was successfully attached
|
|
|
+
|
|
|
+
|
|
|
+def _ensure_benepar_warning(message: str) -> None:
|
|
|
+ """Record a warning once when benepar annotations are unavailable."""
|
|
|
+ global BENE_PAR_WARNING
|
|
|
+ if not BENE_PAR_WARNING:
|
|
|
+ BENE_PAR_WARNING = message
|
|
|
+
|
|
|
+
|
|
|
+def _load_spacy_pipeline(
|
|
|
+ model_name: str = "en_core_web_sm", benepar_model: str = "benepar_en3"
|
|
|
+) -> Language:
|
|
|
+ global BENE_PAR_WARNING, HAS_BENEPAR
|
|
|
+ BENE_PAR_WARNING = None
|
|
|
+ HAS_BENEPAR = False
|
|
|
+ try:
|
|
|
+ nlp = spacy.load(model_name)
|
|
|
+ except OSError:
|
|
|
+ try:
|
|
|
+ spacy_download(model_name)
|
|
|
+ nlp = spacy.load(model_name)
|
|
|
+ except Exception as exc: # pragma: no cover - install helper
|
|
|
+ raise RuntimeError(
|
|
|
+ f"spaCy model '{model_name}' is required. Install via `python -m spacy download {model_name}`."
|
|
|
+ ) from exc
|
|
|
+
|
|
|
+ # Ensure we have sentence segmentation available
|
|
|
+ pipe_names = set(nlp.pipe_names)
|
|
|
+ if not ({"parser", "senter", "sentencizer"} & pipe_names):
|
|
|
+ try:
|
|
|
+ nlp.add_pipe("sentencizer")
|
|
|
+ except Exception:
|
|
|
+ pass # if already present or unavailable, ignore
|
|
|
+
|
|
|
+ # Try to add benepar
|
|
|
+ if "benepar" not in nlp.pipe_names:
|
|
|
+ try:
|
|
|
+ nlp.add_pipe("benepar", config={"model": benepar_model}, last=True)
|
|
|
+ HAS_BENEPAR = True
|
|
|
+ except ValueError:
|
|
|
+ try:
|
|
|
+ benepar.download(benepar_model)
|
|
|
+ nlp.add_pipe("benepar", config={"model": benepar_model}, last=True)
|
|
|
+ HAS_BENEPAR = True
|
|
|
+ except Exception as exc: # pragma: no cover - install helper
|
|
|
+ HAS_BENEPAR = False
|
|
|
+ BENE_PAR_WARNING = (
|
|
|
+ "Benepar model '{model}' unavailable ({err}). Falling back to dependency-based spans."
|
|
|
+ ).format(model=benepar_model, err=exc)
|
|
|
+ except Exception as exc:
|
|
|
+ HAS_BENEPAR = False
|
|
|
+ BENE_PAR_WARNING = (
|
|
|
+ "Failed to attach benepar parser to spaCy pipeline. Falling back to dependency-based spans ({err})."
|
|
|
+ ).format(err=exc)
|
|
|
+ else:
|
|
|
+ HAS_BENEPAR = True
|
|
|
+
|
|
|
+ return nlp
|
|
|
+
|
|
|
+
|
|
|
+try:
|
|
|
+ NLP: Optional[Language] = _load_spacy_pipeline()
|
|
|
+ NLP_LOAD_ERROR: Optional[Exception] = None
|
|
|
+except Exception as exc: # pragma: no cover - import-time diagnostics
|
|
|
+ NLP = None
|
|
|
+ NLP_LOAD_ERROR = exc
|
|
|
+
|
|
|
+
|
|
|
+class AnalyzeRequest(BaseModel):
|
|
|
+ text: str = Field(..., description="Raw English text to highlight")
|
|
|
+
|
|
|
+
|
|
|
+class AnalyzeResponse(BaseModel):
|
|
|
+ highlighted_html: str
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class Token:
|
|
|
+ text: str
|
|
|
+ start: int
|
|
|
+ end: int
|
|
|
+ kind: str # 'word' | 'space' | 'punct'
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class Span:
|
|
|
+ start_token: int
|
|
|
+ end_token: int
|
|
|
+ cls: str
|
|
|
+ attrs: Optional[Dict[str, str]] = None
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class SentenceSummary:
|
|
|
+ subjects: List[str] = field(default_factory=list)
|
|
|
+ predicates: List[str] = field(default_factory=list)
|
|
|
+ objects: List[str] = field(default_factory=list)
|
|
|
+ complements: List[str] = field(default_factory=list)
|
|
|
+ clauses: List[str] = field(default_factory=list)
|
|
|
+ clause_functions: List[str] = field(default_factory=list)
|
|
|
+ connectors: List[str] = field(default_factory=list)
|
|
|
+ residual_roles: List[str] = field(default_factory=list)
|
|
|
+ sentence_length: int = 0
|
|
|
+
|
|
|
+
|
|
|
+TOKEN_REGEX = re.compile(
|
|
|
+ r"""
|
|
|
+ (?:\s+)
|
|
|
+ |(?:\d+(?:[\.,]\d+)*)
|
|
|
+ |(?:\w+(?:[-']\w+)*)
|
|
|
+ |(?:.)
|
|
|
+ """,
|
|
|
+ re.VERBOSE | re.UNICODE,
|
|
|
+)
|
|
|
+
|
|
|
+WORD_LIKE_RE = re.compile(r"\w+(?:[-']\w+)*\Z", re.UNICODE)
|
|
|
+NUMBER_RE = re.compile(r"\d+(?:[\.,]\d+)*\Z", re.UNICODE)
|
|
|
+PARAGRAPH_BREAK_RE = re.compile(r"(?:\r?\n[ \t]*){2,}")
|
|
|
+
|
|
|
+
|
|
|
+SUBJECT_DEPS = {"nsubj", "nsubjpass", "csubj", "csubjpass"}
|
|
|
+DIRECT_OBJECT_DEPS = {"dobj", "obj"}
|
|
|
+INDIRECT_OBJECT_DEPS = {"iobj", "dative"}
|
|
|
+COMPLEMENT_DEPS = {"attr", "oprd", "acomp", "ccomp", "xcomp"}
|
|
|
+ADVERBIAL_DEPS = {"advmod", "npadvmod", "advcl", "obl", "prep", "pcomp"}
|
|
|
+RELATIVE_PRONOUNS = {"which", "that", "who", "whom", "whose", "where", "when"}
|
|
|
+SUBORDINATORS_TO_FUNCTION = {
|
|
|
+ "when": "TIME",
|
|
|
+ "while": "TIME",
|
|
|
+ "after": "TIME",
|
|
|
+ "before": "TIME",
|
|
|
+ "until": "TIME",
|
|
|
+ "as": "TIME",
|
|
|
+ "once": "TIME",
|
|
|
+ "since": "TIME",
|
|
|
+ "because": "REASON",
|
|
|
+ "now that": "REASON",
|
|
|
+ "if": "CONDITION",
|
|
|
+ "unless": "CONDITION",
|
|
|
+ "provided": "CONDITION",
|
|
|
+ "provided that": "CONDITION",
|
|
|
+ "although": "CONCESSION",
|
|
|
+ "though": "CONCESSION",
|
|
|
+ "even though": "CONCESSION",
|
|
|
+ "whereas": "CONCESSION",
|
|
|
+ "so that": "RESULT",
|
|
|
+ "so": "RESULT",
|
|
|
+ "lest": "PURPOSE",
|
|
|
+ "in order that": "PURPOSE",
|
|
|
+}
|
|
|
+FINITE_VERB_TAGS = {"VBD", "VBP", "VBZ"}
|
|
|
+NONFINITE_VERB_TAGS = {"VBG", "VBN"}
|
|
|
+FIXED_MULTIWORD_PHRASES: Tuple[Tuple[re.Pattern, str], ...] = tuple(
|
|
|
+ (
|
|
|
+ re.compile(pattern, re.IGNORECASE),
|
|
|
+ label,
|
|
|
+ )
|
|
|
+ for pattern, label in [
|
|
|
+ (r"\bas well as\b", "as well as"),
|
|
|
+ (r"\brather than\b", "rather than"),
|
|
|
+ (r"\bin addition to\b", "in addition to"),
|
|
|
+ (r"\bin spite of\b", "in spite of"),
|
|
|
+ (r"\baccording to\b", "according to"),
|
|
|
+ (r"\bas soon as\b", "as soon as"),
|
|
|
+ ]
|
|
|
+)
|
|
|
+CLAUSE_FUNCTION_LABELS = {
|
|
|
+ "TIME": "时间",
|
|
|
+ "REASON": "原因",
|
|
|
+ "CONDITION": "条件",
|
|
|
+ "CONCESSION": "让步",
|
|
|
+ "RESULT": "结果",
|
|
|
+ "PURPOSE": "目的",
|
|
|
+}
|
|
|
+RESIDUAL_DEP_LABELS = {
|
|
|
+ "det": "限定词",
|
|
|
+ "prep": "介词",
|
|
|
+ "case": "介词标记",
|
|
|
+ "cc": "并列连词",
|
|
|
+ "mark": "从属连词",
|
|
|
+ "poss": "所有格标记",
|
|
|
+ "nummod": "数量修饰语",
|
|
|
+ "aux": "助动词",
|
|
|
+ "prt": "小品词",
|
|
|
+}
|
|
|
+RESIDUAL_POS_LABELS = {
|
|
|
+ "ADJ": "形容词修饰语",
|
|
|
+ "ADV": "副词",
|
|
|
+ "NUM": "数词",
|
|
|
+ "PRON": "代词",
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def _classify_segment(seg: str) -> str:
|
|
|
+ if not seg:
|
|
|
+ return "punct"
|
|
|
+ if seg.isspace():
|
|
|
+ return "space"
|
|
|
+ if NUMBER_RE.fullmatch(seg) or WORD_LIKE_RE.fullmatch(seg):
|
|
|
+ return "word"
|
|
|
+ return "punct"
|
|
|
+
|
|
|
+
|
|
|
+def _append_fallback_tokens(text: str, start: int, end: int, tokens: List[Token]) -> None:
|
|
|
+ for idx in range(start, end):
|
|
|
+ ch = text[idx]
|
|
|
+ if ch.isspace():
|
|
|
+ kind = "space"
|
|
|
+ elif ch.isalnum() or ch == "_":
|
|
|
+ kind = "word"
|
|
|
+ else:
|
|
|
+ kind = "punct"
|
|
|
+ tokens.append(Token(ch, idx, idx + 1, kind))
|
|
|
+
|
|
|
+
|
|
|
+def tokenize_preserve(text: str) -> List[Token]:
|
|
|
+ tokens: List[Token] = []
|
|
|
+ if not text:
|
|
|
+ return tokens
|
|
|
+
|
|
|
+ last_end = 0
|
|
|
+ for match in TOKEN_REGEX.finditer(text):
|
|
|
+ if match.start() > last_end:
|
|
|
+ _append_fallback_tokens(text, last_end, match.start(), tokens)
|
|
|
+ seg = text[match.start() : match.end()]
|
|
|
+ tokens.append(Token(seg, match.start(), match.end(), _classify_segment(seg)))
|
|
|
+ last_end = match.end()
|
|
|
+
|
|
|
+ if last_end < len(text):
|
|
|
+ _append_fallback_tokens(text, last_end, len(text), tokens)
|
|
|
+
|
|
|
+ if not tokens and text:
|
|
|
+ tokens = [Token(text, 0, len(text), "word" if text[0].isalnum() else "punct")]
|
|
|
+ return tokens
|
|
|
+
|
|
|
+
|
|
|
+def build_char_to_token_map(tokens: List[Token]) -> Dict[int, int]:
|
|
|
+ mapping: Dict[int, int] = {}
|
|
|
+ for idx, tok in enumerate(tokens):
|
|
|
+ for pos in range(tok.start, tok.end):
|
|
|
+ mapping[pos] = idx
|
|
|
+ return mapping
|
|
|
+
|
|
|
+
|
|
|
+def char_span_to_token_span(
|
|
|
+ char_start: int, char_end: int, mapping: Dict[int, int]
|
|
|
+) -> Tuple[int, int]:
|
|
|
+ if char_end <= char_start:
|
|
|
+ return -1, -1
|
|
|
+ start_idx = mapping.get(char_start)
|
|
|
+ end_idx = mapping.get(char_end - 1)
|
|
|
+ if start_idx is None or end_idx is None:
|
|
|
+ return -1, -1
|
|
|
+ return start_idx, end_idx + 1
|
|
|
+
|
|
|
+
|
|
|
+def add_char_based_span(
|
|
|
+ spans: List[Span],
|
|
|
+ char_start: int,
|
|
|
+ char_end: int,
|
|
|
+ cls: str,
|
|
|
+ mapping: Dict[int, int],
|
|
|
+ attrs: Optional[Dict[str, str]] = None,
|
|
|
+) -> None:
|
|
|
+ s_tok, e_tok = char_span_to_token_span(char_start, char_end, mapping)
|
|
|
+ if s_tok < 0 or e_tok < 0:
|
|
|
+ return
|
|
|
+ safe_attrs = None
|
|
|
+ if attrs:
|
|
|
+ safe_attrs = {k: html.escape(v, quote=True) for k, v in attrs.items() if v}
|
|
|
+ spans.append(Span(start_token=s_tok, end_token=e_tok, cls=cls, attrs=safe_attrs))
|
|
|
+
|
|
|
+
|
|
|
+def add_span(spans: List[Span], start_token: int, end_token: int, cls: str, attrs: Optional[Dict[str, str]] = None):
|
|
|
+ if start_token < 0 or end_token < 0 or end_token <= start_token:
|
|
|
+ return
|
|
|
+ spans.append(Span(start_token=start_token, end_token=end_token, cls=cls, attrs=attrs))
|
|
|
+
|
|
|
+
|
|
|
+def subtree_char_span(token: SpacyToken) -> Tuple[int, int]:
|
|
|
+ subtree = list(token.subtree)
|
|
|
+ if not subtree:
|
|
|
+ return token.idx, token.idx + len(token.text)
|
|
|
+ return subtree[0].idx, subtree[-1].idx + len(subtree[-1].text)
|
|
|
+
|
|
|
+
|
|
|
+def _subtree_text(token: SpacyToken) -> str:
|
|
|
+ span = token.doc[token.left_edge.i : token.right_edge.i + 1]
|
|
|
+ return span.text
|
|
|
+
|
|
|
+
|
|
|
+def _find_antecedent_word(sentence: SpacySpan, clause_start_char: int) -> Optional[str]:
|
|
|
+ candidate = None
|
|
|
+ for tok in sentence:
|
|
|
+ if tok.idx >= clause_start_char:
|
|
|
+ break
|
|
|
+ if tok.pos_ in {"NOUN", "PROPN", "PRON"}:
|
|
|
+ candidate = tok.text
|
|
|
+ return candidate
|
|
|
+
|
|
|
+
|
|
|
+def _is_nonfinite_clause(span: SpacySpan) -> bool:
|
|
|
+ tags = {tok.tag_ for tok in span if tok.tag_}
|
|
|
+ if tags & FINITE_VERB_TAGS:
|
|
|
+ return False
|
|
|
+ if "TO" in tags or tags & NONFINITE_VERB_TAGS:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def _classify_noun_clause(span: SpacySpan) -> Optional[str]:
|
|
|
+ deps = {tok.dep_ for tok in span}
|
|
|
+ if deps & {"csubj", "csubjpass"}:
|
|
|
+ return "subject"
|
|
|
+ if deps & {"ccomp", "xcomp"}:
|
|
|
+ return "complement"
|
|
|
+ if deps & {"dobj", "obj"}:
|
|
|
+ return "object"
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def _split_paragraph_ranges(text: str) -> List[Tuple[int, int]]:
|
|
|
+ """Return inclusive paragraph ranges, keeping separators intact."""
|
|
|
+ if not text:
|
|
|
+ return [(0, 0)]
|
|
|
+ ranges: List[Tuple[int, int]] = []
|
|
|
+ start = 0
|
|
|
+ for match in PARAGRAPH_BREAK_RE.finditer(text):
|
|
|
+ ranges.append((start, match.start()))
|
|
|
+ start = match.end()
|
|
|
+ ranges.append((start, len(text)))
|
|
|
+ # Ensure at least one range and sorted order
|
|
|
+ if not ranges:
|
|
|
+ ranges = [(0, len(text))]
|
|
|
+ return ranges
|
|
|
+
|
|
|
+
|
|
|
+def _circled_number(value: int) -> str:
|
|
|
+ """Return the circled number style for sentence numbering."""
|
|
|
+ if value <= 0:
|
|
|
+ return ""
|
|
|
+ if value <= 20:
|
|
|
+ return chr(ord("\u2460") + value - 1)
|
|
|
+ if 21 <= value <= 35:
|
|
|
+ return chr(ord("\u3251") + value - 21)
|
|
|
+ if 36 <= value <= 50:
|
|
|
+ return chr(ord("\u32B1") + value - 36)
|
|
|
+ return f"({value})"
|
|
|
+
|
|
|
+
|
|
|
+def annotate_constituents(
|
|
|
+ sentence: SpacySpan,
|
|
|
+ spans: List[Span],
|
|
|
+ mapping: Dict[int, int],
|
|
|
+ sentence_start_char: int,
|
|
|
+ sentence_end_char: int,
|
|
|
+ summary: Optional[SentenceSummary] = None,
|
|
|
+) -> None:
|
|
|
+ # If benepar is not attached or a previous warning indicates fallback, skip.
|
|
|
+ if not HAS_BENEPAR or BENE_PAR_WARNING:
|
|
|
+ _ensure_benepar_warning(
|
|
|
+ "Benepar component missing or unavailable. Using dependency-based spans."
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ # If the extension is not present, skip
|
|
|
+ if not SpacySpan.has_extension("constituents"):
|
|
|
+ _ensure_benepar_warning(
|
|
|
+ "Benepar component missing from spaCy pipeline. Falling back to dependency spans."
|
|
|
+ )
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ constituents = sentence._.constituents
|
|
|
+ except Exception as exc:
|
|
|
+ # Catch any error while accessing benepar results and fallback safely
|
|
|
+ _ensure_benepar_warning(
|
|
|
+ f"Benepar constituency parse unavailable: {exc}. Falling back to dependency spans."
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ seen_ranges = set()
|
|
|
+ for const in constituents:
|
|
|
+ label = getattr(const, "label_", None)
|
|
|
+ if not label:
|
|
|
+ continue
|
|
|
+ start_char, end_char = const.start_char, const.end_char
|
|
|
+ if start_char == sentence_start_char and end_char == sentence_end_char:
|
|
|
+ continue # skip the entire sentence span itself
|
|
|
+
|
|
|
+ key = (start_char, end_char, label)
|
|
|
+ is_relative = False
|
|
|
+
|
|
|
+ if label in {"PP", "ADVP"}:
|
|
|
+ if key in seen_ranges:
|
|
|
+ continue
|
|
|
+ seen_ranges.add(key)
|
|
|
+ add_char_based_span(spans, start_char, end_char, "role-adverbial", mapping)
|
|
|
+ continue
|
|
|
+
|
|
|
+ if label == "SBAR" and const:
|
|
|
+ first_token = const[0]
|
|
|
+ lowered = first_token.text.lower()
|
|
|
+ if lowered in RELATIVE_PRONOUNS:
|
|
|
+ antecedent = _find_antecedent_word(sentence, start_char)
|
|
|
+ attrs = {"data-modifies": antecedent} if antecedent else None
|
|
|
+ add_char_based_span(spans, start_char, end_char, "clause-relative", mapping, attrs)
|
|
|
+ if summary:
|
|
|
+ summary.clauses.append("定语从句")
|
|
|
+ is_relative = True
|
|
|
+ else:
|
|
|
+ function = SUBORDINATORS_TO_FUNCTION.get(lowered)
|
|
|
+ attrs = {"data-function": function}
|
|
|
+ add_char_based_span(spans, start_char, end_char, "clause-adverbial", mapping, attrs)
|
|
|
+ if summary:
|
|
|
+ summary.clauses.append("状语从句")
|
|
|
+ if function:
|
|
|
+ summary.clause_functions.append(function)
|
|
|
+ continue
|
|
|
+
|
|
|
+ if label in {"S", "VP"}:
|
|
|
+ if _is_nonfinite_clause(const):
|
|
|
+ add_char_based_span(spans, start_char, end_char, "clause-nonfinite", mapping)
|
|
|
+ if summary:
|
|
|
+ summary.clauses.append("非限定结构")
|
|
|
+ continue
|
|
|
+ if label == "S" and not is_relative:
|
|
|
+ role = _classify_noun_clause(const)
|
|
|
+ if role:
|
|
|
+ attrs = {"data-clause-role": role}
|
|
|
+ add_char_based_span(spans, start_char, end_char, "clause-noun", mapping, attrs)
|
|
|
+ if summary:
|
|
|
+ summary.clauses.append(f"名词性从句({role})")
|
|
|
+
|
|
|
+
|
|
|
+def _predicate_span_bounds(head: SpacyToken) -> Tuple[int, int]:
|
|
|
+ """Return a character range covering predicate head + functional dependents."""
|
|
|
+ tokens = [head]
|
|
|
+ for child in head.children:
|
|
|
+ if child.dep_ in {"aux", "auxpass", "prt", "cop", "neg"}:
|
|
|
+ tokens.append(child)
|
|
|
+ start_char = min(tok.idx for tok in tokens)
|
|
|
+ end_char = max(tok.idx + len(tok.text) for tok in tokens)
|
|
|
+ return start_char, end_char
|
|
|
+
|
|
|
+
|
|
|
+def _predicate_heads(sentence: SpacySpan) -> List[SpacyToken]:
|
|
|
+ """Collect predicate heads including coordinated verbs."""
|
|
|
+ candidates: List[SpacyToken] = []
|
|
|
+ for tok in sentence:
|
|
|
+ if tok.pos_ not in {"VERB", "AUX"} and tok.tag_ not in FINITE_VERB_TAGS:
|
|
|
+ continue
|
|
|
+ if tok.dep_ == "ROOT":
|
|
|
+ candidates.append(tok)
|
|
|
+ continue
|
|
|
+ if tok.dep_ == "conj" and tok.head.pos_ in {"VERB", "AUX"}:
|
|
|
+ candidates.append(tok)
|
|
|
+ continue
|
|
|
+ if tok.dep_ in {"ccomp", "xcomp", "advcl", "acl", "relcl", "parataxis"}:
|
|
|
+ candidates.append(tok)
|
|
|
+ seen = set()
|
|
|
+ ordered: List[SpacyToken] = []
|
|
|
+ for tok in sorted(candidates, key=lambda t: t.i):
|
|
|
+ if tok.i in seen:
|
|
|
+ continue
|
|
|
+ seen.add(tok.i)
|
|
|
+ ordered.append(tok)
|
|
|
+ return ordered
|
|
|
+
|
|
|
+
|
|
|
+def _add_fixed_phrases(
|
|
|
+ sentence: SpacySpan, mapping: Dict[int, int], spans: List[Span], summary: SentenceSummary
|
|
|
+) -> None:
|
|
|
+ base = sentence.start_char
|
|
|
+ text = sentence.text
|
|
|
+ for pattern, label in FIXED_MULTIWORD_PHRASES:
|
|
|
+ for match in pattern.finditer(text):
|
|
|
+ start_char = base + match.start()
|
|
|
+ end_char = base + match.end()
|
|
|
+ add_char_based_span(
|
|
|
+ spans,
|
|
|
+ start_char,
|
|
|
+ end_char,
|
|
|
+ "phrase-fixed",
|
|
|
+ mapping,
|
|
|
+ attrs={"data-phrase": label},
|
|
|
+ )
|
|
|
+ summary.connectors.append(label.lower())
|
|
|
+
|
|
|
+
|
|
|
+def annotate_sentence(
|
|
|
+ tokens: List[Token],
|
|
|
+ sentence: SpacySpan,
|
|
|
+ mapping: Dict[int, int],
|
|
|
+) -> Tuple[List[Span], SentenceSummary]:
|
|
|
+ spans: List[Span] = []
|
|
|
+ summary = SentenceSummary(sentence_length=len(sentence))
|
|
|
+ sent_bounds = char_span_to_token_span(sentence.start_char, sentence.end_char, mapping)
|
|
|
+ sent_start_tok, sent_end_tok = sent_bounds
|
|
|
+
|
|
|
+ def add_subtree(token: SpacyToken, cls: str):
|
|
|
+ start_char, end_char = subtree_char_span(token)
|
|
|
+ add_char_based_span(spans, start_char, end_char, cls, mapping)
|
|
|
+
|
|
|
+ def add_token(token: SpacyToken, cls: str):
|
|
|
+ add_char_based_span(spans, token.idx, token.idx + len(token.text), cls, mapping)
|
|
|
+
|
|
|
+ for tok in sentence:
|
|
|
+ if tok.dep_ in SUBJECT_DEPS:
|
|
|
+ add_subtree(tok, "role-subject")
|
|
|
+ summary.subjects.append(_subtree_text(tok))
|
|
|
+
|
|
|
+ for head in _predicate_heads(sentence):
|
|
|
+ start_char, end_char = _predicate_span_bounds(head)
|
|
|
+ add_char_based_span(spans, start_char, end_char, "role-predicate", mapping)
|
|
|
+ predicate_text = sentence.doc.text[start_char:end_char].strip()
|
|
|
+ summary.predicates.append(predicate_text or head.text)
|
|
|
+
|
|
|
+ for tok in sentence:
|
|
|
+ if tok.dep_ in DIRECT_OBJECT_DEPS:
|
|
|
+ add_subtree(tok, "role-object-do")
|
|
|
+ summary.objects.append(_subtree_text(tok))
|
|
|
+ break
|
|
|
+
|
|
|
+ io_token = next((tok for tok in sentence if tok.dep_ in INDIRECT_OBJECT_DEPS), None)
|
|
|
+ if io_token is None:
|
|
|
+ for tok in sentence:
|
|
|
+ if tok.dep_ == "pobj" and tok.head.dep_ == "prep" and tok.head.lemma_.lower() in {"to", "for"}:
|
|
|
+ io_token = tok
|
|
|
+ break
|
|
|
+ if io_token:
|
|
|
+ add_subtree(io_token, "role-object-io")
|
|
|
+ summary.objects.append(_subtree_text(io_token))
|
|
|
+
|
|
|
+ for tok in sentence:
|
|
|
+ if tok.dep_ in COMPLEMENT_DEPS:
|
|
|
+ add_subtree(tok, "role-complement")
|
|
|
+ summary.complements.append(_subtree_text(tok))
|
|
|
+ break
|
|
|
+
|
|
|
+ for tok in sentence:
|
|
|
+ lowered = tok.text.lower()
|
|
|
+ if tok.dep_ in {"cc", "mark", "preconj"} or tok.pos_ in {"CCONJ", "SCONJ"}:
|
|
|
+ add_token(tok, "role-connector")
|
|
|
+ summary.connectors.append(lowered)
|
|
|
+ if tok.dep_ == "det" or tok.pos_ == "DET":
|
|
|
+ add_token(tok, "role-determiner")
|
|
|
+ if tok.dep_ in {"amod", "poss", "compound", "nummod"}:
|
|
|
+ add_token(tok, "role-modifier")
|
|
|
+
|
|
|
+ adverbial_ranges = set()
|
|
|
+ for tok in sentence:
|
|
|
+ if tok.dep_ in ADVERBIAL_DEPS:
|
|
|
+ adverbial_ranges.add(subtree_char_span(tok))
|
|
|
+ for start_char, end_char in adverbial_ranges:
|
|
|
+ add_char_based_span(spans, start_char, end_char, "role-adverbial", mapping)
|
|
|
+
|
|
|
+ for tok in sentence:
|
|
|
+ if tok.dep_ == "appos":
|
|
|
+ add_subtree(tok, "role-apposition")
|
|
|
+
|
|
|
+ if sent_start_tok >= 0 and sent_end_tok >= 0:
|
|
|
+ stack = []
|
|
|
+ for idx in range(sent_start_tok, sent_end_tok):
|
|
|
+ token = tokens[idx]
|
|
|
+ if token.text == "(":
|
|
|
+ stack.append(idx)
|
|
|
+ elif token.text == ")" and stack:
|
|
|
+ add_span(spans, stack.pop(), idx + 1, "role-parenthetical")
|
|
|
+
|
|
|
+ comma_token_idxs = [
|
|
|
+ i
|
|
|
+ for i in range(sent_start_tok, sent_end_tok)
|
|
|
+ if tokens[i].kind == "punct" and tokens[i].text == ","
|
|
|
+ ]
|
|
|
+ for idx, first_comma in enumerate(comma_token_idxs):
|
|
|
+ if idx + 1 >= len(comma_token_idxs):
|
|
|
+ break
|
|
|
+ second_comma = comma_token_idxs[idx + 1]
|
|
|
+ start_char = tokens[first_comma].start
|
|
|
+ end_char = tokens[second_comma].end
|
|
|
+ span = sentence.doc.char_span(start_char, end_char, alignment_mode="expand")
|
|
|
+ if span and any(tok.tag_ == "VBG" for tok in span):
|
|
|
+ add_span(spans, first_comma, second_comma + 1, "role-absolute")
|
|
|
+
|
|
|
+ annotate_constituents(
|
|
|
+ sentence,
|
|
|
+ spans,
|
|
|
+ mapping,
|
|
|
+ sentence.start_char,
|
|
|
+ sentence.end_char,
|
|
|
+ summary,
|
|
|
+ )
|
|
|
+ _add_fixed_phrases(sentence, mapping, spans, summary)
|
|
|
+
|
|
|
+ return spans, summary
|
|
|
+
|
|
|
+
|
|
|
+def _label_residual_token(token: SpacyToken) -> Optional[str]:
|
|
|
+ dep_label = RESIDUAL_DEP_LABELS.get(token.dep_)
|
|
|
+ if dep_label:
|
|
|
+ return dep_label
|
|
|
+ return RESIDUAL_POS_LABELS.get(token.pos_)
|
|
|
+
|
|
|
+
|
|
|
+def _collect_residual_roles(
|
|
|
+ sentence: SpacySpan,
|
|
|
+ tokens: List[Token],
|
|
|
+ spans: List[Span],
|
|
|
+ sent_bounds: Tuple[int, int],
|
|
|
+ summary: SentenceSummary,
|
|
|
+ mapping: Dict[int, int],
|
|
|
+) -> None:
|
|
|
+ sent_start, sent_end = sent_bounds
|
|
|
+ if sent_start < 0 or sent_end < 0 or sent_start >= sent_end:
|
|
|
+ return
|
|
|
+ coverage = [False] * (sent_end - sent_start)
|
|
|
+ for span in spans:
|
|
|
+ lo = max(span.start_token, sent_start)
|
|
|
+ hi = min(span.end_token, sent_end)
|
|
|
+ for idx in range(lo, hi):
|
|
|
+ coverage[idx - sent_start] = True
|
|
|
+ doc = sentence.doc
|
|
|
+ for offset, covered in enumerate(coverage):
|
|
|
+ if covered:
|
|
|
+ continue
|
|
|
+ token = tokens[sent_start + offset]
|
|
|
+ if token.kind != "word":
|
|
|
+ continue
|
|
|
+ span = doc.char_span(token.start, token.end, alignment_mode="expand")
|
|
|
+ if not span or not span.text.strip():
|
|
|
+ continue
|
|
|
+ label = _label_residual_token(span[0])
|
|
|
+ if label and label not in summary.residual_roles:
|
|
|
+ summary.residual_roles.append(label)
|
|
|
+ if label:
|
|
|
+ add_char_based_span(
|
|
|
+ spans,
|
|
|
+ token.start,
|
|
|
+ token.end,
|
|
|
+ "role-residual",
|
|
|
+ mapping,
|
|
|
+ attrs={"data-role": label},
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def _classify_sentence_complexity(summary: SentenceSummary) -> Tuple[str, bool]:
|
|
|
+ clause_count = len(summary.clauses)
|
|
|
+ connector_count = len(summary.connectors)
|
|
|
+ word_count = summary.sentence_length
|
|
|
+ if clause_count >= 2:
|
|
|
+ return "多重复杂句", True
|
|
|
+ if clause_count == 1:
|
|
|
+ return "主从复合句", True
|
|
|
+ if connector_count >= 2:
|
|
|
+ return "并列复合句", True
|
|
|
+ if word_count >= 25:
|
|
|
+ return "长句", True
|
|
|
+ return "简单句", False
|
|
|
+
|
|
|
+
|
|
|
+def _translate_clause_functions(functions: List[str]) -> List[str]:
|
|
|
+ translated = []
|
|
|
+ for item in functions:
|
|
|
+ label = CLAUSE_FUNCTION_LABELS.get(item, item)
|
|
|
+ if label not in translated:
|
|
|
+ translated.append(label)
|
|
|
+ return translated
|
|
|
+
|
|
|
+
|
|
|
+def build_sentence_note(summary: SentenceSummary) -> Tuple[str, bool]:
|
|
|
+ note_parts: List[str] = []
|
|
|
+ clause_label = "无"
|
|
|
+ if summary.clauses:
|
|
|
+ counts = Counter(summary.clauses)
|
|
|
+ clause_label = "、".join(
|
|
|
+ f"{name}×{count}" if count > 1 else name for name, count in counts.items()
|
|
|
+ )
|
|
|
+ functions = _translate_clause_functions(summary.clause_functions)
|
|
|
+ connectors = list(dict.fromkeys(summary.connectors))
|
|
|
+ residual = summary.residual_roles
|
|
|
+ subjects_seq = list(dict.fromkeys(summary.subjects))
|
|
|
+ predicates_seq = list(dict.fromkeys(summary.predicates))
|
|
|
+ objects_seq = list(dict.fromkeys(summary.objects))
|
|
|
+ complements_seq = list(dict.fromkeys(summary.complements))
|
|
|
+ subjects = "、".join(subjects_seq) if subjects_seq else "未识别"
|
|
|
+ predicates = "、".join(predicates_seq) if predicates_seq else "未识别"
|
|
|
+ objects = "、".join(objects_seq) if objects_seq else "无"
|
|
|
+ complements = "、".join(complements_seq) if complements_seq else "无"
|
|
|
+ note_parts.append(f"主语:{subjects}")
|
|
|
+ note_parts.append(f"谓语:{predicates}")
|
|
|
+ note_parts.append(f"宾语:{objects}")
|
|
|
+ if complements != "无":
|
|
|
+ note_parts.append(f"补语:{complements}")
|
|
|
+ note_parts.append(f"从句:{clause_label}")
|
|
|
+ if functions:
|
|
|
+ note_parts.append(f"从句功能:{'、'.join(functions)}")
|
|
|
+ connector_text = "、".join(connectors) if connectors else "未检测到典型连接词"
|
|
|
+ note_parts.append(f"连接词:{connector_text}")
|
|
|
+ if residual:
|
|
|
+ note_parts.append(f"未高亮:{'、'.join(residual)}")
|
|
|
+ complexity_label, is_complex = _classify_sentence_complexity(summary)
|
|
|
+ note_parts.insert(0, f"句型:{complexity_label}")
|
|
|
+ note_parts.append(f"词数:{summary.sentence_length}")
|
|
|
+ return ";".join(note_parts), is_complex
|
|
|
+
|
|
|
+
|
|
|
+def render_with_spans(tokens: List[Token], spans: List[Span]) -> str:
|
|
|
+ spans = sorted(spans, key=lambda s: (s.start_token, -s.end_token))
|
|
|
+ out_parts: List[str] = []
|
|
|
+ active_stack: List[Span] = []
|
|
|
+ span_queue = list(spans)
|
|
|
+ current_idx = 0
|
|
|
+
|
|
|
+ def open_span(span: Span):
|
|
|
+ attrs = ""
|
|
|
+ if span.attrs:
|
|
|
+ attrs = " " + " ".join(
|
|
|
+ f"{k}='" + html.escape(v, quote=True) + "'" for k, v in span.attrs.items()
|
|
|
+ )
|
|
|
+ out_parts.append(f"<span class='{span.cls}'{attrs}>")
|
|
|
+
|
|
|
+ def close_span():
|
|
|
+ out_parts.append("</span>")
|
|
|
+
|
|
|
+ while current_idx < len(tokens):
|
|
|
+ opening = [sp for sp in span_queue if sp.start_token == current_idx]
|
|
|
+ for sp in opening:
|
|
|
+ open_span(sp)
|
|
|
+ active_stack.append(sp)
|
|
|
+ span_queue.remove(sp)
|
|
|
+
|
|
|
+ token = tokens[current_idx]
|
|
|
+ out_parts.append(html.escape(token.text))
|
|
|
+ current_idx += 1
|
|
|
+
|
|
|
+ while active_stack and active_stack[-1].end_token == current_idx:
|
|
|
+ active_stack.pop()
|
|
|
+ close_span()
|
|
|
+
|
|
|
+ while active_stack:
|
|
|
+ active_stack.pop()
|
|
|
+ close_span()
|
|
|
+
|
|
|
+ return "".join(out_parts)
|
|
|
+
|
|
|
+
|
|
|
+def _run_pipeline_without_benepar(text: str) -> "spacy.tokens.Doc":
|
|
|
+ """Run the spaCy pipeline skipping benepar, for robust fallback."""
|
|
|
+ assert NLP is not None
|
|
|
+ doc = NLP.make_doc(text)
|
|
|
+ for name, proc in NLP.pipeline:
|
|
|
+ if name == "benepar":
|
|
|
+ continue
|
|
|
+ doc = proc(doc)
|
|
|
+ return doc
|
|
|
+
|
|
|
+
|
|
|
+def highlight_text_with_spacy(text: str, paragraph_meta: Optional[List[Dict[str, str]]] = None) -> str:
|
|
|
+ if NLP is None:
|
|
|
+ raise RuntimeError(f"spaCy pipeline unavailable: {NLP_LOAD_ERROR}")
|
|
|
+ tokens = tokenize_preserve(text)
|
|
|
+ if not tokens:
|
|
|
+ return ""
|
|
|
+ mapping = build_char_to_token_map(tokens)
|
|
|
+
|
|
|
+ # Robust doc creation: if benepar causes any error, skip it and fallback.
|
|
|
+ try:
|
|
|
+ doc = NLP(text)
|
|
|
+ except Exception as exc:
|
|
|
+ _ensure_benepar_warning(
|
|
|
+ f"Benepar failed during processing: {exc}. Falling back to dependency-based spans."
|
|
|
+ )
|
|
|
+ doc = _run_pipeline_without_benepar(text)
|
|
|
+
|
|
|
+ paragraph_ranges = _split_paragraph_ranges(text)
|
|
|
+ paragraph_counters = [0 for _ in paragraph_ranges]
|
|
|
+ paragraph_idx = 0
|
|
|
+ paragraph_spans: List[Span] = []
|
|
|
+ paragraph_attrs = paragraph_meta if paragraph_meta and len(paragraph_meta) == len(paragraph_ranges) else None
|
|
|
+ for idx, (start, end) in enumerate(paragraph_ranges):
|
|
|
+ attrs = None
|
|
|
+ if paragraph_attrs:
|
|
|
+ attrs = paragraph_attrs[idx] or None
|
|
|
+ add_char_based_span(paragraph_spans, start, end, "paragraph-scope", mapping, attrs=attrs)
|
|
|
+
|
|
|
+ spans: List[Span] = list(paragraph_spans)
|
|
|
+
|
|
|
+ for sent in doc.sents:
|
|
|
+ while paragraph_idx < len(paragraph_ranges) and paragraph_ranges[paragraph_idx][1] <= sent.start_char:
|
|
|
+ paragraph_idx += 1
|
|
|
+ current_idx = min(paragraph_idx, len(paragraph_ranges) - 1)
|
|
|
+ paragraph_counters[current_idx] += 1
|
|
|
+ sentence_label = _circled_number(paragraph_counters[current_idx])
|
|
|
+
|
|
|
+ sentence_spans, summary = annotate_sentence(tokens, sent, mapping)
|
|
|
+ sent_bounds = char_span_to_token_span(sent.start_char, sent.end_char, mapping)
|
|
|
+ sent_start, sent_end = sent_bounds
|
|
|
+ if sent_start >= 0 and sent_end >= 0:
|
|
|
+ _collect_residual_roles(sent, tokens, sentence_spans, sent_bounds, summary, mapping)
|
|
|
+ helper_note, is_complex = build_sentence_note(summary)
|
|
|
+ attrs = {
|
|
|
+ "data-sid": sentence_label,
|
|
|
+ "data-note": helper_note,
|
|
|
+ "data-complex": "1" if is_complex else "0",
|
|
|
+ }
|
|
|
+ sentence_spans.append(Span(start_token=sent_start, end_token=sent_end, cls="sentence-scope", attrs=attrs))
|
|
|
+ spans.extend(sentence_spans)
|
|
|
+ return render_with_spans(tokens, spans)
|
|
|
+
|
|
|
+
|
|
|
+app = FastAPI(title="Grammar Highlight API (spaCy + benepar)")
|
|
|
+app.add_middleware(
|
|
|
+ CORSMiddleware,
|
|
|
+ allow_origins=["*"],
|
|
|
+ allow_credentials=True,
|
|
|
+ allow_methods=["*"],
|
|
|
+ allow_headers=["*"],
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+@app.post("/analyze", response_model=AnalyzeResponse)
|
|
|
+async def analyze(req: AnalyzeRequest):
|
|
|
+ text = req.text
|
|
|
+ if text is None or not text.strip():
|
|
|
+ raise HTTPException(status_code=400, detail="Text is required")
|
|
|
+ try:
|
|
|
+ sanitized_fragment = highlight_text_with_spacy(text)
|
|
|
+ helper_state = "on" if SENTENCE_HELPER_ENABLED else "off"
|
|
|
+ return AnalyzeResponse(
|
|
|
+ highlighted_html=f"{STYLE_BLOCK}<div class='analysis' data-helper='{helper_state}'>{sanitized_fragment}</div>"
|
|
|
+ )
|
|
|
+ except RuntimeError as exc:
|
|
|
+ raise HTTPException(status_code=500, detail=str(exc)) from exc
|
|
|
+ except Exception as exc: # pragma: no cover - defensive
|
|
|
+ raise HTTPException(status_code=500, detail=f"Analysis failed: {exc}") from exc
|
|
|
+
|
|
|
+
|
|
|
+@app.get("/health")
|
|
|
+async def health():
|
|
|
+ status = "ok" if NLP is not None else "failed"
|
|
|
+ detail = None if NLP is not None else str(NLP_LOAD_ERROR)
|
|
|
+ payload = {"status": status}
|
|
|
+ if detail:
|
|
|
+ payload["detail"] = detail
|
|
|
+ if BENE_PAR_WARNING:
|
|
|
+ payload["warning"] = BENE_PAR_WARNING
|
|
|
+ payload["benepar_attached"] = HAS_BENEPAR
|
|
|
+ return payload
|
|
|
+
|
|
|
+
|
|
|
+@app.get("/proxy", response_class=HTMLResponse)
|
|
|
+async def proxy(url: Optional[str] = None, show_images: bool = False):
|
|
|
+ if not url:
|
|
|
+ return HTMLResponse(_render_proxy_page(show_images=show_images))
|
|
|
+ try:
|
|
|
+ normalized_url, title, page_text, images, code_blocks, paragraph_meta = await _fetch_remote_plaintext(url)
|
|
|
+ highlighted_fragment = highlight_text_with_spacy(page_text, paragraph_meta=paragraph_meta or None)
|
|
|
+ if code_blocks:
|
|
|
+ highlighted_fragment = _inject_proxy_codeblocks(highlighted_fragment, code_blocks)
|
|
|
+ image_notice = None
|
|
|
+ if images:
|
|
|
+ if show_images:
|
|
|
+ highlighted_fragment = _inject_proxy_images(highlighted_fragment, images)
|
|
|
+ else:
|
|
|
+ highlighted_fragment = _strip_proxy_image_markers(highlighted_fragment)
|
|
|
+ image_notice = (
|
|
|
+ f"检测到 {len(images)} 张正文图片,为提速默认隐藏。勾选“显示图片”后重新抓取即可加载原图。"
|
|
|
+ )
|
|
|
+ html_body = _render_proxy_page(
|
|
|
+ url_value=normalized_url,
|
|
|
+ message="分析完成,结果如下。",
|
|
|
+ highlight_fragment=highlighted_fragment,
|
|
|
+ source_url=normalized_url,
|
|
|
+ source_title=title,
|
|
|
+ show_images=show_images,
|
|
|
+ image_notice=image_notice,
|
|
|
+ )
|
|
|
+ return HTMLResponse(html_body)
|
|
|
+ except ValueError as exc:
|
|
|
+ body = _render_proxy_page(url_value=url or "", message=str(exc), is_error=True, show_images=show_images)
|
|
|
+ return HTMLResponse(body, status_code=400)
|
|
|
+ except httpx.HTTPError as exc:
|
|
|
+ # Provide a clearer message for common HTTP errors from the remote site.
|
|
|
+ msg = None
|
|
|
+ if isinstance(exc, httpx.HTTPStatusError) and exc.response is not None:
|
|
|
+ status = exc.response.status_code
|
|
|
+ if status == 403:
|
|
|
+ msg = (
|
|
|
+ "抓取页面失败:目标站点返回 403 Forbidden(禁止访问)。"
|
|
|
+ "该网站很可能禁止自动抓取或代理访问,目前无法通过本工具获取正文,"
|
|
|
+ "可以尝试在浏览器中打开并手动复制需要的内容。"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ msg = f"抓取页面失败:目标站点返回 HTTP {status}。"
|
|
|
+ if msg is None:
|
|
|
+ msg = f"抓取页面失败:{exc}"
|
|
|
+ body = _render_proxy_page(
|
|
|
+ url_value=url or "",
|
|
|
+ message=msg,
|
|
|
+ is_error=True,
|
|
|
+ show_images=show_images,
|
|
|
+ )
|
|
|
+ return HTMLResponse(body, status_code=502)
|
|
|
+ except Exception as exc:
|
|
|
+ body = _render_proxy_page(
|
|
|
+ url_value=url or "",
|
|
|
+ message=f"代理分析失败:{exc}",
|
|
|
+ is_error=True,
|
|
|
+ show_images=show_images,
|
|
|
+ )
|
|
|
+ return HTMLResponse(body, status_code=500)
|
|
|
+
|
|
|
+
|
|
|
+@app.get("/", response_class=HTMLResponse)
|
|
|
+async def ui():
|
|
|
+ return """<!DOCTYPE html>
|
|
|
+<html lang=\"zh-CN\">
|
|
|
+<head>
|
|
|
+<meta charset=\"UTF-8\" />
|
|
|
+<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
|
|
|
+<title>Grammar Highlighter</title>
|
|
|
+<style>
|
|
|
+body { font-family: system-ui, -apple-system, sans-serif; margin: 2rem; line-height: 1.6; }
|
|
|
+textarea { width: 100%; min-height: 140px; font-size: 1rem; padding: 0.75rem; border: 1px solid #d0d7de; border-radius: 0.5rem; }
|
|
|
+button { margin-top: 0.75rem; padding: 0.6rem 1.4rem; font-size: 1rem; cursor: pointer; border: none; border-radius: 999px; background: #1f7a8c; color: #fff; }
|
|
|
+button + button { margin-left: 0.5rem; background: #6b7280; }
|
|
|
+button:disabled { opacity: 0.6; cursor: wait; }
|
|
|
+#result { margin-top: 1.5rem; border-top: 1px solid #e5e7eb; padding-top: 1rem; min-height: 2rem; }
|
|
|
+#status { margin-left: 0.75rem; color: #3b82f6; }
|
|
|
+.err { color: #b00020; }
|
|
|
+.muted { color: #6b7280; font-size: 0.9rem; }
|
|
|
+.tts-controls { margin-top: 0.75rem; display: flex; align-items: center; gap: 0.75rem; flex-wrap: wrap; }
|
|
|
+.tts-controls button { margin-top: 0; background: #f97316; }
|
|
|
+.tts-status { font-size: 0.95rem; color: #475569; }
|
|
|
+</style>
|
|
|
+</head>
|
|
|
+<body>
|
|
|
+<h1>Grammar Highlighter (spaCy + benepar)</h1>
|
|
|
+<textarea id=\"text\" placeholder=\"Type the English text you want to analyze...\"></textarea>
|
|
|
+<div>
|
|
|
+<button type=\"button\" id=\"submit\">Analyze</button>
|
|
|
+<button type=\"button\" id=\"clear\">清空输入</button>
|
|
|
+<span id=\"status\"></span>
|
|
|
+</div>
|
|
|
+<div class=\"tts-controls\">
|
|
|
+<button type=\"button\" id=\"tts\">朗读高亮文本</button>
|
|
|
+<button type=\"button\" id=\"tts-selection\">朗读选中文本</button>
|
|
|
+<span class=\"tts-status\" id=\"tts-status\"></span>
|
|
|
+</div>
|
|
|
+<div id=\"result\"></div>
|
|
|
+
|
|
|
+<script>
|
|
|
+const btn = document.getElementById('submit');
|
|
|
+const btnClear = document.getElementById('clear');
|
|
|
+const textarea = document.getElementById('text');
|
|
|
+const statusEl = document.getElementById('status');
|
|
|
+const ttsBtn = document.getElementById('tts');
|
|
|
+const ttsSelectionBtn = document.getElementById('tts-selection');
|
|
|
+const ttsStatus = document.getElementById('tts-status');
|
|
|
+const result = document.getElementById('result');
|
|
|
+const TTS_ENDPOINT = 'http://141.140.15.30:8028/generate';
|
|
|
+let currentAudio = null;
|
|
|
+let queuedAudios = [];
|
|
|
+let streamingFinished = false;
|
|
|
+
|
|
|
+function resetUI() {
|
|
|
+ result.innerHTML = '';
|
|
|
+ statusEl.textContent = '';
|
|
|
+ statusEl.classList.remove('err');
|
|
|
+ ttsStatus.textContent = '';
|
|
|
+ setTtsButtonsDisabled(false);
|
|
|
+ resetAudioPlayback();
|
|
|
+}
|
|
|
+
|
|
|
+btn.addEventListener('click', async () => {
|
|
|
+ resetUI();
|
|
|
+ const value = textarea.value.trim();
|
|
|
+ if (!value) {
|
|
|
+ statusEl.textContent = '请输入要分析的英文文本。';
|
|
|
+ statusEl.classList.add('err');
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ btn.disabled = true;
|
|
|
+ statusEl.textContent = 'Analyzing ...';
|
|
|
+
|
|
|
+ try {
|
|
|
+ const response = await fetch('/analyze', {
|
|
|
+ method: 'POST',
|
|
|
+ headers: { 'Content-Type': 'application/json' },
|
|
|
+ body: JSON.stringify({ text: value })
|
|
|
+ });
|
|
|
+
|
|
|
+ if (!response.ok) {
|
|
|
+ const error = await response.json().catch(() => ({ detail: 'Request failed' }));
|
|
|
+ throw new Error(error.detail || 'Request failed');
|
|
|
+ }
|
|
|
+
|
|
|
+ const data = await response.json();
|
|
|
+ result.innerHTML = data.highlighted_html || '';
|
|
|
+ statusEl.textContent = '';
|
|
|
+ } catch (err) {
|
|
|
+ statusEl.textContent = '错误:' + (err.message || 'Unknown error');
|
|
|
+ statusEl.classList.add('err');
|
|
|
+ } finally {
|
|
|
+ btn.disabled = false;
|
|
|
+ }
|
|
|
+});
|
|
|
+
|
|
|
+btnClear.addEventListener('click', () => {
|
|
|
+ textarea.value = '';
|
|
|
+ resetUI();
|
|
|
+ textarea.focus();
|
|
|
+});
|
|
|
+
|
|
|
+function extractHighlightedText() {
|
|
|
+ const highlightRoot = result.querySelector('.analysis');
|
|
|
+ return highlightRoot ? highlightRoot.textContent.trim() : '';
|
|
|
+}
|
|
|
+
|
|
|
+function setTtsButtonsDisabled(disabled) {
|
|
|
+ if (ttsBtn) {
|
|
|
+ ttsBtn.disabled = disabled;
|
|
|
+ }
|
|
|
+ if (ttsSelectionBtn) {
|
|
|
+ ttsSelectionBtn.disabled = disabled;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+function resetAudioPlayback() {
|
|
|
+ queuedAudios = [];
|
|
|
+ streamingFinished = false;
|
|
|
+ if (currentAudio) {
|
|
|
+ currentAudio.pause();
|
|
|
+ currentAudio = null;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+function markStreamingFinished() {
|
|
|
+ streamingFinished = true;
|
|
|
+ if (!currentAudio && !queuedAudios.length) {
|
|
|
+ ttsStatus.textContent = '播放完成';
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+function playNextAudioChunk() {
|
|
|
+ if (!queuedAudios.length) {
|
|
|
+ currentAudio = null;
|
|
|
+ if (streamingFinished) {
|
|
|
+ ttsStatus.textContent = '播放完成';
|
|
|
+ } else {
|
|
|
+ ttsStatus.textContent = '等待更多语音...';
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ const chunk = queuedAudios.shift();
|
|
|
+ ttsStatus.textContent = '播放中...';
|
|
|
+ currentAudio = new Audio('data:audio/wav;base64,' + chunk);
|
|
|
+ currentAudio.onended = playNextAudioChunk;
|
|
|
+ currentAudio.onerror = () => {
|
|
|
+ ttsStatus.textContent = '播放失败';
|
|
|
+ currentAudio = null;
|
|
|
+ };
|
|
|
+ currentAudio.play().catch(err => {
|
|
|
+ ttsStatus.textContent = '自动播放被阻止:' + err.message;
|
|
|
+ currentAudio = null;
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+function enqueueAudioChunk(chunk) {
|
|
|
+ queuedAudios.push(chunk);
|
|
|
+ if (!currentAudio) {
|
|
|
+ playNextAudioChunk();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+function parseTtsLine(line) {
|
|
|
+ try {
|
|
|
+ const parsed = JSON.parse(line);
|
|
|
+ if (parsed && parsed.audio) {
|
|
|
+ enqueueAudioChunk(parsed.audio);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ } catch (err) {
|
|
|
+ console.warn('无法解析TTS响应行', err);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+}
|
|
|
+
|
|
|
+async function consumeTtsResponse(response) {
|
|
|
+ let chunkCount = 0;
|
|
|
+ const handleLine = rawLine => {
|
|
|
+ const trimmed = rawLine.replace(/\\r/g, '').trim();
|
|
|
+ if (!trimmed) return;
|
|
|
+ if (parseTtsLine(trimmed)) {
|
|
|
+ chunkCount += 1;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ if (response.body && response.body.getReader) {
|
|
|
+ const reader = response.body.getReader();
|
|
|
+ const decoder = new TextDecoder();
|
|
|
+ let buffer = '';
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ const { value, done } = await reader.read();
|
|
|
+ if (done) break;
|
|
|
+ buffer += decoder.decode(value, { stream: true });
|
|
|
+ let newlineIndex;
|
|
|
+ while ((newlineIndex = buffer.indexOf('\\n')) >= 0) {
|
|
|
+ const line = buffer.slice(0, newlineIndex);
|
|
|
+ buffer = buffer.slice(newlineIndex + 1);
|
|
|
+ handleLine(line);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ buffer += decoder.decode();
|
|
|
+ if (buffer) {
|
|
|
+ handleLine(buffer);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ const payload = await response.text();
|
|
|
+ payload.split('\\n').forEach(handleLine);
|
|
|
+ }
|
|
|
+ return chunkCount;
|
|
|
+}
|
|
|
+
|
|
|
+function getSelectedPageText() {
|
|
|
+ const selection = window.getSelection ? window.getSelection() : null;
|
|
|
+ return selection ? selection.toString().trim() : '';
|
|
|
+}
|
|
|
+
|
|
|
+async function streamTtsRequest(text) {
|
|
|
+ const response = await fetch(TTS_ENDPOINT, {
|
|
|
+ method: 'POST',
|
|
|
+ headers: { 'Content-Type': 'application/json' },
|
|
|
+ body: JSON.stringify({ text })
|
|
|
+ });
|
|
|
+ if (!response.ok) {
|
|
|
+ throw new Error('接口响应错误');
|
|
|
+ }
|
|
|
+ const chunkCount = await consumeTtsResponse(response);
|
|
|
+ if (!chunkCount) {
|
|
|
+ throw new Error('接口未返回音频数据');
|
|
|
+ }
|
|
|
+ markStreamingFinished();
|
|
|
+}
|
|
|
+
|
|
|
+function createTtsRequest(textResolver, emptyMessage) {
|
|
|
+ return async () => {
|
|
|
+ const text = textResolver();
|
|
|
+ if (!text) {
|
|
|
+ ttsStatus.textContent = emptyMessage;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ setTtsButtonsDisabled(true);
|
|
|
+ ttsStatus.textContent = '请求语音...';
|
|
|
+ resetAudioPlayback();
|
|
|
+ try {
|
|
|
+ await streamTtsRequest(text);
|
|
|
+ } catch (err) {
|
|
|
+ ttsStatus.textContent = 'TTS 出错:' + (err && err.message ? err.message : err);
|
|
|
+ resetAudioPlayback();
|
|
|
+ } finally {
|
|
|
+ setTtsButtonsDisabled(false);
|
|
|
+ }
|
|
|
+ };
|
|
|
+}
|
|
|
+
|
|
|
+if (ttsBtn) {
|
|
|
+ ttsBtn.addEventListener('click', createTtsRequest(extractHighlightedText, '请先生成高亮结果'));
|
|
|
+}
|
|
|
+if (ttsSelectionBtn) {
|
|
|
+ ttsSelectionBtn.addEventListener('click', createTtsRequest(getSelectedPageText, '请先选择要朗读的文本'));
|
|
|
+}
|
|
|
+</script>
|
|
|
+</body>
|
|
|
+</html>"""
|
|
|
+PROXY_PAGE_TEMPLATE = Template(
|
|
|
+ """<!DOCTYPE html>
|
|
|
+<html lang=\"zh-CN\">
|
|
|
+<head>
|
|
|
+<meta charset=\"UTF-8\" />
|
|
|
+<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
|
|
|
+<title>Grammar Proxy Highlighter</title>
|
|
|
+<style>
|
|
|
+body { font-family: system-ui, -apple-system, \"Segoe UI\", sans-serif; margin: 0 auto; max-width: 860px; padding: 1.5rem; line-height: 1.65; }
|
|
|
+h1 { font-size: 1.45rem; margin-bottom: 1rem; }
|
|
|
+form { display: flex; flex-wrap: wrap; gap: 0.5rem; margin-bottom: 0.75rem; }
|
|
|
+input[type=\"url\"] { flex: 1 1 260px; padding: 0.65rem; font-size: 1rem; border-radius: 0.5rem; border: 1px solid #d0d7de; }
|
|
|
+button { padding: 0.65rem 1.4rem; border: none; border-radius: 999px; background: #2563eb; color: #fff; font-size: 1rem; cursor: pointer; }
|
|
|
+.show-images-toggle { display: inline-flex; align-items: center; gap: 0.35rem; font-size: 0.9rem; color: #475569; }
|
|
|
+.show-images-toggle input { width: auto; }
|
|
|
+.tts-controls { margin-top: 0.5rem; display: flex; align-items: center; flex-wrap: wrap; gap: 0.75rem; }
|
|
|
+.tts-controls button { background: #f97316; }
|
|
|
+.tts-status { font-size: 0.95rem; color: #475569; }
|
|
|
+.status { margin-top: 0.25rem; font-size: 0.95rem; }
|
|
|
+.status.err { color: #b00020; }
|
|
|
+.status.ok { color: #059669; }
|
|
|
+section.result { margin-top: 1.4rem; padding-top: 1rem; border-top: 1px solid #e5e7eb; }
|
|
|
+section.result .source { font-size: 0.95rem; margin-bottom: 0.5rem; color: #475569; word-break: break-word; }
|
|
|
+section.result .source a { color: inherit; text-decoration: underline; }
|
|
|
+section.result img { display:block; margin:0.75rem auto; max-width:100%; height:auto; max-width:min(100%,800px); }
|
|
|
+.image-hint { font-size:0.9rem; color:#6b7280; margin:0.5rem 0 0; }
|
|
|
+.clear-floating { position: fixed; left: 0; right: 0; bottom: 0; padding: 0.55rem 1.5rem; border-radius: 0; border-top: 1px solid #e5e7eb; background: rgba(249,250,251,0.96); display: flex; justify-content: center; z-index: 40; }
|
|
|
+.clear-floating button { padding: 0.55rem 1.8rem; border-radius: 999px; background: #6b7280; color: #fff; font-size: 0.95rem; }
|
|
|
+.clear-floating button:hover { filter: brightness(1.05); }
|
|
|
+@media (prefers-reduced-motion: reduce) { .clear-floating { scroll-behavior: auto; } }
|
|
|
+@media (max-width: 640px) { body { padding-bottom: 3.2rem; } }
|
|
|
+</style>
|
|
|
+$style_block
|
|
|
+</head>
|
|
|
+<body>
|
|
|
+<h1>网页代理高亮</h1>
|
|
|
+<form method=\"get\" action=\"/proxy\" class=\"url-form\">
|
|
|
+<input type=\"url\" name=\"url\" value=\"$url_value\" placeholder=\"https://example.com/article\" required />
|
|
|
+<button type=\"submit\">抓取并高亮</button>
|
|
|
+<label class=\"show-images-toggle\">
|
|
|
+ <input type=\"checkbox\" name=\"show_images\" value=\"1\" $show_images_checked />
|
|
|
+ <span>显示图片(默认关闭以提升速度)</span>
|
|
|
+</label>
|
|
|
+</form>
|
|
|
+$status_block
|
|
|
+<div class=\"tts-controls\">
|
|
|
+ <button type=\"button\" id=\"proxy-tts-btn\" disabled>朗读高亮文本</button>
|
|
|
+ <button type=\"button\" id=\"proxy-tts-selection\">朗读选中文本</button>
|
|
|
+ <span class=\"tts-status\" id=\"proxy-tts-status\"></span>
|
|
|
+</div>
|
|
|
+$result_block
|
|
|
+<div class=\"clear-floating\">
|
|
|
+ <button type=\"button\" id=\"proxy-reset\">清空并重置</button>
|
|
|
+</div>
|
|
|
+<script>
|
|
|
+(function() {
|
|
|
+ var resetBtn = document.getElementById('proxy-reset');
|
|
|
+ if (resetBtn) {
|
|
|
+ resetBtn.addEventListener('click', function() {
|
|
|
+ // 简单做法:回到无参数的 /proxy,相当于重置页面状态
|
|
|
+ window.location.href = '/proxy';
|
|
|
+ });
|
|
|
+ }
|
|
|
+ var ttsBtn = document.getElementById('proxy-tts-btn');
|
|
|
+ var ttsSelectionBtn = document.getElementById('proxy-tts-selection');
|
|
|
+ var ttsStatus = document.getElementById('proxy-tts-status');
|
|
|
+ var TTS_ENDPOINT = 'http://141.140.15.30:8028/generate';
|
|
|
+ var currentAudio = null;
|
|
|
+ var queuedAudios = [];
|
|
|
+ var streamingFinished = false;
|
|
|
+
|
|
|
+ function extractProxyText() {
|
|
|
+ var container = document.querySelector('section.result .analysis');
|
|
|
+ return container ? container.textContent.trim() : '';
|
|
|
+ }
|
|
|
+
|
|
|
+ function setTtsButtonsDisabled(disabled) {
|
|
|
+ if (ttsBtn) {
|
|
|
+ ttsBtn.disabled = disabled;
|
|
|
+ }
|
|
|
+ if (ttsSelectionBtn) {
|
|
|
+ ttsSelectionBtn.disabled = disabled;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ function resetAudioPlayback() {
|
|
|
+ queuedAudios = [];
|
|
|
+ streamingFinished = false;
|
|
|
+ if (currentAudio) {
|
|
|
+ currentAudio.pause();
|
|
|
+ currentAudio = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ function markStreamingFinished() {
|
|
|
+ streamingFinished = true;
|
|
|
+ if (!currentAudio && !queuedAudios.length) {
|
|
|
+ ttsStatus.textContent = '播放完成';
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ function playNextAudioChunk() {
|
|
|
+ if (!queuedAudios.length) {
|
|
|
+ currentAudio = null;
|
|
|
+ if (streamingFinished) {
|
|
|
+ ttsStatus.textContent = '播放完成';
|
|
|
+ } else {
|
|
|
+ ttsStatus.textContent = '等待更多语音...';
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ var chunk = queuedAudios.shift();
|
|
|
+ ttsStatus.textContent = '播放中...';
|
|
|
+ currentAudio = new Audio('data:audio/wav;base64,' + chunk);
|
|
|
+ currentAudio.onended = playNextAudioChunk;
|
|
|
+ currentAudio.onerror = function() {
|
|
|
+ ttsStatus.textContent = '播放失败';
|
|
|
+ currentAudio = null;
|
|
|
+ };
|
|
|
+ currentAudio.play().catch(function(err) {
|
|
|
+ ttsStatus.textContent = '自动播放被阻止:' + err.message;
|
|
|
+ currentAudio = null;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ function enqueueAudioChunk(chunk) {
|
|
|
+ queuedAudios.push(chunk);
|
|
|
+ if (!currentAudio) {
|
|
|
+ playNextAudioChunk();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ function parseTtsLine(line) {
|
|
|
+ try {
|
|
|
+ var parsed = JSON.parse(line);
|
|
|
+ if (parsed && parsed.audio) {
|
|
|
+ enqueueAudioChunk(parsed.audio);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ } catch (err) {
|
|
|
+ console.warn('无法解析TTS响应行', err);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ async function consumeTtsResponse(response) {
|
|
|
+ var chunkCount = 0;
|
|
|
+ var handleLine = function(rawLine) {
|
|
|
+ var trimmed = rawLine.replace(/\\r/g, '').trim();
|
|
|
+ if (!trimmed) return;
|
|
|
+ if (parseTtsLine(trimmed)) {
|
|
|
+ chunkCount += 1;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ if (response.body && response.body.getReader) {
|
|
|
+ var reader = response.body.getReader();
|
|
|
+ var decoder = new TextDecoder();
|
|
|
+ var buffer = '';
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ var readResult = await reader.read();
|
|
|
+ if (readResult.done) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ buffer += decoder.decode(readResult.value, { stream: true });
|
|
|
+ var newlineIndex;
|
|
|
+ while ((newlineIndex = buffer.indexOf('\\n')) >= 0) {
|
|
|
+ var line = buffer.slice(0, newlineIndex);
|
|
|
+ buffer = buffer.slice(newlineIndex + 1);
|
|
|
+ handleLine(line);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ buffer += decoder.decode();
|
|
|
+ if (buffer) {
|
|
|
+ handleLine(buffer);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ var payload = await response.text();
|
|
|
+ payload.split('\\n').forEach(handleLine);
|
|
|
+ }
|
|
|
+ return chunkCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ function getSelectedPageText() {
|
|
|
+ var selection = window.getSelection ? window.getSelection() : null;
|
|
|
+ return selection ? selection.toString().trim() : '';
|
|
|
+ }
|
|
|
+
|
|
|
+ async function streamTtsRequest(text) {
|
|
|
+ var response = await fetch(TTS_ENDPOINT, {
|
|
|
+ method: 'POST',
|
|
|
+ headers: { 'Content-Type': 'application/json' },
|
|
|
+ body: JSON.stringify({ text: text })
|
|
|
+ });
|
|
|
+ if (!response.ok) {
|
|
|
+ throw new Error('接口响应错误');
|
|
|
+ }
|
|
|
+ var chunkCount = await consumeTtsResponse(response);
|
|
|
+ if (!chunkCount) {
|
|
|
+ throw new Error('接口未返回音频数据');
|
|
|
+ }
|
|
|
+ markStreamingFinished();
|
|
|
+ }
|
|
|
+
|
|
|
+ function createTtsRequest(textResolver, emptyMessage) {
|
|
|
+ return async function() {
|
|
|
+ var text = textResolver();
|
|
|
+ if (!text) {
|
|
|
+ ttsStatus.textContent = emptyMessage;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ setTtsButtonsDisabled(true);
|
|
|
+ ttsStatus.textContent = '请求语音...';
|
|
|
+ resetAudioPlayback();
|
|
|
+ try {
|
|
|
+ await streamTtsRequest(text);
|
|
|
+ } catch (err) {
|
|
|
+ ttsStatus.textContent = 'TTS 出错:' + (err && err.message ? err.message : err);
|
|
|
+ resetAudioPlayback();
|
|
|
+ } finally {
|
|
|
+ setTtsButtonsDisabled(false);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ if (ttsBtn) {
|
|
|
+ ttsBtn.addEventListener('click', createTtsRequest(extractProxyText, '暂无可朗读内容'));
|
|
|
+ var hasText = !!extractProxyText();
|
|
|
+ ttsBtn.disabled = !hasText;
|
|
|
+ if (!hasText) {
|
|
|
+ ttsStatus.textContent = '高亮完成后可朗读';
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (ttsSelectionBtn) {
|
|
|
+ ttsSelectionBtn.addEventListener('click', createTtsRequest(getSelectedPageText, '请先选择要朗读的文本'));
|
|
|
+ }
|
|
|
+})();
|
|
|
+</script>
|
|
|
+</body>
|
|
|
+</html>"""
|
|
|
+)
|
|
|
+
|
|
|
+ALLOWED_URL_SCHEMES = {"http", "https"}
|
|
|
+MAX_REMOTE_HTML_BYTES = 1_000_000
|
|
|
+REMOTE_FETCH_TIMEOUT = 10.0
|
|
|
+REMOTE_FETCH_HEADERS = {
|
|
|
+ # Use a browser-like user agent and common headers so that sites which
|
|
|
+ # block generic HTTP clients are more likely to return normal content.
|
|
|
+ "User-Agent": (
|
|
|
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
|
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
|
+ "Chrome/124.0.0.0 Safari/537.36"
|
|
|
+ ),
|
|
|
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
|
+ "Accept-Language": "en-US,en;q=0.9",
|
|
|
+ # Let httpx / the underlying HTTP stack negotiate an encoding it can
|
|
|
+ # actually decode. If we unconditionally advertise "br" but the runtime
|
|
|
+ # does not have brotli support installed, some sites will respond with
|
|
|
+ # brotli-compressed payloads that end up as乱码 or decoding errors.
|
|
|
+ #
|
|
|
+ # Most modern servers default to gzip or identity when the header is
|
|
|
+ # absent, which are both handled fine by httpx.
|
|
|
+ # "Accept-Encoding": "gzip, deflate, br",
|
|
|
+ "Connection": "keep-alive",
|
|
|
+ "Upgrade-Insecure-Requests": "1",
|
|
|
+ # A few anti‑bot setups check these request headers; keeping them close
|
|
|
+ # to real desktop Chrome values slightly improves compatibility, even
|
|
|
+ # though they are not a guarantee against 403 responses.
|
|
|
+ "Sec-Fetch-Site": "none",
|
|
|
+ "Sec-Fetch-Mode": "navigate",
|
|
|
+ "Sec-Fetch-User": "?1",
|
|
|
+ "Sec-Fetch-Dest": "document",
|
|
|
+}
|
|
|
+SIMPLE_FETCH_HEADERS = {
|
|
|
+ # Minimal browser-like headers for the fallback "simple request" path.
|
|
|
+ "User-Agent": (
|
|
|
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
|
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
|
+ "Chrome/124.0.0.0 Safari/537.36"
|
|
|
+ ),
|
|
|
+ "Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8",
|
|
|
+ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
|
|
+ "Connection": "close",
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def _inject_proxy_images(html_fragment: str, images: List[Dict[str, str]]) -> str:
|
|
|
+ """Replace stable image placeholders with <img> tags in the highlighted HTML."""
|
|
|
+ result = html_fragment
|
|
|
+ for idx, img in enumerate(images):
|
|
|
+ marker = img.get("marker") or f"__GHIMG_{idx}__"
|
|
|
+ src = html.escape(img.get("src", "") or "", quote=True)
|
|
|
+ if not src:
|
|
|
+ continue
|
|
|
+ alt = html.escape(img.get("alt", "") or "", quote=True)
|
|
|
+ title = html.escape(img.get("title", "") or "", quote=True)
|
|
|
+ attrs = [f"src='{src}'"]
|
|
|
+ if alt:
|
|
|
+ attrs.append(f"alt='{alt}'")
|
|
|
+ if title:
|
|
|
+ attrs.append(f"title='{title}'")
|
|
|
+ # Preserve simple width/height hints when they look safe. Most modern
|
|
|
+ # pages rely on CSS for sizing, but explicit attributes can help keep
|
|
|
+ # code snippets or diagrams close to their original scale.
|
|
|
+ def _safe_dim(value: Optional[str]) -> Optional[str]:
|
|
|
+ if not value:
|
|
|
+ return None
|
|
|
+ value = value.strip()
|
|
|
+ if re.fullmatch(r"\d+(?:\.\d+)?(px|%)?", value):
|
|
|
+ return value
|
|
|
+ return None
|
|
|
+
|
|
|
+ width = _safe_dim(img.get("width"))
|
|
|
+ height = _safe_dim(img.get("height"))
|
|
|
+ if width:
|
|
|
+ attrs.append(f"width='{html.escape(width, quote=True)}'")
|
|
|
+ if height:
|
|
|
+ attrs.append(f"height='{html.escape(height, quote=True)}'")
|
|
|
+ img_tag = "<img " + " ".join(attrs) + " />"
|
|
|
+ # Simple textual replacement is sufficient because placeholders
|
|
|
+ # are emitted as plain word tokens without HTML meta characters.
|
|
|
+ result = result.replace(marker, img_tag)
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+IMG_MARKER_RE = re.compile(r"__GHIMG_\d+__")
|
|
|
+
|
|
|
+
|
|
|
+def _strip_proxy_image_markers(html_fragment: str) -> str:
|
|
|
+ """Remove residual image placeholders when images are hidden."""
|
|
|
+ if IMG_MARKER_RE.search(html_fragment) is None:
|
|
|
+ return html_fragment
|
|
|
+ return IMG_MARKER_RE.sub("", html_fragment)
|
|
|
+
|
|
|
+
|
|
|
+def _inject_proxy_codeblocks(html_fragment: str, code_blocks: List[Dict[str, str]]) -> str:
|
|
|
+ """Replace code placeholders with <pre><code> blocks, preserving formatting."""
|
|
|
+ result = html_fragment
|
|
|
+ for idx, block in enumerate(code_blocks):
|
|
|
+ marker = block.get("marker") or f"__GHCODE_{idx}__"
|
|
|
+ raw = block.get("text") or ""
|
|
|
+ if not raw.strip():
|
|
|
+ continue
|
|
|
+ # Escape HTML but keep newlines so that <pre> preserves formatting.
|
|
|
+ code_html = html.escape(raw, quote=False)
|
|
|
+ pre_tag = f"<pre><code>{code_html}</code></pre>"
|
|
|
+ result = result.replace(marker, pre_tag)
|
|
|
+ return result
|
|
|
+
|
|
|
+class SimpleHTMLStripper(HTMLParser):
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__()
|
|
|
+ # Accumulate visible text into paragraph-like blocks while skipping
|
|
|
+ # navigation / sidebars / ads etc. We do this with a small HTML
|
|
|
+ # structure–aware state machine instead of flattening everything.
|
|
|
+ self._blocks: List[Dict[str, Any]] = []
|
|
|
+ self._current_parts: List[str] = []
|
|
|
+
|
|
|
+ # Track when we are inside potentially main content containers
|
|
|
+ # like <article> or <main>.
|
|
|
+ self._article_depth = 0
|
|
|
+
|
|
|
+ # Track whether we are inside a preformatted code block so that we
|
|
|
+ # can preserve indentation and line breaks instead of collapsing
|
|
|
+ # whitespace as normal text.
|
|
|
+ self._in_pre = False
|
|
|
+ self._in_code = False
|
|
|
+ self._current_code_chunks: List[str] = []
|
|
|
+ self._code_blocks: List[Dict[str, str]] = []
|
|
|
+
|
|
|
+ # Stack of flags indicating which open tags should be skipped.
|
|
|
+ # When any active flag is True, textual data is ignored.
|
|
|
+ self._skip_stack: List[bool] = []
|
|
|
+ self._skip_depth = 0
|
|
|
+
|
|
|
+ self._title_chunks: List[str] = []
|
|
|
+ self._in_title = False
|
|
|
+ self._h1_chunks: List[str] = []
|
|
|
+ self._h1_main_chunks: List[str] = []
|
|
|
+ self._in_h1 = False
|
|
|
+
|
|
|
+ # Collected inline images from the main content, in document order.
|
|
|
+ # Each image is represented as a small dict with sanitized attributes.
|
|
|
+ self._images: List[Dict[str, str]] = []
|
|
|
+ # Active list containers (<ul>/<ol>) and current <li> nesting state.
|
|
|
+ self._list_stack: List[Dict[str, Any]] = []
|
|
|
+ self._list_item_stack: List[Dict[str, Any]] = []
|
|
|
+
|
|
|
+ # Keywords commonly used in class/id attributes for non‑article areas
|
|
|
+ _NOISE_KEYWORDS = {
|
|
|
+ "sidebar",
|
|
|
+ "side-bar",
|
|
|
+ "aside",
|
|
|
+ "nav",
|
|
|
+ "menu",
|
|
|
+ "breadcrumb",
|
|
|
+ "breadcrumbs",
|
|
|
+ "pagination",
|
|
|
+ "pager",
|
|
|
+ "comment",
|
|
|
+ "comments",
|
|
|
+ "reply",
|
|
|
+ "advert",
|
|
|
+ "ad-",
|
|
|
+ "ads",
|
|
|
+ "sponsor",
|
|
|
+ "promo",
|
|
|
+ "promotion",
|
|
|
+ "related",
|
|
|
+ "recommend",
|
|
|
+ "share",
|
|
|
+ "social",
|
|
|
+ "subscribe",
|
|
|
+ "signup",
|
|
|
+ "login",
|
|
|
+ "popup",
|
|
|
+ "modal",
|
|
|
+ "banner",
|
|
|
+ "cookie",
|
|
|
+ "notification",
|
|
|
+ "toolbar",
|
|
|
+ "footer",
|
|
|
+ "header-bar",
|
|
|
+ }
|
|
|
+
|
|
|
+ # Tags whose textual content is almost never part of the main article.
|
|
|
+ _ALWAYS_SKIP_TAGS = {
|
|
|
+ "script",
|
|
|
+ "style",
|
|
|
+ "noscript",
|
|
|
+ "nav",
|
|
|
+ "aside",
|
|
|
+ "footer",
|
|
|
+ "form",
|
|
|
+ "svg",
|
|
|
+ "iframe",
|
|
|
+ "button",
|
|
|
+ "input",
|
|
|
+ "textarea",
|
|
|
+ "select",
|
|
|
+ "option",
|
|
|
+ "label",
|
|
|
+ }
|
|
|
+
|
|
|
+ # Structural container tags where noise classes/roles are meaningful.
|
|
|
+ # For purely inline tags we avoid applying aggressive noise heuristics
|
|
|
+ # so that important inline text (e.g. spans in the first sentence) is
|
|
|
+ # not accidentally dropped.
|
|
|
+ _STRUCTURAL_NOISE_TAGS = {
|
|
|
+ "div",
|
|
|
+ "section",
|
|
|
+ "aside",
|
|
|
+ "nav",
|
|
|
+ "header",
|
|
|
+ "footer",
|
|
|
+ "main",
|
|
|
+ "article",
|
|
|
+ "ul",
|
|
|
+ "ol",
|
|
|
+ "li",
|
|
|
+ }
|
|
|
+
|
|
|
+ # Block-level tags that naturally mark paragraph boundaries.
|
|
|
+ _BLOCK_TAGS = {
|
|
|
+ "p",
|
|
|
+ "li",
|
|
|
+ "blockquote",
|
|
|
+ "h1",
|
|
|
+ "h2",
|
|
|
+ "h3",
|
|
|
+ "h4",
|
|
|
+ "h5",
|
|
|
+ "h6",
|
|
|
+ "pre",
|
|
|
+ "table",
|
|
|
+ "tr",
|
|
|
+ }
|
|
|
+
|
|
|
+ # Keywords for containers that are likely to hold the main article body.
|
|
|
+ # Used to decide which regions count as "main content" for both text
|
|
|
+ # and inline images.
|
|
|
+ _CONTENT_KEYWORDS = {
|
|
|
+ "content",
|
|
|
+ "main-content",
|
|
|
+ "article-body",
|
|
|
+ "post-body",
|
|
|
+ "post-content",
|
|
|
+ "entry-content",
|
|
|
+ "story-body",
|
|
|
+ "blog-post",
|
|
|
+ "markdown-body",
|
|
|
+ "readable-content",
|
|
|
+ }
|
|
|
+
|
|
|
+ # Keywords on image-related class/id/src that usually indicate avatars,
|
|
|
+ # logo icons, decorative banners, etc., which we want to drop from the
|
|
|
+ # extracted main content.
|
|
|
+ _IMAGE_NOISE_KEYWORDS = {
|
|
|
+ "avatar",
|
|
|
+ "author",
|
|
|
+ "logo",
|
|
|
+ "icon",
|
|
|
+ "favicon",
|
|
|
+ "badge",
|
|
|
+ "banner",
|
|
|
+ "thumb",
|
|
|
+ "thumbnail",
|
|
|
+ "profile",
|
|
|
+ "cover",
|
|
|
+ "background",
|
|
|
+ "sprite",
|
|
|
+ "emoji",
|
|
|
+ "reaction",
|
|
|
+ }
|
|
|
+ _TEXT_NOISE_KEYWORDS = {
|
|
|
+ "menu",
|
|
|
+ "menus",
|
|
|
+ "navigation",
|
|
|
+ "nav",
|
|
|
+ "目录",
|
|
|
+ "目錄",
|
|
|
+ "导航",
|
|
|
+ "導航",
|
|
|
+ "菜单",
|
|
|
+ "菜單",
|
|
|
+ "广告",
|
|
|
+ "廣告",
|
|
|
+ "ad",
|
|
|
+ "ads",
|
|
|
+ "sponsor",
|
|
|
+ "sponsored",
|
|
|
+ "上一篇",
|
|
|
+ "下一篇",
|
|
|
+ "返回顶部",
|
|
|
+ "返回頂部",
|
|
|
+ "分享",
|
|
|
+ "分享至",
|
|
|
+ "相关推荐",
|
|
|
+ "相关阅读",
|
|
|
+ "相關閱讀",
|
|
|
+ "recommended",
|
|
|
+ "related posts",
|
|
|
+ "login",
|
|
|
+ "signup",
|
|
|
+ }
|
|
|
+ _TEXT_NOISE_PREFIXES = (
|
|
|
+ "目录",
|
|
|
+ "目錄",
|
|
|
+ "导航",
|
|
|
+ "導航",
|
|
|
+ "菜单",
|
|
|
+ "菜單",
|
|
|
+ "广告",
|
|
|
+ "廣告",
|
|
|
+ "上一篇",
|
|
|
+ "下一篇",
|
|
|
+ "上一页",
|
|
|
+ "下一页",
|
|
|
+ "返回目录",
|
|
|
+ "返回目錄",
|
|
|
+ "返回顶部",
|
|
|
+ "返回頂部",
|
|
|
+ "分享",
|
|
|
+ "相关",
|
|
|
+ "相關",
|
|
|
+ "recommended",
|
|
|
+ "login",
|
|
|
+ "signup",
|
|
|
+ )
|
|
|
+
|
|
|
+ def _finish_paragraph(self) -> None:
|
|
|
+ """Flush current buffered tokens into a paragraph list."""
|
|
|
+ if not self._current_parts:
|
|
|
+ return
|
|
|
+ # For regular paragraphs we still collapse excessive internal
|
|
|
+ # whitespace, but we keep logical breaks between paragraphs
|
|
|
+ # themselves so that the downstream highlighter can reconstruct
|
|
|
+ # paragraph structure.
|
|
|
+ text = " ".join(self._current_parts)
|
|
|
+ text = re.sub(r"\s+", " ", text).strip()
|
|
|
+ self._current_parts = []
|
|
|
+ if not text:
|
|
|
+ return
|
|
|
+ if self._looks_like_noise_paragraph(text):
|
|
|
+ return
|
|
|
+ block_kind = "paragraph"
|
|
|
+ list_kind: Optional[str] = None
|
|
|
+ list_depth = 0
|
|
|
+ list_index: Optional[int] = None
|
|
|
+ if self._list_item_stack:
|
|
|
+ list_ctx = self._list_item_stack[-1]
|
|
|
+ block_kind = "list-item"
|
|
|
+ list_kind = list_ctx.get("list_type") or "ul"
|
|
|
+ depth_value = list_ctx.get("depth", 1)
|
|
|
+ try:
|
|
|
+ depth_int = int(depth_value)
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ depth_int = 1
|
|
|
+ list_depth = min(max(depth_int, 1), 5)
|
|
|
+ if list_kind == "ol":
|
|
|
+ idx = list_ctx.get("index")
|
|
|
+ if isinstance(idx, int):
|
|
|
+ list_index = idx
|
|
|
+ self._blocks.append(
|
|
|
+ {
|
|
|
+ "text": text,
|
|
|
+ "is_main": self._article_depth > 0,
|
|
|
+ "kind": block_kind,
|
|
|
+ "list_kind": list_kind,
|
|
|
+ "list_depth": list_depth,
|
|
|
+ "list_index": list_index,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ def _looks_like_noise_paragraph(self, text: str) -> bool:
|
|
|
+ normalized = text.strip()
|
|
|
+ if not normalized:
|
|
|
+ return True
|
|
|
+ lowered = normalized.lower()
|
|
|
+ compact = re.sub(r"\s+", "", lowered)
|
|
|
+ for prefix in self._TEXT_NOISE_PREFIXES:
|
|
|
+ if lowered.startswith(prefix.lower()):
|
|
|
+ if len(normalized) <= 80:
|
|
|
+ return True
|
|
|
+ if len(normalized) <= 80:
|
|
|
+ for keyword in self._TEXT_NOISE_KEYWORDS:
|
|
|
+ if keyword in lowered or keyword in compact:
|
|
|
+ return True
|
|
|
+ # Skip very short bullet-like crumbs that mostly consist of symbols.
|
|
|
+ if len(normalized) <= 6 and sum(ch.isalnum() for ch in normalized) <= 1:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _parse_ordered_start(raw_value: Optional[str]) -> int:
|
|
|
+ if raw_value is None:
|
|
|
+ return 1
|
|
|
+ value = raw_value.strip()
|
|
|
+ if not value:
|
|
|
+ return 1
|
|
|
+ try:
|
|
|
+ parsed = int(value)
|
|
|
+ return parsed if parsed >= 1 else 1
|
|
|
+ except ValueError:
|
|
|
+ return 1
|
|
|
+
|
|
|
+ def handle_starttag(self, tag, attrs):
|
|
|
+ lowered = tag.lower()
|
|
|
+ # Paragraph boundary before starting a new block element or <br>.
|
|
|
+ if lowered in self._BLOCK_TAGS or lowered == "br":
|
|
|
+ if self._skip_depth == 0:
|
|
|
+ self._finish_paragraph()
|
|
|
+
|
|
|
+ # Entering a <pre> region – treat it as a dedicated code block.
|
|
|
+ if lowered == "pre" and self._skip_depth == 0:
|
|
|
+ self._finish_paragraph()
|
|
|
+ self._in_pre = True
|
|
|
+ self._current_code_chunks = []
|
|
|
+
|
|
|
+ # Decide whether this element should be skipped entirely.
|
|
|
+ attr_dict = {k.lower(): (v or "") for k, v in attrs}
|
|
|
+ role = attr_dict.get("role", "").lower()
|
|
|
+ classes_ids = (attr_dict.get("class", "") + " " + attr_dict.get("id", "")).lower()
|
|
|
+
|
|
|
+ is_noise_attr = False
|
|
|
+ # Only treat class/id keywords as layout "noise" on structural
|
|
|
+ # containers (div/section/nav/etc). Inline tags with "comment"
|
|
|
+ # in their class (like mdspan-comment on Towards Data Science)
|
|
|
+ # should not be discarded, otherwise we lose the first words
|
|
|
+ # of sentences.
|
|
|
+ if lowered in self._STRUCTURAL_NOISE_TAGS:
|
|
|
+ is_noise_attr = any(key in classes_ids for key in self._NOISE_KEYWORDS)
|
|
|
+ if role in {"navigation", "banner", "contentinfo", "complementary"}:
|
|
|
+ is_noise_attr = True
|
|
|
+
|
|
|
+ skip_this = lowered in self._ALWAYS_SKIP_TAGS or is_noise_attr
|
|
|
+ if skip_this:
|
|
|
+ self._skip_depth += 1
|
|
|
+ self._skip_stack.append(skip_this)
|
|
|
+
|
|
|
+ # Track when we are inside an article-like container; only count if not skipped.
|
|
|
+ if self._skip_depth == 0 and lowered in {"article", "main", "section", "div"}:
|
|
|
+ # Treat semantic containers and common "main content" classes as
|
|
|
+ # part of the article area so that we keep their text and inline
|
|
|
+ # media but still avoid sidebars / nav.
|
|
|
+ if lowered in {"article", "main"} or any(
|
|
|
+ key in classes_ids for key in self._CONTENT_KEYWORDS
|
|
|
+ ) or role == "main":
|
|
|
+ self._article_depth += 1
|
|
|
+
|
|
|
+ if self._skip_depth == 0 and lowered in {"ul", "ol"}:
|
|
|
+ start = 1
|
|
|
+ if lowered == "ol":
|
|
|
+ start = self._parse_ordered_start(attr_dict.get("start"))
|
|
|
+ self._list_stack.append(
|
|
|
+ {
|
|
|
+ "type": lowered,
|
|
|
+ "start": start,
|
|
|
+ "next_index": start,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ if lowered == "li" and self._skip_depth == 0:
|
|
|
+ list_ctx = self._list_stack[-1] if self._list_stack else None
|
|
|
+ depth = len(self._list_stack) if self._list_stack else 1
|
|
|
+ list_type = list_ctx.get("type") if list_ctx else "ul"
|
|
|
+ index = None
|
|
|
+ if list_ctx and list_ctx["type"] == "ol":
|
|
|
+ index = list_ctx["next_index"]
|
|
|
+ list_ctx["next_index"] = index + 1
|
|
|
+ li_value = attr_dict.get("value")
|
|
|
+ if li_value and list_ctx and list_ctx["type"] == "ol":
|
|
|
+ try:
|
|
|
+ value_idx = int(li_value)
|
|
|
+ index = value_idx
|
|
|
+ list_ctx["next_index"] = value_idx + 1
|
|
|
+ except ValueError:
|
|
|
+ pass
|
|
|
+ self._list_item_stack.append(
|
|
|
+ {
|
|
|
+ "list_type": list_type,
|
|
|
+ "index": index,
|
|
|
+ "depth": depth,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ if lowered == "title" and self._skip_depth == 0:
|
|
|
+ self._in_title = True
|
|
|
+ if lowered == "h1" and self._skip_depth == 0:
|
|
|
+ self._in_h1 = True
|
|
|
+
|
|
|
+ if lowered == "code" and self._skip_depth == 0 and self._in_pre:
|
|
|
+ # Nested <code> inside <pre> – keep track but we don't need
|
|
|
+ # separate buffering beyond the enclosing pre block.
|
|
|
+ self._in_code = True
|
|
|
+
|
|
|
+ # Inline image handling: only keep <img> elements that are inside the
|
|
|
+ # main article content (tracked via _article_depth) and that do not
|
|
|
+ # look like avatars / logos / decorative icons. We insert a stable
|
|
|
+ # placeholder token into the text stream so that the /proxy renderer
|
|
|
+ # can later replace it with a real <img> tag while preserving the
|
|
|
+ # grammar highlighting.
|
|
|
+ if lowered == "img" and self._skip_depth == 0 and self._article_depth > 0:
|
|
|
+ src = attr_dict.get("src", "").strip()
|
|
|
+ if src:
|
|
|
+ alt = attr_dict.get("alt", "") or ""
|
|
|
+ title = attr_dict.get("title", "") or ""
|
|
|
+ width = (attr_dict.get("width") or "").strip()
|
|
|
+ height = (attr_dict.get("height") or "").strip()
|
|
|
+ img_classes_ids = classes_ids + " " + src.lower()
|
|
|
+ if any(key in img_classes_ids for key in self._IMAGE_NOISE_KEYWORDS):
|
|
|
+ return
|
|
|
+ marker = f"__GHIMG_{len(self._images)}__"
|
|
|
+ img_info: Dict[str, str] = {
|
|
|
+ "marker": marker,
|
|
|
+ "src": src,
|
|
|
+ "alt": alt,
|
|
|
+ "title": title,
|
|
|
+ }
|
|
|
+ if width:
|
|
|
+ img_info["width"] = width
|
|
|
+ if height:
|
|
|
+ img_info["height"] = height
|
|
|
+ self._images.append(img_info)
|
|
|
+ # Treat the image as an inline token within the current
|
|
|
+ # paragraph. Paragraph finishing logic will ensure it
|
|
|
+ # stays grouped with surrounding text.
|
|
|
+ self._current_parts.append(marker)
|
|
|
+
|
|
|
+ def handle_endtag(self, tag):
|
|
|
+ lowered = tag.lower()
|
|
|
+ if lowered == "code" and self._in_code:
|
|
|
+ self._in_code = False
|
|
|
+
|
|
|
+ if lowered == "pre" and self._in_pre:
|
|
|
+ self._in_pre = False
|
|
|
+ # Finalize the current code block into a single placeholder
|
|
|
+ # token so that it passes through the grammar highlighter
|
|
|
+ # untouched, and can later be restored as a <pre><code> block.
|
|
|
+ code_text = "".join(self._current_code_chunks)
|
|
|
+ self._current_code_chunks = []
|
|
|
+ if code_text.strip() and self._skip_depth == 0:
|
|
|
+ marker = f"__GHCODE_{len(self._code_blocks)}__"
|
|
|
+ self._code_blocks.append({"marker": marker, "text": code_text})
|
|
|
+ # We append the marker to the paragraph parts so that
|
|
|
+ # get_text() emits it in the right position.
|
|
|
+ self._current_parts.append(marker)
|
|
|
+
|
|
|
+ # Closing a block element ends the current paragraph.
|
|
|
+ if lowered in self._BLOCK_TAGS and self._skip_depth == 0:
|
|
|
+ self._finish_paragraph()
|
|
|
+
|
|
|
+ if lowered == "li" and self._skip_depth == 0 and self._list_item_stack:
|
|
|
+ self._list_item_stack.pop()
|
|
|
+ if lowered in {"ul", "ol"} and self._skip_depth == 0 and self._list_stack:
|
|
|
+ self._list_stack.pop()
|
|
|
+
|
|
|
+ if lowered == "title":
|
|
|
+ self._in_title = False
|
|
|
+ if lowered == "h1":
|
|
|
+ self._in_h1 = False
|
|
|
+
|
|
|
+ if lowered in {"article", "main", "section"} and self._skip_depth == 0 and self._article_depth > 0:
|
|
|
+ self._article_depth -= 1
|
|
|
+
|
|
|
+ if self._skip_stack:
|
|
|
+ skip_this = self._skip_stack.pop()
|
|
|
+ if skip_this and self._skip_depth > 0:
|
|
|
+ self._skip_depth -= 1
|
|
|
+
|
|
|
+ def handle_data(self, data):
|
|
|
+ if self._skip_depth > 0:
|
|
|
+ return
|
|
|
+ if self._in_pre or self._in_code:
|
|
|
+ # Preserve code blocks exactly as they appear, including
|
|
|
+ # newlines and indentation.
|
|
|
+ self._current_code_chunks.append(data)
|
|
|
+ return
|
|
|
+ stripped = data.strip()
|
|
|
+ if not stripped:
|
|
|
+ return
|
|
|
+ if self._in_title:
|
|
|
+ self._title_chunks.append(stripped)
|
|
|
+ return
|
|
|
+
|
|
|
+ # Regular visible text
|
|
|
+ self._current_parts.append(stripped)
|
|
|
+ if self._in_h1:
|
|
|
+ self._h1_chunks.append(stripped)
|
|
|
+ if self._article_depth > 0:
|
|
|
+ self._h1_main_chunks.append(stripped)
|
|
|
+
|
|
|
+ def get_text(self) -> str:
|
|
|
+ # Flush any trailing paragraph.
|
|
|
+ self._finish_paragraph()
|
|
|
+ blocks = self._selected_blocks()
|
|
|
+ if not blocks:
|
|
|
+ return ""
|
|
|
+ return "\n\n".join(block["text"] for block in blocks)
|
|
|
+
|
|
|
+ def _selected_blocks(self) -> List[Dict[str, Any]]:
|
|
|
+ if not self._blocks:
|
|
|
+ return []
|
|
|
+ main_blocks = [block for block in self._blocks if block.get("is_main")]
|
|
|
+ return main_blocks if main_blocks else self._blocks
|
|
|
+
|
|
|
+ def get_blocks(self) -> List[Dict[str, Any]]:
|
|
|
+ blocks = self._selected_blocks()
|
|
|
+ return [dict(block) for block in blocks]
|
|
|
+
|
|
|
+ def get_title(self) -> str:
|
|
|
+ # Prefer <h1> heading (especially inside <article>/<main>) as the
|
|
|
+ # primary title; fall back to <title>.
|
|
|
+ if self._h1_main_chunks:
|
|
|
+ raw = " ".join(self._h1_main_chunks)
|
|
|
+ elif self._h1_chunks:
|
|
|
+ raw = " ".join(self._h1_chunks)
|
|
|
+ elif self._title_chunks:
|
|
|
+ raw = " ".join(self._title_chunks)
|
|
|
+ else:
|
|
|
+ return ""
|
|
|
+ return re.sub(r"\s+", " ", raw).strip()
|
|
|
+
|
|
|
+ def get_images(self) -> List[Dict[str, str]]:
|
|
|
+ """Return the list of captured inline images in document order."""
|
|
|
+ return list(self._images)
|
|
|
+
|
|
|
+ def get_code_blocks(self) -> List[Dict[str, str]]:
|
|
|
+ """Return captured code blocks (from <pre>/<code>) in document order."""
|
|
|
+ return list(self._code_blocks)
|
|
|
+
|
|
|
+
|
|
|
+def _normalize_target_url(raw_url: str) -> str:
|
|
|
+ candidate = (raw_url or "").strip()
|
|
|
+ if not candidate:
|
|
|
+ raise ValueError("请输入要抓取的 URL。")
|
|
|
+ parsed = urlparse(candidate if "://" in candidate else f"https://{candidate}")
|
|
|
+ if parsed.scheme not in ALLOWED_URL_SCHEMES:
|
|
|
+ raise ValueError("仅支持 http/https 协议链接。")
|
|
|
+ if not parsed.netloc:
|
|
|
+ raise ValueError("URL 缺少域名部分。")
|
|
|
+ sanitized = parsed._replace(fragment="")
|
|
|
+ return urlunparse(sanitized)
|
|
|
+
|
|
|
+
|
|
|
+def _fallback_html_to_text(html_body: str) -> str:
|
|
|
+ """Very simple HTML-to-text fallback used when structured extraction fails.
|
|
|
+
|
|
|
+ This does not attempt to distinguish main content from navigation, but it
|
|
|
+ guarantees we return *something* for pages whose structure confuses the
|
|
|
+ SimpleHTMLStripper heuristics (e.g. some mirror sites).
|
|
|
+ """
|
|
|
+ # Drop script/style/noscript content outright.
|
|
|
+ cleaned = re.sub(
|
|
|
+ r"(?is)<(script|style|noscript)[^>]*>.*?</\1>",
|
|
|
+ " ",
|
|
|
+ html_body,
|
|
|
+ )
|
|
|
+ # Convert common block separators into newlines.
|
|
|
+ cleaned = re.sub(r"(?i)<br\s*/?>", "\n", cleaned)
|
|
|
+ cleaned = re.sub(r"(?i)</p\s*>", "\n\n", cleaned)
|
|
|
+ cleaned = re.sub(r"(?i)</(div|section|article|li|h[1-6])\s*>", "\n\n", cleaned)
|
|
|
+ # Remove all remaining tags.
|
|
|
+ cleaned = re.sub(r"(?is)<[^>]+>", " ", cleaned)
|
|
|
+ cleaned = html.unescape(cleaned)
|
|
|
+ # Normalize whitespace but keep paragraph-level blank lines.
|
|
|
+ cleaned = cleaned.replace("\r", "")
|
|
|
+ # Collapse runs of spaces/tabs inside lines.
|
|
|
+ cleaned = re.sub(r"[ \t\f\v]+", " ", cleaned)
|
|
|
+ # Collapse 3+ blank lines into just 2.
|
|
|
+ cleaned = re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned)
|
|
|
+ cleaned = cleaned.strip()
|
|
|
+ return cleaned
|
|
|
+
|
|
|
+
|
|
|
+def _build_paragraph_metadata(blocks: List[Dict[str, Any]]) -> List[Dict[str, str]]:
|
|
|
+ """Convert stripped block info into span attributes for downstream rendering."""
|
|
|
+ if not blocks:
|
|
|
+ return []
|
|
|
+ paragraph_meta: List[Dict[str, str]] = []
|
|
|
+ for block in blocks:
|
|
|
+ attrs: Dict[str, str] = {}
|
|
|
+ if block.get("kind") == "list-item" and block.get("list_kind"):
|
|
|
+ attrs["data-list-kind"] = str(block["list_kind"])
|
|
|
+ depth = block.get("list_depth")
|
|
|
+ if depth:
|
|
|
+ attrs["data-list-depth"] = str(depth)
|
|
|
+ if block.get("list_kind") == "ol" and block.get("list_index") is not None:
|
|
|
+ attrs["data-list-index"] = str(block["list_index"])
|
|
|
+ paragraph_meta.append(attrs)
|
|
|
+ return paragraph_meta
|
|
|
+
|
|
|
+
|
|
|
+def _decode_html_bytes(raw_content: bytes, encoding_hint: Optional[str]) -> str:
|
|
|
+ encoding_candidates: List[str] = []
|
|
|
+ if encoding_hint:
|
|
|
+ encoding_candidates.append(encoding_hint)
|
|
|
+ encoding_candidates.extend(["utf-8", "latin-1"])
|
|
|
+ last_exc: Optional[Exception] = None
|
|
|
+ for enc in encoding_candidates:
|
|
|
+ try:
|
|
|
+ html_body = raw_content.decode(enc, errors="replace")
|
|
|
+ break
|
|
|
+ except Exception as exc: # pragma: no cover - defensive
|
|
|
+ last_exc = exc
|
|
|
+ else: # pragma: no cover - extremely unlikely
|
|
|
+ raise RuntimeError(f"无法解码远程页面内容: {last_exc}")
|
|
|
+ if len(html_body) > MAX_REMOTE_HTML_BYTES:
|
|
|
+ html_body = html_body[:MAX_REMOTE_HTML_BYTES]
|
|
|
+ return html_body
|
|
|
+
|
|
|
+
|
|
|
+async def _download_html_via_httpx(url: str) -> str:
|
|
|
+ async with httpx.AsyncClient(timeout=REMOTE_FETCH_TIMEOUT, follow_redirects=True) as client:
|
|
|
+ response = await client.get(url, headers=REMOTE_FETCH_HEADERS)
|
|
|
+ html_body = _decode_html_bytes(response.content, response.encoding)
|
|
|
+ response.raise_for_status()
|
|
|
+ return html_body
|
|
|
+
|
|
|
+
|
|
|
+async def _download_html_via_stdlib(url: str) -> str:
|
|
|
+ def _sync_fetch() -> Tuple[bytes, Optional[str]]:
|
|
|
+ req = urllib_request.Request(url, headers=SIMPLE_FETCH_HEADERS)
|
|
|
+ opener = urllib_request.build_opener(urllib_request.ProxyHandler({}))
|
|
|
+ with opener.open(req, timeout=REMOTE_FETCH_TIMEOUT) as resp:
|
|
|
+ data = resp.read(MAX_REMOTE_HTML_BYTES + 1)
|
|
|
+ headers = getattr(resp, "headers", None)
|
|
|
+ encoding_hint = None
|
|
|
+ if headers is not None:
|
|
|
+ get_charset = getattr(headers, "get_content_charset", None)
|
|
|
+ if callable(get_charset):
|
|
|
+ encoding_hint = get_charset()
|
|
|
+ if not encoding_hint:
|
|
|
+ content_type = headers.get("Content-Type", "")
|
|
|
+ match = re.search(r"charset=([\w-]+)", content_type or "", re.IGNORECASE)
|
|
|
+ if match:
|
|
|
+ encoding_hint = match.group(1)
|
|
|
+ return data, encoding_hint
|
|
|
+
|
|
|
+ raw_content, encoding_hint = await asyncio.to_thread(_sync_fetch)
|
|
|
+ return _decode_html_bytes(raw_content, encoding_hint)
|
|
|
+
|
|
|
+
|
|
|
+async def _download_html_with_fallback(url: str) -> str:
|
|
|
+ first_exc: Optional[Exception] = None
|
|
|
+ try:
|
|
|
+ return await _download_html_via_httpx(url)
|
|
|
+ except httpx.HTTPStatusError as exc:
|
|
|
+ status = exc.response.status_code if exc.response is not None else None
|
|
|
+ if status not in {401, 403, 407, 451, 429}:
|
|
|
+ raise
|
|
|
+ first_exc = exc
|
|
|
+ except httpx.HTTPError as exc:
|
|
|
+ first_exc = exc
|
|
|
+
|
|
|
+ try:
|
|
|
+ return await _download_html_via_stdlib(url)
|
|
|
+ except (urllib_error.URLError, urllib_error.HTTPError, TimeoutError) as fallback_exc:
|
|
|
+ if first_exc:
|
|
|
+ raise first_exc from fallback_exc
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+async def _fetch_remote_plaintext(
|
|
|
+ url: str,
|
|
|
+) -> Tuple[str, str, str, List[Dict[str, str]], List[Dict[str, str]], List[Dict[str, str]]]:
|
|
|
+ normalized = _normalize_target_url(url)
|
|
|
+ html_body = await _download_html_with_fallback(normalized)
|
|
|
+ stripper = SimpleHTMLStripper()
|
|
|
+ stripper.feed(html_body)
|
|
|
+ title = stripper.get_title() or normalized
|
|
|
+ images = stripper.get_images()
|
|
|
+ code_blocks = stripper.get_code_blocks()
|
|
|
+ plain_text = stripper.get_text()
|
|
|
+ block_info = stripper.get_blocks()
|
|
|
+ if not plain_text:
|
|
|
+ plain_text = _fallback_html_to_text(html_body)
|
|
|
+ if not plain_text:
|
|
|
+ raise ValueError("未能从该页面提取正文。")
|
|
|
+ # Fallback text no longer contains structured placeholders, so any
|
|
|
+ # collected media/code markers would be invalid.
|
|
|
+ images = []
|
|
|
+ code_blocks = []
|
|
|
+ block_info = []
|
|
|
+ paragraph_meta = _build_paragraph_metadata(block_info)
|
|
|
+ return normalized, title, plain_text, images, code_blocks, paragraph_meta
|
|
|
+
|
|
|
+
|
|
|
+def _render_proxy_page(
|
|
|
+ *,
|
|
|
+ url_value: str = "",
|
|
|
+ message: Optional[str] = None,
|
|
|
+ is_error: bool = False,
|
|
|
+ highlight_fragment: Optional[str] = None,
|
|
|
+ source_url: Optional[str] = None,
|
|
|
+ source_title: Optional[str] = None,
|
|
|
+ show_images: bool = False,
|
|
|
+ image_notice: Optional[str] = None,
|
|
|
+) -> str:
|
|
|
+ helper_state = "on" if SENTENCE_HELPER_ENABLED else "off"
|
|
|
+ status_block = ""
|
|
|
+ if message:
|
|
|
+ cls = "status err" if is_error else "status ok"
|
|
|
+ status_block = f"<p class='{cls}'>{html.escape(message)}</p>"
|
|
|
+
|
|
|
+ style_block = STYLE_BLOCK if highlight_fragment else ""
|
|
|
+ result_block = ""
|
|
|
+ if highlight_fragment and source_url:
|
|
|
+ safe_url = html.escape(source_url, quote=True)
|
|
|
+ safe_title = html.escape(source_title or source_url)
|
|
|
+ image_hint = ""
|
|
|
+ if image_notice:
|
|
|
+ image_hint = f"<p class='image-hint'>{html.escape(image_notice)}</p>"
|
|
|
+ result_block = (
|
|
|
+ "<section class='result'>"
|
|
|
+ f"<div class='source'>原页面:<a href='{safe_url}' target='_blank' rel='noopener'>{safe_title}</a></div>"
|
|
|
+ f"<div class='analysis' data-helper='{helper_state}'>{highlight_fragment}</div>"
|
|
|
+ f"{image_hint}"
|
|
|
+ "</section>"
|
|
|
+ )
|
|
|
+
|
|
|
+ show_images_checked = "checked" if show_images else ""
|
|
|
+ return PROXY_PAGE_TEMPLATE.substitute(
|
|
|
+ style_block=style_block,
|
|
|
+ url_value=html.escape(url_value or "", quote=True),
|
|
|
+ status_block=status_block,
|
|
|
+ result_block=result_block,
|
|
|
+ show_images_checked=show_images_checked,
|
|
|
+ )
|