Python 3.x网络爬虫从零基础到项目实战
requests: 简单易用的 HTTP 库print(response.status_code) # 200 表示成功print(response.text) # 输出网页内容urllib: Python 标准库中的 URL 处理模块: 网页解析库,用于从 HTML 或 XML 文件中提取数据print(soup.p.text) # 输出: Hello Worldlxml: 高效的 XML 和
Python 3.x 网络爬虫从零基础到项目实战
网络爬虫是一种强大的技术,可以自动化收集和分析互联网上的数据。本指南将从零基础开始,带领您逐步掌握 Python 爬虫技术,直至能够独立完成实战项目。
目录
- Python 爬虫基础
- 爬虫工具与库
- HTTP 基础与数据获取
- 解析网页数据
- 反爬虫机制与应对策略
- Selenium 与动态网页抓取
- Scrapy 框架入门
- 数据存储
- 实战项目:电商数据爬虫
- 实战项目:新闻聚合爬虫
- 爬虫的法律与道德
- 进阶技巧与优化
1. Python 爬虫基础
1.1 Python 环境搭建
首先确保你已安装 Python 3.x 版本(建议 3.8+)。
安装 Python:
- Windows: 从 Python 官网 下载并安装
- macOS: 使用 Homebrew
brew install python - Linux: 大多数发行版已预装,或使用包管理器安装
检查安装:
python --version
# 或
python3 --version
创建虚拟环境:
python -m venv crawler_env
激活虚拟环境:
- Windows:
crawler_env\Scripts\activate - macOS/Linux:
source crawler_env/bin/activate
1.2 网络爬虫基本概念
什么是网络爬虫:
网络爬虫(Web Crawler)是一种按照一定规则自动浏览并提取互联网信息的程序。
爬虫工作流程:
- 发送网络请求,获取网页内容
- 解析网页,提取有用信息
- 存储数据
- 根据需求进一步爬取相关页面
爬虫的应用场景:
- 数据分析和挖掘
- 搜索引擎索引
- 价格监控
- 内容聚合
- 科学研究
1.3 爬虫的合法性与注意事项
- 遵守网站的 robots.txt 协议
- 控制爬取速度,不要给服务器造成压力
- 不要爬取敏感或私人数据
- 数据仅用于个人学习和研究
- 遵守当地法律法规
2. 爬虫工具与库
2.1 安装基本库
pip install requests beautifulsoup4 lxml pandas
2.2 常用爬虫库介绍
requests: 简单易用的 HTTP 库
import requests
response = requests.get('https://www.example.com')
print(response.status_code) # 200 表示成功
print(response.text) # 输出网页内容
urllib: Python 标准库中的 URL 处理模块
import urllib.request
with urllib.request.urlopen('https://www.example.com') as response:
html = response.read()
print(html)
BeautifulSoup: 网页解析库,用于从 HTML 或 XML 文件中提取数据
from bs4 import BeautifulSoup
soup = BeautifulSoup('<html><body><p>Hello World</p></body></html>', 'html.parser')
print(soup.p.text) # 输出: Hello World
lxml: 高效的 XML 和 HTML 解析库
from lxml import etree
html = etree.HTML('<html><body><p>Hello World</p></body></html>')
result = html.xpath('//p/text()')
print(result) # 输出: ['Hello World']
2.3 开发工具与浏览器插件
开发工具:
- PyCharm: 功能强大的 Python IDE
- VS Code: 轻量级编辑器,配合 Python 插件使用
- Jupyter Notebook: 交互式开发和测试
浏览器插件:
- Chrome DevTools: 分析网页结构、网络请求
- XPath Helper: 帮助构建和测试 XPath 表达式
- JSON Formatter: 格式化 JSON 数据
- User-Agent Switcher: 模拟不同设备访问网站
3. HTTP 基础与数据获取
3.1 HTTP 协议基础
HTTP 请求方法:
- GET: 请求指定资源,参数包含在 URL 中
- POST: 向服务器提交数据,数据包含在请求体中
- PUT: 上传资源
- DELETE: 删除资源
- HEAD: 类似 GET 但只获取头部信息
- OPTIONS: 获取服务器支持的方法
HTTP 状态码:
- 1xx: 信息性状态码
- 2xx: 成功状态码(如 200 OK)
- 3xx: 重定向状态码(如 301 永久重定向)
- 4xx: 客户端错误(如 404 Not Found)
- 5xx: 服务器错误(如 500 Internal Server Error)
HTTP 头部:
- User-Agent: 客户端信息
- Content-Type: 内容类型
- Cookie: 存储用户信息
- Referer: 请求来源页面
3.2 使用 Requests 库获取数据
基本 GET 请求:
import requests
response = requests.get('https://api.github.com/events')
print(response.status_code)
print(response.headers['content-type'])
print(response.encoding)
print(response.json()) # 自动解析 JSON 响应
带参数的 GET 请求:
payload = {'key1': 'value1', 'key2': 'value2'}
response = requests.get('https://httpbin.org/get', params=payload)
print(response.url) # 将显示 https://httpbin.org/get?key1=value1&key2=value2
自定义请求头:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get('https://api.github.com', headers=headers)
POST 请求:
payload = {'username': 'test', 'password': 'testing'}
response = requests.post('https://httpbin.org/post', data=payload)
print(response.json())
发送 JSON 数据:
import json
url = 'https://httpbin.org/post'
payload = {'name': 'John', 'age': 30}
response = requests.post(url, json=payload)
print(response.json())
处理 Cookie:
# 获取 Cookie
response = requests.get('https://httpbin.org/cookies/set/sessioncookie/123456789')
print(response.cookies['sessioncookie'])
# 发送 Cookie
cookies = {'sessioncookie': '123456789'}
response = requests.get('https://httpbin.org/cookies', cookies=cookies)
print(response.json())
会话对象维持状态:
session = requests.Session()
session.get('https://httpbin.org/cookies/set/sessioncookie/123456789')
response = session.get('https://httpbin.org/cookies')
print(response.json())
超时设置:
try:
response = requests.get('https://httpbin.org/delay/10', timeout=5)
except requests.exceptions.Timeout:
print("请求超时")
错误处理:
try:
response = requests.get('https://nonexistent-domain.com')
response.raise_for_status() # 如果状态码不是 200,将引发异常
except requests.exceptions.HTTPError as err:
print(f"HTTP 错误: {err}")
except requests.exceptions.ConnectionError:
print("连接错误")
except requests.exceptions.Timeout:
print("请求超时")
except requests.exceptions.RequestException as err:
print(f"发生错误: {err}")
3.3 实战:创建一个简单的网页爬虫
编写一个爬取网站标题和简介的小程序:
import requests
from bs4 import BeautifulSoup
import time
def simple_crawler(url):
# 设置头部信息模拟浏览器
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
# 发送请求
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # 检查请求是否成功
# 解析网页内容
soup = BeautifulSoup(response.text, 'html.parser')
# 提取标题
title = soup.title.string if soup.title else "No title found"
# 提取描述(如果有meta description标签)
description = "No description found"
description_tag = soup.find('meta', attrs={'name': 'description'})
if description_tag and 'content' in description_tag.attrs:
description = description_tag['content']
# 返回结果
return {
'url': url,
'title': title,
'description': description,
'status': 'success'
}
except requests.exceptions.HTTPError as err:
return {'url': url, 'status': f"HTTP Error: {err}"}
except requests.exceptions.ConnectionError:
return {'url': url, 'status': "Connection Error"}
except requests.exceptions.Timeout:
return {'url': url, 'status': "Timeout Error"}
except requests.exceptions.RequestException as err:
return {'url': url, 'status': f"Error: {err}"}
# 测试函数
urls_to_crawl = [
'https://www.python.org',
'https://www.github.com',
'https://www.wikipedia.org'
]
for url in urls_to_crawl:
result = simple_crawler(url)
print(f"\nCrawling: {result['url']}")
if result['status'] == 'success':
print(f"Title: {result['title']}")
print(f"Description: {result['description']}")
else:
print(f"Failed: {result['status']}")
# 休息1秒,避免请求过快
time.sleep(1)
4. 解析网页数据
4.1 BeautifulSoup 详解
BeautifulSoup 是一个用于从 HTML 和 XML 文件中提取数据的 Python 库。
安装与基本使用:
from bs4 import BeautifulSoup
# 从字符串创建 BeautifulSoup 对象
html_doc = """
<html>
<head><title>网页标题</title></head>
<body>
<p class="story">从前有三个人:
<a href="http://example.com/elsie" class="sister" id="link1">Elsie</a>,
<a href="http://example.com/lacie" class="sister" id="link2">Lacie</a> 和
<a href="http://example.com/tillie" class="sister" id="link3">Tillie</a>;
他们住在井底。</p>
</body>
</html>
"""
soup = BeautifulSoup(html_doc, 'html.parser') # 或使用 'lxml' 解析器
导航文档树:
# 获取标题
print(soup.title) # <title>网页标题</title>
print(soup.title.string) # 网页标题
# 获取段落
print(soup.p)
# 获取链接
print(soup.a) # 只获取第一个 <a> 标签
# 获取所有链接
print(soup.find_all('a'))
# 获取特定 ID 的元素
print(soup.find(id="link3"))
搜索文档:
# 使用 find_all() 查找所有 <a> 标签
links = soup.find_all('a')
for link in links:
print(link.get('href'))
# 使用 CSS 选择器
links = soup.select('p a.sister') # 查找p标签下的class为sister的所有a标签
for link in links:
print(link['href'])
# 通过属性值查找
print(soup.find_all('a', class_='sister')) # 注意使用 class_
# 查找特定文本内容
import re
print(soup.find_all(text=re.compile("Tillie")))
修改文档:
# 修改标签内容
soup.title.string = "新标题"
# 添加新标签
soup.body.append(soup.new_tag('div', id='new_div'))
new_div = soup.find(id='new_div')
new_div.string = "这是新添加的div"
# 输出修改后的HTML
print(soup.prettify())
4.2 XPath 与 lxml 库
XPath 是一种在 XML 文档中查找信息的语言,也适用于 HTML 文档。
安装与基本使用:
from lxml import etree
html = """
<html>
<head><title>网页标题</title></head>
<body>
<div class="content">
<ul>
<li class="item-0"><a href="link1.html">第一个</a></li>
<li class="item-1"><a href="link2.html">第二个</a></li>
<li class="item-2"><a href="link3.html">第三个</a></li>
<li class="item-3"><a href="link4.html">第四个</a></li>
<li class="item-4"><a href="link5.html">第五个</a></li>
</ul>
</div>
</body>
</html>
"""
# 解析HTML
tree = etree.HTML(html)
XPath 选择器:
# 选择所有 <li> 元素
li_elements = tree.xpath('//li')
print(f"共找到 {len(li_elements)} 个li元素")
# 选择所有 <a> 标签的 href 属性
links = tree.xpath('//a/@href')
print(links) # ['link1.html', 'link2.html', 'link3.html', 'link4.html', 'link5.html']
# 选择所有 <a> 标签的文本
text = tree.xpath('//a/text()')
print(text) # ['第一个', '第二个', '第三个', '第四个', '第五个']
# 选择特定 class 的元素
items = tree.xpath('//li[@class="item-0"]')
for item in items:
link = item.xpath('./a/text()')[0]
href = item.xpath('./a/@href')[0]
print(f"{link} -> {href}")
# 按顺序选择元素
third_item = tree.xpath('//li[3]') # 选择第三个 li 元素
print(etree.tostring(third_item[0], encoding='utf-8').decode('utf-8'))
组合 XPath 表达式:
# 选择 class 包含 'item-' 的 li 元素
items = tree.xpath('//li[contains(@class, "item-")]')
print(f"匹配的元素数量: {len(items)}")
# 使用 OR 组合多个条件
items = tree.xpath('//li[@class="item-0"] | //li[@class="item-1"]')
print(f"匹配的元素数量: {len(items)}")
# 选择父节点
ul = tree.xpath('//li[@class="item-0"]/..')
print(etree.tostring(ul[0], encoding='utf-8').decode('utf-8'))
# 选择祖先节点
ancestors = tree.xpath('//li[@class="item-0"]/ancestor::*')
for ancestor in ancestors:
print(ancestor.tag)
4.3 正则表达式提取数据
正则表达式是一种强大的文本匹配和处理工具。
基本使用:
import re
text = "我的电话号码是 123-456-7890,另一个电话是 (123) 456-7891"
# 查找所有电话号码
phone_pattern = r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}'
phones = re.findall(phone_pattern, text)
print(phones) # ['123-456-7890', '(123) 456-7891']
# 使用正则表达式从HTML中提取信息
html = '<div class="price">¥299.00</div><div class="title">iPhone 手机壳</div>'
# 提取价格
price = re.search(r'<div class="price">¥([\d.]+)</div>', html)
if price:
print(f"价格: ¥{price.group(1)}") # 价格: ¥299.00
# 提取标题
title = re.search(r'<div class="title">(.*?)</div>', html)
if title:
print(f"商品: {title.group(1)}") # 商品: iPhone 手机壳
结合 requests 使用正则表达式:
import requests
import re
url = 'https://www.python.org/'
response = requests.get(url)
# 提取所有链接
links = re.findall(r'href="(https?://.*?)"', response.text)
for link in links[:5]: # 只显示前5个
print(link)
# 提取所有图片
images = re.findall(r'src="(.*?\.(?:png|jpg|jpeg|gif))"', response.text)
for image in images[:5]: # 只显示前5个
print(image)
4.4 实战:京东商品数据提取
以下是一个爬取京东商品信息的示例:
import requests
from bs4 import BeautifulSoup
import re
import time
import json
import random
def get_jd_product_info(keyword, num_pages=1):
"""
爬取京东商品信息
:param keyword: 搜索关键词
:param num_pages: 爬取页数
:return: 商品信息列表
"""
all_products = []
for page in range(1, num_pages + 1):
# 构建请求URL
url = f'https://search.jd.com/Search?keyword={keyword}&page={page}'
# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://www.jd.com/'
}
try:
# 发送请求
response = requests.get(url, headers=headers)
response.raise_for_status()
# 解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 获取商品列表
products = soup.select('li.gl-item')
print(f"第 {page} 页,找到 {len(products)} 个商品")
# 遍历商品列表
for product in products:
# 提取商品信息
try:
# 提取商品ID
product_id = product.get('data-sku')
# 提取商品价格
price_element = product.select_one('div.p-price i')
price = price_element.text if price_element else 'N/A'
# 提取商品标题
title_element = product.select_one('div.p-name em')
title = title_element.text.strip() if title_element else 'N/A'
# 提取店铺名称
shop_element = product.select_one('div.p-shop span a')
shop = shop_element.text.strip() if shop_element else 'N/A'
# 提取评论数
commit_element = product.select_one('div.p-commit strong a')
commits = commit_element.text if commit_element else '0'
# 提取商品详情页链接
link = f"https://item.jd.com/{product_id}.html" if product_id else 'N/A'
# 构建商品信息字典
product_info = {
'id': product_id,
'title': title,
'price': price,
'shop': shop,
'commits': commits,
'link': link
}
all_products.append(product_info)
except Exception as e:
print(f"提取商品信息时出错: {e}")
continue
# 添加随机延迟,避免请求过快
time.sleep(random.uniform(1, 3))
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
break
return all_products
# 使用示例
if __name__ == "__main__":
keyword = "笔记本电脑"
num_pages = 2
print(f"开始爬取京东商品: {keyword}")
products = get_jd_product_info(keyword, num_pages)
# 输出结果
print(f"\n共爬取 {len(products)} 个商品:")
for product in products[:5]: # 只显示前5个
print(f"\nID: {product['id']}")
print(f"标题: {product['title']}")
print(f"价格: {product['price']}")
print(f"店铺: {product['shop']}")
print(f"评论数: {product['commits']}")
print(f"链接: {product['link']}")
# 保存为JSON文件
with open(f'jd_{keyword}.json', 'w', encoding='utf-8') as f:
json.dump(products, f, ensure_ascii=False, indent=4)
print(f"\n数据已保存到 jd_{keyword}.json")
注意:此示例仅供学习参考,实际使用时可能需要调整 XPath 表达式,因为网站结构可能会变化。
5. 反爬虫机制与应对策略
5.1 常见的反爬虫机制
1. IP 限制:
- 短时间内大量请求会被网站封禁 IP
- 可能会返回 403 Forbidden 状态码
2. User-Agent 检测:
- 检查请求头信息,识别爬虫
- 拒绝非常见浏览器的请求
3. Cookie 和 Session 验证:
- 需要正确的 Cookie 才能访问
- 会话跟踪验证用户是否为真实浏览器
4. 验证码:
- 图片验证码、滑动验证、点击验证等
- 阻止自动化访问
5. 动态生成内容:
- 使用 JavaScript 渲染页面内容
- 需要浏览器环境才能获取完整信息
6. 蜜罐陷阱:
- 设置隐藏链接,正常用户不会点击
- 爬虫如果访问这些链接会被标记
7. 频率限制:
- 限制单位时间内的请求次数
- 超过限制可能需要等待或被封禁
5.2 应对策略
5.2.1 修改请求头
import requests
import random
# 随机 User-Agent 列表
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
]
def get_random_ua():
"""返回随机User-Agent"""
return random.choice(user_agents)
# 请求示例
url = 'https://www.example.com'
headers = {
'User-Agent': get_random_ua(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://www.google.com/',
'Upgrade-Insecure-Requests': '1',
'DNT': '1', # Do Not Track
}
try:
response = requests.get(url, headers=headers)
print(f"状态码: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
5.2.2 添加请求延迟
import requests
import time
import random
urls = [
'https://www.example.com/page1',
'https://www.example.com/page2',
'https://www.example.com/page3'
]
for url in urls:
try:
# 随机延迟1-5秒
delay = random.uniform(1, 5)
print(f"等待 {delay:.2f} 秒...")
time.sleep(delay)
response = requests.get(url)
print(f"URL: {url}, 状态码: {response.status_code}")
except Exception as e:
print(f"错误: {e}")
5.2.3 代理 IP 轮换
import requests
from itertools import cycle
# 代理IP列表(这只是示例,实际使用时需要有效的代理)
proxies = [
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
"http://proxy3.example.com:8080"
]
# 创建代理循环迭代器
proxy_cycle = cycle(proxies)
url = 'https://httpbin.org/ip'
for _ in range(3):
# 获取下一个代理
proxy = next(proxy_cycle)
print(f"使用代理: {proxy}")
try:
response = requests.get(
url,
proxies={"http": proxy, "https": proxy},
timeout=10
)
print(f"状态码: {response.status_code}")
print(f"IP地址: {response.json()['origin']}")
except Exception as e:
print(f"请求失败: {e}")
time.sleep(1)
5.2.4 使用 Session 维持 Cookie
import requests
# 创建 Session 对象
session = requests.Session()
# 自定义请求头
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
# 登录网站
login_url = 'https://example.com/login'
login_data = {
'username': 'your_username',
'password': 'your_password'
}
response = session.post(login_url, data=login_data)
print(f"登录状态: {response.status_code}")
# 访问需要登录的页面,会自动使用之前的Cookie
protected_url = 'https://example.com/protected-page'
response = session.get(protected_url)
print(f"访问状态: {response.status_code}")
print(response.text[:100]) # 显示前100个字符
5.2.5 请求重试
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def get_with_retry(url, retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504)):
"""带重试机制的GET请求"""
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session.get(url)
try:
response = get_with_retry('https://httpbin.org/status/500')
print(f"状态码: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
5.3 爬虫检测与绕过
5.3.1 模拟浏览器指纹
import requests
url = "https://www.example.com"
# 构建完整的浏览器指纹
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Cache-Control": "max-age=0",
"Sec-Ch-Ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"91\", \"Google Chrome\";v=\"91\"",
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1"
}
try:
response = requests.get(url, headers=headers)
print(f"状态码: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
5.3.2 绕过基本认证
import requests
from requests.auth import HTTPBasicAuth
url = 'https://httpbin.org/basic-auth/user/passwd'
# 使用基本认证
response = requests.get(url, auth=HTTPBasicAuth('user', 'passwd'))
print(f"状态码: {response.status_code}")
print(response.json())
# 更简洁的写法
response = requests.get(url, auth=('user', 'passwd'))
print(f"状态码: {response.status_code}")
print(response.json())
5.3.3 处理简单的JS重定向
import requests
import re
def follow_js_redirect(url):
response = requests.get(url)
# 查找JavaScript重定向
redirect_pattern = r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]'
match = re.search(redirect_pattern, response.text)
if match:
redirect_url = match.group(1)
print(f"找到JS重定向: {redirect_url}")
# 跟随重定向
return requests.get(redirect_url)
return response
# 使用示例
url = 'https://example.com/page-with-js-redirect'
response = follow_js_redirect(url)
print(f"最终状态码: {response.status_code}")
5.4 高级反爬虫与对应策略
5.4.1 指纹识别与Web驱动检测
现代网站使用各种技术来识别爬虫:
- Canvas 指纹:通过浏览器渲染图形的微小差异识别
- WebRTC 泄露:获取真实 IP 而非代理 IP
- 浏览器功能检测:检查浏览器 API 和功能是否完整
- WebDriver 检测:直接检测 Selenium 等自动化工具
对应策略:
- 使用专门的反侦测工具,如 undetected-chromedriver
- 使用 Playwright 等更新的自动化工具,检测难度更大
- 使用浏览器扩展隐藏自动化特征(但部分网站会检测扩展)
# 使用 undetected_chromedriver 绕过检测
import undetected_chromedriver as uc
import time
# 创建一个未被检测的Chrome实例
driver = uc.Chrome()
# 访问测试网站
driver.get('https://bot.sannysoft.com/') # 一个测试浏览器指纹的网站
# 等待页面加载
time.sleep(5)
# 截图保存结果
driver.save_screenshot('bot_test_result.png')
# 关闭浏览器
driver.quit()
5.4.2 验证码处理
对于图片验证码,可以使用 OCR 技术识别,或使用专门的验证码识别服务。
# 使用 pytesseract 识别简单验证码
import requests
from PIL import Image
import pytesseract
import io
# 获取验证码图片
response = requests.get('https://example.com/captcha.php')
captcha_image = Image.open(io.BytesIO(response.content))
# 预处理图片
captcha_image = captcha_image.convert('L') # 转灰度图
threshold = 127
captcha_image = captcha_image.point(lambda p: p > threshold and 255)
# OCR识别
captcha_text = pytesseract.image_to_string(captcha_image, config='--psm 8')
captcha_text = ''.join(filter(str.isalnum, captcha_text)) # 只保留字母和数字
print(f"识别的验证码: {captcha_text}")
# 使用识别的验证码提交表单
data = {
'username': 'your_username',
'password': 'your_password',
'captcha': captcha_text
}
response = requests.post('https://example.com/login', data=data)
print(f"提交状态: {response.status_code}")
对于复杂的验证码(如滑动验证、点选验证),通常需要使用 Selenium 配合特定的识别和模拟技术。
6. Selenium 与动态网页抓取
6.1 Selenium 基础
Selenium 是一个自动化浏览器工具,可以模拟真实用户操作网页。
安装依赖:
pip install selenium webdriver-manager
基本使用:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
import time
# 自动下载和配置ChromeDriver
service = Service(ChromeDriverManager().install())
# 创建Chrome浏览器实例
driver = webdriver.Chrome(service=service)
# 访问网站
driver.get('https://www.python.org')
# 获取标题
print(f"网站标题: {driver.title}")
# 查找元素
search_input = driver.find_element(By.ID, 'id-search-field')
search_input.send_keys('pycon')
submit_button = driver.find_element(By.ID, 'submit')
submit_button.click()
# 等待页面加载
time.sleep(2)
# 获取搜索结果
search_results = driver.find_elements(By.CSS_SELECTOR, '.list-recent-events li')
print(f"找到 {len(search_results)} 个搜索结果:")
for result in search_results[:3]: # 只显示前3个
print(result.text)
# 截图
driver.save_screenshot('python_search.png')
# 关闭浏览器
driver.quit()
6.2 元素定位与操作
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
# 设置Chrome
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)
# 打开目标网站
driver.get('https://www.google.com')
# 1. 按ID查找元素
search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys('Selenium WebDriver')
search_box.send_keys(Keys.RETURN) # 按回车键
time.sleep(2)
# 2. 按类名查找元素
results = driver.find_elements(By.CSS_SELECTOR, '.g')
print(f"找到 {len(results)} 个搜索结果")
# 3. 按XPath查找元素
first_result = driver.find_element(By.XPATH, '//div[@class="g"]//h3')
print(f"第一个结果标题: {first_result.text}")
# 4. 按链接文本查找
link = driver.find_element(By.PARTIAL_LINK_TEXT, 'Selenium')
print(f"找到链接: {link.text}")
# 5. 元素交互
# 5.1 点击
link.click()
time.sleep(2)
# 5.2 返回上一页
driver.back()
time.sleep(1)
# 5.3 填写表单
search_box = driver.find_element(By.NAME, 'q')
search_box.clear() # 清除输入框
search_box.send_keys('Python programming')
search_box.submit() # 提交表单
time.sleep(2)
# 5.4 获取属性
search_box = driver.find_element(By.NAME, 'q')
print(f"输入框值: {search_box.get_attribute('value')}")
print(f"输入框名称: {search_box.get_attribute('name')}")
# 5.5 检查元素状态
print(f"输入框是否显示: {search_box.is_displayed()}")
print(f"输入框是否启用: {search_box.is_enabled()}")
# 关闭浏览器
driver.quit()
6.3 等待策略
Selenium 提供了两种等待策略:显式等待和隐式等待。
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
# 设置Chrome
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)
# 隐式等待:为所有元素设置最大等待时间
driver.implicitly_wait(10) # 单位为秒
# 打开网站
driver.get('https://www.python.org')
# 使用隐式等待查找元素
search_box = driver.find_element(By.ID, 'id-search-field')
search_box.send_keys('pycon')
search_box.submit()
# 显式等待:等待特定条件满足
try:
# 等待搜索结果加载完成
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '.list-recent-events'))
)
print("搜索结果已加载")
# 现在可以安全地处理搜索结果
results = driver.find_elements(By.CSS_SELECTOR, '.list-recent-events li')
print(f"找到 {len(results)} 个结果")
except Exception as e:
print(f"等待超时: {e}")
# 自定义等待条件
def text_contains(locator, text):
def check(driver):
try:
element_text = driver.find_element(*locator).text
return text in element_text
except:
return False
return check
try:
# 等待页面标题包含特定文本
WebDriverWait(driver, 10).until(
text_contains((By.CSS_SELECTOR, '.list-recent-events h3'), 'PyCon')
)
print("找到PyCon相关结果")
except Exception as e:
print(f"未找到PyCon相关结果: {e}")
# 关闭浏览器
driver.quit()
6.4 处理高级页面操作
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import Select
import time
# 设置Chrome
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)
try:
# 1. 下拉菜单
driver.get('https://www.w3schools.com/tags/tryit.asp?filename=tryhtml_select')
# 切换到包含下拉菜单的iframe
driver.switch_to.frame('iframeResult')
# 查找下拉菜单
dropdown = Select(driver.find_element(By.ID, 'cars'))
# 选择选项
dropdown.select_by_visible_text('Audi')
time.sleep(1)
dropdown.select_by_value('volvo')
time.sleep(1)
dropdown.select_by_index(2) # 选择第三个选项 (索引从0开始)
time.sleep(1)
# 获取所有选项
options = dropdown.options
print("下拉菜单选项:")
for option in options:
print(f" - {option.text}")
# 切回主框架
driver.switch_to.default_content()
# 2. 鼠标悬停
driver.get('https://www.w3schools.com')
# 找到需要悬停的元素
tutorials_button = driver.find_element(By.ID, 'navbtn_tutorials')
# 创建ActionChains对象
actions = ActionChains(driver)
# 执行鼠标悬停
actions.move_to_element(tutorials_button).perform()
time.sleep(2)
# 3. 拖放操作
driver.get('https://www.w3schools.com/html/html5_draganddrop.asp')
# 等待元素加载
time.sleep(2)
# 找到要拖放的元素
source = driver.find_element(By.ID, 'drag1')
target = driver.find_element(By.ID, 'div2')
# 执行拖放
actions = ActionChains(driver)
actions.drag_and_drop(source, target).perform()
time.sleep(2)
# 4. 处理JavaScript弹窗
driver.get('https://www.w3schools.com/js/tryit.asp?filename=tryjs_alert')
# 切换到包含按钮的iframe
driver.switch_to.frame('iframeResult')
# 点击按钮,触发alert
driver.find_element(By.CSS_SELECTOR, 'button').click()
# 切换到alert
alert = driver.switch_to.alert
# 获取alert文本
print(f"弹窗文本: {alert.text}")
# 接受alert (点击OK)
alert.accept()
# 如果是确认框,可以使用dismiss() 拒绝
# alert.dismiss()
# 如果是提示框,可以使用send_keys() 输入文本
# alert.send_keys('Hello World')
# 5. 切换标签页或窗口
driver.get('https://www.w3schools.com')
# 打开新标签页
driver.execute_script("window.open('https://www.python.org', '_blank');")
# 获取所有窗口句柄
handles = driver.window_handles
# 切换到新标签页
driver.switch_to.window(handles[1])
print(f"当前页面标题: {driver.title}")
# 切换回原标签页
driver.switch_to.window(handles[0])
print(f"当前页面标题: {driver.title}")
except Exception as e:
print(f"出现错误: {e}")
finally:
# 关闭浏览器
driver.quit()
6.5 无头浏览器
无头浏览器模式可以在没有图形界面的环境中运行 Selenium,节省资源并提高速度。
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
import time
# 配置Chrome选项
chrome_options = Options()
chrome_options.add_argument("--headless") # 启用无头模式
chrome_options.add_argument("--disable-gpu") # 禁用GPU加速
chrome_options.add_argument("--window-size=1920,1200") # 设置窗口大小
# 创建浏览器实例
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
try:
# 访问网站
driver.get("https://www.python.org")
print(f"页面标题: {driver.title}")
# 搜索
search_box = driver.find_element(By.NAME, "q")
search_box.send_keys("headless chrome")
search_box.submit()
# 等待搜索结果
time.sleep(2)
# 获取结果
results = driver.find_elements(By.CSS_SELECTOR, ".list-recent-events li")
print(f"找到 {len(results)} 个结果")
# 截图
driver.save_screenshot("headless_screenshot.png")
print("截图已保存")
except Exception as e:
print(f"出现错误: {e}")
finally:
# 关闭浏览器
driver.quit()
6.6 实战:使用 Selenium 爬取动态加载的网站
以下是一个使用 Selenium 爬取知乎热门问题的示例:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import json
import random
def scrape_zhihu_hot_questions(num_questions=10):
"""
使用Selenium爬取知乎热门问题
:param num_questions: 要爬取的问题数量
:return: 问题列表
"""
# 配置Chrome
chrome_options = Options()
chrome_options.add_argument("--disable-notifications") # 禁用通知
chrome_options.add_argument("--disable-extensions") # 禁用扩展
chrome_options.add_argument("--disable-popup-blocking") # 禁用弹窗
chrome_options.add_argument("--start-maximized") # 最大化窗口
chrome_options.add_argument("--disable-blink-features=AutomationControlled") # 不易被检测
# 添加随机User-Agent
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0"
]
chrome_options.add_argument(f"--user-agent={random.choice(user_agents)}")
# 初始化WebDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
questions = []
try:
# 访问知乎热榜页面
driver.get("https://www.zhihu.com/hot")
# 等待页面加载完成
print("等待页面加载...")
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".HotList-item"))
)
# 滚动页面以加载更多内容
print("滚动页面加载更多内容...")
last_height = driver.execute_script("return document.body.scrollHeight")
while len(questions) < num_questions:
# 滚动到底部
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# 等待页面加载
time.sleep(random.uniform(1, 2))
# 获取热门问题
hot_items = driver.find_elements(By.CSS_SELECTOR, ".HotList-item")
# 处理已加载的问题
for item in hot_items:
if len(questions) >= num_questions:
break
try:
# 排名
rank_element = item.find_element(By.CSS_SELECTOR, ".HotList-itemIndex")
rank = rank_element.text.strip()
# 标题
title_element = item.find_element(By.CSS_SELECTOR, ".HotList-itemTitle")
title = title_element.text.strip()
# 热度
hot_element = item.find_element(By.CSS_SELECTOR, ".HotList-itemMetrics")
hot = hot_element.text.strip()
# 链接
link_element = item.find_element(By.CSS_SELECTOR, ".HotList-itemTitle a")
link = link_element.get_attribute("href")
# 检查是否已存在
is_duplicate = False
for q in questions:
if q["title"] == title:
is_duplicate = True
break
if not is_duplicate:
question_data = {
"rank": rank,
"title": title,
"hot": hot,
"link": link
}
questions.append(question_data)
print(f"已爬取 {len(questions)}/{num_questions}: {title}")
except Exception as e:
print(f"处理问题时出错: {e}")
continue
# 检查是否滚动到底部
new_height = driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
# 随机延迟
time.sleep(random.uniform(0.5, 1.5))
print(f"共爬取了 {len(questions)} 个问题")
except Exception as e:
print(f"爬取过程中出错: {e}")
finally:
# 关闭浏览器
driver.quit()
return questions
# 运行爬虫
if __name__ == "__main__":
print("开始爬取知乎热门问题...")
hot_questions = scrape_zhihu_hot_questions(20) # 爬取20个热门问题
# 保存到JSON文件
with open("zhihu_hot_questions.json", "w", encoding="utf-8") as f:
json.dump(hot_questions, f, ensure_ascii=False, indent=4)
print("爬取完成,数据已保存到 zhihu_hot_questions.json")
# 显示部分结果
print("\n热门问题TOP5:")
for i, q in enumerate(hot_questions[:5]):
print(f"{q['rank']}. {q['title']} ({q['hot']})")
7. Scrapy 框架入门
7.1 Scrapy 简介与安装
Scrapy 是一个用于爬取网站并从其中提取结构化数据的应用框架。
安装 Scrapy:
pip install scrapy
检查安装:
scrapy version
7.2 创建项目与基本组件
创建新的 Scrapy 项目:
scrapy startproject quotes_spider
cd quotes_spider
项目结构如下:
quotes_spider/
scrapy.cfg # 部署配置文件
quotes_spider/ # 项目模块
__init__.py
items.py # 定义数据结构
middlewares.py # 中间件
pipelines.py # 数据处理
settings.py # 项目设置
spiders/ # 爬虫目录
__init__.py
创建爬虫:
scrapy genspider quotes quotes.toscrape.com
这将在 spiders/ 目录下创建 quotes.py 文件:
# quotes_spider/quotes_spider/spiders/quotes.py
import scrapy
class QuotesSpider(scrapy.Spider):
name = 'quotes'
allowed_domains = ['quotes.toscrape.com']
start_urls = ['http://quotes.toscrape.com/']
def parse(self, response):
pass
7.3 基本的爬虫实现
编辑 quotes.py 文件,实现爬取功能:
import scrapy
class QuotesSpider(scrapy.Spider):
name = 'quotes'
allowed_domains = ['quotes.toscrape.com']
start_urls = ['http://quotes.toscrape.com/']
def parse(self, response):
# 提取所有引用
quotes = response.css('div.quote')
# 处理每个引用
for quote in quotes:
text = quote.css('span.text::text').get()
author = quote.css('small.author::text').get()
tags = quote.css('div.tags a.tag::text').getall()
# 输出结果
yield {
'text': text,
'author': author,
'tags': tags
}
# 获取下一页链接并跟踪
next_page = response.css('li.next a::attr(href)').get()
if next_page is not None:
yield response.follow(next_page, self.parse)
运行爬虫:
scrapy crawl quotes -o quotes.json
7.4 Items 和 Item Pipelines
定义 Item:
编辑 items.py 文件,定义数据结构:
# quotes_spider/quotes_spider/items.py
import scrapy
class QuoteItem(scrapy.Item):
text = scrapy.Field()
author = scrapy.Field()
tags = scrapy.Field()
使用 Item:
修改爬虫以使用 Item:
# quotes_spider/quotes_spider/spiders/quotes.py
import scrapy
from quotes_spider.items import QuoteItem
class QuotesSpider(scrapy.Spider):
name = 'quotes'
allowed_domains = ['quotes.toscrape.com']
start_urls = ['http://quotes.toscrape.com/']
def parse(self, response):
quotes = response.css('div.quote')
for quote in quotes:
item = QuoteItem()
item['text'] = quote.css('span.text::text').get()
item['author'] = quote.css('small.author::text').get()
item['tags'] = quote.css('div.tags a.tag::text').getall()
yield item
next_page = response.css('li.next a::attr(href)').get()
if next_page is not None:
yield response.follow(next_page, self.parse)
创建 Pipeline:
编辑 pipelines.py 文件,处理和存储数据:
# quotes_spider/quotes_spider/pipelines.py
import json
from itemadapter import ItemAdapter
class JsonWriterPipeline:
def open_spider(self, spider):
self.file = open('quotes_pipeline.json', 'w')
self.file.write('[')
self.first_item = True
def close_spider(self, spider):
self.file.write(']')
self.file.close()
def process_item(self, item, spider):
# 转换为字典
line = json.dumps(ItemAdapter(item).asdict(), ensure_ascii=False)
# 添加逗号分隔(除了第一个元素)
if self.first_item:
self.first_item = False
else:
line = ',\n' + line
self.file.write(line)
return item
class RemoveTagsPipeline:
def process_item(self, item, spider):
# 只保留某些标签,例如只保留与文学相关的标签
literary_tags = ['love', 'life', 'inspirational', 'poetry', 'literature']
filtered_tags = [tag for tag in item['tags'] if tag in literary_tags]
if filtered_tags:
item['tags'] = filtered_tags
else:
item['tags'] = ['unclassified']
return item
启用 Pipeline:
编辑 settings.py 文件,启用你的 Pipelines:
# quotes_spider/quotes_spider/settings.py
# ...其他设置...
# 配置项目 Pipeline
ITEM_PIPELINES = {
'quotes_spider.pipelines.RemoveTagsPipeline': 300,
'quotes_spider.pipelines.JsonWriterPipeline': 800,
}
7.5 中间件与请求处理
Spider 中间件:
编辑 middlewares.py 文件,自定义 Spider 中间件:
# quotes_spider/quotes_spider/middlewares.py
from scrapy import signals
class CustomSpiderMiddleware:
@classmethod
def from_crawler(cls, crawler):
middleware = cls()
crawler.signals.connect(middleware.spider_opened, signal=signals.spider_opened)
return middleware
def process_spider_input(self, response, spider):
# 处理响应
return None
def process_spider_output(self, response, result, spider):
# 处理爬虫的输出
for item in result:
# 例如,只选择有特定标签的引用
if isinstance(item, dict) and 'tags' in item and 'love' in item['tags']:
yield item
else:
yield item
def process_spider_exception(self, response, exception, spider):
# 处理爬虫异常
pass
def process_start_requests(self, start_requests, spider):
# 处理起始请求
for request in start_requests:
yield request
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
下载器中间件:
自定义下载器中间件:
# quotes_spider/quotes_spider/middlewares.py
import random
class RandomUserAgentMiddleware:
def __init__(self, user_agents):
self.user_agents = user_agents
@classmethod
def from_crawler(cls, crawler):
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
]
return cls(user_agents)
def process_request(self, request, spider):
# 随机选择一个User-Agent
request.headers['User-Agent'] = random.choice(self.user_agents)
return None
启用中间件:
编辑 settings.py 文件,启用自定义中间件:
# quotes_spider/quotes_spider/settings.py
# ...其他设置...
# 启用Spider中间件
SPIDER_MIDDLEWARES = {
'quotes_spider.middlewares.CustomSpiderMiddleware': 543,
}
# 启用下载器中间件
DOWNLOADER_MIDDLEWARES = {
'quotes_spider.middlewares.RandomUserAgentMiddleware': 400,
}
7.6 使用选择器
CSS 选择器:
# 在爬虫中使用 CSS 选择器
def parse(self, response):
# 选择所有引用
quotes = response.css('div.quote')
# 选择第一个引用
first_quote = response.css('div.quote')[0]
# 提取文本
text = first_quote.css('span.text::text').get()
# 提取属性
href = response.css('li.next a::attr(href)').get()
# 提取多个文本
all_tags = first_quote.css('a.tag::text').getall()
XPath 选择器:
# 在爬虫中使用 XPath 选择器
def parse(self, response):
# 选择所有引用
quotes = response.xpath('//div[@class="quote"]')
# 选择第一个引用
first_quote = quotes[0]
# 提取文本
text = first_quote.xpath('./span[@class="text"]/text()').get()
# 提取属性
href = response.xpath('//li[@class="next"]/a/@href').get()
# 提取多个文本
all_tags = first_quote.xpath('./div[@class="tags"]/a/text()').getall()
组合使用:
def parse(self, response):
# 使用 CSS 选择器找到引用,然后使用 XPath 提取内容
quotes = response.css('div.quote')
for quote in quotes:
text = quote.xpath('./span[@class="text"]/text()').get()
author = quote.css('small.author::text').get()
yield {
'text': text,
'author': author
}
7.7 处理表单和登录
示例:爬取需要登录的网站
# quotes_spider/quotes_spider/spiders/login_spider.py
import scrapy
from scrapy.http import FormRequest
class LoginSpider(scrapy.Spider):
name = 'login_spider'
start_urls = ['http://quotes.toscrape.com/login']
def parse(self, response):
# 获取CSRF令牌
csrf_token = response.css('input[name="csrf_token"]::attr(value)').get()
# 提交登录表单
return FormRequest.from_response(
response,
formdata={
'csrf_token': csrf_token,
'username': 'your_username',
'password': 'your_password'
},
callback=self.after_login
)
def after_login(self, response):
# 检查是否登录成功
if 'logout' in response.text:
self.log('登录成功!')
# 开始爬取需要登录的内容
yield scrapy.Request(
'http://quotes.toscrape.com/protected-quotes',
callback=self.parse_protected
)
else:
self.log('登录失败.')
def parse_protected(self, response):
# 爬取需要登录后才能访问的内容
quotes = response.css('div.quote')
for quote in quotes:
yield {
'text': quote.css('span.text::text').get(),
'author': quote.css('small.author::text').get(),
'tags': quote.css('div.tags a.tag::text').getall()
}
7.8 使用 Item Loader
Item Loader 提供了更灵活的方式来填充 Item 对象:
# quotes_spider/quotes_spider/loaders.py
from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, MapCompose, Join
from quotes_spider.items import QuoteItem
class QuoteLoader(ItemLoader):
default_item_class = QuoteItem
# 默认输出处理器
default_output_processor = TakeFirst()
# 文本处理
text_in = MapCompose(str.strip, lambda x: x.strip('"\''))
# 作者处理
author_in = MapCompose(str.strip)
# 标签处理
tags_in = MapCompose(str.strip, str.lower)
tags_out = Join(',') # 使用逗号连接标签
在爬虫中使用 Item Loader:
# quotes_spider/quotes_spider/spiders/quotes.py
import scrapy
from quotes_spider.loaders import QuoteLoader
class QuotesSpider(scrapy.Spider):
name = 'quotes'
allowed_domains = ['quotes.toscrape.com']
start_urls = ['http://quotes.toscrape.com/']
def parse(self, response):
quotes = response.css('div.quote')
for quote in quotes:
loader = QuoteLoader(selector=quote)
# 添加字段
loader.add_css('text', 'span.text::text')
loader.add_css('author', 'small.author::text')
loader.add_css('tags', 'a.tag::text')
# 加载Item
yield loader.load_item()
next_page = response.css('li.next a::attr(href)').get()
if next_page is not None:
yield response.follow(next_page, self.parse)
7.9 实战:构建书籍爬虫
# books_spider/books_spider/spiders/books.py
import scrapy
from books_spider.items import BookItem
class BooksSpider(scrapy.Spider):
name = 'books'
allowed_domains = ['books.toscrape.com']
start_urls = ['http://books.toscrape.com/']
def parse(self, response):
# 提取所有书籍链接
books = response.css('article.product_pod h3 a::attr(href)').getall()
# 遍历每本书的链接
for book_url in books:
# 拼接完整URL
book_url = response.urljoin(book_url)
yield scrapy.Request(book_url, callback=self.parse_book)
# 处理分页
next_page = response.css('li.next a::attr(href)').get()
if next_page:
next_page_url = response.urljoin(next_page)
yield scrapy.Request(next_page_url, callback=self.parse)
def parse_book(self, response):
# 创建BookItem
book = BookItem()
# 提取书籍信息
book['title'] = response.css('div.product_main h1::text').get()
book['price'] = response.css('p.price_color::text').get()
book['category'] = response.css('ul.breadcrumb li:nth-child(3) a::text').get()
book['availability'] = response.css('p.availability::text').getall()[1].strip()
book['rating'] = response.css('p.star-rating::attr(class)').get().split()[1]
book['description'] = response.css('div#product_description + p::text').get()
book['url'] = response.url
# 提取图片URL
image_rel_url = response.css('div.item.active img::attr(src)').get()
book['image_url'] = response.urljoin(image_rel_url) if image_rel_url else None
yield book
创建 BookItem:
# books_spider/books_spider/items.py
import scrapy
class BookItem(scrapy.Item):
title = scrapy.Field()
price = scrapy.Field()
category = scrapy.Field()
availability = scrapy.Field()
rating = scrapy.Field()
description = scrapy.Field()
url = scrapy.Field()
image_url = scrapy.Field()
创建数据处理 Pipeline:
# books_spider/books_spider/pipelines.py
import re
from itemadapter import ItemAdapter
class BooksPipeline:
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 清理价格,只保留数字
price = adapter.get('price')
if price:
# 提取数字部分,例如从 "£51.77" 提取 "51.77"
price_value = re.search(r'[0-9.]+', price)
if price_value:
adapter['price'] = float(price_value.group())
# 标准化评分
rating = adapter.get('rating')
if rating:
# 将评星文本(One, Two, Three, Four, Five)转换为数字
rating_map = {
'One': 1,
'Two': 2,
'Three': 3,
'Four': 4,
'Five': 5
}
adapter['rating'] = rating_map.get(rating, 0)
# 处理库存信息
availability = adapter.get('availability')
if availability:
# 提取数字,例如从 "In stock (19 available)" 提取 "19"
stock_match = re.search(r'(\d+) available', availability)
if stock_match:
adapter['availability'] = int(stock_match.group(1))
else:
adapter['availability'] = 0
return item
激活 Pipeline:
# books_spider/books_spider/settings.py
ITEM_PIPELINES = {
'books_spider.pipelines.BooksPipeline': 300,
}
运行爬虫:
cd books_spider
scrapy crawl books -o books.json
8. 数据存储
8.1 存储为不同格式
JSON 格式:
import json
# 将数据保存为JSON
def save_to_json(data, filename):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
# 从JSON加载数据
def load_from_json(filename):
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
# 使用示例
data = [{'name': '张三', 'age': 30}, {'name': '李四', 'age': 25}]
save_to_json(data, 'people.json')
loaded_data = load_from_json('people.json')
CSV 格式:
import csv
# 将数据保存为CSV
def save_to_csv(data, filename, fieldnames=None):
if not fieldnames and data:
# 自动获取字段名
fieldnames = data[0].keys()
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
# 从CSV加载数据
def load_from_csv(filename):
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
return list(reader)
# 使用示例
data = [{'name': '张三', 'age': '30'}, {'name': '李四', 'age': '25'}]
save_to_csv(data, 'people.csv')
loaded_data = load_from_csv('people.csv')
Excel 格式:
import pandas as pd
# 将数据保存为Excel
def save_to_excel(data, filename, sheet_name='Sheet1'):
df = pd.DataFrame(data)
df.to_excel(filename, sheet_name=sheet_name, index=False)
# 从Excel加载数据
def load_from_excel(filename, sheet_name='Sheet1'):
df = pd.read_excel(filename, sheet_name=sheet_name)
return df.to_dict('records')
# 使用示例
data = [{'name': '张三', 'age': 30}, {'name': '李四', 'age': 25}]
save_to_excel(data, 'people.xlsx')
loaded_data = load_from_excel('people.xlsx')
XML 格式:
import xml.etree.ElementTree as ET
from xml.dom import minidom
# 将数据保存为XML
def save_to_xml(data, filename, root_element='data', item_element='item'):
root = ET.Element(root_element)
for item in data:
item_elem = ET.SubElement(root, item_element)
for key, value in item.items():
child = ET.SubElement(item_elem, key)
child.text = str(value)
# 格式化XML
rough_string = ET.tostring(root, 'utf-8')
reparsed = minidom.parseString(rough_string)
pretty_xml = reparsed.toprettyxml(indent=" ")
with open(filename, 'w', encoding='utf-8') as f:
f.write(pretty_xml)
# 从XML加载数据
def load_from_xml(filename, item_element='item'):
tree = ET.parse(filename)
root = tree.getroot()
data = []
for item_elem in root.findall(item_element):
item_data = {}
for child in item_elem:
item_data[child.tag] = child.text
data.append(item_data)
return data
# 使用示例
data = [{'name': '张三', 'age': 30}, {'name': '李四', 'age': 25}]
save_to_xml(data, 'people.xml')
loaded_data = load_from_xml('people.xml')
8.2 数据库存储
SQLite:
import sqlite3
# 连接到SQLite数据库
def create_connection(db_file):
conn = None
try:
conn = sqlite3.connect(db_file)
return conn
except sqlite3.Error as e:
print(e)
return conn
# 创建表
def create_table(conn, create_table_sql):
try:
c = conn.cursor()
c.execute(create_table_sql)
except sqlite3.Error as e:
print(e)
# 插入多条数据
def insert_many_records(conn, table_name, records, fields):
placeholders = ', '.join(['?' for _ in fields])
fields_str = ', '.join(fields)
sql = f'INSERT INTO {table_name} ({fields_str}) VALUES ({placeholders})'
try:
c = conn.cursor()
# 将字典列表转换为值元组列表
values = [[record.get(field) for field in fields] for record in records]
c.executemany(sql, values)
conn.commit()
return c.rowcount
except sqlite3.Error as e:
print(e)
return 0
# 查询数据
def select_all_records(conn, table_name):
sql = f'SELECT * FROM {table_name}'
try:
c = conn.cursor()
c.execute(sql)
columns = [column[0] for column in c.description]
result = []
for row in c.fetchall():
result.append(dict(zip(columns, row)))
return result
except sqlite3.Error as e:
print(e)
return []
# 使用示例
def main():
database = "pythonsqlite.db"
# 创建人员表的SQL语句
sql_create_people_table = """
CREATE TABLE IF NOT EXISTS people (
id integer PRIMARY KEY,
name text NOT NULL,
age integer
);
"""
# 创建连接
conn = create_connection(database)
if conn is not None:
# 创建表
create_table(conn, sql_create_people_table)
# 准备数据
people = [
{'name': '张三', 'age': 30},
{'name': '李四', 'age': 25},
{'name': '王五', 'age': 40}
]
# 插入数据
fields = ['name', 'age']
rows_affected = insert_many_records(conn, 'people', people, fields)
print(f"插入了 {rows_affected} 条记录。")
# 查询所有记录
all_people = select_all_records(conn, 'people')
print("查询结果:")
for person in all_people:
print(f"ID: {person['id']}, 姓名: {person['name']}, 年龄: {person['age']}")
# 关闭连接
conn.close()
else:
print("无法创建数据库连接。")
if __name__ == '__main__':
main()
MySQL/MariaDB:
import mysql.connector
from mysql.connector import Error
# 连接到MySQL数据库
def create_connection(host_name, user_name, user_password, db_name=None):
connection = None
try:
connection = mysql.connector.connect(
host=host_name,
user=user_name,
passwd=user_password,
database=db_name
)
print("MySQL数据库连接成功")
except Error as e:
print(f"MySQL连接错误: {e}")
return connection
# 创建数据库
def create_database(connection, query):
cursor = connection.cursor()
try:
cursor.execute(query)
print("数据库创建成功")
except Error as e:
print(f"创建数据库错误: {e}")
# 执行查询
def execute_query(connection, query):
cursor = connection.cursor()
try:
cursor.execute(query)
connection.commit()
print("查询成功执行")
except Error as e:
print(f"查询执行错误: {e}")
# 执行读取查询
def execute_read_query(connection, query):
cursor = connection.cursor(dictionary=True)
result = None
try:
cursor.execute(query)
result = cursor.fetchall()
return result
except Error as e:
print(f"读取数据错误: {e}")
return None
# 使用示例
def main():
# 连接参数
host = "localhost"
user = "root"
password = "your_password"
# 连接到MySQL服务器
connection = create_connection(host, user, password)
if connection:
# 创建数据库
create_database_query = "CREATE DATABASE IF NOT EXISTS web_crawler"
create_database(connection, create_database_query)
# 关闭之前的连接
connection.close()
# 连接到新创建的数据库
connection = create_connection(host, user, password, "web_crawler")
if connection:
# 创建表
create_table_query = """
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2),
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
execute_query(connection, create_table_query)
# 插入数据
insert_data_query = """
INSERT INTO products (name, price, description)
VALUES
('笔记本电脑', 5999.99, '高性能笔记本电脑,适合办公和游戏'),
('智能手机', 3999.00, '最新款智能手机,支持5G网络'),
('蓝牙耳机', 299.50, '无线蓝牙耳机,音质清晰');
"""
execute_query(connection, insert_data_query)
# 查询数据
select_query = "SELECT * FROM products"
products = execute_read_query(connection, select_query)
if products:
print("所有产品:")
for product in products:
print(f"ID: {product['id']}")
print(f"名称: {product['name']}")
print(f"价格: {product['price']}")
print(f"描述: {product['description']}")
print(f"创建时间: {product['created_at']}")
print("-" * 30)
# 关闭连接
connection.close()
print("MySQL连接已关闭")
if __name__ == "__main__":
main()
MongoDB:
from pymongo import MongoClient
# 连接MongoDB
def connect_mongodb(host='localhost', port=27017, db_name='web_crawler'):
try:
client = MongoClient(host, port)
db = client[db_name]
print(f"MongoDB连接成功: {db_name}")
return db
except Exception as e:
print(f"MongoDB连接错误: {e}")
return None
# 插入多条文档
def insert_many_documents(db, collection_name, documents):
try:
collection = db[collection_name]
result = collection.insert_many(documents)
return len(result.inserted_ids)
except Exception as e:
print(f"插入文档错误: {e}")
return 0
# 查询所有文档
def find_all_documents(db, collection_name, query=None, projection=None):
try:
collection = db[collection_name]
return list(collection.find(query, projection))
except Exception as e:
print(f"查询文档错误: {e}")
return []
# 查询符合条件的文档
def find_documents(db, collection_name, query):
try:
collection = db[collection_name]
return list(collection.find(query))
except Exception as e:
print(f"查询文档错误: {e}")
return []
# 更新文档
def update_documents(db, collection_name, query, update):
try:
collection = db[collection_name]
result = collection.update_many(query, update)
return result.modified_count
except Exception as e:
print(f"更新文档错误: {e}")
return 0
# 删除文档
def delete_documents(db, collection_name, query):
try:
collection = db[collection_name]
result = collection.delete_many(query)
return result.deleted_count
except Exception as e:
print(f"删除文档错误: {e}")
return 0
# 使用示例
def main():
# 连接MongoDB
db = connect_mongodb()
if db:
# 准备数据
products = [
{
"name": "笔记本电脑",
"brand": "联想",
"price": 5999.99,
"specs": {
"cpu": "Intel i5",
"ram": "8GB",
"storage": "512GB SSD"
},
"in_stock": True
},
{
"name": "智能手机",
"brand": "小米",
"price": 2999.00,
"specs": {
"screen": "6.5英寸",
"camera": "4800万像素",
"battery": "4500mAh"
},
"in_stock": True
},
{
"name": "智能手表",
"brand": "华为",
"price": 1499.00,
"specs": {
"screen": "1.3英寸",
"battery": "14天续航",
"water_resistant": True
},
"in_stock": False
}
]
# 插入数据
collection_name = "products"
count = insert_many_documents(db, collection_name, products)
print(f"插入了 {count} 条文档")
# 查询所有文档
all_products = find_all_documents(db, collection_name)
print("\n所有产品:")
for product in all_products:
print(f"ID: {product['_id']}")
print(f"名称: {product['name']}")
print(f"品牌: {product['brand']}")
print(f"价格: {product['price']}")
print("-" * 30)
# 按条件查询
in_stock_products = find_documents(db, collection_name, {"in_stock": True})
print(f"\n有库存的产品数量: {len(in_stock_products)}")
# 更新文档
update_count = update_documents(
db,
collection_name,
{"name": "智能手表"},
{"$set": {"in_stock": True, "price": 1299.00}}
)
print(f"更新了 {update_count} 条文档")
# 再次查询手表
watches = find_documents(db, collection_name, {"name": "智能手表"})
if watches:
print(f"\n更新后的手表信息:")
watch = watches[0]
print(f"名称: {watch['name']}")
print(f"价格: {watch['price']}")
print(f"库存状态: {watch['in_stock']}")
# 删除文档
delete_count = delete_documents(db, collection_name, {"brand": "华为"})
print(f"删除了 {delete_count} 条文档")
# 查询剩余文档数
remaining = find_all_documents(db, collection_name)
print(f"剩余文档数: {len(remaining)}")
if __name__ == "__main__":
main()
8.3 云存储与数据托管服务
存储到 Google Sheets:
import gspread
from oauth2client.service_account import ServiceAccountCredentials
def save_to_google_sheets(data, sheet_name):
# 授权访问Google API
scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
credentials = ServiceAccountCredentials.from_json_keyfile_name('your-credentials.json', scope)
gc = gspread.authorize(credentials)
try:
# 尝试打开现有的工作表
sheet = gc.open(sheet_name)
except gspread.exceptions.SpreadsheetNotFound:
# 如果不存在则创建新的
sheet = gc.create(sheet_name)
# 共享给您的Gmail账户
sheet.share('your-email@gmail.com', perm_type='user', role='writer')
# 获取第一个工作表
worksheet = sheet.get_worksheet(0)
# 如果数据为空,则返回
if not data:
return
# 获取所有字段名称
headers = list(data[0].keys())
# 清空工作表并设置标题行
worksheet.clear()
worksheet.append_row(headers)
# 添加数据行
for item in data:
row = [item.get(header, '') for header in headers]
worksheet.append_row(row)
print(f"数据已保存到Google Sheets: {sheet_name}")
print(f"查看链接: {sheet.url}")
# 使用示例
data = [
{'name': '张三', 'age': 30, 'city': '北京'},
{'name': '李四', 'age': 25, 'city': '上海'},
{'name': '王五', 'age': 40, 'city': '广州'}
]
save_to_google_sheets(data, 'User Data')
存储到 AWS S3:
import boto3
import json
import tempfile
from botocore.exceptions import ClientError
def save_to_s3(data, bucket_name, file_name, file_format='json'):
"""
将数据保存到AWS S3
:param data: 要保存的数据
:param bucket_name: S3桶名称
:param file_name: 保存的文件名
:param file_format: 文件格式 (json 或 csv)
:return: 成功返回True,失败返回False
"""
# 创建S3客户端
s3_client = boto3.client('s3')
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp:
if file_format == 'json':
json.dump(data, temp, ensure_ascii=False, indent=4)
elif file_format == 'csv':
import csv
fieldnames = data[0].keys() if data else []
writer = csv.DictWriter(temp, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
else:
print(f"不支持的文件格式: {file_format}")
return False
# 上传文件到S3
with open(temp.name, 'rb') as file:
s3_client.upload_fileobj(file, bucket_name, file_name)
print(f"数据已上传到S3: s3://{bucket_name}/{file_name}")
return True
except ClientError as e:
print(f"上传到S3时出错: {e}")
return False
# 使用示例
data = [
{'id': 1, 'name': '笔记本电脑', 'price': 5999.99},
{'id': 2, 'name': '智能手机', 'price': 3999.00},
{'id': 3, 'name': '蓝牙耳机', 'price': 299.50}
]
# 上传为JSON
save_to_s3(data, 'your-bucket-name', 'products.json')
# 上传为CSV
save_to_s3(data, 'your-bucket-name', 'products.csv', 'csv')
使用 Firebase Realtime Database:
import firebase_admin
from firebase_admin import credentials
from firebase_admin import db
import datetime
def initialize_firebase(credentials_path, database_url):
"""初始化Firebase连接"""
try:
cred = credentials.Certificate(credentials_path)
firebase_admin.initialize_app(cred, {
'databaseURL': database_url
})
print("Firebase初始化成功")
return True
except Exception as e:
print(f"Firebase初始化失败: {e}")
return False
def save_to_firebase(data, ref_path):
"""
将数据保存到Firebase Realtime Database
:param data: 要保存的数据
:param ref_path: Firebase中的引用路径
"""
try:
# 获取数据库引用
ref = db.reference(ref_path)
# 添加时间戳
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 保存数据
for i, item in enumerate(data):
item_data = dict(item)
item_data['timestamp'] = timestamp
ref.child(f"item_{i}").set(item_data)
print(f"数据已保存到Firebase: {ref_path}")
return True
except Exception as e:
print(f"保存到Firebase失败: {e}")
return False
def read_from_firebase(ref_path):
"""
从Firebase Realtime Database读取数据
:param ref_path: Firebase中的引用路径
:return: 读取的数据
"""
try:
# 获取数据库引用
ref = db.reference(ref_path)
# 获取所有数据
data = ref.get()
if data:
# 转换为列表格式
return [item for item in data.values()]
else:
return []
except Exception as e:
print(f"从Firebase读取失败: {e}")
return []
# 使用示例
# 初始化Firebase (只需要执行一次)
initialize_firebase(
'path/to/your/firebase-credentials.json',
'https://your-project-id.firebaseio.com/'
)
# 准备数据
products = [
{'name': '笔记本电脑', 'price': 5999.99, 'brand': '联想'},
{'name': '智能手机', 'price': 3999.00, 'brand': '小米'},
{'name': '蓝牙耳机', 'price': 299.50, 'brand': '华为'}
]
# 保存数据
save_to_firebase(products, 'products')
# 读取数据
retrieved_products = read_from_firebase('products')
print("\n从Firebase读取的数据:")
for product in retrieved_products:
print(f"名称: {product['name']}")
print(f"价格: {product['price']}")
print(f"品牌: {product['brand']}")
print(f"时间戳: {product['timestamp']}")
print("-" * 30)
9. 实战项目:电商数据爬虫
让我们创建一个完整的电商数据爬虫项目,爬取京东商品数据并存储到数据库中。
9.1 项目设计与需求
项目目标:
- 爬取京东特定类别的商品信息
- 获取商品标题、价格、评价数、商店名等信息
- 存储数据到MongoDB数据库
- 支持增量更新和去重
- 实现数据可视化分析
目录结构:
jd_spider/
├── spider/
│ ├── __init__.py
│ ├── config.py # 配置文件
│ ├── crawler.py # 爬虫主类
│ ├── db.py # 数据库操作
│ └── utils.py # 工具函数
├── analysis/
│ ├── __init__.py
│ ├── analyzer.py # 数据分析
│ └── visualize.py # 数据可视化
├── main.py # 主程序
└── requirements.txt # 依赖包
9.2 配置和依赖
requirements.txt:
requests==2.31.0
beautifulsoup4==4.12.2
selenium==4.15.2
pymongo==4.6.0
pandas==2.1.3
matplotlib==3.8.2
seaborn==0.13.0
webdriver-manager==4.0.1
fake-useragent==1.3.0
python-dotenv==1.0.0
config.py:
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 爬虫配置
CRAWLER_CONFIG = {
'MAX_PAGES': int(os.getenv('MAX_PAGES', 5)),
'DELAY_MIN': float(os.getenv('DELAY_MIN', 1.0)),
'DELAY_MAX': float(os.getenv('DELAY_MAX', 3.0)),
'TIMEOUT': int(os.getenv('TIMEOUT', 10)),
'RETRY_TIMES': int(os.getenv('RETRY_TIMES', 3)),
'USER_AGENT_ROTATE': os.getenv('USER_AGENT_ROTATE', 'True').lower() == 'true',
}
# 数据库配置
DB_CONFIG = {
'MONGO_URI': os.getenv('MONGO_URI', 'mongodb://localhost:27017/'),
'DB_NAME': os.getenv('DB_NAME', 'jd_products'),
'COLLECTION_NAME': os.getenv('COLLECTION_NAME', 'products'),
}
# 代理配置
PROXY_CONFIG = {
'USE_PROXY': os.getenv('USE_PROXY', 'False').lower() == 'true',
'PROXY_API': os.getenv('PROXY_API', ''),
}
# Selenium配置
SELENIUM_CONFIG = {
'USE_SELENIUM': os.getenv('USE_SELENIUM', 'False').lower() == 'true',
'HEADLESS': os.getenv('HEADLESS', 'True').lower() == 'true',
}
# 日志配置
LOG_CONFIG = {
'LEVEL': os.getenv('LOG_LEVEL', 'INFO'),
'FILE': os.getenv('LOG_FILE', 'jd_spider.log'),
}
9.3 数据库操作
db.py:
import pymongo
from spider.config import DB_CONFIG
import logging
class Database:
def __init__(self):
"""初始化数据库连接"""
self.client = None
self.db = None
self.collection = None
self.logger = logging.getLogger(__name__)
def connect(self):
"""连接到MongoDB"""
try:
self.client = pymongo.MongoClient(DB_CONFIG['MONGO_URI'])
self.db = self.client[DB_CONFIG['DB_NAME']]
self.collection = self.db[DB_CONFIG['COLLECTION_NAME']]
# 创建索引 (如果不存在)
if 'product_id_1' not in self.collection.index_information():
self.collection.create_index([('product_id', pymongo.ASCENDING)], unique=True)
self.logger.info(f"成功连接到MongoDB: {DB_CONFIG['DB_NAME']}.{DB_CONFIG['COLLECTION_NAME']}")
return True
except Exception as e:
self.logger.error(f"连接MongoDB失败: {e}")
return False
def close(self):
"""关闭数据库连接"""
if self.client:
self.client.close()
self.logger.info("MongoDB连接已关闭")
def save_product(self, product):
"""保存单个商品信息,存在则更新"""
if not self.collection:
self.logger.error("数据库未连接")
return False
try:
# 使用product_id作为唯一标识
result = self.collection.update_one(
{'product_id': product['product_id']},
{'$set': product},
upsert=True
)
if result.upserted_id:
self.logger.debug(f"新增商品: {product['title']}")
return True
elif result.modified_count > 0:
self.logger.debug(f"更新商品: {product['title']}")
return True
else:
self.logger.debug(f"商品已存在且无变化: {product['title']}")
return False
except Exception as e:
self.logger.error(f"保存商品失败: {e}")
return False
def save_products(self, products):
"""批量保存商品信息"""
if not self.collection:
self.logger.error("数据库未连接")
return 0
saved_count = 0
for product in products:
if self.save_product(product):
saved_count += 1
self.logger.info(f"已保存 {saved_count}/{len(products)} 件商品")
return saved_count
def get_product_count(self):
"""获取商品总数"""
if not self.collection:
self.logger.error("数据库未连接")
return 0
return self.collection.count_documents({})
def get_all_products(self, limit=None):
"""获取所有商品信息"""
if not self.collection:
self.logger.error("数据库未连接")
return []
cursor = self.collection.find({})
if limit:
cursor = cursor.limit(limit)
return list(cursor)
def get_products_by_category(self, category):
"""按类别获取商品信息"""
if not self.collection:
self.logger.error("数据库未连接")
return []
return list(self.collection.find({'category': category}))
def get_price_range(self):
"""获取价格范围"""
if not self.collection:
self.logger.error("数据库未连接")
return (0, 0)
pipeline = [
{
'$group': {
'_id': None,
'min_price': {'$min': '$price_number'},
'max_price': {'$max': '$price_number'}
}
}
]
result = list(self.collection.aggregate(pipeline))
if result:
return (result[0]['min_price'], result[0]['max_price'])
return (0, 0)
def get_top_shops(self, limit=10):
"""获取店铺出现次数最多的Top N"""
if not self.collection:
self.logger.error("数据库未连接")
return []
pipeline = [
{'$group': {'_id': '$shop', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
{'$limit': limit}
]
return list(self.collection.aggregate(pipeline))
9.4 爬虫实现
utils.py:
import logging
import random
import time
import re
from fake_useragent import UserAgent
from spider.config import CRAWLER_CONFIG, LOG_CONFIG
# 配置日志
def setup_logger():
logger = logging.getLogger('jd_spider')
logger.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
# 文件处理器
file_handler = logging.FileHandler(LOG_CONFIG['FILE'], encoding='utf-8')
file_handler.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
# 格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
# 随机延迟
def random_delay():
delay = random.uniform(CRAWLER_CONFIG['DELAY_MIN'], CRAWLER_CONFIG['DELAY_MAX'])
time.sleep(delay)
return delay
# 获取随机User-Agent
def get_random_ua():
try:
ua = UserAgent()
return ua.random
except:
# 如果fake-useragent失败,使用预定义的列表
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
]
return random.choice(user_agents)
# 提取数字
def extract_number(text):
if not text:
return 0
matches = re.findall(r'\d+\.?\d*', text)
if matches:
return float(matches[0])
return 0
# 清理文本
def clean_text(text):
if not text:
return ""
# 移除多余空白字符
text = re.sub(r'\s+', ' ', text).strip()
# 移除特殊字符
text = re.sub(r'[^\w\s\u4e00-\u9fff.,-]', '', text)
return text
crawler.py:
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import quote
import logging
import time
import re
import json
from spider.config import CRAWLER_CONFIG, SELENIUM_CONFIG
from spider.utils import random_delay, get_random_ua, extract_number, clean_text, setup_logger
class JDCrawler:
def __init__(self):
"""初始化京东爬虫"""
self.logger = logging.getLogger('jd_spider')
self.session = requests.Session()
self.driver = None
self.setup_session()
if SELENIUM_CONFIG['USE_SELENIUM']:
self.setup_selenium()
def setup_session(self):
"""配置请求会话"""
# 设置请求头
self.session.headers.update({
'User-Agent': get_random_ua(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://www.jd.com/',
})
def setup_selenium(self):
"""配置Selenium WebDriver"""
chrome_options = Options()
if SELENIUM_CONFIG['HEADLESS']:
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument(f'user-agent={get_random_ua()}')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
# 禁用图片加载以提高速度
chrome_prefs = {"profile.managed_default_content_settings.images": 2}
chrome_options.add_experimental_option("prefs", chrome_prefs)
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=chrome_options)
self.driver.set_page_load_timeout(CRAWLER_CONFIG['TIMEOUT'])
self.logger.info("Selenium WebDriver已初始化")
def close(self):
"""关闭资源"""
if self.driver:
self.driver.quit()
self.logger.info("Selenium WebDriver已关闭")
def search_products(self, keyword, max_pages=None):
"""
搜索商品
:param keyword: 搜索关键词
:param max_pages: 最大爬取页数
:return: 商品列表
"""
if max_pages is None:
max_pages = CRAWLER_CONFIG['MAX_PAGES']
all_products = []
for page in range(1, max_pages + 1):
try:
self.logger.info(f"正在爬取关键词 '{keyword}' 的第 {page} 页")
# 构建搜索URL
url = f'https://search.jd.com/Search?keyword={quote(keyword)}&page={page}'
if SELENIUM_CONFIG['USE_SELENIUM']:
products_on_page = self._parse_with_selenium(url)
else:
products_on_page = self._parse_with_requests(url)
if not products_on_page:
self.logger.warning(f"第 {page} 页没有找到商品,停止爬取")
break
self.logger.info(f"第 {page} 页找到 {len(products_on_page)} 个商品")
all_products.extend(products_on_page)
# 随机延迟
delay = random_delay()
self.logger.debug(f"随机延迟 {delay:.2f} 秒")
except Exception as e:
self.logger.error(f"爬取第 {page} 页时出错: {e}")
break
self.logger.info(f"总共爬取了 {len(all_products)} 个商品")
return all_products
def _parse_with_requests(self, url):
"""使用requests解析页面"""
try:
# 每次请求前更新User-Agent
if CRAWLER_CONFIG['USER_AGENT_ROTATE']:
self.session.headers.update({'User-Agent': get_random_ua()})
response = self.session.get(url, timeout=CRAWLER_CONFIG['TIMEOUT'])
response.raise_for_status()
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 解析商品列表
return self._extract_products_from_soup(soup)
except requests.RequestException as e:
self.logger.error(f"请求错误: {e}")
return []
def _parse_with_selenium(self, url):
"""使用Selenium解析页面"""
try:
self.driver.get(url)
# 等待商品列表加载完成
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div.gl-i-wrap"))
)
# 滚动页面以加载更多内容
self._scroll_page()
# 获取页面源码
html = self.driver.page_source
soup = BeautifulSoup(html, 'html.parser')
# 解析商品列表
return self._extract_products_from_soup(soup)
except Exception as e:
self.logger.error(f"Selenium解析错误: {e}")
return []
def _scroll_page(self):
"""滚动页面加载所有内容"""
try:
# 获取当前页面高度
last_height = self.driver.execute_script("return document.body.scrollHeight")
while True:
# 滚动到页面底部
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# 等待页面加载
time.sleep(2)
# 获取新的页面高度
new_height = self.driver.execute_script("return document.body.scrollHeight")
# 如果高度没有变化,说明已经到底部了
if new_height == last_height:
break
last_height = new_height
except Exception as e:
self.logger.error(f"滚动页面时出错: {e}")
def _extract_products_from_soup(self, soup):
"""从BeautifulSoup对象中提取商品信息"""
products = []
# 查找所有商品项
product_items = soup.select('li.gl-item')
for item in product_items:
try:
# 提取商品ID
product_id = item.get('data-sku')
if not product_id:
continue
# 提取商品价格
price_element = item.select_one('div.p-price strong i')
price = price_element.text if price_element else 'N/A'
price_number = extract_number(price)
# 提取商品标题
title_element = item.select_one('div.p-name em')
title = title_element.get_text(' ', strip=True) if title_element else 'N/A'
title = clean_text(title)
# 提取店铺名称
shop_element = item.select_one('div.p-shop span a')
shop = shop_element.get_text(strip=True) if shop_element else 'N/A'
shop = clean_text(shop)
# 提取评论数
commit_element = item.select_one('div.p-commit strong a')
commit_text = commit_element.text if commit_element else '0'
commits = extract_number(commit_text)
# 提取商品链接
link = f"https://item.jd.com/{product_id}.html"
# 获取当前时间
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
# 构建商品信息
product_info = {
'product_id': product_id,
'title': title,
'price': price,
'price_number': price_number,
'shop': shop,
'commits': int(commits),
'link': link,
'timestamp': timestamp
}
products.append(product_info)
except Exception as e:
self.logger.error(f"解析商品时出错: {e}")
continue
return products
def get_product_details(self, product_id):
"""获取商品详细信息"""
try:
url = f"https://item.jd.com/{product_id}.html"
if SELENIUM_CONFIG['USE_SELENIUM']:
self.driver.get(url)
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, "detail"))
)
html = self.driver.page_source
else:
response = self.session.get(url, timeout=CRAWLER_CONFIG['TIMEOUT'])
response.raise_for_status()
html = response.text
soup = BeautifulSoup(html, 'html.parser')
# 提
此回复受到限制,因为已达到允许的最大输出 Token 数。
让我们继续完成电商数据爬虫项目的实现。
9.4 爬虫实现(续)
crawler.py(继续):
def get_product_details(self, product_id):
"""获取商品详细信息"""
try:
url = f"https://item.jd.com/{product_id}.html"
if SELENIUM_CONFIG['USE_SELENIUM']:
self.driver.get(url)
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, "detail"))
)
html = self.driver.page_source
else:
response = self.session.get(url, timeout=CRAWLER_CONFIG['TIMEOUT'])
response.raise_for_status()
html = response.text
soup = BeautifulSoup(html, 'html.parser')
# 提取商品详情
details = {}
# 标题
title_element = soup.select_one('.sku-name')
details['title'] = title_element.get_text(strip=True) if title_element else 'N/A'
# 价格 (通过JS加载,可能无法直接获取)
details['price'] = 'N/A'
# 品牌
brand_element = soup.select_one('#parameter-brand li a')
details['brand'] = brand_element.get_text(strip=True) if brand_element else 'N/A'
# 商品参数
params = {}
param_elements = soup.select('#parameter2 li')
for param in param_elements:
text = param.get_text(strip=True)
if ':' in text:
key, value = text.split(':', 1)
params[key.strip()] = value.strip()
details['parameters'] = params
# 商品描述
desc_element = soup.select_one('.news')
details['description'] = desc_element.get_text(strip=True) if desc_element else 'N/A'
# 获取评论数据
details['comments'] = self._get_product_comments(product_id)
# 添加商品ID和URL
details['product_id'] = product_id
details['url'] = url
return details
except Exception as e:
self.logger.error(f"获取商品详情时出错 (ID: {product_id}): {e}")
return None
def _get_product_comments(self, product_id, page=1, page_size=10):
"""获取商品评论数据"""
try:
# 构建评论API URL
url = f"https://club.jd.com/comment/productPageComments.action?productId={product_id}&score=0&sortType=5&page={page}&pageSize={page_size}"
# 设置Referer和头信息
headers = {
'Referer': f'https://item.jd.com/{product_id}.html',
'User-Agent': get_random_ua()
}
response = requests.get(url, headers=headers, timeout=CRAWLER_CONFIG['TIMEOUT'])
response.raise_for_status()
# 解析JSON响应
comment_data = response.json()
return {
'count': comment_data.get('productCommentSummary', {}).get('commentCount', 0),
'good_count': comment_data.get('productCommentSummary', {}).get('goodCount', 0),
'poor_count': comment_data.get('productCommentSummary', {}).get('poorCount', 0),
'average_score': comment_data.get('productCommentSummary', {}).get('averageScore', 0),
'comments': comment_data.get('comments', [])[:5] # 只保留前5条评论
}
except Exception as e:
self.logger.error(f"获取商品评论时出错 (ID: {product_id}): {e}")
return {
'count': 0,
'good_count': 0,
'poor_count': 0,
'average_score': 0,
'comments': []
}
def get_category_products(self, category_id, max_pages=None):
"""爬取特定分类下的商品"""
if max_pages is None:
max_pages = CRAWLER_CONFIG['MAX_PAGES']
all_products = []
for page in range(1, max_pages + 1):
try:
self.logger.info(f"正在爬取分类 ID '{category_id}' 的第 {page} 页")
# 构建分类URL (页码*2-1为京东的分页规则)
url = f'https://list.jd.com/list.html?cat={category_id}&page={2*page-1}'
if SELENIUM_CONFIG['USE_SELENIUM']:
products_on_page = self._parse_with_selenium(url)
else:
products_on_page = self._parse_with_requests(url)
if not products_on_page:
self.logger.warning(f"第 {page} 页没有找到商品,停止爬取")
break
# 添加分类信息
for product in products_on_page:
product['category_id'] = category_id
self.logger.info(f"第 {page} 页找到 {len(products_on_page)} 个商品")
all_products.extend(products_on_page)
# 随机延迟
delay = random_delay()
self.logger.debug(f"随机延迟 {delay:.2f} 秒")
except Exception as e:
self.logger.error(f"爬取分类第 {page} 页时出错: {e}")
break
self.logger.info(f"总共爬取了 {len(all_products)} 个商品")
return all_products
9.5 数据分析部分
analyzer.py:
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from spider.db import Database
class ProductAnalyzer:
def __init__(self):
"""初始化数据分析器"""
self.logger = logging.getLogger('jd_spider')
self.db = Database()
self.db.connect()
self.data = None
def load_data(self, force_reload=False):
"""从数据库加载数据"""
if self.data is None or force_reload:
products = self.db.get_all_products()
if products:
self.data = pd.DataFrame(products)
# 转换价格为数值类型
if 'price_number' in self.data.columns:
self.data['price_number'] = pd.to_numeric(self.data['price_number'], errors='coerce')
# 转换时间戳
if 'timestamp' in self.data.columns:
self.data['timestamp'] = pd.to_datetime(self.data['timestamp'], errors='coerce')
self.logger.info(f"已加载 {len(self.data)} 条商品数据")
else:
self.logger.warning("没有数据可加载")
self.data = pd.DataFrame()
return self.data
def get_basic_stats(self):
"""获取基本统计信息"""
if self.data is None or self.data.empty:
self.load_data()
if self.data.empty:
return {}
stats = {
'total_products': len(self.data),
'unique_shops': self.data['shop'].nunique(),
'avg_price': self.data['price_number'].mean(),
'median_price': self.data['price_number'].median(),
'min_price': self.data['price_number'].min(),
'max_price': self.data['price_number'].max(),
'price_std': self.data['price_number'].std(),
'avg_commits': self.data['commits'].mean(),
}
# 添加最新和最早爬取日期
if 'timestamp' in self.data.columns:
stats['latest_date'] = self.data['timestamp'].max()
stats['earliest_date'] = self.data['timestamp'].min()
return stats
def get_price_distribution(self, bins=10):
"""获取价格分布"""
if self.data is None or self.data.empty:
self.load_data()
if self.data.empty:
return {}
# 计算价格区间
price_bins = pd.cut(self.data['price_number'], bins=bins)
price_dist = price_bins.value_counts().sort_index()
# 转换为dict
price_ranges = [str(interval) for interval in price_dist.index]
counts = price_dist.values.tolist()
return {
'price_ranges': price_ranges,
'counts': counts,
'bin_edges': price_bins.cat.categories.tolist()
}
def get_top_shops(self, limit=10):
"""获取店铺商品数量排名"""
if self.data is None or self.data.empty:
self.load_data()
if self.data.empty:
return []
shop_counts = self.data['shop'].value_counts().head(limit)
return [
{'shop': shop, 'count': count}
for shop, count in shop_counts.items()
]
def get_commit_vs_price(self):
"""分析评论数与价格的关系"""
if self.data is None or self.data.empty:
self.load_data()
if self.data.empty:
return {}
# 确保数据类型正确
self.data['commits'] = pd.to_numeric(self.data['commits'], errors='coerce')
# 计算评论数和价格的相关性
correlation = self.data['commits'].corr(self.data['price_number'])
# 为可视化准备数据
data_for_plot = self.data[['price_number', 'commits']].copy()
# 为了更好的可视化,可以对评论数进行对数转换
data_for_plot['log_commits'] = np.log1p(data_for_plot['commits'])
return {
'correlation': correlation,
'data': data_for_plot.to_dict(orient='records'),
'price_vs_commits': self.data[['price_number', 'commits']].values.tolist()
}
def analyze_titles(self):
"""分析商品标题"""
if self.data is None or self.data.empty:
self.load_data()
if self.data.empty:
return {}
# 标题长度分析
self.data['title_length'] = self.data['title'].astype(str).apply(len)
# 标题关键词分析 (简化版,真实场景可能需要NLP库)
title_words = []
for title in self.data['title']:
if isinstance(title, str):
words = title.split()
title_words.extend(words)
# 计算词频
word_counts = pd.Series(title_words).value_counts().head(20)
return {
'avg_title_length': self.data['title_length'].mean(),
'max_title_length': self.data['title_length'].max(),
'min_title_length': self.data['title_length'].min(),
'title_length_distribution': self.data['title_length'].value_counts().sort_index().to_dict(),
'top_words': word_counts.to_dict()
}
def close(self):
"""关闭数据库连接"""
if self.db:
self.db.close()
visualize.py:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from matplotlib.font_manager import FontProperties
import logging
import os
from analyzer import ProductAnalyzer
# 配置中文字体
try:
font = FontProperties(fname=r'C:\Windows\Fonts\simhei.ttf')
except:
# 对于Linux/Mac,尝试使用系统字体
font = FontProperties()
class DataVisualizer:
def __init__(self, analyzer=None, output_dir='./output'):
"""初始化数据可视化器"""
self.logger = logging.getLogger('jd_spider')
if analyzer is None:
self.analyzer = ProductAnalyzer()
else:
self.analyzer = analyzer
self.output_dir = output_dir
# 创建输出目录
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
# 设置可视化样式
sns.set(style="whitegrid")
plt.rcParams['figure.figsize'] = (12, 8)
def plot_price_distribution(self, bins=20, save=True):
"""绘制价格分布直方图"""
data = self.analyzer.load_data()
if data.empty:
self.logger.warning("没有数据可用于可视化")
return None
# 创建图形
plt.figure(figsize=(12, 8))
# 绘制价格分布直方图
sns.histplot(data=data, x='price_number', bins=bins, kde=True)
plt.title('商品价格分布', fontproperties=font, fontsize=16)
plt.xlabel('价格 (元)', fontproperties=font, fontsize=12)
plt.ylabel('商品数量', fontproperties=font, fontsize=12)
# 添加一些统计信息文本
avg_price = data['price_number'].mean()
median_price = data['price_number'].median()
plt.axvline(avg_price, color='r', linestyle='--', label=f'平均价: {avg_price:.2f}元')
plt.axvline(median_price, color='g', linestyle='--', label=f'中位价: {median_price:.2f}元')
plt.legend(prop=font)
# 保存图像
if save:
file_path = os.path.join(self.output_dir, 'price_distribution.png')
plt.savefig(file_path, dpi=300, bbox_inches='tight')
self.logger.info(f"价格分布图已保存至: {file_path}")
return plt.gcf()
def plot_top_shops(self, limit=10, save=True):
"""绘制店铺商品数量排名"""
top_shops = self.analyzer.get_top_shops(limit)
if not top_shops:
self.logger.warning("没有数据可用于可视化")
return None
# 转换数据格式
shops = [item['shop'] for item in top_shops]
counts = [item['count'] for item in top_shops]
# 创建图形
plt.figure(figsize=(12, 8))
# 水平条形图
bars = plt.barh(range(len(shops)), counts, align='center')
plt.yticks(range(len(shops)), shops, fontproperties=font)
# 添加数据标签
for bar in bars:
width = bar.get_width()
plt.text(width + 0.3, bar.get_y() + bar.get_height()/2, f'{width:.0f}',
ha='left', va='center')
plt.title('商品数量Top店铺', fontproperties=font, fontsize=16)
plt.xlabel('商品数量', fontproperties=font, fontsize=12)
plt.ylabel('店铺名称', fontproperties=font, fontsize=12)
# 保存图像
if save:
file_path = os.path.join(self.output_dir, 'top_shops.png')
plt.savefig(file_path, dpi=300, bbox_inches='tight')
self.logger.info(f"店铺排名图已保存至: {file_path}")
return plt.gcf()
def plot_price_vs_commits(self, save=True):
"""绘制价格与评论数关系散点图"""
data = self.analyzer.load_data()
if data.empty:
self.logger.warning("没有数据可用于可视化")
return None
# 创建图形
plt.figure(figsize=(12, 8))
# 对评论数进行对数变换,使散点图更清晰
data['log_commits'] = np.log1p(data['commits'])
# 绘制散点图
sns.scatterplot(data=data, x='price_number', y='log_commits', alpha=0.6)
plt.title('商品价格与评论数关系', fontproperties=font, fontsize=16)
plt.xlabel('价格 (元)', fontproperties=font, fontsize=12)
plt.ylabel('评论数 (对数)', fontproperties=font, fontsize=12)
# 添加相关性信息
correlation = data['price_number'].corr(data['commits'])
plt.annotate(f'相关系数: {correlation:.3f}',
xy=(0.05, 0.95), xycoords='axes fraction',
fontproperties=font, fontsize=12,
bbox=dict(boxstyle="round,pad=0.3", fc="white", ec="grey", alpha=0.8))
# 保存图像
if save:
file_path = os.path.join(self.output_dir, 'price_vs_commits.png')
plt.savefig(file_path, dpi=300, bbox_inches='tight')
self.logger.info(f"价格评论关系图已保存至: {file_path}")
return plt.gcf()
def plot_title_word_cloud(self, save=True):
"""绘制标题词云"""
try:
from wordcloud import WordCloud
import jieba # 用于中文分词
data = self.analyzer.load_data()
if data.empty:
self.logger.warning("没有数据可用于可视化")
return None
# 合并所有标题
all_titles = ' '.join(data['title'].astype(str))
# 中文分词
words = ' '.join(jieba.cut(all_titles))
# 创建词云
wordcloud = WordCloud(
font_path=font.get_file() if hasattr(font, 'get_file') else None,
width=1000, height=600,
background_color='white',
max_words=100
).generate(words)
# 绘制词云
plt.figure(figsize=(12, 8))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title('商品标题词云', fontproperties=font, fontsize=16)
# 保存图像
if save:
file_path = os.path.join(self.output_dir, 'title_word_cloud.png')
plt.savefig(file_path, dpi=300, bbox_inches='tight')
self.logger.info(f"标题词云已保存至: {file_path}")
return plt.gcf()
except ImportError:
self.logger.error("无法绘制词云,请安装 wordcloud 和 jieba 包")
return None
def generate_all_visualizations(self):
"""生成所有可视化图表"""
self.plot_price_distribution()
self.plot_top_shops()
self.plot_price_vs_commits()
self.plot_title_word_cloud()
self.logger.info("所有可视化图表已生成")
def close(self):
"""关闭分析器"""
if self.analyzer:
self.analyzer.close()
9.6 主程序
main.py:
import argparse
import sys
import logging
import os
import json
from spider.utils import setup_logger
from spider.crawler import JDCrawler
from spider.db import Database
from analysis.analyzer import ProductAnalyzer
from analysis.visualize import DataVisualizer
def main():
"""主函数"""
# 设置日志
logger = setup_logger()
# 解析命令行参数
parser = argparse.ArgumentParser(description='京东商品爬虫')
parser.add_argument('--mode', '-m', choices=['crawl', 'analyze', 'both'], default='both',
help='运行模式: 爬取、分析或两者都执行')
parser.add_argument('--keyword', '-k', type=str, help='搜索关键词')
parser.add_argument('--category', '-c', type=str, help='商品分类ID')
parser.add_argument('--pages', '-p', type=int, default=5, help='爬取页数')
parser.add_argument('--output', '-o', type=str, default='./output', help='输出目录')
parser.add_argument('--visualize', '-v', action='store_true', help='生成可视化图表')
parser.add_argument('--details', '-d', action='store_true', help='爬取商品详情')
args = parser.parse_args()
# 创建输出目录
if not os.path.exists(args.output):
os.makedirs(args.output)
# 初始化数据库
db = Database()
if not db.connect():
logger.error("无法连接到数据库,程序退出")
sys.exit(1)
# 初始化爬虫
if args.mode in ['crawl', 'both']:
crawler = JDCrawler()
try:
products = []
# 根据参数确定爬取方式
if args.keyword:
logger.info(f"开始爬取关键词: '{args.keyword}'")
products = crawler.search_products(args.keyword, args.pages)
elif args.category:
logger.info(f"开始爬取分类ID: '{args.category}'")
products = crawler.get_category_products(args.category, args.pages)
else:
logger.error("必须指定关键词(-k)或分类ID(-c)")
sys.exit(1)
logger.info(f"爬取完成,获取 {len(products)} 个商品")
# 保存数据到数据库
saved_count = db.save_products(products)
logger.info(f"成功保存 {saved_count} 个商品到数据库")
# 爬取商品详情(如果指定)
if args.details and products:
logger.info("开始爬取商品详情...")
for i, product in enumerate(products[:10]): # 只爬取前10个商品详情
logger.info(f"爬取商品详情 {i+1}/10: {product['title'][:30]}...")
details = crawler.get_product_details(product['product_id'])
if details:
# 保存详情到JSON文件
details_file = os.path.join(args.output, f"product_{product['product_id']}.json")
with open(details_file, 'w', encoding='utf-8') as f:
json.dump(details, f, ensure_ascii=False, indent=4)
logger.info(f"商品详情已保存到: {details_file}")
logger.info("商品详情爬取完成")
except KeyboardInterrupt:
logger.info("用户中断,停止爬取")
except Exception as e:
logger.error(f"爬取过程中出错: {e}")
finally:
# 关闭爬虫
crawler.close()
# 数据分析
if args.mode in ['analyze', 'both']:
analyzer = ProductAnalyzer()
# 加载数据
data = analyzer.load_data()
if data.empty:
logger.warning("没有数据可分析")
else:
logger.info("开始分析数据...")
# 获取基本统计信息
stats = analyzer.get_basic_stats()
# 打印统计信息
logger.info(f"商品总数: {stats.get('total_products', 0)}")
logger.info(f"店铺数量: {stats.get('unique_shops', 0)}")
logger.info(f"平均价格: {stats.get('avg_price', 0):.2f}元")
logger.info(f"价格范围: {stats.get('min_price', 0):.2f} - {stats.get('max_price', 0):.2f}元")
logger.info(f"平均评论数: {stats.get('avg_commits', 0):.1f}")
# 获取店铺排名
top_shops = analyzer.get_top_shops(10)
logger.info("\nTop 10 店铺:")
for i, shop in enumerate(top_shops):
logger.info(f"{i+1}. {shop['shop']}: {shop['count']}个商品")
# 保存分析结果到JSON
analysis_file = os.path.join(args.output, "analysis_results.json")
with open(analysis_file, 'w', encoding='utf-8') as f:
json.dump({
'basic_stats': stats,
'top_shops': top_shops,
'price_distribution': analyzer.get_price_distribution(),
'title_analysis': analyzer.analyze_titles()
}, f, ensure_ascii=False, indent=4, default=str)
logger.info(f"分析结果已保存到: {analysis_file}")
# 可视化
if args.visualize:
logger.info("开始生成可视化图表...")
visualizer = DataVisualizer(analyzer, args.output)
visualizer.generate_all_visualizations()
visualizer.close()
logger.info(f"可视化图表已保存到目录: {args.output}")
# 关闭数据库连接
db.close()
logger.info("程序执行完毕")
if __name__ == "__main__":
main()
9.7 使用方法
-
创建项目目录结构:
mkdir -p jd_spider/spider jd_spider/analysis touch jd_spider/spider/__init__.py jd_spider/analysis/__init__.py -
将代码复制到相应文件:
jd_spider/spider/config.pyjd_spider/spider/utils.pyjd_spider/spider/db.pyjd_spider/spider/crawler.pyjd_spider/analysis/analyzer.pyjd_spider/analysis/visualize.pyjd_spider/main.py
-
安装依赖包:
pip install -r requirements.txt -
使用示例:
# 爬取并分析笔记本电脑数据(5页) python main.py --keyword "笔记本电脑" --pages 5 --visualize # 仅爬取数据 python main.py --mode crawl --keyword "智能手表" --pages 3 # 爬取指定分类商品 python main.py --category "670,671,672" --pages 2 # 仅进行数据分析和可视化 python main.py --mode analyze --visualize # 爬取详细信息 python main.py --keyword "耳机" --pages 2 --details
10. 实战项目:新闻聚合爬虫
接下来我们将构建一个新闻聚合爬虫,从多个新闻网站抓取最新新闻,并提供分类和搜索功能。
10.1 项目设计与需求
项目目标:
- 抓取多个新闻网站的最新新闻
- 自动分类和聚合相似新闻
- 提供搜索功能
- 生成简单的Web界面展示结果
目录结构:
news_spider/
├── spiders/
│ ├── __init__.py
│ ├── base_spider.py # 爬虫基类
│ ├── sina_spider.py # 新浪新闻爬虫
│ ├── sohu_spider.py # 搜狐新闻爬虫
│ └── tencent_spider.py # 腾讯新闻爬虫
├── utils/
│ ├── __init__.py
│ ├── db.py # 数据库操作
│ ├── cleaner.py # 文本清洗
│ └── classifier.py # 新闻分类
├── web/
│ ├── __init__.py
│ ├── app.py # Web应用
│ ├── static/ # 静态资源
│ └── templates/ # HTML模板
├── config.py # 配置文件
├── crawler.py # 爬虫控制器
├── main.py # 主程序
└── requirements.txt # 依赖包
10.2 爬虫基类与通用组件
config.py:
# news_spider/config.py
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 爬虫配置
SPIDER_CONFIG = {
'INTERVAL': int(os.getenv('SPIDER_INTERVAL', 3600)), # 爬取间隔(秒)
'REQUEST_DELAY': float(os.getenv('REQUEST_DELAY', 1.0)), # 请求间隔(秒)
'MAX_PAGES': int(os.getenv('MAX_PAGES', 3)), # 每个分类爬取的最大页数
'USER_AGENT_ROTATE': os.getenv('USER_AGENT_ROTATE', 'True').lower() == 'true',
'HEADERS': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
}
}
# 启用的爬虫
ENABLED_SPIDERS = [
'sina',
'sohu',
'tencent'
]
# 数据库配置
DB_CONFIG = {
'MONGO_URI': os.getenv('MONGO_URI', 'mongodb://localhost:27017/'),
'DB_NAME': os.getenv('DB_NAME', 'news_spider'),
'COLLECTION': os.getenv('COLLECTION', 'news'),
}
# 分类配置
CATEGORIES = {
'tech': ['科技', '互联网', 'IT', '数码', '技术', '软件', '编程', '人工智能', 'AI'],
'finance': ['财经', '金融', '股市', '理财', '经济', '投资', '基金', '债券'],
'sports': ['体育', '足球', '篮球', 'NBA', 'CBA', '棋牌', '乒乓球', '羽毛球'],
'entertainment': ['娱乐', '电影', '电视剧', '明星', '综艺', '音乐', '艺人'],
'society': ['社会', '法制', '法治', '案件', '警方', '事件', '热点'],
'world': ['国际', '世界', '全球', '海外', '外交', '环球', '各国'],
'military': ['军事', '国防', '武器', '军队', '战争', '导弹', '航母', '战机'],
'health': ['健康', '医疗', '医院', '疾病', '养生', '保健', '医生', '药品'],
'edu': ['教育', '学校', '大学', '高考', '考试', '学习', '培训', '留学'],
'car': ['汽车', '车型', '自动驾驶', '电动车', '新能源车', '交通'],
'house': ['房产', '房地产', '楼市', '房价', '装修', '买房', '租房'],
'travel': ['旅游', '旅行', '景点', '度假', '酒店', '民宿', '出游'],
}
# 网站配置
SITES = {
'sina': {
'name': '新浪新闻',
'url': 'https://news.sina.com.cn/',
},
'sohu': {
'name': '搜狐新闻',
'url': 'https://news.sohu.com/',
},
'tencent': {
'name': '腾讯新闻',
'url': 'https://news.qq.com/',
},
}
# Web应用配置
WEB_CONFIG = {
'PORT': int(os.getenv('WEB_PORT', 5000)),
'HOST': os.getenv('WEB_HOST', '127.0.0.1'),
'DEBUG': os.getenv('WEB_DEBUG', 'True').lower() == 'true',
'ITEMS_PER_PAGE': int(os.getenv('ITEMS_PER_PAGE', 20)),
}
# 日志配置
LOG_CONFIG = {
'LEVEL': os.getenv('LOG_LEVEL', 'INFO'),
'FILE': os.getenv('LOG_FILE', 'news_spider.log'),
}
utils/db.py:
# news_spider/utils/db.py
import pymongo
from datetime import datetime
import logging
from config import DB_CONFIG
class NewsDatabase:
def __init__(self):
"""初始化数据库连接"""
self.client = None
self.db = None
self.collection = None
self.logger = logging.getLogger('news_spider')
def connect(self):
"""连接到MongoDB"""
try:
self.client = pymongo.MongoClient(DB_CONFIG['MONGO_URI'])
self.db = self.client[DB_CONFIG['DB_NAME']]
self.collection = self.db[DB_CONFIG['COLLECTION']]
# 创建索引
if 'url_1' not in self.collection.index_information():
self.collection.create_index([('url', pymongo.ASCENDING)], unique=True)
if 'publish_time_1' not in self.collection.index_information():
self.collection.create_index([('publish_time', pymongo.DESCENDING)])
if 'category_1' not in self.collection.index_information():
self.collection.create_index([('category', pymongo.ASCENDING)])
if 'source_1' not in self.collection.index_information():
self.collection.create_index([('source', pymongo.ASCENDING)])
# 创建文本索引用于搜索
if 'text_index' not in self.collection.index_information():
self.collection.create_index([
('title', 'text'),
('summary', 'text'),
('content', 'text')
], name='text_index')
self.logger.info(f"成功连接到MongoDB: {DB_CONFIG['DB_NAME']}.{DB_CONFIG['COLLECTION']}")
return True
except Exception as e:
self.logger.error(f"连接MongoDB失败: {e}")
return False
def close(self):
"""关闭数据库连接"""
if self.client:
self.client.close()
self.logger.info("MongoDB连接已关闭")
def save_news(self, news_item):
"""保存单个新闻,如果存在则更新"""
if not self.collection:
self.logger.error("数据库未连接")
return False
# 设置爬取时间
news_item['crawl_time'] = datetime.now()
try:
# 使用URL作为唯一标识
result = self.collection.update_one(
{'url': news_item['url']},
{'$set': news_item},
upsert=True
)
if result.upserted_id:
self.logger.debug(f"新增新闻: {news_item['title']}")
return True
elif result.modified_count > 0:
self.logger.debug(f"更新新闻: {news_item['title']}")
return True
else:
self.logger.debug(f"新闻已存在且无变化: {news_item['title']}")
return False
except Exception as e:
self.logger.error(f"保存新闻失败: {e}")
return False
def save_many_news(self, news_items):
"""批量保存新闻"""
if not self.collection:
self.logger.error("数据库未连接")
return 0
saved_count = 0
for item in news_items:
if self.save_news(item):
saved_count += 1
self.logger.info(f"已保存 {saved_count}/{len(news_items)} 条新闻")
return saved_count
def get_news_count(self):
"""获取新闻总数"""
if not self.collection:
self.logger.error("数据库未连接")
return 0
return self.collection.count_documents({})
def get_latest_news(self, limit=20, skip=0, category=None, source=None):
"""获取最新新闻"""
if not self.collection:
self.logger.error("数据库未连接")
return []
# 构建查询条件
query = {}
if category:
query['category'] = category
if source:
query['source'] = source
# 查询并按发布时间排序
cursor = self.collection.find(query).sort('publish_time', pymongo.DESCENDING).skip(skip).limit(limit)
return list(cursor)
def search_news(self, keyword, limit=20, skip=0, category=None, source=None):
"""搜索新闻"""
if not self.collection:
self.logger.error("数据库未连接")
return []
# 构建查询条件
query = {'$text': {'$search': keyword}}
if category:
query['category'] = category
if source:
query['source'] = source
# 文本搜索查询
cursor = self.collection.find(
query,
{'score': {'$meta': 'textScore'}} # 添加相关性分数
).sort([
('score', {'$meta': 'textScore'}), # 按相关性排序
('publish_time', pymongo.DESCENDING) # 其次按时间排序
]).skip(skip).limit(limit)
return list(cursor)
def get_category_stats(self):
"""获取各分类新闻数量"""
if not self.collection:
self.logger.error("数据库未连接")
return {}
pipeline = [
{'$group': {'_id': '$category', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
]
result = list(self.collection.aggregate(pipeline))
return {item['_id']: item['count'] for item in result}
def get_source_stats(self):
"""获取各来源新闻数量"""
if not self.collection:
self.logger.error("数据库未连接")
return {}
pipeline = [
{'$group': {'_id': '$source', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
]
result = list(self.collection.aggregate(pipeline))
return {item['_id']: item['count'] for item in result}
def get_timespan_stats(self):
"""获取每日新闻数量趋势"""
if not self.collection:
self.logger.error("数据库未连接")
return {}
pipeline = [
{'$project': {
'date': {'$dateToString': {'format': '%Y-%m-%d', 'date': '$publish_time'}}
}},
{'$group': {'_id': '$date', 'count': {'$sum': 1}}},
{'$sort': {'_id': 1}}
]
result = list(self.collection.aggregate(pipeline))
return {item['_id']: item['count'] for item in result}
utils/cleaner.py:
# news_spider/utils/cleaner.py
import re
import html
from bs4 import BeautifulSoup
import logging
class TextCleaner:
def __init__(self):
"""初始化文本清洗器"""
self.logger = logging.getLogger('news_spider')
def clean_html(self, html_content):
"""清洗HTML内容,提取纯文本"""
if not html_content:
return ""
try:
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 移除脚本和样式元素
for script in soup(["script", "style", "iframe"]):
script.extract()
# 获取文本
text = soup.get_text(separator=' ')
# 清理文本
return self.clean_text(text)
except Exception as e:
self.logger.error(f"清洗HTML内容时出错: {e}")
return ""
def clean_text(self, text):
"""清洗文本内容"""
if not text:
return ""
try:
# 解码HTML实体
text = html.unescape(text)
# 移除多余空白字符
text = re.sub(r'\s+', ' ', text).strip()
# 移除特殊字符和控制字符
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text)
# 移除常见新闻页面中的特定文本
patterns_to_remove = [
r'责任编辑:[\w\s]+',
r'来源:[\w\s]+',
r'作者:[\w\s]+',
r'原标题:[\w\s]+',
r'编辑:[\w\s]+',
r'记者:[\w\s]+',
r'©[\w\s]+版权所有',
r'图片来源:[\w\s]+',
r'点击查看更多',
r'相关阅读',
r'更多精彩内容',
r'收藏本文',
r'进入[\w\s]+专题',
r'扫描二维码'
]
for pattern in patterns_to_remove:
text = re.sub(pattern, '', text)
return text.strip()
except Exception as e:
self.logger.error(f"清洗文本内容时出错: {e}")
return text
def extract_summary(self, content, max_length=200):
"""从内容中提取摘要"""
if not content:
return ""
# 清理内容
clean_content = self.clean_text(content)
# 截取指定长度
if len(clean_content) > max_length:
# 尝试在句子结束处截断
for i in range(max_length, max_length - 50, -1):
if i < len(clean_content) and clean_content[i] in ['.', '。', '!', '!', '?', '?']:
return clean_content[:i+1]
# 如果没找到合适的句子结束位置,直接截断并加上省略号
summary = clean_content[:max_length].strip()
if len(clean_content) > max_length:
summary += '...'
return summary
else:
return clean_content
utils/classifier.py:
# news_spider/utils/classifier.py
import logging
import re
from config import CATEGORIES
class NewsClassifier:
def __init__(self):
"""初始化新闻分类器"""
self.logger = logging.getLogger('news_spider')
self.categories = CATEGORIES
def classify(self, title, content=None):
"""
根据标题和内容分类新闻
:param title: 新闻标题
:param content: 新闻内容(可选)
:return: 类别名称,如 'tech', 'finance' 等
"""
if not title:
return 'other'
# 组合标题和内容(如果有)
text = title
if content:
text = title + ' ' + content[:500] # 只使用内容的前500个字符
# 计算各类别的匹配分数
scores = {}
for category, keywords in self.categories.items():
score = 0
for keyword in keywords:
# 计算关键词在文本中出现的次数
count = len(re.findall(keyword, text, re.IGNORECASE))
# 标题中出现的权重更高
title_count = len(re.findall(keyword, title, re.IGNORECASE))
# 计算得分 (标题出现权重为3,内容出现权重为1)
score += title_count * 3 + (count - title_count)
scores[category] = score
# 找出得分最高的类别
if scores:
max_category = max(scores.items(), key=lambda x: x[1])
# 如果最高分大于0,则返回该类别
if max_category[1] > 0:
return max_category[0]
# 默认类别为other
return 'other'
def get_category_display_name(self, category):
"""获取分类的显示名称"""
category_display_names = {
'tech': '科技',
'finance': '财经',
'sports': '体育',
'entertainment': '娱乐',
'society': '社会',
'world': '国际',
'military': '军事',
'health': '健康',
'edu': '教育',
'car': '汽车',
'house': '房产',
'travel': '旅游',
'other': '其他'
}
return category_display_names.get(category, '其他')
10.3 爬虫实现
spiders/base_spider.py:
# news_spider/spiders/base_spider.py
import requests
import logging
import random
import time
from datetime import datetime
from abc import ABC, abstractmethod
from utils.cleaner import TextCleaner
from utils.classifier import NewsClassifier
from config import SPIDER_CONFIG
class BaseSpider(ABC):
def __init__(self, name, source_name=None, base_url=None):
"""
初始化爬虫基类
:param name: 爬虫名称
:param source_name: 新闻来源名称
:param base_url: 网站基础URL
"""
self.name = name
self.source_name = source_name or name
self.base_url = base_url
self.logger = logging.getLogger('news_spider')
self.cleaner = TextCleaner()
self.classifier = NewsClassifier()
self.session = requests.Session()
# 设置请求头
self.session.headers.update(SPIDER_CONFIG['HEADERS'])
def get_random_headers(self):
"""获取随机请求头"""
if SPIDER_CONFIG['USER_AGENT_ROTATE']:
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
'Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1'
]
return {'User-Agent': random.choice(user_agents)}
return {}
def request_page(self, url, method='get', params=None, data=None, timeout=10, retry=3):
"""
请求页面
:param url: 请求URL
:param method: 请求方法,默认get
:param params: 请求参数
:param data: 请求数据
:param timeout: 超时时间
:param retry: 重试次数
:return: 响应对象或None
"""
# 随机延迟
time.sleep(SPIDER_CONFIG['REQUEST_DELAY'])
for i in range(retry):
try:
headers = self.get_random_headers()
if method.lower() == 'get':
response = self.session.get(url, params=params, headers=headers, timeout=timeout)
elif method.lower() == 'post':
response = self.session.post(url, params=params, data=data, headers=headers, timeout=timeout)
else:
raise ValueError(f"不支持的请求方法: {method}")
response.raise_for_status() # 如果状态码不是200,将引发异常
# 设置编码
if response.encoding == 'ISO-8859-1':
response.encoding = response.apparent_encoding
return response
except requests.exceptions.RequestException as e:
self.logger.warning(f"请求失败 ({i+1}/{retry}): {url} - {e}")
if i == retry - 1:
self.logger.error(f"请求失败次数达到上限: {url}")
return None
time.sleep(1) # 延迟后重试
return None
def parse_date(self, date_str):
"""
解析日期字符串为datetime对象
:param date_str: 日期字符串
:return: datetime对象或当前时间
"""
try:
# 尝试各种常见的日期格式
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M',
'%Y年%m月%d日 %H:%M:%S',
'%Y年%m月%d日 %H:%M',
'%Y年%m月%d日',
'%Y/%m/%d %H:%M:%S',
'%Y/%m/%d %H:%M',
'%Y/%m/%d',
'%m-%d %H:%M',
'%m月%d日 %H:%M'
]
for fmt in formats:
try:
# 尝试解析完整日期时间
return datetime.strptime(date_str, fmt)
except ValueError:
continue
# 处理特殊格式,如"X小时前"、"X分钟前"等
if '分钟前' in date_str:
minutes = int(date_str.replace('分钟前', '').strip())
return datetime.now().replace(microsecond=0) - timedelta(minutes=minutes)
if '小时前' in date_str:
hours = int(date_str.replace('小时前', '').strip())
return datetime.now().replace(microsecond=0) - timedelta(hours=hours)
if '昨天' in date_str:
time_part = date_str.replace('昨天', '').strip()
if ':' in time_part:
hour, minute = map(int, time_part.split(':'))
yesterday = datetime.now().replace(microsecond=0) - timedelta(days=1)
return yesterday.replace(hour=hour, minute=minute, second=0)
else:
return datetime.now().replace(microsecond=0) - timedelta(days=1)
# 如果都无法识别,记录警告并返回当前时间
self.logger.warning(f"无法解析日期字符串: '{date_str}',使用当前时间")
return datetime.now().replace(microsecond=0)
except Exception as e:
self.logger.error(f"解析日期出错: {e}")
return datetime.now().replace(microsecond=0)
def create_news_item(self, url, title, content=None, html_content=None, summary=None,
publish_time=None, author=None, source=None, category=None, tags=None, image_url=None):
"""
创建新闻项
:param url: 新闻URL
:param title: 新闻标题
:param content: 新闻内容(纯文本)
:param html_content: 新闻HTML内容
:param summary: 新闻摘要
:param publish_time: 发布时间
:param author: 作者
:param source: 来源
:param category: 分类
:param tags: 标签列表
:param image_url: 图片URL
:return: 新闻项字典
"""
# 清洗标题
title = self.cleaner.clean_text(title)
# 从HTML内容提取纯文本(如果提供了HTML内容但没有纯文本)
if not content and html_content:
content = self.cleaner.clean_html(html_content)
elif content:
content = self.cleaner.clean_text(content)
# 生成摘要(如果没有提供)
if not summary and content:
summary = self.cleaner.extract_summary(content)
elif summary:
summary = self.cleaner.clean_text(summary)
# 分类新闻(如果没有提供分类)
if not category:
category = self.classifier.classify(title, content)
# 默认来源为爬虫名称
if not source:
source = self.source_name
# 默认发布时间为当前时间
if not publish_time:
publish_time = datetime.now().replace(microsecond=0)
elif isinstance(publish_time, str):
publish_time = self.parse_date(publish_time)
# 创建新闻项
news_item = {
'url': url,
'title': title,
'content': content,
'summary': summary,
'html_content': html_content,
'publish_time': publish_time,
'author': author,
'source': source,
'category': category,
'tags': tags or [],
'image_url': image_url,
'spider': self.name,
'crawl_time': datetime.now().replace(microsecond=0)
}
return news_item
@abstractmethod
def crawl(self):
"""爬取新闻,需要子类实现"""
pass
spiders/sina_spider.py:
# news_spider/spiders/sina_spider.py
from bs4 import BeautifulSoup
import re
import json
import time
from datetime import datetime
from spiders.base_spider import BaseSpider
from config import CATEGORIES, SITES
class SinaSpider(BaseSpider):
def __init__(self):
"""初始化新浪新闻爬虫"""
super().__init__(
name='sina',
source_name=SITES['sina']['name'],
base_url=SITES['sina']['url']
)
# 新浪新闻各分类URL
self.category_urls = {
'society': 'https://news.sina.com.cn/society/',
'tech': 'https://tech.sina.com.cn/',
'finance': 'https://finance.sina.com.cn/',
'sports': 'https://sports.sina.com.cn/',
'entertainment': 'https://ent.sina.com.cn/',
'military': 'https://mil.sina.com.cn/',
'world': 'https://news.sina.com.cn/world/',
}
def crawl(self):
"""爬取新浪新闻"""
self.logger.info("开始爬取新浪新闻")
all_news = []
# 爬取各分类的新闻
for category, url in self.category_urls.items():
try:
self.logger.info(f"爬取新浪新闻分类: {category}")
news_list = self.crawl_category(category, url)
all_news.extend(news_list)
# 避免请求过快
time.sleep(1)
except Exception as e:
self.logger.error(f"爬取新浪新闻分类 {category} 出错: {e}")
self.logger.info(f"新浪新闻爬取完成,共获取 {len(all_news)} 条新闻")
return all_news
def crawl_category(self, category, url, max_pages=1):
"""
爬取指定分类的新闻
:param category: 分类名称
:param url: 分类URL
:param max_pages: 最大爬取页数
:return: 新闻列表
"""
news_list = []
# 爬取分类首页
response = self.request_page(url)
if not response:
return news_list
# 解析新闻列表
soup = BeautifulSoup(response.text, 'html.parser')
# 尝试从首页提取新闻链接
news_links = []
# 查找新闻链接
for a in soup.find_all('a', href=True):
link = a['href']
# 排除非新闻链接
if not self.is_news_link(link):
continue
# 添加到新闻链接列表
if link not in news_links:
news_links.append(link)
# 爬取新闻详情
for link in news_links[:20]: # 限制每个分类最多爬取20条
try:
news_item = self.crawl_news_detail(link, category)
if news_item:
news_list.append(news_item)
time.sleep(0.5) # 避免请求过快
except Exception as e:
self.logger.error(f"爬取新闻详情出错: {link} - {e}")
return news_list
def is_news_link(self, url):
"""判断是否为新闻链接"""
# 新浪新闻链接通常为 http(s)://xxx.sina.com.cn/xxx/xxx-xxxxxxxx.shtml 格式
patterns = [
r'https?://[a-z]+\.sina\.com\.cn/.*\d{4}-\d{2}-\d{2}.*\.s?html',
r'https?://[a-z]+\.sina\.com\.cn/.*/\d{4}-\d{2}-\d{2}/doc-[a-z0-9]+\.s?html'
]
for pattern in patterns:
if re.match(pattern, url):
return True
return False
def crawl_news_detail(self, url, category=None):
"""
爬取新闻详情
:param url: 新闻URL
:param category: 分类(可选)
:return: 新闻项或None
"""
# 请求新闻详情页
response = self.request_page(url)
if not response:
return None
# 解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
try:
# 提取标题
title_element = soup.select_one('.main-title, h1.title, .article-header h1')
if not title_element:
return None
title = title_element.text.strip()
# 提取发布时间
time_element = soup.select_one('.date, .pub-time, .article-header .date, .time-source .time')
publish_time = datetime.now()
if time_element:
time_text = time_element.text.strip()
publish_time = self.parse_date(time_text)
# 提取作者
author_element = soup.select_one('.author, .show_author, .article-header .source')
author = author_element.text.strip() if author_element else None
# 提取内容
content_element = soup.select_one('#artibody, .article-content, .article-body')
if not content_element:
return None
html_content = str(content_element)
content = self.cleaner.clean_html(html_content)
# 提取图片
image_element = soup.select_one('#artibody img, .article-content img')
image_url = image_element['src'] if image_element and image_element.has_attr('src') else None
# 生成摘要
summary = self.cleaner.extract_summary(content)
# 创建新闻项
news_item = self.create_news_item(
url=url,
title=title,
content=content,
html_content=html_content,
summary=summary,
publish_time=publish_time,
author=author,
category=category,
image_url=image_url
)
return news_item
except Exception as e:
self.logger.error(f"解析新闻详情出错: {url} - {e}")
return None
spiders/sohu_spider.py:
# news_spider/spiders/sohu_spider.py
from bs4 import BeautifulSoup
import re
import json
import time
from datetime import datetime
from spiders.base_spider import BaseSpider
from config import CATEGORIES, SITES
class SohuSpider(BaseSpider):
def __init__(self):
"""初始化搜狐新闻爬虫"""
super().__init__(
name='sohu',
source_name=SITES['sohu']['name'],
base_url=SITES['sohu']['url']
)
# 搜狐新闻各分类URL
self.category_urls = {
'society': 'https://news.sohu.com/c/8/',
'tech': 'https://it.sohu.com/',
'finance': 'https://business.sohu.com/',
'sports': 'https://sports.sohu.com/',
'entertainment': 'https://yule.sohu.com/',
'military': 'https://mil.sohu.com/',
'world': 'https://news.sohu.com/c/9/',
'health': 'https://health.sohu.com/',
'travel': 'https://travel.sohu.com/',
'car': 'https://auto.sohu.com/',
}
def crawl(self):
"""爬取搜狐新闻"""
self.logger.info("开始爬取搜狐新闻")
all_news = []
# 爬取各分类的新闻
for category, url in self.category_urls.items():
try:
self.logger.info(f"爬取搜狐新闻分类: {category}")
news_list = self.crawl_category(category, url)
all_news.extend(news_list)
# 避免请求过快
time.sleep(1)
except Exception as e:
self.logger.error(f"爬取搜狐新闻分类 {category} 出错: {e}")
self.logger.info(f"搜狐新闻爬取完成,共获取 {len(all_news)} 条新闻")
return all_news
def crawl_category(self, category, url, max_pages=1):
"""
爬取指定分类的新闻
:param category: 分类名称
:param url: 分类URL
:param max_pages: 最大爬取页数
:return: 新闻列表
"""
news_list = []
# 爬取分类首页
response = self.request_page(url)
if not response:
return news_list
# 解析新闻列表
soup = BeautifulSoup(response.text, 'html.parser')
# 尝试提取新闻链接
news_links = []
# 查找新闻链接
for a in soup.find_all('a', href=True):
link = a['href']
# 确保是完整URL
if link.startswith('//'):
link = 'https:' + link
elif not link.startswith('http'):
continue
# 排除非搜狐新闻链接
if not self.is_news_link(link):
continue
# 添加到新闻链接列表
if link not in news_links:
news_links.append(link)
# 爬取新闻详情
for link in news_links[:20]: # 限制每个分类最多爬取20条
try:
news_item = self.crawl_news_detail(link, category)
if news_item:
news_list.append(news_item)
time.sleep(0.5) # 避免请求过快
except Exception as e:
self.logger.error(f"爬取新闻详情出错: {link} - {e}")
return news_list
def is_news_link(self, url):
"""判断是否为搜狐新闻链接"""
# 搜狐新闻链接通常为 http(s)://www.sohu.com/a/xxxxxxxxx_xxxxx 格式
patterns = [
r'https?://(www\.)?sohu\.com/a/\d+_\d+',
r'https?://[a-z]+\.sohu\.com/a/\d+_\d+'
]
for pattern in patterns:
if re.match(pattern, url):
return True
return False
def crawl_news_detail(self, url, category=None):
"""
爬取新闻详情
:param url: 新闻URL
:param category: 分类(可选)
:return: 新闻项或None
"""
# 请求新闻详情页
response = self.request_page(url)
if not response:
return None
# 解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
try:
# 尝试从页面中获取新闻数据
news_data = None
for script in soup.select('script'):
if script.string and '_ARTICLE_INFO' in script.string:
match = re.search(r'var\s+_ARTICLE_INFO\s*=\s*({.+?});', script.string, re.DOTALL)
if match:
try:
news_data = json.loads(match.group(1))
break
except json.JSONDecodeError:
continue
# 如果找到JSON数据,使用它来提取信息
if news_data:
title = news_data.get('title', '')
# 尝试获取发布时间
publish_time_str = news_data.get('publicTime', '')
publish_time = self.parse_date(publish_time_str) if publish_time_str else datetime.now()
# 获取作者信息
author = news_data.get('authorName', '')
# 提取图片链接
image_url = None
if 'cover' in news_data:
image_url = news_data['cover']
# 内容可能在HTML中,而不是JSON中
content_element = soup.select_one('article')
if not content_element:
content_element = soup.select_one('.article')
if content_element:
html_content = str(content_element)
content = self.cleaner.clean_html(html_content)
else:
return None
else:
# 若无法从JSON中获取,则尝试从HTML中提取
title_element = soup.select_one('h1')
if not title_element:
return None
title = title_element.text.strip()
# 提取发布时间
time_element = soup.select_one('.article-info span, .time')
publish_time = datetime.now()
if time_element:
time_text = time_element.text.strip()
publish_time = self.parse_date(time_text)
# 提取作者
author_element = soup.select_one('.author span, .user-info h4')
author = author_element.text.strip() if author_element else None
# 提取内容
content_element = soup.select_one('article, .article')
if not content_element:
return None
html_content = str(content_element)
content = self.cleaner.clean_html(html_content)
# 提取图片
image_element = soup.select_one('article img, .article-info img')
image_url = image_element['src'] if image_element and image_element.has_attr('src') else None
# 生成摘要
summary = self.cleaner.extract_summary(content)
# 创建新闻项
news_item = self.create_news_item(
url=url,
title=title,
content=content,
html_content=html_content,
summary=summary,
publish_time=publish_time,
author=author,
category=category,
image_url=image_url
)
return news_item
except Exception as e:
self.logger.error(f"解析新闻详情出错: {url} - {e}")
return None
spiders/tencent_spider.py:
# news_spider/spiders/tencent_spider.py
from bs4 import BeautifulSoup
import re
import json
import time
from datetime import datetime
from spiders.base_spider import BaseSpider
from config import CATEGORIES, SITES
class TencentSpider(BaseSpider):
def __init__(self):
"""初始化腾讯新闻爬虫"""
super().__init__(
name='tencent',
source_name=SITES['tencent']['name'],
base_url=SITES['tencent']['url']
)
# 腾讯新闻各分类URL
self.category_urls = {
'tech': 'https://new.qq.com/ch/tech/',
'finance': 'https://new.qq.com/ch/finance/',
'sports': 'https://new.qq.com/ch/sports/',
'entertainment': 'https://new.qq.com/ch/ent/',
'world': 'https://new.qq.com/ch/world/',
'military': 'https://new.qq.com/ch/milite/',
'health': 'https://new.qq.com/ch/health/',
'car': 'https://new.qq.com/ch/auto/',
'house': 'https://new.qq.com/ch/house/',
}
def crawl(self):
"""爬取腾讯新闻"""
self.logger.info("开始爬取腾讯新闻")
all_news = []
# 爬取各分类的新闻
for category, url in self.category_urls.items():
try:
self.logger.info(f"爬取腾讯新闻分类: {category}")
news_list = self.crawl_category(category, url)
all_news.extend(news_list)
# 避免请求过快
time.sleep(1)
except Exception as e:
self.logger.error(f"爬取腾讯新闻分类 {category} 出错: {e}")
self.logger.info(f"腾讯新闻爬取完成,共获取 {len(all_news)} 条新闻")
return all_news
def crawl_category(self, category, url, max_pages=1):
"""
爬取指定分类的新闻
:param category: 分类名称
:param url: 分类URL
:param max_pages: 最大爬取页数
:return: 新闻列表
"""
news_list = []
# 爬取分类首页
response = self.request_page(url)
if not response:
return news_list
# 解析网页数据
# 腾讯新闻使用了大量的JavaScript,所以我们需要从HTML中提取JSON数据
try:
# 尝试从页面中提取新闻数据
news_data = None
# 从HTML中查找包含新闻数据的script标签
pattern = re.compile(r'window\.DATA\s*=\s*({.+?}});', re.DOTALL)
match = pattern.search(response.text)
if match:
try:
json_str = match.group(1)
data = json.loads(json_str)
# 提取新闻列表
if 'list' in data:
# 处理每条新闻
for item in data['list']:
# 提取新闻URL
news_url = item.get('url', '')
if not news_url:
continue
# 确保URL格式正确
if not news_url.startswith('http'):
news_url = 'https:' + news_url if news_url.startswith('//') else 'https://' + news_url
# 爬取新闻详情
news_item = self.crawl_news_detail(news_url, category)
if news_item:
news_list.append(news_item)
# 避免请求过快
time.sleep(0.5)
except json.JSONDecodeError:
self.logger.error(f"解析JSON数据失败")
# 如果无法从JSON中提取,则尝试从HTML中直接提取链接
if not news_list:
soup = BeautifulSoup(response.text, 'html.parser')
# 查找新闻链接
for a in soup.find_all('a', href=True):
link = a['href']
# 确保是完整URL
if link.startswith('//'):
link = 'https:' + link
elif not link.startswith('http'):
continue
# 仅处理腾讯新闻链接
if self.is_news_link(link) and link not in [item['url'] for item in news_list]:
# 爬取新闻详情
news_item = self.crawl_news_detail(link, category)
if news_item:
news_list.append(news_item)
# 限制每个分类最多爬取20条
if len(news_list) >= 20:
break
# 避免请求过快
time.sleep(0.5)
except Exception as e:
self.logger.error(f"爬取分类页面出错: {url} - {e}")
return news_list
def is_news_link(self, url):
"""判断是否为腾讯新闻链接"""
# 腾讯新闻链接格式多样,这里做简单判断
patterns = [
r'https?://new\.qq\.com/omn/\w+/\w+\.html',
r'https?://new\.qq\.com/rain/a/\w+',
r'https?://view\.inews\.qq\.com/\w+/\w+'
]
for pattern in patterns:
if re.match(pattern, url):
return True
return False
def crawl_news_detail(self, url, category=None):
"""
爬取新闻详情
:param url: 新闻URL
:param category: 分类(可选)
:return: 新闻项或None
"""
# 请求新闻详情页
response = self.request_page(url)
if not response:
return None
# 解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
try:
# 尝试从页面中获取新闻数据
news_data = None
# 尝试查找包含新闻数据的script标签
pattern = re.compile(r'window\.DATA\s*=\s*({.+?}});', re.DOTALL)
for script in soup.find_all('script'):
if script.string:
match = pattern.search(script.string)
if match:
try:
news_data = json.loads(match.group(1))
break
except json.JSONDecodeError:
continue
# 如果找到JSON数据,使用它来提取信息
if news_data and 'article_info' in news_data:
article_info = news_data['article_info']
title = article_info.get('title', '')
# 尝试获取发布时间
publish_time_str = article_info.get('publish_time', '')
publish_time = self.parse_date(publish_time_str) if publish_time_str else datetime.now()
# 获取作者信息
author = article_info.get('author', '')
# 提取内容
content = ''
if 'content' in article_info:
# 内容可能是HTML或者纯文本
html_content = article_info['content']
content = self.cleaner.clean_html(html_content)
# 提取图片链接
image_url = None
if 'cover' in article_info:
image_url = article_info['cover']
else:
# 若无法从JSON中获取,则尝试从HTML中提取
title_element = soup.select_one('h1')
if not title_element:
return None
title = title_element.text.strip()
# 提取发布时间
time_element = soup.select_one('.time, .article-time, .pubtime, .date')
publish_time = datetime.now()
if time_element:
time_text = time_element.text.strip()
publish_time = self.parse_date(time_text)
# 提取作者
author_element = soup.select_one('.author, .src, .article-source')
author = author_element.text.strip() if author_element else None
# 提取内容
content_element = soup.select_one('.content-article, .article-content')
if content_element:
html_content = str(content_element)
content = self.cleaner.clean_html(html_content)
else:
return None
# 提取图片
image_element = soup.select_one('.content-article img')
image_url = image_element['src'] if image_element and image_element.has_attr('src') else None
# 生成摘要
summary = self.cleaner.extract_summary(content)
# 创建新闻项
news_item = self.create_news_item(
url=url,
title=title,
content=content,
summary=summary,
publish_time=publish_time,
author=author,
category=category,
image_url=image_url
)
return news_item
except Exception as e:
self.logger.error(f"解析新闻详情出错: {url} - {e}")
return None
10.4 爬虫控制器
crawler.py:
# news_spider/crawler.py
import importlib
import threading
import time
import logging
import schedule
from datetime import datetime
from utils.db import NewsDatabase
from config import ENABLED_SPIDERS, SPIDER_CONFIG
class NewsCrawler:
def __init__(self):
"""初始化新闻爬虫控制器"""
self.logger = logging.getLogger('news_spider')
self.db = NewsDatabase()
self.db.connect()
self.spiders = {}
self.load_spiders()
def load_spiders(self):
"""加载所有启用的爬虫"""
for spider_name in ENABLED_SPIDERS:
try:
# 动态导入爬虫模块
module_name = f'spiders.{spider_name}_spider'
module = importlib.import_module(module_name)
# 获取类名(假设类名为XxxSpider,如SinaSpider)
class_name = ''.join(word.capitalize() for word in spider_name.split('_')) + 'Spider'
# 获取爬虫类并实例化
spider_class = getattr(module, class_name)
self.spiders[spider_name] = spider_class()
self.logger.info(f"已加载爬虫: {spider_name}")
except Exception as e:
self.logger.error(f"加载爬虫 {spider_name} 失败: {e}")
def run_spider(self, spider_name):
"""
运行指定的爬虫
:param spider_name: 爬虫名称
:return: 爬取的新闻数量
"""
if spider_name not in self.spiders:
self.logger.error(f"爬虫 {spider_name} 不存在")
return 0
try:
self.logger.info(f"开始运行爬虫: {spider_name}")
start_time = time.time()
# 爬取新闻
news_items = self.spiders[spider_name].crawl()
# 保存到数据库
saved_count = self.db.save_many_news(news_items)
end_time = time.time()
duration = end_time - start_time
self.logger.info(f"爬虫 {spider_name} 完成: 爬取 {len(news_items)} 条新闻,成功保存 {saved_count} 条,用时 {duration:.2f} 秒")
return saved_count
except Exception as e:
self.logger.error(f"运行爬虫 {spider_name} 出错: {e}")
return 0
def run_all_spiders(self):
"""运行所有爬虫"""
total_saved = 0
start_time = time.time()
self.logger.info(f"开始运行所有爬虫,时间: {datetime.now()}")
for spider_name in self.spiders:
saved = self.run_spider(spider_name)
total_saved += saved
end_time = time.time()
duration = end_time - start_time
self.logger.info(f"所有爬虫运行完成: 总共保存 {total_saved} 条新闻,总用时 {duration:.2f} 秒")
return total_saved
def schedule_crawl(self):
"""启动定时爬取任务"""
interval = SPIDER_CONFIG['INTERVAL'] # 爬取间隔(秒)
self.logger.info(f"设置定时爬取任务,间隔: {interval}秒")
# 先运行一次
self.run_all_spiders()
# 设置定时任务
schedule.every(interval).seconds.do(self.run_all_spiders)
# 持续运行定时任务
while True:
schedule.run_pending()
time.sleep(1)
def close(self):
"""关闭资源"""
if self.db:
self.db.close()
self.logger.info("爬虫控制器已关闭")
10.5 Web界面
web/app.py:
# news_spider/web/app.py
from flask import Flask, render_template, request, jsonify, redirect, url_for, abort
import os
import logging
from datetime import datetime, timedelta
from utils.db import NewsDatabase
from utils.classifier import NewsClassifier
from config import WEB_CONFIG, CATEGORIES, SITES
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
# 初始化日志
logger = logging.getLogger('news_spider')
# 初始化数据库和分类器
db = NewsDatabase()
db.connect()
classifier = NewsClassifier()
@app.template_filter('format_date')
def format_date(value):
"""
格式化日期过滤器
:param value: datetime对象
:return: 格式化的日期字符串
"""
if not value:
return ''
now = datetime.now()
diff = now - value
if diff < timedelta(minutes=1):
return '刚刚'
elif diff < timedelta(hours=1):
return f'{int(diff.total_seconds() // 60)}分钟前'
elif diff < timedelta(days=1):
return f'{int(diff.total_seconds() // 3600)}小时前'
elif diff < timedelta(days=30):
return f'{diff.days}天前'
else:
return value.strftime('%Y-%m-%d')
@app.route('/')
def index():
"""首页"""
page = request.args.get('page', 1, type=int)
category = request.args.get('category', None)
source = request.args.get('source', None)
items_per_page = WEB_CONFIG['ITEMS_PER_PAGE']
# 获取最新新闻
skip = (page - 1) * items_per_page
news = db.get_latest_news(limit=items_per_page, skip=skip, category=category, source=source)
# 获取统计信息
stats = {
'total': db.get_news_count(),
'categories': db.get_category_stats(),
'sources': db.get_source_stats()
}
# 准备分类和来源信息
categories = {k: classifier.get_category_display_name(k) for k in CATEGORIES.keys()}
categories['other'] = '其他'
sources = SITES
return render_template(
'index.html',
news=news,
stats=stats,
page=page,
items_per_page=items_per_page,
total_items=stats['total'],
category=category,
source=source,
categories=categories,
sources=sources
)
@app.route('/search')
def search():
"""搜索页面"""
keyword = request.args.get('q', '')
page = request.args.get('page', 1, type=int)
category = request.args.get('category', None)
source = request.args.get('source', None)
items_per_page = WEB_CONFIG['ITEMS_PER_PAGE']
if not keyword:
return redirect(url_for('index'))
# 搜索新闻
skip = (page - 1) * items_per_page
news = db.search_news(keyword, limit=items_per_page, skip=skip, category=category, source=source)
# 获取统计信息
stats = {
'total': len(news), # 这里只是当前搜索结果的数量
'categories': db.get_category_stats(),
'sources': db.get_source_stats()
}
# 准备分类和来源信息
categories = {k: classifier.get_category_display_name(k) for k in CATEGORIES.keys()}
categories['other'] = '其他'
sources = SITES
return render_template(
'search.html',
news=news,
keyword=keyword,
stats=stats,
page=page,
items_per_page=items_per_page,
total_items=stats['total'],
category=category,
source=source,
categories=categories,
sources=sources
)
@app.route('/news/<news_id>')
def news_detail(news_id):
"""新闻详情页"""
# 从数据库获取新闻
news = db.collection.find_one({'_id': news_id})
if not news:
abort(404)
# 获取相同分类的推荐新闻
related_news = db.get_latest_news(limit=5, category=news.get('category'))
# 排除当前新闻
related_news = [n for n in related_news if str(n['_id']) != news_id]
return render_template(
'detail.html',
news=news,
related_news=related_news[:4] # 最多显示4条相关新闻
)
@app.route('/api/stats')
def api_stats():
"""统计数据API"""
# 获取统计信息
stats = {
'total': db.get_news_count(),
'categories': db.get_category_stats(),
'sources': db.get_source_stats(),
'timespan': db.get_timespan_stats()
}
return jsonify(stats)
@app.errorhandler(404)
def page_not_found(e):
"""404页面"""
return render_template('404.html'), 404
@app.errorhandler(500)
def server_error(e):
"""500页面"""
logger.error(f"服务器错误: {e}")
return render_template('500.html'), 500
def start_web_app():
"""启动Web应用"""
app.run(
host=WEB_CONFIG['HOST'],
port=WEB_CONFIG['PORT'],
debug=WEB_CONFIG['DEBUG']
)
web/templates/index.html:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>新闻聚合 - 最新资讯</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css">
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-dark">
<div class="container">
<a class="navbar-brand" href="/">新闻聚合</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link active" href="/">首页</a>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" id="categoryDropdown" role="button" data-bs-toggle="dropdown">
分类
</a>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="/">全部</a></li>
{% for cat_key, cat_name in categories.items() %}
<li><a class="dropdown-item" href="/?category={{ cat_key }}">{{ cat_name }}</a></li>
{% endfor %}
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" id="sourceDropdown" role="button" data-bs-toggle="dropdown">
来源
</a>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="/">全部</a></li>
{% for source_key, source_data in sources.items() %}
<li><a class="dropdown-item" href="/?source={{ source_data.name }}">{{ source_data.name }}</a></li>
{% endfor %}
</ul>
</li>
</ul>
<form class="d-flex" action="/search">
<input class="form-control me-2" type="search" name="q" placeholder="搜索新闻" required>
<button class="btn btn-outline-light" type="submit">搜索</button>
</form>
</div>
</div>
</nav>
<div class="container my-4">
{% if category %}
<div class="alert alert-info">
当前分类: {{ categories.get(category, '全部') }}
<a href="/" class="float-end">查看全部</a>
</div>
{% endif %}
{% if source %}
<div class="alert alert-info">
来源: {{ source }}
<a href="/" class="float-end">查看全部</a>
</div>
{% endif %}
<div class="row">
<div class="col-md-8">
<h2>最新新闻</h2>
{% if news %}
<div class="list-group news-list">
{% for item in news %}
<a href="/news/{{ item._id }}" class="list-group-item list-group-item-action">
<div class="d-flex w-100 align-items-center mb-1">
{% if item.image_url %}
<div class="news-image me-3">
<img src="{{ item.image_url }}" alt="{{ item.title }}" class="img-thumbnail">
</div>
{% endif %}
<div class="news-content">
<h5 class="mb-1">{{ item.title }}</h5>
<p class="mb-1">{{ item.summary }}</p>
<small>
<span class="badge bg-secondary">{{ item.source }}</span>
<span class="badge bg-info">{{ categories.get(item.category, '其他') }}</span>
<span class="text-muted">{{ item.publish_time|format_date }}</span>
</small>
</div>
</div>
</a>
{% endfor %}
</div>
<!-- 分页 -->
<nav class="mt-4">
<ul class="pagination justify-content-center">
{% set total_pages = (total_items / items_per_page)|round(0, 'ceil')|int %}
{% if page > 1 %}
<li class="page-item">
<a class="page-link" href="?page={{ page - 1 }}{% if category %}&category={{ category }}{% endif %}{% if source %}&source={{ source }}{% endif %}">上一页</a>
</li>
{% else %}
<li class="page-item disabled">
<span class="page-link">上一页</span>
</li>
{% endif %}
{% for i in range(1, total_pages + 1) %}
{% if i == page %}
<li class="page-item active">
<span class="page-link">{{ i }}</span>
</li>
{% elif i <= 3 or i >= total_pages - 2 or (i >= page - 1 and i <= page + 1) %}
<li class="page-item">
<a class="page-link" href="?page={{ i }}{% if category %}&category={{ category }}{% endif %}{% if source %}&source={{ source }}{% endif %}">{{ i }}</a>
</li>
{% elif i == 4 or i == total_pages - 3 %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
{% endfor %}
{% if page < total_pages %}
<li class="page-item">
<a class="page-link" href="?page={{ page + 1 }}{% if category %}&category={{ category }}{% endif %}{% if source %}&source={{ source }}{% endif %}">下一页</a>
</li>
{% else %}
<li class="page-item disabled">
<span class="page-link">下一页</span>
</li>
{% endif %}
</ul>
</nav>
{% else %}
<div class="alert alert-warning">
没有找到相关新闻
</div>
{% endif %}
</div>
<div class="col-md-4">
<div class="card mb-4">
<div class="card-header">
统计信息
</div>
<div class="card-body">
<p>总新闻数: {{ stats.total }}</p>
<h6>分类统计</h6>
<ul class="list-group">
{% for cat, count in stats.categories.items() %}
<li class="list-group-item d-flex justify-content-between align-items-center">
<a href="/?category={{ cat }}">{{ categories.get(cat, '其他') }}</a>
<span class="badge bg-primary rounded-pill">{{ count }}</span>
</li>
{% endfor %}
</ul>
<h6 class="mt-3">来源统计</h6>
<ul class="list-group">
{% for src, count in stats.sources.items() %}
<li class="list-group-item d-flex justify-content-between align-items-center">
<a href="/?source={{ src }}">{{ src }}</a>
<span class="badge bg-primary rounded-pill">{{ count }}</span>
</li>
{% endfor %}
</ul>
</div>
</div>
</div>
</div>
</div>
<footer class="bg-dark text-white py-4 mt-4">
<div class="container">
<div class="row">
<div class="col-md-6">
<h5>新闻聚合</h5>
<p>一个使用Python爬虫技术的新闻聚合平台,汇集多个主流新闻网站的最新资讯。</p>
</div>
<div class="col-md-6 text-md-end">
<p>© 2023 新闻聚合</p>
<p>仅作学习和研究使用,请勿用于商业目的。</p>
</div>
</div>
</div>
</footer>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
<script src="{{ url_for('static', filename='js/script.js') }}"></script>
</body>
</html>
html 2
在画布上打开
web/templates/detail.html:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{{ news.title }} - 新闻聚合</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css">
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-dark">
<div class="container">
<a class="navbar-brand" href="/">新闻聚合</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link" href="/">首页</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/?category={{ news.category }}">{{ news.category }}</a>
</li>
</ul>
<form class="d-flex" action="/search">
<input class="form-control me-2" type="search" name="q" placeholder="搜索新闻" required>
<button class="btn btn-outline-light" type="submit">搜索</button>
</form>
</div>
</div>
</nav>
<div class="container my-4">
<div class="row">
<div class="col-md-8">
<article>
<header class="mb-4">
<h1 class="fw-bolder mb-1">{{ news.title }}</h1>
<div class="text-muted fst-italic mb-2">
发布于 {{ news.publish_time|format_date }}
{% if news.author %}
由 {{ news.author }}
{% endif %}
</div>
<span class="badge bg-secondary me-2">{{ news.source }}</span>
<span class="badge bg-primary">{{ news.category }}</span>
</header>
{% if news.image_url %}
<figure class="mb-4">
<img class="img-fluid rounded" src="{{ news.image_url }}" alt="{{ news.title }}">
</figure>
{% endif %}
<section class="mb-5 news-content">
{% if news.html_content %}
{{ news.html_content|safe }}
{% else %}
<p>{{ news.content }}</p>
{% endif %}
</section>
</article>
<div class="card mb-4">
<div class="card-header">新闻来源</div>
<div class="card-body">
<p class="mb-0">
原文链接: <a href="{{ news.url }}" target="_blank">{{ news.url }}</a>
</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card mb-4">
<div class="card-header">相关推荐</div>
<div class="card-body">
{% if related_news %}
<ul class="list-unstyled related-news">
{% for item in related_news %}
<li class="mb-3">
<a href="/news/{{ item._id }}">{{ item.title }}</a>
<div class="text-muted small">{{ item.publish_time|format_date }}</div>
</li>
{% endfor %}
</ul>
{% else %}
<p class="card-text">暂无相关推荐</p>
{% endif %}
</div>
</div>
<div class="card mb-4">
<div class="card-header">标签</div>
<div class="card-body">
{% if news.tags %}
<div class="tags-cloud">
{% for tag in news.tags %}
<a href="/search?q={{ tag }}" class="btn btn-sm btn-outline-secondary me-1 mb-1">{{ tag }}</a>
{% endfor %}
</div>
{% else %}
<p class="card-text">暂无标签</p>
{% endif %}
</div>
</div>
</div>
</div>
</div>
<footer class="bg-dark text-white py-4 mt-4">
<div class="container">
<div class="row">
<div class="col-md-6">
<h5>新闻聚合</h5>
<p>一个使用Python爬虫技术的新闻聚合平台,汇集多个主流新闻网站的最新资讯。</p>
</div>
<div class="col-md-6 text-md-end">
<p>© 2023 新闻聚合</p>
<p>仅作学习和研究使用,请勿用于商业目的。</p>
</div>
</div>
</div>
</footer>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
<script src="{{ url_for('static', filename='js/script.js') }}"></script>
</body>
</html>
html 3
在画布上打开
web/static/css/style.css:
/* Basic styling */
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background-color: #f8f9fa;
}
/* News list styling */
.news-list .list-group-item {
border-left: none;
border-right: none;
padding: 1rem;
transition: background-color 0.2s;
}
.news-list .list-group-item:hover {
background-color: #f5f5f5;
}
.news-image {
width: 120px;
min-width: 120px;
height: 80px;
overflow: hidden;
}
.news-image img {
width: 100%;
height: 100%;
object-fit: cover;
}
/* News content styling */
.news-content {
line-height: 1.6;
font-size: 1.1rem;
}
.news-content img {
max-width: 100%;
height: auto;
margin: 1rem 0;
}
.news-content a {
color: #007bff;
}
/* Related news styling */
.related-news li {
border-bottom: 1px solid #eee;
padding-bottom: 0.5rem;
}
.related-news li:last-child {
border-bottom: none;
}
/* Tags cloud */
.tags-cloud {
display: flex;
flex-wrap: wrap;
}
/* Media queries for responsive design */
@media (max-width: 768px) {
.news-image {
width: 100px;
height: 70px;
}
.news-list .list-group-item h5 {
font-size: 1rem;
}
.news-list .list-group-item p {
font-size: 0.85rem;
}
}
@media (max-width: 576px) {
.news-content {
font-size: 1rem;
}
.d-flex.w-100.align-items-center {
flex-direction: column;
align-items: flex-start !important;
}
.news-image {
width: 100%;
height: 150px;
margin-bottom: 0.75rem;
}
.news-content {
width: 100%;
}
}
10.6 主程序
main.py:
# news_spider/main.py
import os
import sys
import logging
import argparse
import threading
from crawler import NewsCrawler
from web.app import start_web_app
from utils.db import NewsDatabase
from config import LOG_CONFIG
# 配置日志
def setup_logger():
"""设置日志"""
logger = logging.getLogger('news_spider')
logger.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
# 文件处理器
file_handler = logging.FileHandler(LOG_CONFIG['FILE'], encoding='utf-8')
file_handler.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
# 格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# 清除已有处理器
if logger.handlers:
logger.handlers.clear()
# 添加处理器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
def main():
"""主函数"""
# 设置日志
logger = setup_logger()
# 解析命令行参数
parser = argparse.ArgumentParser(description='新闻聚合爬虫')
parser.add_argument('--crawl', action='store_true', help='运行爬虫')
parser.add_argument('--web', action='store_true', help='启动Web应用')
parser.add_argument('--schedule', action='store_true', help='启动定时爬取')
parser.add_argument('--spider', type=str, help='指定要运行的爬虫(如sina,sohu,tencent)')
args = parser.parse_args()
# 如果没有指定任何选项,默认启动Web应用
if not (args.crawl or args.web or args.schedule or args.spider):
args.web = True
# 测试数据库连接
db = NewsDatabase()
if not db.connect():
logger.error("无法连接到数据库,请检查配置")
sys.exit(1)
# 显示数据库中的新闻数量
news_count = db.get_news_count()
logger.info(f"数据库中已有 {news_count} 条新闻")
db.close()
# 创建爬虫控制器
crawler = None
if args.crawl or args.schedule or args.spider:
crawler = NewsCrawler()
# 运行指定爬虫
if args.spider:
spider_names = args.spider.split(',')
for spider_name in spider_names:
crawler.run_spider(spider_name.strip())
# 运行所有爬虫
if args.crawl and not args.spider:
crawler.run_all_spiders()
# 启动定时爬取
if args.schedule:
logger.info("启动定时爬取")
schedule_thread = threading.Thread(target=crawler.schedule_crawl)
schedule_thread.daemon = True
schedule_thread.start()
# 启动Web应用
if args.web:
logger.info("启动Web应用")
if args.schedule:
logger.info("Web应用和定时爬取同时运行")
start_web_app()
else:
start_web_app()
# 如果启动了定时爬取但没有启动Web应用,保持主线程运行
if args.schedule and not args.web:
try:
# 保持主线程运行
while True:
pass
except KeyboardInterrupt:
logger.info("程序被用户中断")
finally:
if crawler:
crawler.close()
# 如果只是运行爬虫或指定爬虫,关闭爬虫控制器
if (args.crawl or args.spider) and not (args.web or args.schedule):
if crawler:
crawler.close()
if __name__ == "__main__":
main()
10.7 使用方法
-
创建项目目录结构:
mkdir -p news_spider/spiders news_spider/utils news_spider/web/static news_spider/web/templates touch news_spider/spiders/__init__.py news_spider/utils/__init__.py news_spider/web/__init__.py -
将代码复制到相应文件:
将上面提供的所有代码文件复制到相应的目录中。 -
安装依赖包:
创建requirements.txt文件,内容如下:requests==2.31.0 beautifulsoup4==4.12.2 pymongo==4.6.0 Flask==2.3.3 schedule==1.2.0 python-dotenv==1.0.0然后安装依赖:
pip install -r requirements.txt -
运行程序:
-
只爬取新闻:
python main.py --crawl -
指定爬取特定新闻源:
python main.py --spider sina,sohu -
启动定时爬取:
python main.py --schedule -
只启动Web应用:
python main.py --web -
同时启动定时爬取和Web应用:
python main.py --schedule --web
-
-
访问Web界面:
在浏览器中访问http://127.0.0.1:5000即可查看新闻聚合界面。
11. 爬虫的法律与道德
11.1 爬虫的法律风险
法律风险:
- 侵犯商业利益: 未经许可抓取商业网站数据可能被视为侵犯商业利益
- 侵犯著作权: 批量复制或重新发布他人内容可能违反著作权法
- 侵犯隐私: 抓取、存储和处理个人数据可能违反隐私法规
- 违反服务条款: 几乎所有网站都有明确禁止爬虫的服务条款
- 计算机犯罪: 在某些情况下,某些类型的爬虫活动可能被视为未授权访问
案例:
- LinkedIn诉hiQ Labs案: 法院认为对公开数据的爬取不违反《计算机欺诈与滥用法案》
- Facebook诉Power Ventures案: 法院认定在收到明确"停止和终止"通知后继续爬取构成违法
11.2 道德准则
爬虫的道德准则:
- 尊重robots.txt: 始终检查和遵守网站的robots.txt文件
- 合理的爬取速度: 控制请求频率,避免对服务器造成负担
- 清晰的身份标识: 在请求头中标明你的爬虫身份和联系方式
- 数据使用透明: 明确你将如何使用抓取的数据
- 尊重著作权: 获取适当的许可才能使用或分发内容
- 最小数据原则: 只收集必要的数据,不要过度收集
- 保护隐私: 不要爬取或存储个人识别信息
11.3 遵守robots.txt
什么是robots.txt:
robots.txt是放置在网站根目录的文本文件,指导爬虫哪些页面可以爬取,哪些不可以。
robots.txt示例:
User-agent: * # 适用于所有爬虫
Disallow: /private/ # 禁止访问/private/目录
Disallow: /tmp/ # 禁止访问/tmp/目录
Allow: /public/ # 允许访问/public/目录
User-agent: Googlebot # 仅适用于Google爬虫
Disallow: /no-google/ # 禁止Google访问/no-google/目录
Python中检查robots.txt:
import requests
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse
def can_fetch(url, user_agent="MyBot/1.0"):
"""
检查是否允许爬取特定URL
:param url: 要检查的URL
:param user_agent: 爬虫的用户代理字符串
:return: 布尔值,True表示允许爬取
"""
parsed_url = urlparse(url)
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
# 获取robots.txt文件
try:
robot_parser = RobotFileParser()
robot_parser.set_url(robots_url)
robot_parser.read()
# 检查
Python 3.x 网络爬虫从零基础到项目实战 (续)
11. 爬虫的法律与道德(续)
11.3 遵守robots.txt(续)
Python中检查robots.txt:
import requests
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse
def can_fetch(url, user_agent="MyBot/1.0"):
"""
检查是否允许爬取特定URL
:param url: 要检查的URL
:param user_agent: 爬虫的用户代理字符串
:return: 布尔值,True表示允许爬取
"""
parsed_url = urlparse(url)
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
# 获取robots.txt文件
try:
robot_parser = RobotFileParser()
robot_parser.set_url(robots_url)
robot_parser.read()
# 检查是否允许爬取
allowed = robot_parser.can_fetch(user_agent, url)
return allowed
except Exception as e:
print(f"检查robots.txt时出错: {e}")
# 如果无法获取robots.txt,建议默认为不允许爬取
return False
# 示例
url = "https://www.example.com/page.html"
if can_fetch(url, "MySpider/1.0"):
print(f"允许爬取: {url}")
else:
print(f"不允许爬取: {url}")
11.4 合理爬取策略
合理爬取的最佳实践:
- 识别自己: 在User-Agent中提供真实信息和联系方式
- 限制请求频率: 使用延迟和并发控制
- 使用缓存: 避免重复请求相同的资源
- 优雅地处理错误: 遇到错误时进行退避重试而不是立即重试
- 遵循条款限制: 遵守网站明确的使用条款
- 遵守robots.txt: 始终检查robots.txt
- 分析网站结构: 理解网站架构以减少不必要的请求
请求频率控制示例:
import time
import random
import requests
from urllib.parse import urlparse
class RateLimitedRequester:
def __init__(self, requests_per_minute=20):
"""
初始化请求器,设置每分钟最大请求数
:param requests_per_minute: 每分钟允许的最大请求数
"""
self.requests_per_minute = requests_per_minute
self.min_interval = 60.0 / requests_per_minute
self.domain_last_request = {} # 每个域名最后请求的时间
def get(self, url, **kwargs):
"""
发送GET请求,并遵循频率限制
:param url: 请求URL
:param kwargs: 传递给requests.get的参数
:return: requests.Response对象
"""
# 解析域名
domain = urlparse(url).netloc
# 检查该域名的上一次请求时间
if domain in self.domain_last_request:
# 计算距离上次请求的时间间隔
elapsed = time.time() - self.domain_last_request[domain]
# 如果间隔小于最小间隔,则休眠一段时间
if elapsed < self.min_interval:
sleep_time = self.min_interval - elapsed
# 添加一些随机变化,避免请求过于规律
sleep_time += random.uniform(0, self.min_interval * 0.1)
print(f"休眠 {sleep_time:.2f} 秒以遵守频率限制...")
time.sleep(sleep_time)
# 发送请求
response = requests.get(url, **kwargs)
# 更新最后请求时间
self.domain_last_request[domain] = time.time()
return response
# 使用示例
requester = RateLimitedRequester(requests_per_minute=10) # 每分钟最多10个请求
urls = [
"https://www.example.com/page1",
"https://www.example.com/page2",
"https://www.example.com/page3",
# ...更多URL...
]
for url in urls:
try:
print(f"请求: {url}")
response = requester.get(url)
print(f"状态码: {response.status_code}")
# 处理响应...
except Exception as e:
print(f"请求失败: {e}")
11.5 爬虫政策分析工具
以下是一个简单的工具,可以分析网站的爬虫相关政策:
import requests
from bs4 import BeautifulSoup
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse, urljoin
class WebsiteCrawlPolicyAnalyzer:
def __init__(self, url, user_agent="PythonCrawlAnalyzer/1.0"):
"""
初始化爬虫政策分析器
:param url: 要分析的网站URL
:param user_agent: 用户代理字符串
"""
self.url = url
self.user_agent = user_agent
self.domain = urlparse(url).netloc
self.base_url = f"{urlparse(url).scheme}://{self.domain}"
self.robots_url = f"{self.base_url}/robots.txt"
self.tos_url = None # 服务条款URL,将在分析过程中找到
self.robot_parser = RobotFileParser()
self.headers = {"User-Agent": user_agent}
def analyze(self):
"""执行完整分析并返回结果"""
results = {
"robots_txt": self.analyze_robots_txt(),
"crawl_delay": self.get_crawl_delay(),
"terms_of_service": self.find_terms_of_service(),
"meta_robots": self.check_meta_robots()
}
# 添加综合评估
results["assessment"] = self.overall_assessment(results)
return results
def analyze_robots_txt(self):
"""分析robots.txt文件"""
try:
self.robot_parser.set_url(self.robots_url)
self.robot_parser.read()
# 检查是否存在完全禁止的规则
fully_disallowed = False
for rline in self.robot_parser.lines:
if rline.startswith("User-agent: *") or rline.startswith(f"User-agent: {self.user_agent}"):
if "Disallow: /" in self.robot_parser.lines[self.robot_parser.lines.index(rline) + 1:self.robot_parser.lines.index(rline) + 5]:
fully_disallowed = True
break
# 检查是否允许爬取首页
index_allowed = self.robot_parser.can_fetch(self.user_agent, self.base_url)
# 获取禁止的路径示例
disallowed_paths = self._get_disallowed_paths()
return {
"exists": True,
"fully_disallowed": fully_disallowed,
"index_allowed": index_allowed,
"disallowed_paths": disallowed_paths,
"raw_content": self._get_robots_content()
}
except Exception as e:
print(f"分析robots.txt时出错: {e}")
return {
"exists": False,
"error": str(e)
}
def _get_robots_content(self):
"""获取robots.txt的原始内容"""
try:
response = requests.get(self.robots_url, headers=self.headers)
if response.status_code == 200:
return response.text
return None
except:
return None
def _get_disallowed_paths(self):
"""获取对当前用户代理禁止的路径"""
disallowed = []
try:
response = requests.get(self.robots_url, headers=self.headers)
if response.status_code != 200:
return []
lines = response.text.split('\n')
user_agent_matches = False
for line in lines:
line = line.strip()
if line.startswith('User-agent:'):
agent = line[11:].strip()
# 检查是否适用于我们的User-Agent或通配符
user_agent_matches = (agent == '*' or self.user_agent.lower().startswith(agent.lower()))
elif user_agent_matches and line.startswith('Disallow:'):
path = line[9:].strip()
if path and path != '/': # 忽略空路径和根路径
disallowed.append(path)
except Exception as e:
print(f"获取禁止路径时出错: {e}")
return disallowed[:10] # 返回最多10个示例
def get_crawl_delay(self):
"""获取Crawl-delay设置"""
try:
for rline in self.robot_parser.lines:
if rline.lower().startswith("crawl-delay:"):
return float(rline.split(":", 1)[1].strip())
return None
except:
return None
def find_terms_of_service(self):
"""尝试找到服务条款页面并检查爬虫相关条款"""
try:
# 获取首页内容
response = requests.get(self.base_url, headers=self.headers)
if response.status_code != 200:
return {"found": False, "reason": "无法访问网站首页"}
soup = BeautifulSoup(response.text, 'html.parser')
# 查找可能的服务条款链接
tos_links = []
for a in soup.find_all('a', href=True):
text = a.text.lower()
if any(term in text for term in ['terms', 'conditions', 'terms of service', 'terms of use',
'服务条款', '使用条款', '用户协议']):
href = a['href']
full_url = urljoin(self.base_url, href)
tos_links.append(full_url)
if not tos_links:
return {"found": False, "reason": "找不到服务条款链接"}
# 检查第一个服务条款链接
self.tos_url = tos_links[0]
tos_response = requests.get(self.tos_url, headers=self.headers)
if tos_response.status_code != 200:
return {"found": False, "reason": "无法访问服务条款页面"}
tos_soup = BeautifulSoup(tos_response.text, 'html.parser')
tos_text = tos_soup.get_text().lower()
# 检查是否包含爬虫相关条款
scraping_related = []
keywords = ['scrape', 'crawl', 'spider', 'bot', 'automatic', 'data mining',
'爬虫', '抓取', '自动化', '数据挖掘']
for keyword in keywords:
if keyword in tos_text:
# 尝试提取包含关键词的句子
start = max(0, tos_text.find(keyword) - 100)
end = min(len(tos_text), tos_text.find(keyword) + 100)
context = tos_text[start:end].replace('\n', ' ').strip()
scraping_related.append(f"...{context}...")
return {
"found": True,
"url": self.tos_url,
"mentions_scraping": len(scraping_related) > 0,
"scraping_contexts": scraping_related[:3] # 最多返回3个示例
}
except Exception as e:
return {"found": False, "reason": f"分析服务条款时出错: {e}"}
def check_meta_robots(self):
"""检查网站首页的meta robots标签"""
try:
response = requests.get(self.url, headers=self.headers)
if response.status_code != 200:
return {"error": f"无法访问URL: {response.status_code}"}
soup = BeautifulSoup(response.text, 'html.parser')
# 查找meta robots标签
meta_robots = soup.find('meta', attrs={'name': 'robots'})
meta_googlebot = soup.find('meta', attrs={'name': 'googlebot'})
results = {"found": False}
if meta_robots:
content = meta_robots.get('content', '').lower()
results = {
"found": True,
"content": content,
"noindex": 'noindex' in content,
"nofollow": 'nofollow' in content
}
if meta_googlebot:
results["googlebot"] = {
"content": meta_googlebot.get('content', '').lower()
}
return results
except Exception as e:
return {"error": str(e)}
def overall_assessment(self, results):
"""对爬取策略进行综合评估"""
# 初始评分为5(中性),范围为0-10
# 0表示完全禁止爬取,10表示完全允许爬取
score = 5
assessment = {
"score": score,
"factors": [],
"recommendation": ""
}
# 检查robots.txt
if results["robots_txt"]["exists"]:
if results["robots_txt"]["fully_disallowed"]:
score -= 5
assessment["factors"].append("robots.txt禁止爬取整个网站")
elif not results["robots_txt"]["index_allowed"]:
score -= 3
assessment["factors"].append("robots.txt禁止爬取首页")
else:
score += 1
assessment["factors"].append("robots.txt允许爬取首页")
if results["crawl_delay"]:
score -= 1
assessment["factors"].append(f"存在爬取速率限制: {results['crawl_delay']}秒/请求")
else:
score += 2
assessment["factors"].append("网站没有robots.txt文件")
# 检查meta robots
if results["meta_robots"]["found"]:
if results["meta_robots"].get("noindex", False):
score -= 2
assessment["factors"].append("页面使用noindex meta标签")
if results["meta_robots"].get("nofollow", False):
score -= 2
assessment["factors"].append("页面使用nofollow meta标签")
# 检查服务条款
if results["terms_of_service"]["found"]:
if results["terms_of_service"]["mentions_scraping"]:
score -= 2
assessment["factors"].append("服务条款明确提到爬虫限制")
# 确保分数在0-10范围内
score = max(0, min(10, score))
assessment["score"] = score
# 根据分数给出建议
if score <= 2:
assessment["recommendation"] = "强烈建议不要爬取,网站明确禁止爬虫活动"
elif score <= 5:
assessment["recommendation"] = "谨慎爬取,严格遵守robots.txt的限制,控制爬取速度"
elif score <= 8:
assessment["recommendation"] = "可以爬取,但应遵守良好实践,避免过多请求"
else:
assessment["recommendation"] = "爬取无明显限制,但仍应保持适度的爬取速度和尊重网站资源"
return assessment
# 使用示例
def analyze_website_crawl_policy(url):
"""分析网站的爬虫政策"""
analyzer = WebsiteCrawlPolicyAnalyzer(url)
results = analyzer.analyze()
print(f"\n爬虫策略分析: {url}\n{'-'*50}")
# 打印robots.txt分析
print("\n【robots.txt分析】")
if results["robots_txt"]["exists"]:
print(f"- 是否完全禁止爬取: {'是' if results['robots_txt']['fully_disallowed'] else '否'}")
print(f"- 是否允许爬取首页: {'是' if results['robots_txt']['index_allowed'] else '否'}")
if results["robots_txt"]["disallowed_paths"]:
print("- 禁止爬取的路径示例:")
for path in results["robots_txt"]["disallowed_paths"]:
print(f" * {path}")
else:
print("- 网站没有robots.txt文件")
# 打印爬取延迟
print("\n【爬取延迟设置】")
if results["crawl_delay"]:
print(f"- Crawl-delay: {results['crawl_delay']}秒")
else:
print("- 未设置Crawl-delay")
# 打印服务条款分析
print("\n【服务条款分析】")
if results["terms_of_service"]["found"]:
print(f"- 服务条款地址: {results['terms_of_service']['url']}")
print(f"- 是否提及爬虫: {'是' if results['terms_of_service']['mentions_scraping'] else '否'}")
if results["terms_of_service"]["mentions_scraping"]:
print("- 相关条款摘录:")
for context in results["terms_of_service"]["scraping_contexts"]:
print(f" * {context}")
else:
print(f"- 找不到服务条款页面: {results['terms_of_service']['reason']}")
# 打印meta robots分析
print("\n【Meta Robots分析】")
if results["meta_robots"].get("found", False):
print(f"- Meta Robots: {results['meta_robots']['content']}")
print(f"- noindex: {'是' if results['meta_robots'].get('noindex', False) else '否'}")
print(f"- nofollow: {'是' if results['meta_robots'].get('nofollow', False) else '否'}")
else:
print("- 页面未使用meta robots标签")
# 打印综合评估
print("\n【综合评估】")
print(f"- 爬取友好度评分: {results['assessment']['score']}/10")
print("- 影响因素:")
for factor in results["assessment"]["factors"]:
print(f" * {factor}")
print(f"- 建议: {results['assessment']['recommendation']}")
# 使用示例
if __name__ == "__main__":
analyze_website_crawl_policy("https://www.example.com")
12. 进阶技巧与优化
12.1 高效爬虫设计模式
12.1.1 生产者-消费者模式
这种模式将URL发现(生产者)和内容下载处理(消费者)分离,提高效率:
import threading
import queue
import time
import random
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
class ProducerConsumerCrawler:
def __init__(self, seed_urls, max_depth=2, num_producers=1, num_consumers=4):
"""
初始化生产者-消费者爬虫
:param seed_urls: 种子URL列表
:param max_depth: 最大爬取深度
:param num_producers: 生产者线程数
:param num_consumers: 消费者线程数
"""
self.url_queue = queue.Queue() # URL队列
self.seen_urls = set() # 已见过的URLs集合
self.result_data = [] # 爬取结果
self.result_lock = threading.Lock() # 结果列表锁
self.max_depth = max_depth
self.num_producers = num_producers
self.num_consumers = num_consumers
# 初始化种子URL
for url in seed_urls:
self.url_queue.put((url, 0)) # (url, depth)
self.seen_urls.add(url)
# 停止标志
self.should_stop = False
def producer(self):
"""生产者线程:从页面中发现新URL并加入队列"""
while not self.should_stop:
try:
# 从队列获取URL和深度
url, depth = self.url_queue.get(timeout=1)
# 超过最大深度则不处理
if depth >= self.max_depth:
self.url_queue.task_done()
continue
try:
# 获取页面内容
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
response = requests.get(url, headers=headers, timeout=5)
if response.status_code != 200:
self.url_queue.task_done()
continue
# 使用BeautifulSoup解析页面
soup = BeautifulSoup(response.text, 'html.parser')
# 提取所有链接
for a_tag in soup.find_all('a', href=True):
# 构建绝对URL
link = urljoin(url, a_tag['href'])
# 如果是新链接,加入队列
if link not in self.seen_urls:
with threading.Lock():
self.seen_urls.add(link)
self.url_queue.put((link, depth + 1))
except Exception as e:
print(f"处理URL时出错: {url} - {e}")
# 标记任务完成
self.url_queue.task_done()
# 随机延迟,避免请求过快
time.sleep(random.uniform(0.5, 1.5))
except queue.Empty:
# 队列为空,等待一下再试
time.sleep(0.2)
def consumer(self):
"""消费者线程:处理URL并提取数据"""
while not self.should_stop:
try:
# 从队列获取URL和深度
url, _ = self.url_queue.get(timeout=1)
try:
# 获取页面内容
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
response = requests.get(url, headers=headers, timeout=5)
if response.status_code != 200:
self.url_queue.task_done()
continue
# 解析页面并提取信息
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.text.strip() if soup.title else "No Title"
# 提取文章内容示例 (根据实际网站结构调整选择器)
content = ""
article = soup.find('article') or soup.find('div', class_='content')
if article:
# 提取所有段落文本
paragraphs = article.find_all('p')
content = '\n'.join([p.text.strip() for p in paragraphs])
# 创建数据项并添加到结果列表
data_item = {
'url': url,
'title': title,
'content': content[:200] + "..." if len(content) > 200 else content,
'time': time.strftime('%Y-%m-%d %H:%M:%S')
}
# 使用锁保护共享的结果列表
with self.result_lock:
self.result_data.append(data_item)
print(f"已处理: {url} - {title}")
except Exception as e:
print(f"处理URL时出错: {url} - {e}")
# 标记任务完成
self.url_queue.task_done()
# 随机延迟,避免请求过快
time.sleep(random.uniform(1.0, 2.0))
except queue.Empty:
# 队列为空,等待一下再试
time.sleep(0.2)
def crawl(self, max_items=100, timeout=60):
"""
开始爬取
:param max_items: 最大爬取项数
:param timeout: 最大爬取时间(秒)
:return: 爬取的数据列表
"""
# 创建并启动生产者线程
producer_threads = []
for i in range(self.num_producers):
t = threading.Thread(target=self.producer)
t.daemon = True
t.start()
producer_threads.append(t)
# 创建并启动消费者线程
consumer_threads = []
for i in range(self.num_consumers):
t = threading.Thread(target=self.consumer)
t.daemon = True
t.start()
consumer_threads.append(t)
# 设置开始时间
start_time = time.time()
# 主循环
try:
while True:
# 检查是否达到最大项数
if len(self.result_data) >= max_items:
print(f"已达到最大项数限制: {max_items}")
break
# 检查是否超时
if time.time() - start_time > timeout:
print(f"爬取超时: {timeout}秒")
break
# 检查队列是否为空且所有任务已完成
if self.url_queue.empty():
# 等待一下,让生产者有机会添加新URL
time.sleep(2)
# 如果队列仍然为空,结束爬取
if self.url_queue.empty():
print("URL队列为空,爬取完成")
break
time.sleep(0.5)
except KeyboardInterrupt:
print("用户中断爬取")
finally:
# 设置停止标志并等待线程结束
self.should_stop = True
for t in producer_threads + consumer_threads:
t.join(1.0)
print(f"爬取完成,共获取 {len(self.result_data)} 个项目,处理 {len(self.seen_urls)} 个URL")
return self.result_data
# 使用示例
if __name__ == "__main__":
seed_urls = [
"https://news.ycombinator.com/",
"https://www.theverge.com/"
]
crawler = ProducerConsumerCrawler(
seed_urls=seed_urls,
max_depth=2,
num_producers=2,
num_consumers=4
)
results = crawler.crawl(max_items=20, timeout=60)
print("\n爬取结果示例:")
for i, item in enumerate(results[:5]):
print(f"\n[{i+1}] {item['title']}")
print(f"URL: {item['url']}")
print(f"内容预览: {item['content'][:100]}...")
12.1.2 异步爬虫
使用Python的asyncio可以实现高效的异步爬虫:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin
import logging
import random
from aiohttp import ClientSession
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AsyncCrawler:
def __init__(self, seed_urls, max_depth=2, max_urls_per_domain=20):
"""
初始化异步爬虫
:param seed_urls: 种子URL列表
:param max_depth: 最大爬取深度
:param max_urls_per_domain: 每个域名最大爬取URL数
"""
self.seed_urls = seed_urls
self.max_depth = max_depth
self.max_urls_per_domain = max_urls_per_domain
self.visited_urls = set()
self.url_queue = asyncio.Queue()
self.results = []
self.domain_counts = {} # 域名爬取计数
self.semaphore = asyncio.Semaphore(10) # 限制并发请求数
async def fetch(self, url, session, depth):
"""异步获取页面内容"""
try:
async with self.semaphore:
# 添加随机延迟,避免请求过快
await asyncio.sleep(random.uniform(0.5, 1.5))
async with session.get(url, timeout=10) as response:
if response.status != 200:
logger.warning(f"获取失败: {url} - 状态码: {response.status}")
return None
# 获取内容类型,确保是HTML
content_type = response.headers.get('Content-Type', '').lower()
if 'text/html' not in content_type:
logger.warning(f"跳过非HTML内容: {url} - {content_type}")
return None
# 获取页面内容
html = await response.text()
# 解析页面
soup = BeautifulSoup(html, 'html.parser')
# 提取标题
title = soup.title.text.strip() if soup.title else "No Title"
# 提取内容
content = ""
article = soup.find('article') or soup.find('div', class_='content')
if article:
paragraphs = article.find_all('p')
content = '\n'.join([p.text.strip() for p in paragraphs])
# 提取新链接
if depth < self.max_depth:
await self.extract_links(soup, url, depth + 1)
# 记录结果
self.results.append({
'url': url,
'title': title,
'content_preview': content[:200] + '...' if content and len(content) > 200 else content,
'depth': depth
})
logger.info(f"已爬取: {url} - {title}")
return html
except aiohttp.ClientError as e:
logger.error(f"请求错误: {url} - {e}")
except asyncio.TimeoutError:
logger.error(f"请求超时: {url}")
except Exception as e:
logger.error(f"爬取错误: {url} - {e}")
return None
async def extract_links(self, soup, base_url, new_depth):
"""从页面提取链接并加入队列"""
# 从URL中提取域名
from urllib.parse import urlparse
base_domain = urlparse(base_url).netloc
# 检查域名限制
if self.domain_counts.get(base_domain, 0) >= self.max_urls_per_domain:
return
# 提取所有链接
for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
# 跳过锚点和JavaScript链接
if href.startswith('#') or href.startswith('javascript:'):
continue
# 构建完整URL
full_url = urljoin(base_url, href)
# 解析URL
parsed = urlparse(full_url)
# 只处理HTTP和HTTPS链接
if not parsed.scheme in ['http', 'https']:
continue
# 获取域名
domain = parsed.netloc
# 跳过已访问的URL
if full_url in self.visited_urls:
continue
# 标记为已访问
self.visited_urls.add(full_url)
# 更新域名计数
self.domain_counts[domain] = self.domain_counts.get(domain, 0) + 1
# 加入队列
await self.url_queue.put((full_url, new_depth))
async def worker(self, session):
"""工作任务:从队列获取URL并处理"""
while True:
try:
# 从队列获取URL和深度
url, depth = await asyncio.wait_for(self.url_queue.get(), timeout=5.0)
# 爬取页面
await self.fetch(url, session, depth)
# 标记任务完成
self.url_queue.task_done()
except asyncio.TimeoutError:
# 队列超时,可能是任务已经完成
break
except Exception as e:
logger.error(f"工作任务出错: {e}")
self.url_queue.task_done()
async def crawl(self, num_workers=10, timeout=60):
"""
开始爬取
:param num_workers: 工作任务数
:param timeout: 爬取超时时间(秒)
"""
# 将种子URL加入队列
for url in self.seed_urls:
self.visited_urls.add(url)
await self.url_queue.put((url, 0))
# 创建限时任务
try:
# 创建客户端会话
async with aiohttp.ClientSession(
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'},
timeout=aiohttp.ClientTimeout(total=15) # 15秒超时
) as session:
# 创建工作任务
tasks = []
for _ in range(num_workers):
task = asyncio.create_task(self.worker(session))
tasks.append(task)
# 设置开始时间
start_time = time.time()
# 等待队列处理完毕或超时
while not self.url_queue.empty():
# 检查超时
if time.time() - start_time > timeout:
logger.info(f"爬取超时: {timeout}秒")
break
# 等待一小段时间
await asyncio.sleep(0.5)
# 任务完成或超时,取消所有任务
for task in tasks:
task.cancel()
# 等待任务取消
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(f"爬取过程出错: {e}")
logger.info(f"爬取完成,共获取 {len(self.results)} 个页面,访问 {len(self.visited_urls)} 个URL")
return self.results
async def main():
"""主函数"""
seed_urls = [
"https://news.ycombinator.com/",
"https://www.theverge.com/"
]
crawler = AsyncCrawler(
seed_urls=seed_urls,
max_depth=2,
max_urls_per_domain=10
)
results = await crawler.crawl(num_workers=5, timeout=30)
# 打印部分结果
print("\n爬取结果:")
for i, result in enumerate(results[:5], 1):
print(f"\n[{i}] {result['title']}")
print(f"URL: {result['url']}")
print(f"深度: {result['depth']}")
print(f"内容预览: {result['content_preview']}")
print(f"\n总共爬取了 {len(results)} 个页面")
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())
12.2 性能优化技巧
12.2.1 连接池与会话重用
使用连接池和会话重用可以显著提高爬虫性能:
import requests
from concurrent.futures import ThreadPoolExecutor
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
from bs4 import BeautifulSoup
class OptimizedCrawler:
def __init__(self, max_workers=10, timeout=10, max_retries=3):
"""
初始化优化爬虫
:param max_workers: 最大工作线程数
:param timeout: 请求超时时间
:param max_retries: 最大重试次数
"""
self.max_workers = max_workers
self.timeout = timeout
# 创建会话对象
self.session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=max_retries,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST"]
)
# 创建适配器,设置连接池
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=20, # 连接池保持的连接数
pool_maxsize=50 # 连接池最大连接数
)
# 设置适配器
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# 设置默认请求头
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml",
"Accept-Language": "en-US,en;q=0.9",
})
# 线程池
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def close(self):
"""关闭资源"""
self.session.close()
self.executor.shutdown()
def fetch_url(self, url):
"""
获取URL内容
:param url: 要获取的URL
:return: (URL, 状态码, 内容) 元组或 (URL, 错误信息, None) 元组
"""
try:
start_time = time.time()
response = self.session.get(url, timeout=self.timeout)
elapsed = time.time() - start_time
print(f"获取 {url} - 状态: {response.status_code} - 用时: {elapsed:.2f}秒")
return (url, response.status_code, response.text)
except Exception as e:
print(f"获取 {url} 出错: {e}")
return (url, str(e), None)
def batch_fetch(self, urls):
"""
批量获取URL内容
:param urls: URL列表
:return: 结果列表
"""
# 使用线程池提交任务
futures = [self.executor.submit(self.fetch_url, url) for url in urls]
# 获取结果
results = []
for future in futures:
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"获取任务结果时出错: {e}")
return results
def extract_data(self, html, url):
"""
从HTML中提取数据
:param html: HTML内容
:param url: URL
:return: 提取的数据字典
"""
try:
soup = BeautifulSoup(html, 'html.parser')
# 提取标题
title = soup.title.text.strip() if soup.title else "No Title"
# 提取描述
description = ""
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc and 'content' in meta_desc.attrs:
description = meta_desc.attrs['content']
# 提取链接
links = []
for a in soup.find_all('a', href=True):
links.append(a['href'])
return {
'url': url,
'title': title,
'description': description,
'links': links[:10] # 限制只返回前10个链接
}
except Exception as e:
print(f"提取数据出错: {url} - {e}")
return {
'url': url,
'error': str(e)
}
def crawl_and_extract(self, urls):
"""
爬取URL并提取数据
:param urls: URL列表
:return: 提取的数据列表
"""
# 批量获取URL内容
fetch_results = self.batch_fetch(urls)
# 提取数据
data = []
for url, status, html in fetch_results:
if html: # 如果有内容
result = self.extract_data(html, url)
data.append(result)
return data
# 使用示例
if __name__ == "__main__":
urls = [
"https://www.python.org",
"https://www.github.com",
"https://www.wikipedia.org",
"https://www.reddit.com",
"https://news.ycombinator.com"
]
crawler = OptimizedCrawler(max_workers=3, timeout=5)
try:
print("开始爬取...")
start_time = time.time()
results = crawler.crawl_and_extract(urls)
elapsed = time.time() - start_time
print(f"\n爬取完成,用时: {elapsed:.2f}秒,获取了 {len(results)} 个页面")
# 打印结果
for i, result in enumerate(results, 1):
print(f"\n[{i}] {result['title']}")
print(f"URL: {result['url']}")
if 'description' in result:
print(f"描述: {result['description'][:100]}...")
if 'links' in result and result['links']:
print(f"部分链接: {', '.join(result['links'][:3])}")
finally:
# 关闭资源
crawler.close()
12.2.2 数据压缩与内存优化
处理大量数据时,压缩和内存优化非常重要:
import gzip
import json
import time
import os
import tempfile
from bs4 import BeautifulSoup
import psutil
import requests
from memory_profiler import profile
class MemoryEfficientCrawler:
def __init__(self, use_compression=True, batch_size=100):
"""
初始化内存高效爬虫
:param use_compression: 是否使用压缩
:param batch_size: 批处理大小
"""
self.use_compression = use_compression
self.batch_size = batch_size
self.temp_dir = tempfile.mkdtemp()
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def log_memory_usage(self, label):
"""记录内存使用情况"""
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
print(f"内存使用 [{label}]: {mem_info.rss / 1024 / 1024:.2f} MB")
def fetch_url(self, url):
"""获取URL内容"""
try:
response = self.session.get(url, timeout=10)
return response.text
except Exception as e:
print(f"获取URL出错: {url} - {e}")
return None
def extract_data(self, html, url):
"""从HTML中提取数据"""
if not html:
return None
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.text.strip() if soup.title else "No Title"
# 提取主要文本
main_text = ""
for p in soup.find_all('p'):
main_text += p.text + "\n"
return {
'url': url,
'title': title,
'text': main_text,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
def save_batch(self, batch, batch_num):
"""保存一批数据到临时文件"""
if not batch:
return
filename = os.path.join(self.temp_dir, f"batch_{batch_num}.json")
if self.use_compression:
# 压缩保存
with gzip.open(filename + '.gz', 'wt', encoding='utf-8') as f:
json.dump(batch, f)
else:
# 不压缩保存
with open(filename, 'w', encoding='utf-8') as f:
json.dump(batch, f)
def load_batch(self, batch_num):
"""从临时文件加载一批数据"""
filename = os.path.join(self.temp_dir, f"batch_{batch_num}.json")
if self.use_compression:
try:
with gzip.open(filename + '.gz', 'rt', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
return []
else:
try:
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
return []
@profile
def crawl(self, urls):
"""爬取URL列表并提取数据"""
self.log_memory_usage("开始")
result_count = 0
batch_count = 0
current_batch = []
# 爬取和处理URL
for i, url in enumerate(urls):
print(f"处理 {i+1}/{len(urls)}: {url}")
# 获取页面内容
html = self.fetch_url(url)
# 从HTML中提取数据
data = self.extract_data(html, url)
if data:
current_batch.append(data)
result_count += 1
# 如果当前批次达到批处理大小,保存并清空
if len(current_batch) >= self.batch_size:
self.save_batch(current_batch, batch_count)
batch_count += 1
current_batch = []
# 记录内存使用
self.log_memory_usage(f"Batch {batch_count}")
# 保存最后的批次
if current_batch:
self.save_batch(current_batch, batch_count)
batch_count += 1
self.log_memory_usage("爬取完成")
# 处理结果
all_results = []
# 顺序加载所有批次
for i in range(batch_count):
batch_data = self.load_batch(i)
all_results.extend(batch_data)
# 使用后删除批次文件以释放空间
if self.use_compression:
try:
os.remove(os.path.join(self.temp_dir, f"batch_{i}.json.gz"))
except:
pass
else:
try:
os.remove(os.path.join(self.temp_dir, f"batch_{i}.json"))
except:
pass
self.log_memory_usage("处理完成")
print(f"爬取完成:处理了 {len(urls)} 个URL,成功获取 {result_count} 条数据")
return all_results
def cleanup(self):
"""清理临时文件"""
try:
# 删除临时目录
for file in os.listdir(self.temp_dir):
os.remove(os.path.join(self.temp_dir, file))
os.rmdir(self.temp_dir)
print(f"已清理临时目录: {self.temp_dir}")
except Exception as e:
print(f"清理临时文件时出错: {e}")
# 使用示例
if __name__ == "__main__":
# 生成测试URL列表
test_urls = [
"https://en.wikipedia.org/wiki/Python_(programming_language)",
"https://en.wikipedia.org/wiki/Web_scraping",
"https://en.wikipedia.org/wiki/Data_mining",
"https://en.wikipedia.org/wiki/Natural_language_processing",
"https://en.wikipedia.org/wiki/Machine_learning"
]
# 创建爬虫实例
crawler = MemoryEfficientCrawler(use_compression=True, batch_size=2)
try:
# 执行爬取
results = crawler.crawl(test_urls)
# 打印结果
print(f"\n获取了 {len(results)} 条结果:")
for i, result in enumerate(results[:3], 1): # 只打印前3条
print(f"\n[{i}] {result['title']}")
print(f"URL: {result['url']}")
text_preview = result['text'][:100].replace('\n', ' ')
print(f"文本预览: {text_preview}...")
finally:
# 清理资源
crawler.cleanup()
12.2.3 缓存策略
实现高效的缓存机制可以减少重复请求:
import hashlib
import os
import pickle
import time
import requests
from bs4 import BeautifulSoup
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class CachedCrawler:
def __init__(self, cache_dir='.cache', expiry_hours=24):
"""
初始化带缓存的爬虫
:param cache_dir: 缓存目录
:param expiry_hours: 缓存过期时间(小时)
"""
self.cache_dir = cache_dir
self.expiry_seconds = expiry_hours * 3600
# 创建缓存目录
os.makedirs(cache_dir, exist_ok=True)
# 创建会话
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
# 缓存统计
self.cache_hits = 0
self.cache_misses = 0
def _get_cache_key(self, url):
"""生成URL的缓存键"""
# 使用URL的MD5哈希作为缓存键
return hashlib.md5(url.encode()).hexdigest()
def _get_cache_path(self, cache_key):
"""获取缓存文件路径"""
return os.path.join(self.cache_dir, f"{cache_key}.pkl")
def _save_to_cache(self, cache_key, data):
"""保存数据到缓存"""
cache_path = self._get_cache_path(cache_key)
try:
# 序列化数据
cache_data = {
'timestamp': time.time(),
'data': data
}
with open(cache_path, 'wb') as f:
pickle.dump(cache_data, f)
return True
except Exception as e:
logger.error(f"保存缓存失败: {e}")
return False
def _load_from_cache(self, cache_key):
"""从缓存加载数据"""
cache_path = self._get_cache_path(cache_key)
# 检查缓存文件是否存在
if not os.path.exists(cache_path):
return None
try:
# 加载缓存数据
with open(cache_path, 'rb') as f:
cache_data = pickle.load(f)
# 检查是否过期
if time.time() - cache_data['timestamp'] > self.expiry_seconds:
logger.debug(f"缓存已过期: {cache_key}")
return None
return cache_data['data']
except Exception as e:
logger.error(f"加载缓存失败: {e}")
return None
def fetch_url(self, url, bypass_cache=False):
"""
获取URL内容,优先使用缓存
:param url: 要获取的URL
:param bypass_cache: 是否绕过缓存,强制重新获取
:return: (状态码, 内容) 元组
"""
# 生成缓存键
cache_key = self._get_cache_key(url)
# 如果不强制绕过缓存,尝试从缓存加载
if not bypass_cache:
cached_data = self._load_from_cache(cache_key)
if cached_data:
self.cache_hits += 1
logger.info(f"缓存命中: {url}")
return cached_data
# 缓存未命中或强制绕过缓存,发送请求
self.cache_misses += 1
logger.info(f"获取URL: {url}")
try:
response = self.session.get(url, timeout=10)
data = (response.status_code, response.text)
# 请求成功,保存到缓存
if response.status_code == 200:
self._save_to_cache(cache_key, data)
return data
except Exception as e:
logger.error(f"请求失败: {url} - {e}")
return (0, None)
def extract_data(self, html):
"""从HTML提取数据"""
if not html:
return None
soup = BeautifulSoup(html, 'html.parser')
# 提取标题
title = soup.title.text.strip() if soup.title else "No Title"
# 提取描述
description = ""
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc and 'content' in meta_desc.attrs:
description = meta_desc['content']
# 提取正文内容 (简单示例)
content = ""
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')
if main_content:
paragraphs = main_content.find_all('p')
content = '\n'.join([p.text for p in paragraphs])
# 提取链接
links = []
for a in soup.find_all('a', href=True):
href = a['href']
if href.startswith('http'):
links.append(href)
return {
'title': title,
'description': description,
'content_preview': content[:200] + '...' if len(content) > 200 else content,
'links': links[:10] # 最多10个链接
}
def crawl_with_cache(self, urls, force_refresh=False):
"""
使用缓存爬取多个URL
:param urls: URL列表
:param force_refresh: 是否强制刷新缓存
:return: 结果列表
"""
results = []
for url in urls:
# 获取URL内容
status_code, html = self.fetch_url(url, bypass_cache=force_refresh)
# 如果请求成功,提取数据
if status_code == 200 and html:
data = self.extract_data(html)
if data:
# 添加URL和状态码
data['url'] = url
data['status_code'] = status_code
results.append(data)
else:
logger.warning(f"无法获取内容: {url} - 状态码: {status_code}")
# 打印缓存统计
total_requests = self.cache_hits + self.cache_misses
hit_rate = self.cache_hits / total_requests * 100 if total_requests > 0 else 0
logger.info(f"缓存统计: 命中 {self.cache_hits} 次,未命中 {self.cache_misses} 次,命中率 {hit_rate:.2f}%")
return results
def clear_cache(self, older_than_hours=None):
"""
清理缓存
:param older_than_hours: 如果设置,只清理早于指定小时数的缓存
:return: 已删除的缓存文件数
"""
count = 0
now = time.time()
for filename in os.listdir(self.cache_dir):
if not filename.endswith('.pkl'):
continue
file_path = os.path.join(self.cache_dir, filename)
# 如果指定了时间阈值
if older_than_hours is not None:
try:
# 加载缓存数据以获取时间戳
with open(file_path, 'rb') as f:
cache_data = pickle.load(f)
# 检查是否早于阈值
if now - cache_data['timestamp'] < older_than_hours * 3600:
continue
except:
# 如果无法读取文件,默认删除
pass
# 删除文件
try:
os.remove(file_path)
count += 1
except Exception as e:
logger.error(f"删除缓存文件失败: {file_path} - {e}")
logger.info(f"已清理 {count} 个缓存文件")
return count
# 使用示例
if __name__ == "__main__":
# 创建缓存爬虫
crawler = CachedCrawler(cache_dir='./.url_cache', expiry_hours=48)
# 测试URL
urls = [
"https://www.python.org",
"https://docs.python.org/3/",
"https://pypi.org"
]
# 第一次运行,应该都是缓存未命中
print("\n第一次运行 (应该全部从网络获取):")
results1 = crawler.crawl_with_cache(urls)
# 打印部分结果
for i, result in enumerate(results1, 1):
print(f"\n[{i}] {result['title']}")
print(f"URL: {result['url']}")
print(f"描述: {result['description'][:100]}...")
# 第二次运行,应该都是缓存命中
print("\n第二次运行 (应该全部从缓存获取):")
results2 = crawler.crawl_with_cache(urls)
# 强制刷新缓存
print("\n第三次运行 (强制刷新缓存):")
results3 = crawler.crawl_with_cache(urls, force_refresh=True)
# 清理过期缓存
print("\n清理超过24小时的缓存:")
deleted_count = crawler.clear_cache(older_than_hours=24)
12.3 分布式爬虫
12.3.1 使用Redis实现简单的分布式爬虫
import redis
import json
import time
import random
import requests
from bs4 import BeautifulSoup
import threading
import logging
from urllib.parse import urlparse, urljoin
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DistributedCrawler:
def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
"""
初始化分布式爬虫
:param redis_host: Redis主机地址
:param redis_port: Redis端口
:param redis_db: Redis数据库
"""
# 连接Redis
self.redis = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
# 队列名称
self.url_queue_key = 'crawler:urls'
self.result_key = 'crawler:results'
self.visited_key = 'crawler:visited'
self.worker_activity_key = 'crawler:worker_activity'
# 会话对象
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
# 爬虫ID
self.worker_id = f"worker-{random.randint(1000, 9999)}"
logger.info(f"工作节点ID: {self.worker_id}")
def add_seed_urls(self, urls):
"""
添加种子URL到队列
:param urls: URL列表
"""
pipe = self.redis.pipeline()
for url in urls:
# 如果URL未被访问过,添加到队列
if not self.redis.sismember(self.visited_key, url):
# 将URL添加到队列
pipe.lpush(self.url_queue_key, url)
logger.info(f"添加种子URL: {url}")
pipe.execute()
def mark_url_processed(self, url):
"""
标记URL为已处理
:param url: 处理完的URL
"""
self.redis.sadd(self.visited_key, url)
def get_url_to_process(self, timeout=0):
"""
从队列中获取一个URL处理
:param timeout: 阻塞超时时间(秒)
:return: URL或None
"""
result = self.redis.brpop(self.url_queue_key, timeout)
if result:
return result[1].decode('utf-8')
return None
def save_result(self, result):
"""
保存爬取结果
:param result: 爬取结果字典
"""
if not result:
return
# 序列化结果
result_json = json.dumps(result)
# 保存到Redis列表
self.redis.lpush(self.result_key, result_json)
# 限制结果列表长度,防止内存溢出
self.redis.ltrim(self.result_key, 0, 9999) # 保留最新的10000条结果
def update_worker_status(self, status):
"""
更新工作节点状态
:param status: 状态信息
"""
status_data = {
'id': self.worker_id,
'timestamp': time.time(),
'status': status
}
self.redis.hset(self.worker_activity_key, self.worker_id, json.dumps(status_data))
# 设置过期时间,如果节点停止更新状态,将自动从活动列表中移除
self.redis.expire(self.worker_activity_key, 60) # 60秒过期
def fetch_url(self, url):
"""
获取URL内容
:param url: 要获取的URL
:return: 响应对象或None
"""
try:
# 随机延迟,避免请求过快
time.sleep(random.uniform(1.0, 3.0))
# 发送请求
response = self.session.get(url, timeout=10)
return response
except Exception as e:
logger.error(f"获取URL出错: {url} - {e}")
return None
def extract_links(self, html, base_url):
"""
从HTML中提取链接
:param html: HTML内容
:param base_url: 基础URL
:return: 链接列表
"""
if not html:
return []
try:
soup = BeautifulSoup(html, 'html.parser')
links = []
# 解析URL域名
base_domain = urlparse(base_url).netloc
# 提取所有链接
for a in soup.find_all('a', href=True):
href = a['href']
# 跳过空链接、锚点和JavaScript链接
if not href or href.startswith('#') or href.startswith('javascript:'):
continue
# 构建完整URL
full_url = urljoin(base_url, href)
parsed = urlparse(full_url)
# 只保留HTTP和HTTPS链接
if not parsed.scheme in ['http', 'https']:
continue
# 只保留同域名链接
if parsed.netloc == base_domain:
# 规范化URL
normalized_url = full_url.split('#')[0] # 移除锚点
links.append(normalized_url)
return links
except Exception as e:
logger.error(f"提取链接出错: {e}")
return []
def extract_data(self, html, url):
"""
从HTML中提取数据
:param html: HTML内容
:param url: URL
:return: 提取的数据
"""
if not html:
return None
try:
soup = BeautifulSoup(html, 'html.parser')
# 提取标题
title = soup.title.text.strip() if soup.title else "No Title"
# 提取页面元信息
description = ""
keywords = ""
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc and 'content' in meta_desc.attrs:
description = meta_desc['content']
meta_keywords = soup.find('meta', attrs={'name': 'keywords'})
if meta_keywords and 'content' in meta_keywords.attrs:
keywords = meta_keywords['content']
# 提取正文内容
content = ""
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')
if main_content:
# 提取所有段落
paragraphs = main_content.find_all('p')
content = '\n'.join([p.text.strip() for p in paragraphs])
# 如果找不到主要内容,尝试提取所有段落
if not content:
paragraphs = soup.find_all('p')
content = '\n'.join([p.text.strip() for p in paragraphs])
# 构建结果
return {
'url': url,
'title': title,
'description': description,
'keywords': keywords,
'content': content[:1000] + '...' if len(content) > 1000 else content,
'crawl_time': time.strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
logger.error(f"提取数据出错: {url} - {e}")
return None
def process_url(self, url, max_depth=2, current_depth=0):
"""
处理单个URL
:param url: 要处理的URL
:param max_depth: 最大抓取深度
:param current_depth: 当前深度
"""
if not url:
return
# 更新工作节点状态
self.update_worker_status(f"处理: {url}")
# 检查URL是否已被处理
if self.redis.sismember(self.visited_key, url):
logger.debug(f"URL已处理: {url}")
return
# 获取URL内容
logger.info(f"获取URL: {url}")
response = self.fetch_url(url)
if not response or response.status_code != 200:
logger.warning(f"获取URL失败: {url} - 状态码: {response.status_code if response else 'N/A'}")
self.mark_url_processed(url)
return
# 从HTML中提取数据
data = self.extract_data(response.text, url)
if data:
self.save_result(data)
# 如果未达到最大深度,提取并添加新链接
if current_depth < max_depth:
# 提取链接
links = self.extract_links(response.text, url)
# 添加新链接到队列
pipe = self.redis.pipeline()
added_count = 0
for link in links:
# 检查链接是否已被处理
if not self.redis.sismember(self.visited_key, link):
pipe.lpush(self.url_queue_key, link)
added_count += 1
pipe.execute()
logger.info(f"添加了 {added_count} 个新链接到队列")
# 标记URL为已处理
self.mark_url_processed(url)
def start_worker(self, max_depth=2, timeout=60):
"""
启动工作节点
:param max_depth: 最大抓取深度
:param timeout: 如果没有URL可处理,等待多少秒后退出
"""
logger.info(f"启动工作节点: {self.worker_id}")
# 更新工作节点状态
self.update_worker_status("启动中")
start_time = time.time()
last_url_time = start_time
processed_count = 0
try:
while True:
# 更新工作节点状态
self.update_worker_status(f"活跃 ({processed_count}个URL)")
# 从队列获取URL
url = self.get_url_to_process(timeout=1)
if url:
# 处理URL
self.process_url(url, max_depth=max_depth)
processed_count += 1
last_url_time = time.time()
else:
# 检查是否超时
if time.time() - last_url_time > timeout:
logger.info(f"超过 {timeout} 秒没有新URL,退出")
break
# 避免CPU空转
time.sleep(0.1)
except KeyboardInterrupt:
logger.info("收到中断信号,停止爬虫")
except Exception as e:
logger.error(f"爬虫出错: {e}")
finally:
# 更新工作节点状态
self.update_worker_status(f"已停止 (处理了 {processed_count} 个URL)")
logger.info(f"工作节点停止: {self.worker_id},处理了 {processed_count} 个URL")
def stats(self):
"""获取爬虫统计信息"""
# 获取队列长度
url_queue_len = self.redis.llen(self.url_queue_key)
# 获取已访问URL数量
visited_count = self.redis.scard(self.visited_key)
# 获取结果数量
result_count = self.redis.llen(self.result_key)
# 获取活动工作节点
workers = []
for worker_id, worker_data in self.redis.hgetall(self.worker_activity_key).items():
try:
worker_info = json.loads(worker_data)
worker_info['id'] = worker_id.decode('utf-8')
workers.append(worker_info)
except:
pass
return {
'pending_urls': url_queue_len,
'visited_urls': visited_count,
'results': result_count,
'active_workers': len(workers),
'workers': workers
}
def get_results(self, limit=10):
"""
获取最新爬取结果
:param limit: 返回的最大结果数
:return: 结果列表
"""
results = []
# 获取最新结果
result_jsons = self.redis.lrange(self.result_key, 0, limit - 1)
for result_json in result_jsons:
try:
result = json.loads(result_json)
results.append(result)
except:
continue
return results
# 使用示例
if __name__ == "__main__":
# 创建分布式爬虫
crawler = DistributedCrawler(redis_host='localhost', redis_port=6379)
# 如果是主节点,添加种子URL
is_master = input("是否为主节点? (y/n): ").lower() == 'y'
if is_master:
# 添加种子URL
seed_urls = [
"https://www.python.org",
"https://docs.python.org",
"https://pypi.org"
]
crawler.add_seed_urls(seed_urls)
# 显示初始统计信息
stats = crawler.stats()
print(f"初始统计: {stats}")
# 启动一个监控线程
def monitor_stats():
while True:
try:
stats = crawler.stats()
print(f"\n爬虫统计 - {time.strftime('%H:%M:%S')}:")
print(f"待处理URL: {stats['pending_urls']}")
print(f"已访问URL: {stats['visited_urls']}")
print(f"结果数量: {stats['results']}")
print(f"活动工作节点: {stats['active_workers']}")
# 显示最近结果
results = crawler.get_results(5)
if results:
print("\n最新结果:")
for i, result in enumerate(results, 1):
print(f"{i}. {result['title']} - {result['url']}")
time.sleep(5)
except KeyboardInterrupt:
break
except Exception as e:
print(f"监控出错: {e}")
time.sleep(5)
# 启动监控线程
monitor_thread = threading.Thread(target=monitor_stats)
monitor_thread.daemon = True
monitor_thread.start()
# 启动工作节点
max_depth = 2
timeout = 60
print(f"启动工作节点,最大深度: {max_depth},超时: {timeout}秒")
crawler.start_worker(max_depth=max_depth, timeout=timeout)
# 结束后显示统计信息
if is_master:
stats = crawler.stats()
print(f"\n最终统计:")
print(f"待处理URL: {stats['pending_urls']}")
print(f"已访问URL: {stats['visited_urls']}")
print(f"结果数量: {stats['results']}")
12.3.2 使用Scrapy和ScrapyRT实现API分布式爬虫
# 这个示例需要先安装Scrapy和ScrapyRT
# pip install scrapy scrapyrt
# 1. 创建Scrapy项目
# scrapy startproject distributed_crawler
# cd distributed_crawler
# scrapy genspider example example.com
# 2. 编辑items.py
"""
# distributed_crawler/distributed_crawler/items.py
import scrapy
class WebsiteItem(scrapy.Item):
url = scrapy.Field()
title = scrapy.Field()
description = scrapy.Field()
keywords = scrapy.Field()
content = scrapy.Field()
links = scrapy.Field()
timestamp = scrapy.Field()
"""
# 3. 编辑spider
"""
# distributed_crawler/distributed_crawler/spiders/website.py
import scrapy
import time
from urllib.parse import urlparse, urljoin
from distributed_crawler.items import WebsiteItem
class WebsiteSpider(scrapy.Spider):
name = 'website'
# 这些设置可以在请求中覆盖
custom_settings = {
'DOWNLOAD_DELAY': 1,
'CONCURRENT_REQUESTS_PER_DOMAIN': 4,
}
def __init__(self, *args, **kwargs):
super(WebsiteSpider, self).__init__(*args, **kwargs)
# 从参数中获取URL
url = kwargs.get('url')
if url:
self.start_urls = [url]
self.allowed_domains = [] # 将在start_requests中设置
def start_requests(self):
for url in self.start_urls:
# 设置允许的域名
parsed_url = urlparse(url)
domain = parsed_url.netloc
self.allowed_domains.append(domain)
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response):
# 提取页面信息
item = WebsiteItem()
item['url'] = response.url
item['title'] = response.css('title::text').get()
# 提取元信息
item['description'] = response.css('meta[name="description"]::attr(content)').get()
item['keywords'] = response.css('meta[name="keywords"]::attr(content)').get()
# 提取内容
content_selector = response.css('main') or response.css('article') or response.css('div.content')
if content_selector:
paragraphs = content_selector.css('p::text').getall()
item['content'] = '\\n'.join([p.strip() for p in paragraphs if p.strip()])
else:
# 如果找不到主要内容容器,尝试获取所有段落
paragraphs = response.css('p::text').getall()
item['content'] = '\\n'.join([p.strip() for p in paragraphs if p.strip()])
# 提取链接
links = []
for href in response.css('a::attr(href)').getall():
full_url = urljoin(response.url, href)
parsed = urlparse(full_url)
# 只保留同域名的HTTP/HTTPS链接
if parsed.scheme in ['http', 'https'] and parsed.netloc in self.allowed_domains:
links.append(full_url)
item['links'] = links
item['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S')
yield item
# 继续爬取链接 (可选,取决于是否要在API中支持自动爬取)
crawl_links = self.settings.getbool('CRAWL_LINKS', False)
if crawl_links:
for url in links:
yield scrapy.Request(url=url, callback=self.parse)
"""
# 4. 编辑settings.py
"""
# distributed_crawler/distributed_crawler/settings.py
BOT_NAME = 'distributed_crawler'
SPIDER_MODULES = ['distributed_crawler.spiders']
NEWSPIDER_MODULE = 'distributed_crawler.spiders'
# 爬虫设置
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
ROBOTSTXT_OBEY = True
DOWNLOAD_DELAY = 1
CONCURRENT_REQUESTS_PER_DOMAIN = 4
# 启用或禁用Spider中间件
# SPIDER_MIDDLEWARES = {
# 'distributed_crawler.middlewares.DistributedCrawlerSpiderMiddleware': 543,
# }
# 启用或禁用下载器中间件
DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
'scrapy.downloadermiddlewares.retry.RetryMiddleware': 500,
}
# ScrapyRT设置
CRAWL_LINKS = False # 默认不自动爬取链接
"""
# 5. 创建客户端代码
import requests
import json
import threading
import time
import random
class ScrapyRTClient:
def __init__(self, scrapyrt_url="http://localhost:9080"):
"""
初始化ScrapyRT客户端
:param scrapyrt_url: ScrapyRT服务的URL
"""
self.scrapyrt_url = scrapyrt_url
def crawl_url(self, url, spider_name="website", crawl_links=False):
"""
爬取单个URL
:param url: 要爬取的URL
:param spider_name: 要使用的Spider名称
:param crawl_links: 是否爬取链接
:return: 爬取结果
"""
params = {
"spider_name": spider_name,
"url": url,
"crawl_links": crawl_links
}
try:
response = requests.get(f"{self.scrapyrt_url}/crawl.json", params=params, timeout=30)
if response.status_code == 200:
return response.json()
else:
print(f"爬取失败: {url} - 状态码: {response.status_code}")
return None
except Exception as e:
print(f"爬取出错: {url} - {e}")
return None
def batch_crawl(self, urls, spider_name="website", max_concurrency=5):
"""
批量爬取URL
:param urls: URL列表
:param spider_name: 要使用的Spider名称
:param max_concurrency: 最大并发请求数
:return: 爬取结果列表
"""
results = []
threads = []
results_lock = threading.Lock()
def worker(url):
result = self.crawl_url(url, spider_name)
if result:
with results_lock:
results.append(result)
# 创建线程
for url in urls:
thread = threading.Thread(target=worker, args=(url,))
threads.append(thread)
# 控制并发
active_threads = []
for thread in threads:
if len(active_threads) >= max_concurrency:
# 等待一个线程完成
old_thread = active_threads.pop(0)
old_thread.join()
# 启动新线程
thread.start()
active_threads.append(thread)
# 随机延迟,避免同时发起太多请求
time.sleep(random.uniform(0.1, 0.5))
# 等待所有线程完成
for thread in active_threads:
thread.join()
return results
# 使用示例 - 启动ScrapyRT服务
"""
要使用这个客户端,需要先启动ScrapyRT服务:
cd distributed_crawler
scrapyrt -p 9080
"""
# 客户端使用示例
if __name__ == "__main__":
client = ScrapyRTClient("http://localhost:9080")
# 爬取单个URL
result = client.crawl_url("https://www.python.org")
if result:
# 打印爬取结果
items = result.get('items', [])
if items:
item = items[0]
print(f"标题: {item.get('title')}")
print(f"描述: {item.get('description')}")
print(f"内容预览: {item.get('content', '')[:200]}...")
print(f"找到 {len(item.get('links', []))} 个链接")
# 批量爬取
urls = [
"https://www.python.org",
"https://docs.python.org",
"https://pypi.org"
]
print("\n批量爬取...")
batch_results = client.batch_crawl(urls, max_concurrency=2)
print(f"爬取了 {len(batch_results)} 个URL")
for i, result in enumerate(batch_results, 1):
items = result.get('items', [])
if items:
item = items[0]
print(f"\n[{i}] {item.get('title')} - {item.get('url')}")
12.4 监控与日志系统
建立完善的监控系统对于大型爬虫项目至关重要:
import logging
import time
import threading
import json
import os
import sqlite3
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from collections import defaultdict, deque
class CrawlerMonitor:
def __init__(self, db_file='crawler_monitor.db', log_file='crawler.log'):
"""
初始化爬虫监控系统
:param db_file: 数据库文件路径
:param log_file: 日志文件路径
"""
# 设置日志
self.logger = self._setup_logger(log_file)
# 初始化数据库
self.db_file = db_file
self._init_db()
# 监控数据
self.stats = {
'requests': 0,
'success': 0,
'errors': 0,
'bytes_downloaded': 0
}
# 实时监控数据(保留最近一小时的数据,10秒一个点)
self.time_series = {
'timestamps': deque(maxlen=360),
'requests': deque(maxlen=360),
'success': deque(maxlen=360),
'errors': deque(maxlen=360)
}
# 错误计数器
self.error_counts = defaultdict(int)
# 域名请求计数
self.domain_requests = defaultdict(int)
# 锁,用于线程安全
self.lock = threading.Lock()
# 启动监控线程
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.is_running = True
self.monitor_thread.start()
self.logger.info("爬虫监控系统已初始化")
def _setup_logger(self, log_file):
"""设置日志系统"""
logger = logging.getLogger('crawler_monitor')
logger.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def _init_db(self):
"""初始化数据库"""
try:
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
# 创建请求日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS request_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
method TEXT,
status_code INTEGER,
response_time REAL,
content_length INTEGER,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建错误日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS error_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
error_type TEXT,
error_message TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建统计数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
requests INTEGER,
success INTEGER,
errors INTEGER,
bytes_downloaded INTEGER,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
except Exception as e:
self.logger.error(f"初始化数据库出错: {e}")
def log_request(self, url, method="GET", status_code=None, response_time=None, content_length=0):
"""
记录请求
:param url: 请求的URL
:param method: 请求方法
:param status_code: 状态码
:param response_time: 响应时间(秒)
:param content_length: 内容长度(字节)
"""
with self.lock:
# 更新统计数据
self.stats['requests'] += 1
if status_code and 200 <= status_code < 400:
self.stats['success'] += 1
elif status_code:
self.stats['errors'] += 1
self.stats['bytes_downloaded'] += content_length
# 更新域名请求计数
from urllib.parse import urlparse
domain = urlparse(url).netloc
self.domain_requests[domain] += 1
# 记录到数据库
try:
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO request_log (url, method, status_code, response_time, content_length)
VALUES (?, ?, ?, ?, ?)
''', (url, method, status_code, response_time, content_length))
conn.commit()
conn.close()
except Exception as e:
self.logger.error(f"记录请求出错: {e}")
def log_error(self, url, error_type, error_message):
"""
记录错误
:param url: 请求的URL
:param error_type: 错误类型
:param error_message: 错误消息
"""
with self.lock:
# 更新错误计数器
self.error_counts[error_type] += 1
# 记录到数据库
try:
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO error_log (url, error_type, error_message)
VALUES (?, ?, ?)
''', (url, error_type, error_message))
conn.commit()
conn.close()
except Exception as e:
self.logger.error(f"记录错误出错: {e}")
def _monitor_loop(self):
"""监控线程主循环"""
last_save = time.time()
while self.is_running:
try:
# 每10秒记录一次时间序列数据
with self.lock:
current_time = time.time()
self.time_series['timestamps'].append(current_time)
self.time_series['requests'].append(self.stats['requests'])
self.time_series['success'].append(self.stats['success'])
self.time_series['errors'].append(self.stats['errors'])
# 每分钟保存一次统计数据到数据库
if time.time() - last_save >= 60:
with self.lock:
stats_copy = self.stats.copy()
try:
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO stats (requests, success, errors, bytes_downloaded)
VALUES (?, ?, ?, ?)
''', (stats_copy['requests'], stats_copy['success'],
stats_copy['errors'], stats_copy['bytes_downloaded']))
conn.commit()
conn.close()
last_save = time.time()
except Exception as e:
self.logger.error(f"保存统计数据出错: {e}")
time.sleep(10)
except Exception as e:
self.logger.error(f"监控线程出错: {e}")
time.sleep(10)
def get_stats(self):
"""获取当前统计数据"""
with self.lock:
return self.stats.copy()
def get_error_summary(self):
"""获取错误摘要"""
with self.lock:
return dict(self.error_counts)
def get_domain_stats(self, top_n=10):
"""
获取域名统计数据
:param top_n: 返回前N个请求最多的域名
:return: 域名统计字典
"""
with self.lock:
# 按请求数排序
sorted_domains = sorted(self.domain_requests.items(),
key=lambda x: x[1], reverse=True)
# 返回前N个
return dict(sorted_domains[:top_n])
def generate_report(self, output_dir='reports'):
"""
生成报告
:param output_dir: 输出目录
:return: 报告文件路径
"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 当前时间
now = datetime.now()
report_date = now.strftime('%Y-%m-%d_%H-%M-%S')
# 准备报告数据
stats = self.get_stats()
error_summary = self.get_error_summary()
domain_stats = self.get_domain_stats()
# 从数据库获取近24小时的统计数据
hourly_stats = self._get_hourly_stats()
# 生成文本报告
report_file = os.path.join(output_dir, f'crawler_report_{report_date}.txt')
with open(report_file, 'w', encoding='utf-8') as f:
f.write(f"爬虫监控报告 - {now.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("="*50 + "\n\n")
f.write("总体统计\n")
f.write("-"*50 + "\n")
f.write(f"总请求数: {stats['requests']}\n")
f.write(f"成功请求: {stats['success']}\n")
f.write(f"失败请求: {stats['errors']}\n")
success_rate = (stats['success'] / stats['requests'] * 100) if stats['requests'] > 0 else 0
f.write(f"成功率: {success_rate:.2f}%\n")
f.write(f"下载数据: {stats['bytes_downloaded'] / 1024 / 1024:.2f} MB\n\n")
f.write("错误统计\n")
f.write("-"*50 + "\n")
for error_type, count in error_summary.items():
f.write(f"{error_type}: {count}\n")
f.write("\n")
f.write("域名统计 (Top 10)\n")
f.write("-"*50 + "\n")
for domain, count in domain_stats.items():
f.write(f"{domain}: {count}\n")
f.write("\n")
f.write("最近24小时统计\n")
f.write("-"*50 + "\n")
for hour, hour_stats in hourly_stats.items():
f.write(f"{hour}: 请求={hour_stats['requests']}, 成功={hour_stats['success']}, "
f"错误={hour_stats['errors']}\n")
# 生成图表
self._generate_charts(output_dir, report_date, hourly_stats)
return report_file
def _get_hourly_stats(self):
"""从数据库获取近24小时的统计数据"""
hourly_stats = {}
try:
conn = sqlite3.connect(self.db_file)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 24小时前的时间戳
twenty_four_hours_ago = (datetime.now() - timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S')
# 查询每小时的统计数据
cursor.execute('''
SELECT
strftime('%Y-%m-%d %H:00:00', timestamp) as hour,
SUM(requests) as requests,
SUM(success) as success,
SUM(errors) as errors,
SUM(bytes_downloaded) as bytes_downloaded
FROM stats
WHERE timestamp >= ?
GROUP BY hour
ORDER BY hour
''', (twenty_four_hours_ago,))
for row in cursor.fetchall():
hourly_stats[row['hour']] = {
'requests': row['requests'],
'success': row['success'],
'errors': row['errors'],
'bytes_downloaded': row['bytes_downloaded']
}
conn.close()
except Exception as e:
self.logger.error(f"获取小时统计数据出错: {e}")
return hourly_stats
def _generate_charts(self, output_dir, report_date, hourly_stats):
"""生成报表图表"""
try:
# 设置图表样式
plt.style.use('ggplot')
# 图表1: 请求统计
plt.figure(figsize=(10, 6))
hours = list(hourly_stats.keys())
requests = [hourly_stats[h]['requests'] for h in hours]
success = [hourly_stats[h]['success'] for h in hours]
errors = [hourly_stats[h]['errors'] for h in hours]
plt.plot(hours, requests, 'b-', label='总请求')
plt.plot(hours, success, 'g-', label='成功')
plt.plot(hours, errors, 'r-', label='错误')
plt.title('24小时请求统计')
plt.xlabel('时间')
plt.ylabel('请求数')
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(output_dir, f'requests_{report_date}.png'))
plt.close()
# 图表2: 域名请求分布
plt.figure(figsize=(10, 6))
domain_stats = self.get_domain_stats()
domains = list(domain_stats.keys())
counts = list(domain_stats.values())
plt.bar(domains, counts)
plt.title('域名请求分布')
plt.xlabel('域名')
plt.ylabel('请求数')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(output_dir, f'domains_{report_date}.png'))
plt.close()
# 图表3: 错误类型分布
error_summary = self.get_error_summary()
if error_summary:
plt.figure(figsize=(10, 6))
error_types = list(error_summary.keys())
error_counts = list(error_summary.values())
plt.bar(error_types, error_counts)
plt.title('错误类型分布')
plt.xlabel('错误类型')
plt.ylabel('数量')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(output_dir, f'errors_{report_date}.png'))
plt.close()
except Exception as e:
self.logger.error(f"生成图表出错: {e}")
def stop(self):
"""停止监控系统"""
self.is_running = False
if self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=2)
self.logger.info("爬虫监控系统已停止")
# 使用示例
if __name__ == "__main__":
# 初始化监控系统
monitor = CrawlerMonitor()
# 模拟爬虫请求
for i in range(100):
# 模拟成功请求
if i % 10 != 0:
monitor.log_request(
url=f"https://example.com/page{i}",
status_code=200,
response_time=0.3 + (i % 5) * 0.1,
content_length=5000 + (i % 10) * 500
)
else:
# 模拟错误请求
error_types = ["ConnectionError", "TimeoutError", "HTTPError"]
monitor.log_request(
url=f"https://example.com/error{i}",
status_code=404 if i % 20 == 0 else 500,
response_time=1.5 + (i % 3) * 0.5,
content_length=0
)
monitor.log_error(
url=f"https://example.com/error{i}",
error_type=error_types[i % len(error_types)],
error_message=f"Simulated error {i}"
)
# 模拟不同域名的请求
domains = ["example.com", "test.com", "sample.org", "demo.net"]
monitor.log_request(
url=f"https://{domains[i % len(domains)]}/page{i}",
status_code=200,
response_time=0.2,
content_length=2000
)
time.sleep(0.05) # 小延迟使模拟更真实
# 生成报告
report_file = monitor.generate_report()
print(f"报告已生成: {report_file}")
# 打印统计数据
stats = monitor.get_stats()
print("\n当前统计:")
for key, value in stats.items():
print(f"{key}: {value}")
# 获取域名统计
domain_stats = monitor.get_domain_stats()
print("\n域名统计:")
for domain, count in domain_stats.items():
print(f"{domain}: {count}")
# 停止监控系统
monitor.stop()
总结与最佳实践
爬虫效率最佳实践
-
优化请求策略
- 使用连接池和会话重用
- 实现智能延迟和退避机制
- 使用异步请求提高吞吐量
-
数据处理优化
- 使用生产者-消费者模式分离请求和处理
- 实现增量爬取,避免重复爬取
- 使用流式处理大量数据
-
资源管理
- 控制内存使用,特别是处理大量数据时
- 实现请求节流和优先级队列
- 使用缓存减少重复请求
爬虫稳定性最佳实践
-
错误处理
- 实现完善的异常处理机制
- 日志记录和监控错误模式
- 添加自动重试机制
-
健壮性设计
- 支持断点续爬
- 实现分布式架构提高可靠性
- 优雅处理网站结构变化
-
长期运行
- 实现健康检查和自动恢复
- 设置资源限制防止过度消耗
- 定期备份数据和状态
爬虫道德与合规最佳实践
-
遵守规则
- 尊重robots.txt
- 实现合理的请求频率控制
- 添加明确的身份标识
-
减少影响
- 使用条件请求和缓存减少带宽消耗
- 避免在高峰期大量爬取
- 实现自适应爬取速率
-
数据处理
- 遵守隐私和数据保护法规
- 仅收集必要的数据
- 实现数据老化和删除政策
通过本教程,你已经学习了从基础到高级的Python网络爬虫技术,掌握了多种工具和库的使用,以及如何应对各种爬虫挑战。从简单的静态网页爬取到复杂的动态页面处理,从单线程爬虫到分布式系统,你现在拥有了构建高效、稳定和道德的网络爬虫所需的知识和技能。
随着经验的积累,你可以进一步探索更高级的主题,如自然语言处理、机器学习集成、爬虫自动化和防检测技术。记住,强大的能力伴随着责任,始终以尊重和合理的方式使用你的爬虫技术。
更多推荐



所有评论(0)