📋 模块概述

内容抓取模块负责从多种数据源获取AI资讯内容,支持RSS/Atom订阅源和HTML网页抓取,采用并发处理和智能策略提升抓取效率。

核心文件

  • core/fetcher.py - 基础抓取器,支持RSS和HTML
  • core/fetcher_enhanced.py - 增强版抓取器,集成Playwright

🎯 主要功能

1. 多源数据抓取

RSS/Atom 订阅源

  • 使用 feedparser 解析RSS/Atom格式
  • 自动提取标题、链接、发布时间、内容摘要
  • 支持各种RSS变体格式
import feedparser

feed = feedparser.parse(url)
for entry in feed.entries:
    item = {
        'title': entry.title,
        'link': entry.link,
        'published': entry.published,
        'summary': entry.summary
    }

HTML 网页抓取

  • 使用 requests 获取HTML内容
  • 使用 trafilatura 提取正文
  • 自动处理编码和特殊字符
import trafilatura

response = requests.get(url)
html = response.text
content = trafilatura.extract(html)

2. 增强版抓取策略

智能双重方案

系统会自动选择最佳抓取方案:

  • 快速方案:使用 trafilatura 直接提取(适用于静态页面)
  • 浏览器方案:使用 Playwright 渲染后提取(适用于JS渲染页面)
  • 策略缓存:记住每个域名的最佳方案,避免重复尝试

抓取成功率提升

方案 成功率 速度 适用场景
仅 Trafilatura ~40% 静态HTML页面
智能混合方案 ~90%+ 中等 所有类型页面

3. 并发处理

使用 ThreadPoolExecutor 实现多线程并发抓取:

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {
        executor.submit(self.fetch_source, source): source 
        for source in sources
    }
    for future in as_completed(futures, timeout=900):
        result = future.result()

性能提升

  • 20个数据源串行抓取:约 200 秒
  • 20个数据源并发抓取(5线程):约 50 秒
  • 性能提升:4倍

🔧 技术实现

DataFetcher 类

class DataFetcher:
    def __init__(self, timeout=900, date_filter=None):
        self.timeout = timeout
        self.date_filter = date_filter
        self.session = requests.Session()
    
    def fetch_all_concurrent(self, sources, max_workers=5):
        """并发抓取所有数据源"""
        results = {}
        with ThreadPoolExecutor(max_workers) as executor:
            futures = {
                executor.submit(self.fetch_source, s): s 
                for s in sources
            }
            for future in as_completed(futures, timeout):
                source = futures[future]
                try:
                    result = future.result()
                    results[source['name']] = result
                except Exception as e:
                    logger.error(f"抓取失败: {e}")
        return results

EnhancedContentFetcher 类

class EnhancedContentFetcher:
    def __init__(self, fast_fetcher, enable_browser=True):
        self.fast_fetcher = fast_fetcher
        self.playwright_fetcher = PlaywrightFetcher()
        self.strategy = SmartFetchStrategy()
    
    async def fetch_full_content(self, url):
        """智能选择抓取方案"""
        # 检查缓存策略
        if self.strategy.should_use_browser(url):
            return await self._fetch_with_browser(url)
        
        # 尝试快速方案
        content = self.fast_fetcher.fetch_full_content(url)
        
        # 检查是否需要升级到浏览器方案
        if self.strategy.should_use_browser(url, content):
            return await self._fetch_with_browser(url)
        
        return content

⚙️ 配置选项

抓取超时设置

config.yaml 中配置:

# 单个数据源的抓取超时时间(秒)
fetch_timeout: 900  # 默认15分钟

并发线程数

main.py 中调整:

fetcher = DataFetcher(max_workers=5)  # 默认5个线程

# 根据网络和CPU调整:
# - 网络好:可增加到 8-10
# - 网络差:减少到 3-5
# - CPU弱:减少到 3

日期过滤

只抓取指定日期范围的文章:

date_filter:
  days_ago: 5  # 只抓取最近5天的文章
  # 或指定具体日期
  start_date: "2024-01-01"
  end_date: "2024-12-31"

📊 支持的数据源类型

类型 示例 特点
RSS 2.0 OpenAI Blog 标准RSS格式,解析简单
Atom Anthropic News Atom格式,支持丰富元数据
HTML 自定义网页 需要配置选择器
Hacker News Algolia API JSON API,支持搜索
Reddit Subreddit RSS Reddit RSS格式

🐛 错误处理

超时处理

单个数据源超时不影响其他源:

try:
    result = future.result(timeout=900)
except TimeoutError:
    logger.warning(f"数据源超时: {source}")
    continue  # 继续处理下一个

网络错误

自动重试机制:

for attempt in range(max_retries):
    try:
        response = requests.get(url, timeout=30)
        break
    except requests.RequestException as e:
        if attempt < max_retries - 1:
            time.sleep(2 ** attempt)  # 指数退避
        else:
            raise

解析错误

容错处理,记录日志但不中断:

try:
    content = trafilatura.extract(html)
except Exception as e:
    logger.error(f