"""
)
ALLOWED_URL_SCHEMES = {"http", "https"}
MAX_REMOTE_HTML_BYTES = 1_000_000
REMOTE_FETCH_TIMEOUT = 10.0
REMOTE_FETCH_HEADERS = {
# Use a browser-like user agent and common headers so that sites which
# block generic HTTP clients are more likely to return normal content.
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
# Let httpx / the underlying HTTP stack negotiate an encoding it can
# actually decode. If we unconditionally advertise "br" but the runtime
# does not have brotli support installed, some sites will respond with
# brotli-compressed payloads that end up as乱码 or decoding errors.
#
# Most modern servers default to gzip or identity when the header is
# absent, which are both handled fine by httpx.
# "Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
# A few anti‑bot setups check these request headers; keeping them close
# to real desktop Chrome values slightly improves compatibility, even
# though they are not a guarantee against 403 responses.
"Sec-Fetch-Site": "none",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-User": "?1",
"Sec-Fetch-Dest": "document",
}
SIMPLE_FETCH_HEADERS = {
# Minimal browser-like headers for the fallback "simple request" path.
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "close",
}
def _inject_proxy_images(html_fragment: str, images: List[Dict[str, str]]) -> str:
"""Replace stable image placeholders with tags in the highlighted HTML."""
result = html_fragment
for idx, img in enumerate(images):
marker = img.get("marker") or f"__GHIMG_{idx}__"
src = html.escape(img.get("src", "") or "", quote=True)
if not src:
continue
alt = html.escape(img.get("alt", "") or "", quote=True)
title = html.escape(img.get("title", "") or "", quote=True)
attrs = [f"src='{src}'"]
if alt:
attrs.append(f"alt='{alt}'")
if title:
attrs.append(f"title='{title}'")
# Preserve simple width/height hints when they look safe. Most modern
# pages rely on CSS for sizing, but explicit attributes can help keep
# code snippets or diagrams close to their original scale.
def _safe_dim(value: Optional[str]) -> Optional[str]:
if not value:
return None
value = value.strip()
if re.fullmatch(r"\d+(?:\.\d+)?(px|%)?", value):
return value
return None
width = _safe_dim(img.get("width"))
height = _safe_dim(img.get("height"))
if width:
attrs.append(f"width='{html.escape(width, quote=True)}'")
if height:
attrs.append(f"height='{html.escape(height, quote=True)}'")
img_tag = ""
# Simple textual replacement is sufficient because placeholders
# are emitted as plain word tokens without HTML meta characters.
result = result.replace(marker, img_tag)
return result
IMG_MARKER_RE = re.compile(r"__GHIMG_\d+__")
def _strip_proxy_image_markers(html_fragment: str) -> str:
"""Remove residual image placeholders when images are hidden."""
if IMG_MARKER_RE.search(html_fragment) is None:
return html_fragment
return IMG_MARKER_RE.sub("", html_fragment)
def _inject_proxy_codeblocks(html_fragment: str, code_blocks: List[Dict[str, str]]) -> str:
"""Replace code placeholders with
blocks, preserving formatting."""
result = html_fragment
for idx, block in enumerate(code_blocks):
marker = block.get("marker") or f"__GHCODE_{idx}__"
raw = block.get("text") or ""
if not raw.strip():
continue
# Escape HTML but keep newlines so that
"
result = result.replace(marker, pre_tag)
return result
class SimpleHTMLStripper(HTMLParser):
def __init__(self):
super().__init__()
# Accumulate visible text into paragraph-like blocks while skipping
# navigation / sidebars / ads etc. We do this with a small HTML
# structure–aware state machine instead of flattening everything.
self._blocks: List[Dict[str, Any]] = []
self._current_parts: List[str] = []
# Track when we are inside potentially main content containers
# like or .
self._article_depth = 0
# Track whether we are inside a preformatted code block so that we
# can preserve indentation and line breaks instead of collapsing
# whitespace as normal text.
self._in_pre = False
self._in_code = False
self._current_code_chunks: List[str] = []
self._code_blocks: List[Dict[str, str]] = []
# Stack of flags indicating which open tags should be skipped.
# When any active flag is True, textual data is ignored.
self._skip_stack: List[bool] = []
self._skip_depth = 0
self._title_chunks: List[str] = []
self._in_title = False
self._h1_chunks: List[str] = []
self._h1_main_chunks: List[str] = []
self._in_h1 = False
# Collected inline images from the main content, in document order.
# Each image is represented as a small dict with sanitized attributes.
self._images: List[Dict[str, str]] = []
# Active list containers (
/) and current
nesting state.
self._list_stack: List[Dict[str, Any]] = []
self._list_item_stack: List[Dict[str, Any]] = []
# Keywords commonly used in class/id attributes for non‑article areas
_NOISE_KEYWORDS = {
"sidebar",
"side-bar",
"aside",
"nav",
"menu",
"breadcrumb",
"breadcrumbs",
"pagination",
"pager",
"comment",
"comments",
"reply",
"advert",
"ad-",
"ads",
"sponsor",
"promo",
"promotion",
"related",
"recommend",
"share",
"social",
"subscribe",
"signup",
"login",
"popup",
"modal",
"banner",
"cookie",
"notification",
"toolbar",
"footer",
"header-bar",
}
# Tags whose textual content is almost never part of the main article.
_ALWAYS_SKIP_TAGS = {
"script",
"style",
"noscript",
"nav",
"aside",
"footer",
"form",
"svg",
"iframe",
"button",
"input",
"textarea",
"select",
"option",
"label",
}
# Structural container tags where noise classes/roles are meaningful.
# For purely inline tags we avoid applying aggressive noise heuristics
# so that important inline text (e.g. spans in the first sentence) is
# not accidentally dropped.
_STRUCTURAL_NOISE_TAGS = {
"div",
"section",
"aside",
"nav",
"header",
"footer",
"main",
"article",
"ul",
"ol",
"li",
}
# Block-level tags that naturally mark paragraph boundaries.
_BLOCK_TAGS = {
"p",
"li",
"blockquote",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"pre",
"table",
"tr",
}
# Keywords for containers that are likely to hold the main article body.
# Used to decide which regions count as "main content" for both text
# and inline images.
_CONTENT_KEYWORDS = {
"content",
"main-content",
"article-body",
"post-body",
"post-content",
"entry-content",
"story-body",
"blog-post",
"markdown-body",
"readable-content",
}
# Keywords on image-related class/id/src that usually indicate avatars,
# logo icons, decorative banners, etc., which we want to drop from the
# extracted main content.
_IMAGE_NOISE_KEYWORDS = {
"avatar",
"author",
"logo",
"icon",
"favicon",
"badge",
"banner",
"thumb",
"thumbnail",
"profile",
"cover",
"background",
"sprite",
"emoji",
"reaction",
}
_TEXT_NOISE_KEYWORDS = {
"menu",
"menus",
"navigation",
"nav",
"目录",
"目錄",
"导航",
"導航",
"菜单",
"菜單",
"广告",
"廣告",
"ad",
"ads",
"sponsor",
"sponsored",
"上一篇",
"下一篇",
"返回顶部",
"返回頂部",
"分享",
"分享至",
"相关推荐",
"相关阅读",
"相關閱讀",
"recommended",
"related posts",
"login",
"signup",
}
_TEXT_NOISE_PREFIXES = (
"目录",
"目錄",
"导航",
"導航",
"菜单",
"菜單",
"广告",
"廣告",
"上一篇",
"下一篇",
"上一页",
"下一页",
"返回目录",
"返回目錄",
"返回顶部",
"返回頂部",
"分享",
"相关",
"相關",
"recommended",
"login",
"signup",
)
def _finish_paragraph(self) -> None:
"""Flush current buffered tokens into a paragraph list."""
if not self._current_parts:
return
# For regular paragraphs we still collapse excessive internal
# whitespace, but we keep logical breaks between paragraphs
# themselves so that the downstream highlighter can reconstruct
# paragraph structure.
text = " ".join(self._current_parts)
text = re.sub(r"\s+", " ", text).strip()
self._current_parts = []
if not text:
return
if self._looks_like_noise_paragraph(text):
return
block_kind = "paragraph"
list_kind: Optional[str] = None
list_depth = 0
list_index: Optional[int] = None
if self._list_item_stack:
list_ctx = self._list_item_stack[-1]
block_kind = "list-item"
list_kind = list_ctx.get("list_type") or "ul"
depth_value = list_ctx.get("depth", 1)
try:
depth_int = int(depth_value)
except (TypeError, ValueError):
depth_int = 1
list_depth = min(max(depth_int, 1), 5)
if list_kind == "ol":
idx = list_ctx.get("index")
if isinstance(idx, int):
list_index = idx
self._blocks.append(
{
"text": text,
"is_main": self._article_depth > 0,
"kind": block_kind,
"list_kind": list_kind,
"list_depth": list_depth,
"list_index": list_index,
}
)
def _looks_like_noise_paragraph(self, text: str) -> bool:
normalized = text.strip()
if not normalized:
return True
lowered = normalized.lower()
compact = re.sub(r"\s+", "", lowered)
for prefix in self._TEXT_NOISE_PREFIXES:
if lowered.startswith(prefix.lower()):
if len(normalized) <= 80:
return True
if len(normalized) <= 80:
for keyword in self._TEXT_NOISE_KEYWORDS:
if keyword in lowered or keyword in compact:
return True
# Skip very short bullet-like crumbs that mostly consist of symbols.
if len(normalized) <= 6 and sum(ch.isalnum() for ch in normalized) <= 1:
return True
return False
@staticmethod
def _parse_ordered_start(raw_value: Optional[str]) -> int:
if raw_value is None:
return 1
value = raw_value.strip()
if not value:
return 1
try:
parsed = int(value)
return parsed if parsed >= 1 else 1
except ValueError:
return 1
def handle_starttag(self, tag, attrs):
lowered = tag.lower()
# Paragraph boundary before starting a new block element or .
if lowered in self._BLOCK_TAGS or lowered == "br":
if self._skip_depth == 0:
self._finish_paragraph()
# Entering a
region – treat it as a dedicated code block.
if lowered == "pre" and self._skip_depth == 0:
self._finish_paragraph()
self._in_pre = True
self._current_code_chunks = []
# Decide whether this element should be skipped entirely.
attr_dict = {k.lower(): (v or "") for k, v in attrs}
role = attr_dict.get("role", "").lower()
classes_ids = (attr_dict.get("class", "") + " " + attr_dict.get("id", "")).lower()
is_noise_attr = False
# Only treat class/id keywords as layout "noise" on structural
# containers (div/section/nav/etc). Inline tags with "comment"
# in their class (like mdspan-comment on Towards Data Science)
# should not be discarded, otherwise we lose the first words
# of sentences.
if lowered in self._STRUCTURAL_NOISE_TAGS:
is_noise_attr = any(key in classes_ids for key in self._NOISE_KEYWORDS)
if role in {"navigation", "banner", "contentinfo", "complementary"}:
is_noise_attr = True
skip_this = lowered in self._ALWAYS_SKIP_TAGS or is_noise_attr
if skip_this:
self._skip_depth += 1
self._skip_stack.append(skip_this)
# Track when we are inside an article-like container; only count if not skipped.
if self._skip_depth == 0 and lowered in {"article", "main", "section", "div"}:
# Treat semantic containers and common "main content" classes as
# part of the article area so that we keep their text and inline
# media but still avoid sidebars / nav.
if lowered in {"article", "main"} or any(
key in classes_ids for key in self._CONTENT_KEYWORDS
) or role == "main":
self._article_depth += 1
if self._skip_depth == 0 and lowered in {"ul", "ol"}:
start = 1
if lowered == "ol":
start = self._parse_ordered_start(attr_dict.get("start"))
self._list_stack.append(
{
"type": lowered,
"start": start,
"next_index": start,
}
)
if lowered == "li" and self._skip_depth == 0:
list_ctx = self._list_stack[-1] if self._list_stack else None
depth = len(self._list_stack) if self._list_stack else 1
list_type = list_ctx.get("type") if list_ctx else "ul"
index = None
if list_ctx and list_ctx["type"] == "ol":
index = list_ctx["next_index"]
list_ctx["next_index"] = index + 1
li_value = attr_dict.get("value")
if li_value and list_ctx and list_ctx["type"] == "ol":
try:
value_idx = int(li_value)
index = value_idx
list_ctx["next_index"] = value_idx + 1
except ValueError:
pass
self._list_item_stack.append(
{
"list_type": list_type,
"index": index,
"depth": depth,
}
)
if lowered == "title" and self._skip_depth == 0:
self._in_title = True
if lowered == "h1" and self._skip_depth == 0:
self._in_h1 = True
if lowered == "code" and self._skip_depth == 0 and self._in_pre:
# Nested inside
– keep track but we don't need
# separate buffering beyond the enclosing pre block.
self._in_code = True
# Inline image handling: only keep elements that are inside the
# main article content (tracked via _article_depth) and that do not
# look like avatars / logos / decorative icons. We insert a stable
# placeholder token into the text stream so that the /proxy renderer
# can later replace it with a real tag while preserving the
# grammar highlighting.
if lowered == "img" and self._skip_depth == 0 and self._article_depth > 0:
src = attr_dict.get("src", "").strip()
if src:
alt = attr_dict.get("alt", "") or ""
title = attr_dict.get("title", "") or ""
width = (attr_dict.get("width") or "").strip()
height = (attr_dict.get("height") or "").strip()
img_classes_ids = classes_ids + " " + src.lower()
if any(key in img_classes_ids for key in self._IMAGE_NOISE_KEYWORDS):
return
marker = f"__GHIMG_{len(self._images)}__"
img_info: Dict[str, str] = {
"marker": marker,
"src": src,
"alt": alt,
"title": title,
}
if width:
img_info["width"] = width
if height:
img_info["height"] = height
self._images.append(img_info)
# Treat the image as an inline token within the current
# paragraph. Paragraph finishing logic will ensure it
# stays grouped with surrounding text.
self._current_parts.append(marker)
def handle_endtag(self, tag):
lowered = tag.lower()
if lowered == "code" and self._in_code:
self._in_code = False
if lowered == "pre" and self._in_pre:
self._in_pre = False
# Finalize the current code block into a single placeholder
# token so that it passes through the grammar highlighter
# untouched, and can later be restored as a
block.
code_text = "".join(self._current_code_chunks)
self._current_code_chunks = []
if code_text.strip() and self._skip_depth == 0:
marker = f"__GHCODE_{len(self._code_blocks)}__"
self._code_blocks.append({"marker": marker, "text": code_text})
# We append the marker to the paragraph parts so that
# get_text() emits it in the right position.
self._current_parts.append(marker)
# Closing a block element ends the current paragraph.
if lowered in self._BLOCK_TAGS and self._skip_depth == 0:
self._finish_paragraph()
if lowered == "li" and self._skip_depth == 0 and self._list_item_stack:
self._list_item_stack.pop()
if lowered in {"ul", "ol"} and self._skip_depth == 0 and self._list_stack:
self._list_stack.pop()
if lowered == "title":
self._in_title = False
if lowered == "h1":
self._in_h1 = False
if lowered in {"article", "main", "section"} and self._skip_depth == 0 and self._article_depth > 0:
self._article_depth -= 1
if self._skip_stack:
skip_this = self._skip_stack.pop()
if skip_this and self._skip_depth > 0:
self._skip_depth -= 1
def handle_data(self, data):
if self._skip_depth > 0:
return
if self._in_pre or self._in_code:
# Preserve code blocks exactly as they appear, including
# newlines and indentation.
self._current_code_chunks.append(data)
return
stripped = data.strip()
if not stripped:
return
if self._in_title:
self._title_chunks.append(stripped)
return
# Regular visible text
self._current_parts.append(stripped)
if self._in_h1:
self._h1_chunks.append(stripped)
if self._article_depth > 0:
self._h1_main_chunks.append(stripped)
def get_text(self) -> str:
# Flush any trailing paragraph.
self._finish_paragraph()
blocks = self._selected_blocks()
if not blocks:
return ""
return "\n\n".join(block["text"] for block in blocks)
def _selected_blocks(self) -> List[Dict[str, Any]]:
if not self._blocks:
return []
main_blocks = [block for block in self._blocks if block.get("is_main")]
return main_blocks if main_blocks else self._blocks
def get_blocks(self) -> List[Dict[str, Any]]:
blocks = self._selected_blocks()
return [dict(block) for block in blocks]
def get_title(self) -> str:
# Prefer
heading (especially inside /) as the
# primary title; fall back to .
if self._h1_main_chunks:
raw = " ".join(self._h1_main_chunks)
elif self._h1_chunks:
raw = " ".join(self._h1_chunks)
elif self._title_chunks:
raw = " ".join(self._title_chunks)
else:
return ""
return re.sub(r"\s+", " ", raw).strip()
def get_images(self) -> List[Dict[str, str]]:
"""Return the list of captured inline images in document order."""
return list(self._images)
def get_code_blocks(self) -> List[Dict[str, str]]:
"""Return captured code blocks (from
/) in document order."""
return list(self._code_blocks)
def _normalize_target_url(raw_url: str) -> str:
candidate = (raw_url or "").strip()
if not candidate:
raise ValueError("请输入要抓取的 URL。")
parsed = urlparse(candidate if "://" in candidate else f"https://{candidate}")
if parsed.scheme not in ALLOWED_URL_SCHEMES:
raise ValueError("仅支持 http/https 协议链接。")
if not parsed.netloc:
raise ValueError("URL 缺少域名部分。")
sanitized = parsed._replace(fragment="")
return urlunparse(sanitized)
def _fallback_html_to_text(html_body: str) -> str:
"""Very simple HTML-to-text fallback used when structured extraction fails.
This does not attempt to distinguish main content from navigation, but it
guarantees we return *something* for pages whose structure confuses the
SimpleHTMLStripper heuristics (e.g. some mirror sites).
"""
# Drop script/style/noscript content outright.
cleaned = re.sub(
r"(?is)<(script|style|noscript)[^>]*>.*?\1>",
" ",
html_body,
)
# Convert common block separators into newlines.
cleaned = re.sub(r"(?i) ", "\n", cleaned)
cleaned = re.sub(r"(?i)", "\n\n", cleaned)
cleaned = re.sub(r"(?i)(div|section|article|li|h[1-6])\s*>", "\n\n", cleaned)
# Remove all remaining tags.
cleaned = re.sub(r"(?is)<[^>]+>", " ", cleaned)
cleaned = html.unescape(cleaned)
# Normalize whitespace but keep paragraph-level blank lines.
cleaned = cleaned.replace("\r", "")
# Collapse runs of spaces/tabs inside lines.
cleaned = re.sub(r"[ \t\f\v]+", " ", cleaned)
# Collapse 3+ blank lines into just 2.
cleaned = re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned)
cleaned = cleaned.strip()
return cleaned
def _build_paragraph_metadata(blocks: List[Dict[str, Any]]) -> List[Dict[str, str]]:
"""Convert stripped block info into span attributes for downstream rendering."""
if not blocks:
return []
paragraph_meta: List[Dict[str, str]] = []
for block in blocks:
attrs: Dict[str, str] = {}
if block.get("kind") == "list-item" and block.get("list_kind"):
attrs["data-list-kind"] = str(block["list_kind"])
depth = block.get("list_depth")
if depth:
attrs["data-list-depth"] = str(depth)
if block.get("list_kind") == "ol" and block.get("list_index") is not None:
attrs["data-list-index"] = str(block["list_index"])
paragraph_meta.append(attrs)
return paragraph_meta
def _decode_html_bytes(raw_content: bytes, encoding_hint: Optional[str]) -> str:
encoding_candidates: List[str] = []
if encoding_hint:
encoding_candidates.append(encoding_hint)
encoding_candidates.extend(["utf-8", "latin-1"])
last_exc: Optional[Exception] = None
for enc in encoding_candidates:
try:
html_body = raw_content.decode(enc, errors="replace")
break
except Exception as exc: # pragma: no cover - defensive
last_exc = exc
else: # pragma: no cover - extremely unlikely
raise RuntimeError(f"无法解码远程页面内容: {last_exc}")
if len(html_body) > MAX_REMOTE_HTML_BYTES:
html_body = html_body[:MAX_REMOTE_HTML_BYTES]
return html_body
async def _download_html_via_httpx(url: str) -> str:
async with httpx.AsyncClient(timeout=REMOTE_FETCH_TIMEOUT, follow_redirects=True) as client:
response = await client.get(url, headers=REMOTE_FETCH_HEADERS)
html_body = _decode_html_bytes(response.content, response.encoding)
response.raise_for_status()
return html_body
async def _download_html_via_stdlib(url: str) -> str:
def _sync_fetch() -> Tuple[bytes, Optional[str]]:
req = urllib_request.Request(url, headers=SIMPLE_FETCH_HEADERS)
opener = urllib_request.build_opener(urllib_request.ProxyHandler({}))
with opener.open(req, timeout=REMOTE_FETCH_TIMEOUT) as resp:
data = resp.read(MAX_REMOTE_HTML_BYTES + 1)
headers = getattr(resp, "headers", None)
encoding_hint = None
if headers is not None:
get_charset = getattr(headers, "get_content_charset", None)
if callable(get_charset):
encoding_hint = get_charset()
if not encoding_hint:
content_type = headers.get("Content-Type", "")
match = re.search(r"charset=([\w-]+)", content_type or "", re.IGNORECASE)
if match:
encoding_hint = match.group(1)
return data, encoding_hint
raw_content, encoding_hint = await asyncio.to_thread(_sync_fetch)
return _decode_html_bytes(raw_content, encoding_hint)
async def _download_html_with_fallback(url: str) -> str:
first_exc: Optional[Exception] = None
try:
return await _download_html_via_httpx(url)
except httpx.HTTPStatusError as exc:
status = exc.response.status_code if exc.response is not None else None
if status not in {401, 403, 407, 451, 429}:
raise
first_exc = exc
except httpx.HTTPError as exc:
first_exc = exc
try:
return await _download_html_via_stdlib(url)
except (urllib_error.URLError, urllib_error.HTTPError, TimeoutError) as fallback_exc:
if first_exc:
raise first_exc from fallback_exc
raise
async def _fetch_remote_plaintext(
url: str,
) -> Tuple[str, str, str, List[Dict[str, str]], List[Dict[str, str]], List[Dict[str, str]]]:
normalized = _normalize_target_url(url)
html_body = await _download_html_with_fallback(normalized)
stripper = SimpleHTMLStripper()
stripper.feed(html_body)
title = stripper.get_title() or normalized
images = stripper.get_images()
code_blocks = stripper.get_code_blocks()
plain_text = stripper.get_text()
block_info = stripper.get_blocks()
if not plain_text:
plain_text = _fallback_html_to_text(html_body)
if not plain_text:
raise ValueError("未能从该页面提取正文。")
# Fallback text no longer contains structured placeholders, so any
# collected media/code markers would be invalid.
images = []
code_blocks = []
block_info = []
paragraph_meta = _build_paragraph_metadata(block_info)
return normalized, title, plain_text, images, code_blocks, paragraph_meta
def _render_proxy_page(
*,
url_value: str = "",
message: Optional[str] = None,
is_error: bool = False,
highlight_fragment: Optional[str] = None,
source_url: Optional[str] = None,
source_title: Optional[str] = None,
show_images: bool = False,
image_notice: Optional[str] = None,
) -> str:
helper_state = "on" if SENTENCE_HELPER_ENABLED else "off"
status_block = ""
if message:
cls = "status err" if is_error else "status ok"
status_block = f"
{html.escape(message)}
"
style_block = STYLE_BLOCK if highlight_fragment else ""
result_block = ""
if highlight_fragment and source_url:
safe_url = html.escape(source_url, quote=True)
safe_title = html.escape(source_title or source_url)
image_hint = ""
if image_notice:
image_hint = f"