内容抓取模块
DataFetcher & EnhancedContentFetcher 技术实现
📋 模块概述
内容抓取模块负责从多种数据源获取AI资讯内容,支持RSS/Atom订阅源和HTML网页抓取,采用并发处理和智能策略提升抓取效率。
核心文件
core/fetcher.py- 基础抓取器,支持RSS和HTMLcore/fetcher_enhanced.py- 增强版抓取器,集成Playwright
🎯 主要功能
1. 多源数据抓取
RSS/Atom 订阅源
- 使用
feedparser解析RSS/Atom格式 - 自动提取标题、链接、发布时间、内容摘要
- 支持各种RSS变体格式
import feedparser
feed = feedparser.parse(url)
for entry in feed.entries:
item = {
'title': entry.title,
'link': entry.link,
'published': entry.published,
'summary': entry.summary
}
HTML 网页抓取
- 使用
requests获取HTML内容 - 使用
trafilatura提取正文 - 自动处理编码和特殊字符
import trafilatura
response = requests.get(url)
html = response.text
content = trafilatura.extract(html)
2. 增强版抓取策略
智能双重方案
系统会自动选择最佳抓取方案:
- 快速方案:使用 trafilatura 直接提取(适用于静态页面)
- 浏览器方案:使用 Playwright 渲染后提取(适用于JS渲染页面)
- 策略缓存:记住每个域名的最佳方案,避免重复尝试
抓取成功率提升
| 方案 | 成功率 | 速度 | 适用场景 |
|---|---|---|---|
| 仅 Trafilatura | ~40% | 快 | 静态HTML页面 |
| 智能混合方案 | ~90%+ | 中等 | 所有类型页面 |
3. 并发处理
使用 ThreadPoolExecutor 实现多线程并发抓取:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(self.fetch_source, source): source
for source in sources
}
for future in as_completed(futures, timeout=900):
result = future.result()
性能提升
- 20个数据源串行抓取:约 200 秒
- 20个数据源并发抓取(5线程):约 50 秒
- 性能提升:4倍
🔧 技术实现
DataFetcher 类
class DataFetcher:
def __init__(self, timeout=900, date_filter=None):
self.timeout = timeout
self.date_filter = date_filter
self.session = requests.Session()
def fetch_all_concurrent(self, sources, max_workers=5):
"""并发抓取所有数据源"""
results = {}
with ThreadPoolExecutor(max_workers) as executor:
futures = {
executor.submit(self.fetch_source, s): s
for s in sources
}
for future in as_completed(futures, timeout):
source = futures[future]
try:
result = future.result()
results[source['name']] = result
except Exception as e:
logger.error(f"抓取失败: {e}")
return results
EnhancedContentFetcher 类
class EnhancedContentFetcher:
def __init__(self, fast_fetcher, enable_browser=True):
self.fast_fetcher = fast_fetcher
self.playwright_fetcher = PlaywrightFetcher()
self.strategy = SmartFetchStrategy()
async def fetch_full_content(self, url):
"""智能选择抓取方案"""
# 检查缓存策略
if self.strategy.should_use_browser(url):
return await self._fetch_with_browser(url)
# 尝试快速方案
content = self.fast_fetcher.fetch_full_content(url)
# 检查是否需要升级到浏览器方案
if self.strategy.should_use_browser(url, content):
return await self._fetch_with_browser(url)
return content
⚙️ 配置选项
抓取超时设置
在 config.yaml 中配置:
# 单个数据源的抓取超时时间(秒)
fetch_timeout: 900 # 默认15分钟
并发线程数
在 main.py 中调整:
fetcher = DataFetcher(max_workers=5) # 默认5个线程
# 根据网络和CPU调整:
# - 网络好:可增加到 8-10
# - 网络差:减少到 3-5
# - CPU弱:减少到 3
日期过滤
只抓取指定日期范围的文章:
date_filter:
days_ago: 5 # 只抓取最近5天的文章
# 或指定具体日期
start_date: "2024-01-01"
end_date: "2024-12-31"
📊 支持的数据源类型
| 类型 | 示例 | 特点 |
|---|---|---|
| RSS 2.0 | OpenAI Blog | 标准RSS格式,解析简单 |
| Atom | Anthropic News | Atom格式,支持丰富元数据 |
| HTML | 自定义网页 | 需要配置选择器 |
| Hacker News | Algolia API | JSON API,支持搜索 |
| Subreddit RSS | Reddit RSS格式 |
🐛 错误处理
超时处理
单个数据源超时不影响其他源:
try:
result = future.result(timeout=900)
except TimeoutError:
logger.warning(f"数据源超时: {source}")
continue # 继续处理下一个
网络错误
自动重试机制:
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=30)
break
except requests.RequestException as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
else:
raise
解析错误
容错处理,记录日志但不中断:
try:
content = trafilatura.extract(html)
except Exception as e:
logger.error(f