Python 3.x 网络爬虫从零基础到项目实战

网络爬虫是一种强大的技术,可以自动化收集和分析互联网上的数据。本指南将从零基础开始,带领您逐步掌握 Python 爬虫技术,直至能够独立完成实战项目。

目录

  1. Python 爬虫基础
  2. 爬虫工具与库
  3. HTTP 基础与数据获取
  4. 解析网页数据
  5. 反爬虫机制与应对策略
  6. Selenium 与动态网页抓取
  7. Scrapy 框架入门
  8. 数据存储
  9. 实战项目:电商数据爬虫
  10. 实战项目:新闻聚合爬虫
  11. 爬虫的法律与道德
  12. 进阶技巧与优化

1. Python 爬虫基础

1.1 Python 环境搭建

首先确保你已安装 Python 3.x 版本(建议 3.8+)。

安装 Python:

  • Windows: 从 Python 官网 下载并安装
  • macOS: 使用 Homebrew brew install python
  • Linux: 大多数发行版已预装,或使用包管理器安装

检查安装:

python --version
# 或
python3 --version

创建虚拟环境:

python -m venv crawler_env

激活虚拟环境:

  • Windows: crawler_env\Scripts\activate
  • macOS/Linux: source crawler_env/bin/activate

1.2 网络爬虫基本概念

什么是网络爬虫:
网络爬虫(Web Crawler)是一种按照一定规则自动浏览并提取互联网信息的程序。

爬虫工作流程:

  1. 发送网络请求,获取网页内容
  2. 解析网页,提取有用信息
  3. 存储数据
  4. 根据需求进一步爬取相关页面

爬虫的应用场景:

  • 数据分析和挖掘
  • 搜索引擎索引
  • 价格监控
  • 内容聚合
  • 科学研究

1.3 爬虫的合法性与注意事项

  • 遵守网站的 robots.txt 协议
  • 控制爬取速度,不要给服务器造成压力
  • 不要爬取敏感或私人数据
  • 数据仅用于个人学习和研究
  • 遵守当地法律法规

2. 爬虫工具与库

2.1 安装基本库

pip install requests beautifulsoup4 lxml pandas

2.2 常用爬虫库介绍

requests: 简单易用的 HTTP 库

import requests

response = requests.get('https://www.example.com')
print(response.status_code)  # 200 表示成功
print(response.text)  # 输出网页内容

urllib: Python 标准库中的 URL 处理模块

import urllib.request

with urllib.request.urlopen('https://www.example.com') as response:
    html = response.read()
    print(html)

BeautifulSoup: 网页解析库,用于从 HTML 或 XML 文件中提取数据

from bs4 import BeautifulSoup

soup = BeautifulSoup('<html><body><p>Hello World</p></body></html>', 'html.parser')
print(soup.p.text)  # 输出: Hello World

lxml: 高效的 XML 和 HTML 解析库

from lxml import etree

html = etree.HTML('<html><body><p>Hello World</p></body></html>')
result = html.xpath('//p/text()')
print(result)  # 输出: ['Hello World']

2.3 开发工具与浏览器插件

开发工具:

  • PyCharm: 功能强大的 Python IDE
  • VS Code: 轻量级编辑器,配合 Python 插件使用
  • Jupyter Notebook: 交互式开发和测试

浏览器插件:

  • Chrome DevTools: 分析网页结构、网络请求
  • XPath Helper: 帮助构建和测试 XPath 表达式
  • JSON Formatter: 格式化 JSON 数据
  • User-Agent Switcher: 模拟不同设备访问网站

3. HTTP 基础与数据获取

3.1 HTTP 协议基础

HTTP 请求方法:

  • GET: 请求指定资源,参数包含在 URL 中
  • POST: 向服务器提交数据,数据包含在请求体中
  • PUT: 上传资源
  • DELETE: 删除资源
  • HEAD: 类似 GET 但只获取头部信息
  • OPTIONS: 获取服务器支持的方法

HTTP 状态码:

  • 1xx: 信息性状态码
  • 2xx: 成功状态码(如 200 OK)
  • 3xx: 重定向状态码(如 301 永久重定向)
  • 4xx: 客户端错误(如 404 Not Found)
  • 5xx: 服务器错误(如 500 Internal Server Error)

HTTP 头部:

  • User-Agent: 客户端信息
  • Content-Type: 内容类型
  • Cookie: 存储用户信息
  • Referer: 请求来源页面

3.2 使用 Requests 库获取数据

基本 GET 请求:

import requests

response = requests.get('https://api.github.com/events')
print(response.status_code)
print(response.headers['content-type'])
print(response.encoding)
print(response.json())  # 自动解析 JSON 响应

带参数的 GET 请求:

payload = {'key1': 'value1', 'key2': 'value2'}
response = requests.get('https://httpbin.org/get', params=payload)
print(response.url)  # 将显示 https://httpbin.org/get?key1=value1&key2=value2

自定义请求头:

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get('https://api.github.com', headers=headers)

POST 请求:

payload = {'username': 'test', 'password': 'testing'}
response = requests.post('https://httpbin.org/post', data=payload)
print(response.json())

发送 JSON 数据:

import json

url = 'https://httpbin.org/post'
payload = {'name': 'John', 'age': 30}
response = requests.post(url, json=payload)
print(response.json())

处理 Cookie:

# 获取 Cookie
response = requests.get('https://httpbin.org/cookies/set/sessioncookie/123456789')
print(response.cookies['sessioncookie'])

# 发送 Cookie
cookies = {'sessioncookie': '123456789'}
response = requests.get('https://httpbin.org/cookies', cookies=cookies)
print(response.json())

会话对象维持状态:

session = requests.Session()
session.get('https://httpbin.org/cookies/set/sessioncookie/123456789')
response = session.get('https://httpbin.org/cookies')
print(response.json())

超时设置:

try:
    response = requests.get('https://httpbin.org/delay/10', timeout=5)
except requests.exceptions.Timeout:
    print("请求超时")

错误处理:

try:
    response = requests.get('https://nonexistent-domain.com')
    response.raise_for_status()  # 如果状态码不是 200,将引发异常
except requests.exceptions.HTTPError as err:
    print(f"HTTP 错误: {err}")
except requests.exceptions.ConnectionError:
    print("连接错误")
except requests.exceptions.Timeout:
    print("请求超时")
except requests.exceptions.RequestException as err:
    print(f"发生错误: {err}")

3.3 实战:创建一个简单的网页爬虫

编写一个爬取网站标题和简介的小程序:

import requests
from bs4 import BeautifulSoup
import time

def simple_crawler(url):
    # 设置头部信息模拟浏览器
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    
    try:
        # 发送请求
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()  # 检查请求是否成功
        
        # 解析网页内容
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 提取标题
        title = soup.title.string if soup.title else "No title found"
        
        # 提取描述(如果有meta description标签)
        description = "No description found"
        description_tag = soup.find('meta', attrs={'name': 'description'})
        if description_tag and 'content' in description_tag.attrs:
            description = description_tag['content']
        
        # 返回结果
        return {
            'url': url,
            'title': title,
            'description': description,
            'status': 'success'
        }
        
    except requests.exceptions.HTTPError as err:
        return {'url': url, 'status': f"HTTP Error: {err}"}
    except requests.exceptions.ConnectionError:
        return {'url': url, 'status': "Connection Error"}
    except requests.exceptions.Timeout:
        return {'url': url, 'status': "Timeout Error"}
    except requests.exceptions.RequestException as err:
        return {'url': url, 'status': f"Error: {err}"}

# 测试函数
urls_to_crawl = [
    'https://www.python.org',
    'https://www.github.com',
    'https://www.wikipedia.org'
]

for url in urls_to_crawl:
    result = simple_crawler(url)
    print(f"\nCrawling: {result['url']}")
    
    if result['status'] == 'success':
        print(f"Title: {result['title']}")
        print(f"Description: {result['description']}")
    else:
        print(f"Failed: {result['status']}")
    
    # 休息1秒,避免请求过快
    time.sleep(1)

4. 解析网页数据

4.1 BeautifulSoup 详解

BeautifulSoup 是一个用于从 HTML 和 XML 文件中提取数据的 Python 库。

安装与基本使用:

from bs4 import BeautifulSoup

# 从字符串创建 BeautifulSoup 对象
html_doc = """
<html>
<head><title>网页标题</title></head>
<body>
<p class="story">从前有三个人:
<a href="http://example.com/elsie" class="sister" id="link1">Elsie</a>,
<a href="http://example.com/lacie" class="sister" id="link2">Lacie</a> 和
<a href="http://example.com/tillie" class="sister" id="link3">Tillie</a>;
他们住在井底。</p>
</body>
</html>
"""

soup = BeautifulSoup(html_doc, 'html.parser')  # 或使用 'lxml' 解析器

导航文档树:

# 获取标题
print(soup.title)  # <title>网页标题</title>
print(soup.title.string)  # 网页标题

# 获取段落
print(soup.p)

# 获取链接
print(soup.a)  # 只获取第一个 <a> 标签

# 获取所有链接
print(soup.find_all('a'))

# 获取特定 ID 的元素
print(soup.find(id="link3"))

搜索文档:

# 使用 find_all() 查找所有 <a> 标签
links = soup.find_all('a')
for link in links:
    print(link.get('href'))

# 使用 CSS 选择器
links = soup.select('p a.sister')  # 查找p标签下的class为sister的所有a标签
for link in links:
    print(link['href'])

# 通过属性值查找
print(soup.find_all('a', class_='sister'))  # 注意使用 class_

# 查找特定文本内容
import re
print(soup.find_all(text=re.compile("Tillie")))

修改文档:

# 修改标签内容
soup.title.string = "新标题"

# 添加新标签
soup.body.append(soup.new_tag('div', id='new_div'))
new_div = soup.find(id='new_div')
new_div.string = "这是新添加的div"

# 输出修改后的HTML
print(soup.prettify())

4.2 XPath 与 lxml 库

XPath 是一种在 XML 文档中查找信息的语言,也适用于 HTML 文档。

安装与基本使用:

from lxml import etree

html = """
<html>
<head><title>网页标题</title></head>
<body>
<div class="content">
    <ul>
        <li class="item-0"><a href="link1.html">第一个</a></li>
        <li class="item-1"><a href="link2.html">第二个</a></li>
        <li class="item-2"><a href="link3.html">第三个</a></li>
        <li class="item-3"><a href="link4.html">第四个</a></li>
        <li class="item-4"><a href="link5.html">第五个</a></li>
    </ul>
</div>
</body>
</html>
"""

# 解析HTML
tree = etree.HTML(html)

XPath 选择器:

# 选择所有 <li> 元素
li_elements = tree.xpath('//li')
print(f"共找到 {len(li_elements)} 个li元素")

# 选择所有 <a> 标签的 href 属性
links = tree.xpath('//a/@href')
print(links)  # ['link1.html', 'link2.html', 'link3.html', 'link4.html', 'link5.html']

# 选择所有 <a> 标签的文本
text = tree.xpath('//a/text()')
print(text)  # ['第一个', '第二个', '第三个', '第四个', '第五个']

# 选择特定 class 的元素
items = tree.xpath('//li[@class="item-0"]')
for item in items:
    link = item.xpath('./a/text()')[0]
    href = item.xpath('./a/@href')[0]
    print(f"{link} -> {href}")

# 按顺序选择元素
third_item = tree.xpath('//li[3]')  # 选择第三个 li 元素
print(etree.tostring(third_item[0], encoding='utf-8').decode('utf-8'))

组合 XPath 表达式:

# 选择 class 包含 'item-' 的 li 元素
items = tree.xpath('//li[contains(@class, "item-")]')
print(f"匹配的元素数量: {len(items)}")

# 使用 OR 组合多个条件
items = tree.xpath('//li[@class="item-0"] | //li[@class="item-1"]')
print(f"匹配的元素数量: {len(items)}")

# 选择父节点
ul = tree.xpath('//li[@class="item-0"]/..')
print(etree.tostring(ul[0], encoding='utf-8').decode('utf-8'))

# 选择祖先节点
ancestors = tree.xpath('//li[@class="item-0"]/ancestor::*')
for ancestor in ancestors:
    print(ancestor.tag)

4.3 正则表达式提取数据

正则表达式是一种强大的文本匹配和处理工具。

基本使用:

import re

text = "我的电话号码是 123-456-7890,另一个电话是 (123) 456-7891"

# 查找所有电话号码
phone_pattern = r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}'
phones = re.findall(phone_pattern, text)
print(phones)  # ['123-456-7890', '(123) 456-7891']

# 使用正则表达式从HTML中提取信息
html = '<div class="price">¥299.00</div><div class="title">iPhone 手机壳</div>'

# 提取价格
price = re.search(r'<div class="price">¥([\d.]+)</div>', html)
if price:
    print(f"价格: ¥{price.group(1)}")  # 价格: ¥299.00

# 提取标题
title = re.search(r'<div class="title">(.*?)</div>', html)
if title:
    print(f"商品: {title.group(1)}")  # 商品: iPhone 手机壳

结合 requests 使用正则表达式:

import requests
import re

url = 'https://www.python.org/'
response = requests.get(url)

# 提取所有链接
links = re.findall(r'href="(https?://.*?)"', response.text)
for link in links[:5]:  # 只显示前5个
    print(link)

# 提取所有图片
images = re.findall(r'src="(.*?\.(?:png|jpg|jpeg|gif))"', response.text)
for image in images[:5]:  # 只显示前5个
    print(image)

4.4 实战:京东商品数据提取

以下是一个爬取京东商品信息的示例:

import requests
from bs4 import BeautifulSoup
import re
import time
import json
import random

def get_jd_product_info(keyword, num_pages=1):
    """
    爬取京东商品信息
    :param keyword: 搜索关键词
    :param num_pages: 爬取页数
    :return: 商品信息列表
    """
    all_products = []
    
    for page in range(1, num_pages + 1):
        # 构建请求URL
        url = f'https://search.jd.com/Search?keyword={keyword}&page={page}'
        
        # 设置请求头
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Referer': 'https://www.jd.com/'
        }
        
        try:
            # 发送请求
            response = requests.get(url, headers=headers)
            response.raise_for_status()
            
            # 解析HTML
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 获取商品列表
            products = soup.select('li.gl-item')
            
            print(f"第 {page} 页,找到 {len(products)} 个商品")
            
            # 遍历商品列表
            for product in products:
                # 提取商品信息
                try:
                    # 提取商品ID
                    product_id = product.get('data-sku')
                    
                    # 提取商品价格
                    price_element = product.select_one('div.p-price i')
                    price = price_element.text if price_element else 'N/A'
                    
                    # 提取商品标题
                    title_element = product.select_one('div.p-name em')
                    title = title_element.text.strip() if title_element else 'N/A'
                    
                    # 提取店铺名称
                    shop_element = product.select_one('div.p-shop span a')
                    shop = shop_element.text.strip() if shop_element else 'N/A'
                    
                    # 提取评论数
                    commit_element = product.select_one('div.p-commit strong a')
                    commits = commit_element.text if commit_element else '0'
                    
                    # 提取商品详情页链接
                    link = f"https://item.jd.com/{product_id}.html" if product_id else 'N/A'
                    
                    # 构建商品信息字典
                    product_info = {
                        'id': product_id,
                        'title': title,
                        'price': price,
                        'shop': shop,
                        'commits': commits,
                        'link': link
                    }
                    
                    all_products.append(product_info)
                    
                except Exception as e:
                    print(f"提取商品信息时出错: {e}")
                    continue
            
            # 添加随机延迟,避免请求过快
            time.sleep(random.uniform(1, 3))
            
        except requests.exceptions.RequestException as e:
            print(f"请求错误: {e}")
            break
    
    return all_products

# 使用示例
if __name__ == "__main__":
    keyword = "笔记本电脑"
    num_pages = 2
    
    print(f"开始爬取京东商品: {keyword}")
    products = get_jd_product_info(keyword, num_pages)
    
    # 输出结果
    print(f"\n共爬取 {len(products)} 个商品:")
    for product in products[:5]:  # 只显示前5个
        print(f"\nID: {product['id']}")
        print(f"标题: {product['title']}")
        print(f"价格: {product['price']}")
        print(f"店铺: {product['shop']}")
        print(f"评论数: {product['commits']}")
        print(f"链接: {product['link']}")
    
    # 保存为JSON文件
    with open(f'jd_{keyword}.json', 'w', encoding='utf-8') as f:
        json.dump(products, f, ensure_ascii=False, indent=4)
    
    print(f"\n数据已保存到 jd_{keyword}.json")

注意:此示例仅供学习参考,实际使用时可能需要调整 XPath 表达式,因为网站结构可能会变化。

5. 反爬虫机制与应对策略

5.1 常见的反爬虫机制

1. IP 限制:

  • 短时间内大量请求会被网站封禁 IP
  • 可能会返回 403 Forbidden 状态码

2. User-Agent 检测:

  • 检查请求头信息,识别爬虫
  • 拒绝非常见浏览器的请求

3. Cookie 和 Session 验证:

  • 需要正确的 Cookie 才能访问
  • 会话跟踪验证用户是否为真实浏览器

4. 验证码:

  • 图片验证码、滑动验证、点击验证等
  • 阻止自动化访问

5. 动态生成内容:

  • 使用 JavaScript 渲染页面内容
  • 需要浏览器环境才能获取完整信息

6. 蜜罐陷阱:

  • 设置隐藏链接,正常用户不会点击
  • 爬虫如果访问这些链接会被标记

7. 频率限制:

  • 限制单位时间内的请求次数
  • 超过限制可能需要等待或被封禁

5.2 应对策略

5.2.1 修改请求头

import requests
import random

# 随机 User-Agent 列表
user_agents = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
    'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
]

def get_random_ua():
    """返回随机User-Agent"""
    return random.choice(user_agents)

# 请求示例
url = 'https://www.example.com'
headers = {
    'User-Agent': get_random_ua(),
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
    'Referer': 'https://www.google.com/',
    'Upgrade-Insecure-Requests': '1',
    'DNT': '1',  # Do Not Track
}

try:
    response = requests.get(url, headers=headers)
    print(f"状态码: {response.status_code}")
except Exception as e:
    print(f"请求失败: {e}")
5.2.2 添加请求延迟

import requests
import time
import random

urls = [
    'https://www.example.com/page1',
    'https://www.example.com/page2',
    'https://www.example.com/page3'
]

for url in urls:
    try:
        # 随机延迟1-5秒
        delay = random.uniform(1, 5)
        print(f"等待 {delay:.2f} 秒...")
        time.sleep(delay)
        
        response = requests.get(url)
        print(f"URL: {url}, 状态码: {response.status_code}")
        
    except Exception as e:
        print(f"错误: {e}")
5.2.3 代理 IP 轮换

import requests
from itertools import cycle

# 代理IP列表(这只是示例,实际使用时需要有效的代理)
proxies = [
    "http://proxy1.example.com:8080",
    "http://proxy2.example.com:8080",
    "http://proxy3.example.com:8080"
]

# 创建代理循环迭代器
proxy_cycle = cycle(proxies)

url = 'https://httpbin.org/ip'

for _ in range(3):
    # 获取下一个代理
    proxy = next(proxy_cycle)
    print(f"使用代理: {proxy}")
    
    try:
        response = requests.get(
            url, 
            proxies={"http": proxy, "https": proxy},
            timeout=10
        )
        print(f"状态码: {response.status_code}")
        print(f"IP地址: {response.json()['origin']}")
    except Exception as e:
        print(f"请求失败: {e}")
    
    time.sleep(1)
5.2.4 使用 Session 维持 Cookie

import requests

# 创建 Session 对象
session = requests.Session()

# 自定义请求头
session.headers.update({
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})

# 登录网站
login_url = 'https://example.com/login'
login_data = {
    'username': 'your_username',
    'password': 'your_password'
}

response = session.post(login_url, data=login_data)
print(f"登录状态: {response.status_code}")

# 访问需要登录的页面,会自动使用之前的Cookie
protected_url = 'https://example.com/protected-page'
response = session.get(protected_url)
print(f"访问状态: {response.status_code}")
print(response.text[:100])  # 显示前100个字符
5.2.5 请求重试

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def get_with_retry(url, retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504)):
    """带重试机制的GET请求"""
    session = requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    
    return session.get(url)

try:
    response = get_with_retry('https://httpbin.org/status/500')
    print(f"状态码: {response.status_code}")
except Exception as e:
    print(f"请求失败: {e}")

5.3 爬虫检测与绕过

5.3.1 模拟浏览器指纹

import requests

url = "https://www.example.com"

# 构建完整的浏览器指纹
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
    "Accept-Encoding": "gzip, deflate, br",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1",
    "Cache-Control": "max-age=0",
    "Sec-Ch-Ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"91\", \"Google Chrome\";v=\"91\"",
    "Sec-Ch-Ua-Mobile": "?0",
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "none",
    "Sec-Fetch-User": "?1"
}

try:
    response = requests.get(url, headers=headers)
    print(f"状态码: {response.status_code}")
except Exception as e:
    print(f"请求失败: {e}")
5.3.2 绕过基本认证

import requests
from requests.auth import HTTPBasicAuth

url = 'https://httpbin.org/basic-auth/user/passwd'

# 使用基本认证
response = requests.get(url, auth=HTTPBasicAuth('user', 'passwd'))
print(f"状态码: {response.status_code}")
print(response.json())

# 更简洁的写法
response = requests.get(url, auth=('user', 'passwd'))
print(f"状态码: {response.status_code}")
print(response.json())
5.3.3 处理简单的JS重定向

import requests
import re

def follow_js_redirect(url):
    response = requests.get(url)
    
    # 查找JavaScript重定向
    redirect_pattern = r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]'
    match = re.search(redirect_pattern, response.text)
    
    if match:
        redirect_url = match.group(1)
        print(f"找到JS重定向: {redirect_url}")
        
        # 跟随重定向
        return requests.get(redirect_url)
    
    return response

# 使用示例
url = 'https://example.com/page-with-js-redirect'
response = follow_js_redirect(url)
print(f"最终状态码: {response.status_code}")

5.4 高级反爬虫与对应策略

5.4.1 指纹识别与Web驱动检测

现代网站使用各种技术来识别爬虫:

  1. Canvas 指纹:通过浏览器渲染图形的微小差异识别
  2. WebRTC 泄露:获取真实 IP 而非代理 IP
  3. 浏览器功能检测:检查浏览器 API 和功能是否完整
  4. WebDriver 检测:直接检测 Selenium 等自动化工具

对应策略:

  • 使用专门的反侦测工具,如 undetected-chromedriver
  • 使用 Playwright 等更新的自动化工具,检测难度更大
  • 使用浏览器扩展隐藏自动化特征(但部分网站会检测扩展)

# 使用 undetected_chromedriver 绕过检测
import undetected_chromedriver as uc
import time

# 创建一个未被检测的Chrome实例
driver = uc.Chrome()

# 访问测试网站
driver.get('https://bot.sannysoft.com/')  # 一个测试浏览器指纹的网站

# 等待页面加载
time.sleep(5)

# 截图保存结果
driver.save_screenshot('bot_test_result.png')

# 关闭浏览器
driver.quit()
5.4.2 验证码处理

对于图片验证码,可以使用 OCR 技术识别,或使用专门的验证码识别服务。

# 使用 pytesseract 识别简单验证码
import requests
from PIL import Image
import pytesseract
import io

# 获取验证码图片
response = requests.get('https://example.com/captcha.php')
captcha_image = Image.open(io.BytesIO(response.content))

# 预处理图片
captcha_image = captcha_image.convert('L')  # 转灰度图
threshold = 127
captcha_image = captcha_image.point(lambda p: p > threshold and 255)

# OCR识别
captcha_text = pytesseract.image_to_string(captcha_image, config='--psm 8')
captcha_text = ''.join(filter(str.isalnum, captcha_text))  # 只保留字母和数字

print(f"识别的验证码: {captcha_text}")

# 使用识别的验证码提交表单
data = {
    'username': 'your_username',
    'password': 'your_password',
    'captcha': captcha_text
}

response = requests.post('https://example.com/login', data=data)
print(f"提交状态: {response.status_code}")

对于复杂的验证码(如滑动验证、点选验证),通常需要使用 Selenium 配合特定的识别和模拟技术。

6. Selenium 与动态网页抓取

6.1 Selenium 基础

Selenium 是一个自动化浏览器工具,可以模拟真实用户操作网页。

安装依赖:

pip install selenium webdriver-manager

基本使用:

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
import time

# 自动下载和配置ChromeDriver
service = Service(ChromeDriverManager().install())

# 创建Chrome浏览器实例
driver = webdriver.Chrome(service=service)

# 访问网站
driver.get('https://www.python.org')

# 获取标题
print(f"网站标题: {driver.title}")

# 查找元素
search_input = driver.find_element(By.ID, 'id-search-field')
search_input.send_keys('pycon')

submit_button = driver.find_element(By.ID, 'submit')
submit_button.click()

# 等待页面加载
time.sleep(2)

# 获取搜索结果
search_results = driver.find_elements(By.CSS_SELECTOR, '.list-recent-events li')
print(f"找到 {len(search_results)} 个搜索结果:")

for result in search_results[:3]:  # 只显示前3个
    print(result.text)

# 截图
driver.save_screenshot('python_search.png')

# 关闭浏览器
driver.quit()

6.2 元素定位与操作

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time

# 设置Chrome
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)

# 打开目标网站
driver.get('https://www.google.com')

# 1. 按ID查找元素
search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys('Selenium WebDriver')
search_box.send_keys(Keys.RETURN)  # 按回车键

time.sleep(2)

# 2. 按类名查找元素
results = driver.find_elements(By.CSS_SELECTOR, '.g')
print(f"找到 {len(results)} 个搜索结果")

# 3. 按XPath查找元素
first_result = driver.find_element(By.XPATH, '//div[@class="g"]//h3')
print(f"第一个结果标题: {first_result.text}")

# 4. 按链接文本查找
link = driver.find_element(By.PARTIAL_LINK_TEXT, 'Selenium')
print(f"找到链接: {link.text}")

# 5. 元素交互
# 5.1 点击
link.click()
time.sleep(2)

# 5.2 返回上一页
driver.back()
time.sleep(1)

# 5.3 填写表单
search_box = driver.find_element(By.NAME, 'q')
search_box.clear()  # 清除输入框
search_box.send_keys('Python programming')
search_box.submit()  # 提交表单

time.sleep(2)

# 5.4 获取属性
search_box = driver.find_element(By.NAME, 'q')
print(f"输入框值: {search_box.get_attribute('value')}")
print(f"输入框名称: {search_box.get_attribute('name')}")

# 5.5 检查元素状态
print(f"输入框是否显示: {search_box.is_displayed()}")
print(f"输入框是否启用: {search_box.is_enabled()}")

# 关闭浏览器
driver.quit()

6.3 等待策略

Selenium 提供了两种等待策略:显式等待和隐式等待。

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

# 设置Chrome
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)

# 隐式等待:为所有元素设置最大等待时间
driver.implicitly_wait(10)  # 单位为秒

# 打开网站
driver.get('https://www.python.org')

# 使用隐式等待查找元素
search_box = driver.find_element(By.ID, 'id-search-field')
search_box.send_keys('pycon')
search_box.submit()

# 显式等待:等待特定条件满足
try:
    # 等待搜索结果加载完成
    element = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.CSS_SELECTOR, '.list-recent-events'))
    )
    print("搜索结果已加载")
    
    # 现在可以安全地处理搜索结果
    results = driver.find_elements(By.CSS_SELECTOR, '.list-recent-events li')
    print(f"找到 {len(results)} 个结果")
    
except Exception as e:
    print(f"等待超时: {e}")

# 自定义等待条件
def text_contains(locator, text):
    def check(driver):
        try:
            element_text = driver.find_element(*locator).text
            return text in element_text
        except:
            return False
    return check

try:
    # 等待页面标题包含特定文本
    WebDriverWait(driver, 10).until(
        text_contains((By.CSS_SELECTOR, '.list-recent-events h3'), 'PyCon')
    )
    print("找到PyCon相关结果")
except Exception as e:
    print(f"未找到PyCon相关结果: {e}")

# 关闭浏览器
driver.quit()

6.4 处理高级页面操作

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import Select
import time

# 设置Chrome
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)

try:
    # 1. 下拉菜单
    driver.get('https://www.w3schools.com/tags/tryit.asp?filename=tryhtml_select')
    
    # 切换到包含下拉菜单的iframe
    driver.switch_to.frame('iframeResult')
    
    # 查找下拉菜单
    dropdown = Select(driver.find_element(By.ID, 'cars'))
    
    # 选择选项
    dropdown.select_by_visible_text('Audi')
    time.sleep(1)
    
    dropdown.select_by_value('volvo')
    time.sleep(1)
    
    dropdown.select_by_index(2)  # 选择第三个选项 (索引从0开始)
    time.sleep(1)
    
    # 获取所有选项
    options = dropdown.options
    print("下拉菜单选项:")
    for option in options:
        print(f" - {option.text}")
    
    # 切回主框架
    driver.switch_to.default_content()
    
    # 2. 鼠标悬停
    driver.get('https://www.w3schools.com')
    
    # 找到需要悬停的元素
    tutorials_button = driver.find_element(By.ID, 'navbtn_tutorials')
    
    # 创建ActionChains对象
    actions = ActionChains(driver)
    
    # 执行鼠标悬停
    actions.move_to_element(tutorials_button).perform()
    
    time.sleep(2)
    
    # 3. 拖放操作
    driver.get('https://www.w3schools.com/html/html5_draganddrop.asp')
    
    # 等待元素加载
    time.sleep(2)
    
    # 找到要拖放的元素
    source = driver.find_element(By.ID, 'drag1')
    target = driver.find_element(By.ID, 'div2')
    
    # 执行拖放
    actions = ActionChains(driver)
    actions.drag_and_drop(source, target).perform()
    
    time.sleep(2)
    
    # 4. 处理JavaScript弹窗
    driver.get('https://www.w3schools.com/js/tryit.asp?filename=tryjs_alert')
    
    # 切换到包含按钮的iframe
    driver.switch_to.frame('iframeResult')
    
    # 点击按钮,触发alert
    driver.find_element(By.CSS_SELECTOR, 'button').click()
    
    # 切换到alert
    alert = driver.switch_to.alert
    
    # 获取alert文本
    print(f"弹窗文本: {alert.text}")
    
    # 接受alert (点击OK)
    alert.accept()
    
    # 如果是确认框,可以使用dismiss() 拒绝
    # alert.dismiss()
    
    # 如果是提示框,可以使用send_keys() 输入文本
    # alert.send_keys('Hello World')
    
    # 5. 切换标签页或窗口
    driver.get('https://www.w3schools.com')
    
    # 打开新标签页
    driver.execute_script("window.open('https://www.python.org', '_blank');")
    
    # 获取所有窗口句柄
    handles = driver.window_handles
    
    # 切换到新标签页
    driver.switch_to.window(handles[1])
    print(f"当前页面标题: {driver.title}")
    
    # 切换回原标签页
    driver.switch_to.window(handles[0])
    print(f"当前页面标题: {driver.title}")
    
except Exception as e:
    print(f"出现错误: {e}")
    
finally:
    # 关闭浏览器
    driver.quit()

6.5 无头浏览器

无头浏览器模式可以在没有图形界面的环境中运行 Selenium,节省资源并提高速度。

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
import time

# 配置Chrome选项
chrome_options = Options()
chrome_options.add_argument("--headless")  # 启用无头模式
chrome_options.add_argument("--disable-gpu")  # 禁用GPU加速
chrome_options.add_argument("--window-size=1920,1200")  # 设置窗口大小

# 创建浏览器实例
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)

try:
    # 访问网站
    driver.get("https://www.python.org")
    print(f"页面标题: {driver.title}")
    
    # 搜索
    search_box = driver.find_element(By.NAME, "q")
    search_box.send_keys("headless chrome")
    search_box.submit()
    
    # 等待搜索结果
    time.sleep(2)
    
    # 获取结果
    results = driver.find_elements(By.CSS_SELECTOR, ".list-recent-events li")
    print(f"找到 {len(results)} 个结果")
    
    # 截图
    driver.save_screenshot("headless_screenshot.png")
    print("截图已保存")
    
except Exception as e:
    print(f"出现错误: {e}")
    
finally:
    # 关闭浏览器
    driver.quit()

6.6 实战:使用 Selenium 爬取动态加载的网站

以下是一个使用 Selenium 爬取知乎热门问题的示例:

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import json
import random

def scrape_zhihu_hot_questions(num_questions=10):
    """
    使用Selenium爬取知乎热门问题
    :param num_questions: 要爬取的问题数量
    :return: 问题列表
    """
    # 配置Chrome
    chrome_options = Options()
    chrome_options.add_argument("--disable-notifications")  # 禁用通知
    chrome_options.add_argument("--disable-extensions")  # 禁用扩展
    chrome_options.add_argument("--disable-popup-blocking")  # 禁用弹窗
    chrome_options.add_argument("--start-maximized")  # 最大化窗口
    chrome_options.add_argument("--disable-blink-features=AutomationControlled")  # 不易被检测
    
    # 添加随机User-Agent
    user_agents = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0"
    ]
    chrome_options.add_argument(f"--user-agent={random.choice(user_agents)}")
    
    # 初始化WebDriver
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)
    
    questions = []
    
    try:
        # 访问知乎热榜页面
        driver.get("https://www.zhihu.com/hot")
        
        # 等待页面加载完成
        print("等待页面加载...")
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, ".HotList-item"))
        )
        
        # 滚动页面以加载更多内容
        print("滚动页面加载更多内容...")
        last_height = driver.execute_script("return document.body.scrollHeight")
        
        while len(questions) < num_questions:
            # 滚动到底部
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            
            # 等待页面加载
            time.sleep(random.uniform(1, 2))
            
            # 获取热门问题
            hot_items = driver.find_elements(By.CSS_SELECTOR, ".HotList-item")
            
            # 处理已加载的问题
            for item in hot_items:
                if len(questions) >= num_questions:
                    break
                    
                try:
                    # 排名
                    rank_element = item.find_element(By.CSS_SELECTOR, ".HotList-itemIndex")
                    rank = rank_element.text.strip()
                    
                    # 标题
                    title_element = item.find_element(By.CSS_SELECTOR, ".HotList-itemTitle")
                    title = title_element.text.strip()
                    
                    # 热度
                    hot_element = item.find_element(By.CSS_SELECTOR, ".HotList-itemMetrics")
                    hot = hot_element.text.strip()
                    
                    # 链接
                    link_element = item.find_element(By.CSS_SELECTOR, ".HotList-itemTitle a")
                    link = link_element.get_attribute("href")
                    
                    # 检查是否已存在
                    is_duplicate = False
                    for q in questions:
                        if q["title"] == title:
                            is_duplicate = True
                            break
                    
                    if not is_duplicate:
                        question_data = {
                            "rank": rank,
                            "title": title,
                            "hot": hot,
                            "link": link
                        }
                        questions.append(question_data)
                        print(f"已爬取 {len(questions)}/{num_questions}: {title}")
                
                except Exception as e:
                    print(f"处理问题时出错: {e}")
                    continue
            
            # 检查是否滚动到底部
            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                break
            last_height = new_height
            
            # 随机延迟
            time.sleep(random.uniform(0.5, 1.5))
        
        print(f"共爬取了 {len(questions)} 个问题")
        
    except Exception as e:
        print(f"爬取过程中出错: {e}")
    
    finally:
        # 关闭浏览器
        driver.quit()
    
    return questions

# 运行爬虫
if __name__ == "__main__":
    print("开始爬取知乎热门问题...")
    hot_questions = scrape_zhihu_hot_questions(20)  # 爬取20个热门问题
    
    # 保存到JSON文件
    with open("zhihu_hot_questions.json", "w", encoding="utf-8") as f:
        json.dump(hot_questions, f, ensure_ascii=False, indent=4)
    
    print("爬取完成,数据已保存到 zhihu_hot_questions.json")
    
    # 显示部分结果
    print("\n热门问题TOP5:")
    for i, q in enumerate(hot_questions[:5]):
        print(f"{q['rank']}. {q['title']} ({q['hot']})")

7. Scrapy 框架入门

7.1 Scrapy 简介与安装

Scrapy 是一个用于爬取网站并从其中提取结构化数据的应用框架。

安装 Scrapy:

pip install scrapy

检查安装:

scrapy version

7.2 创建项目与基本组件

创建新的 Scrapy 项目:

scrapy startproject quotes_spider
cd quotes_spider

项目结构如下:

quotes_spider/
    scrapy.cfg            # 部署配置文件
    quotes_spider/        # 项目模块
        __init__.py
        items.py          # 定义数据结构
        middlewares.py    # 中间件
        pipelines.py      # 数据处理
        settings.py       # 项目设置
        spiders/          # 爬虫目录
            __init__.py

创建爬虫:

scrapy genspider quotes quotes.toscrape.com

这将在 spiders/ 目录下创建 quotes.py 文件:

# quotes_spider/quotes_spider/spiders/quotes.py
import scrapy

class QuotesSpider(scrapy.Spider):
    name = 'quotes'
    allowed_domains = ['quotes.toscrape.com']
    start_urls = ['http://quotes.toscrape.com/']

    def parse(self, response):
        pass

7.3 基本的爬虫实现

编辑 quotes.py 文件,实现爬取功能:

import scrapy

class QuotesSpider(scrapy.Spider):
    name = 'quotes'
    allowed_domains = ['quotes.toscrape.com']
    start_urls = ['http://quotes.toscrape.com/']

    def parse(self, response):
        # 提取所有引用
        quotes = response.css('div.quote')
        
        # 处理每个引用
        for quote in quotes:
            text = quote.css('span.text::text').get()
            author = quote.css('small.author::text').get()
            tags = quote.css('div.tags a.tag::text').getall()
            
            # 输出结果
            yield {
                'text': text,
                'author': author,
                'tags': tags
            }
        
        # 获取下一页链接并跟踪
        next_page = response.css('li.next a::attr(href)').get()
        if next_page is not None:
            yield response.follow(next_page, self.parse)

运行爬虫:

scrapy crawl quotes -o quotes.json

7.4 Items 和 Item Pipelines

定义 Item

编辑 items.py 文件,定义数据结构:

# quotes_spider/quotes_spider/items.py
import scrapy

class QuoteItem(scrapy.Item):
    text = scrapy.Field()
    author = scrapy.Field()
    tags = scrapy.Field()

使用 Item

修改爬虫以使用 Item:

# quotes_spider/quotes_spider/spiders/quotes.py
import scrapy
from quotes_spider.items import QuoteItem

class QuotesSpider(scrapy.Spider):
    name = 'quotes'
    allowed_domains = ['quotes.toscrape.com']
    start_urls = ['http://quotes.toscrape.com/']

    def parse(self, response):
        quotes = response.css('div.quote')
        
        for quote in quotes:
            item = QuoteItem()
            item['text'] = quote.css('span.text::text').get()
            item['author'] = quote.css('small.author::text').get()
            item['tags'] = quote.css('div.tags a.tag::text').getall()
            
            yield item
        
        next_page = response.css('li.next a::attr(href)').get()
        if next_page is not None:
            yield response.follow(next_page, self.parse)

创建 Pipeline

编辑 pipelines.py 文件,处理和存储数据:

# quotes_spider/quotes_spider/pipelines.py
import json
from itemadapter import ItemAdapter

class JsonWriterPipeline:
    def open_spider(self, spider):
        self.file = open('quotes_pipeline.json', 'w')
        self.file.write('[')
        self.first_item = True

    def close_spider(self, spider):
        self.file.write(']')
        self.file.close()

    def process_item(self, item, spider):
        # 转换为字典
        line = json.dumps(ItemAdapter(item).asdict(), ensure_ascii=False)
        
        # 添加逗号分隔(除了第一个元素)
        if self.first_item:
            self.first_item = False
        else:
            line = ',\n' + line
        
        self.file.write(line)
        return item

class RemoveTagsPipeline:
    def process_item(self, item, spider):
        # 只保留某些标签,例如只保留与文学相关的标签
        literary_tags = ['love', 'life', 'inspirational', 'poetry', 'literature']
        filtered_tags = [tag for tag in item['tags'] if tag in literary_tags]
        
        if filtered_tags:
            item['tags'] = filtered_tags
        else:
            item['tags'] = ['unclassified']
        
        return item

启用 Pipeline

编辑 settings.py 文件,启用你的 Pipelines:

# quotes_spider/quotes_spider/settings.py
# ...其他设置...

# 配置项目 Pipeline
ITEM_PIPELINES = {
   'quotes_spider.pipelines.RemoveTagsPipeline': 300,
   'quotes_spider.pipelines.JsonWriterPipeline': 800,
}

7.5 中间件与请求处理

Spider 中间件

编辑 middlewares.py 文件,自定义 Spider 中间件:

# quotes_spider/quotes_spider/middlewares.py
from scrapy import signals

class CustomSpiderMiddleware:
    @classmethod
    def from_crawler(cls, crawler):
        middleware = cls()
        crawler.signals.connect(middleware.spider_opened, signal=signals.spider_opened)
        return middleware

    def process_spider_input(self, response, spider):
        # 处理响应
        return None

    def process_spider_output(self, response, result, spider):
        # 处理爬虫的输出
        for item in result:
            # 例如,只选择有特定标签的引用
            if isinstance(item, dict) and 'tags' in item and 'love' in item['tags']:
                yield item
            else:
                yield item

    def process_spider_exception(self, response, exception, spider):
        # 处理爬虫异常
        pass

    def process_start_requests(self, start_requests, spider):
        # 处理起始请求
        for request in start_requests:
            yield request

    def spider_opened(self, spider):
        spider.logger.info('Spider opened: %s' % spider.name)

下载器中间件

自定义下载器中间件:

# quotes_spider/quotes_spider/middlewares.py
import random

class RandomUserAgentMiddleware:
    def __init__(self, user_agents):
        self.user_agents = user_agents

    @classmethod
    def from_crawler(cls, crawler):
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
        ]
        return cls(user_agents)

    def process_request(self, request, spider):
        # 随机选择一个User-Agent
        request.headers['User-Agent'] = random.choice(self.user_agents)
        return None

启用中间件

编辑 settings.py 文件,启用自定义中间件:

# quotes_spider/quotes_spider/settings.py
# ...其他设置...

# 启用Spider中间件
SPIDER_MIDDLEWARES = {
   'quotes_spider.middlewares.CustomSpiderMiddleware': 543,
}

# 启用下载器中间件
DOWNLOADER_MIDDLEWARES = {
   'quotes_spider.middlewares.RandomUserAgentMiddleware': 400,
}

7.6 使用选择器

CSS 选择器

# 在爬虫中使用 CSS 选择器
def parse(self, response):
    # 选择所有引用
    quotes = response.css('div.quote')
    
    # 选择第一个引用
    first_quote = response.css('div.quote')[0]
    
    # 提取文本
    text = first_quote.css('span.text::text').get()
    
    # 提取属性
    href = response.css('li.next a::attr(href)').get()
    
    # 提取多个文本
    all_tags = first_quote.css('a.tag::text').getall()

XPath 选择器

# 在爬虫中使用 XPath 选择器
def parse(self, response):
    # 选择所有引用
    quotes = response.xpath('//div[@class="quote"]')
    
    # 选择第一个引用
    first_quote = quotes[0]
    
    # 提取文本
    text = first_quote.xpath('./span[@class="text"]/text()').get()
    
    # 提取属性
    href = response.xpath('//li[@class="next"]/a/@href').get()
    
    # 提取多个文本
    all_tags = first_quote.xpath('./div[@class="tags"]/a/text()').getall()

组合使用

def parse(self, response):
    # 使用 CSS 选择器找到引用,然后使用 XPath 提取内容
    quotes = response.css('div.quote')
    
    for quote in quotes:
        text = quote.xpath('./span[@class="text"]/text()').get()
        author = quote.css('small.author::text').get()
        
        yield {
            'text': text,
            'author': author
        }

7.7 处理表单和登录

示例:爬取需要登录的网站

# quotes_spider/quotes_spider/spiders/login_spider.py
import scrapy
from scrapy.http import FormRequest

class LoginSpider(scrapy.Spider):
    name = 'login_spider'
    start_urls = ['http://quotes.toscrape.com/login']
    
    def parse(self, response):
        # 获取CSRF令牌
        csrf_token = response.css('input[name="csrf_token"]::attr(value)').get()
        
        # 提交登录表单
        return FormRequest.from_response(
            response,
            formdata={
                'csrf_token': csrf_token,
                'username': 'your_username',
                'password': 'your_password'
            },
            callback=self.after_login
        )
    
    def after_login(self, response):
        # 检查是否登录成功
        if 'logout' in response.text:
            self.log('登录成功!')
            
            # 开始爬取需要登录的内容
            yield scrapy.Request(
                'http://quotes.toscrape.com/protected-quotes',
                callback=self.parse_protected
            )
        else:
            self.log('登录失败.')
    
    def parse_protected(self, response):
        # 爬取需要登录后才能访问的内容
        quotes = response.css('div.quote')
        
        for quote in quotes:
            yield {
                'text': quote.css('span.text::text').get(),
                'author': quote.css('small.author::text').get(),
                'tags': quote.css('div.tags a.tag::text').getall()
            }

7.8 使用 Item Loader

Item Loader 提供了更灵活的方式来填充 Item 对象:

# quotes_spider/quotes_spider/loaders.py
from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, MapCompose, Join
from quotes_spider.items import QuoteItem

class QuoteLoader(ItemLoader):
    default_item_class = QuoteItem
    
    # 默认输出处理器
    default_output_processor = TakeFirst()
    
    # 文本处理
    text_in = MapCompose(str.strip, lambda x: x.strip('"\''))
    
    # 作者处理
    author_in = MapCompose(str.strip)
    
    # 标签处理
    tags_in = MapCompose(str.strip, str.lower)
    tags_out = Join(',')  # 使用逗号连接标签

在爬虫中使用 Item Loader:

# quotes_spider/quotes_spider/spiders/quotes.py
import scrapy
from quotes_spider.loaders import QuoteLoader

class QuotesSpider(scrapy.Spider):
    name = 'quotes'
    allowed_domains = ['quotes.toscrape.com']
    start_urls = ['http://quotes.toscrape.com/']

    def parse(self, response):
        quotes = response.css('div.quote')
        
        for quote in quotes:
            loader = QuoteLoader(selector=quote)
            
            # 添加字段
            loader.add_css('text', 'span.text::text')
            loader.add_css('author', 'small.author::text')
            loader.add_css('tags', 'a.tag::text')
            
            # 加载Item
            yield loader.load_item()
        
        next_page = response.css('li.next a::attr(href)').get()
        if next_page is not None:
            yield response.follow(next_page, self.parse)

7.9 实战:构建书籍爬虫

# books_spider/books_spider/spiders/books.py
import scrapy
from books_spider.items import BookItem

class BooksSpider(scrapy.Spider):
    name = 'books'
    allowed_domains = ['books.toscrape.com']
    start_urls = ['http://books.toscrape.com/']

    def parse(self, response):
        # 提取所有书籍链接
        books = response.css('article.product_pod h3 a::attr(href)').getall()
        
        # 遍历每本书的链接
        for book_url in books:
            # 拼接完整URL
            book_url = response.urljoin(book_url)
            yield scrapy.Request(book_url, callback=self.parse_book)
        
        # 处理分页
        next_page = response.css('li.next a::attr(href)').get()
        if next_page:
            next_page_url = response.urljoin(next_page)
            yield scrapy.Request(next_page_url, callback=self.parse)

    def parse_book(self, response):
        # 创建BookItem
        book = BookItem()
        
        # 提取书籍信息
        book['title'] = response.css('div.product_main h1::text').get()
        book['price'] = response.css('p.price_color::text').get()
        book['category'] = response.css('ul.breadcrumb li:nth-child(3) a::text').get()
        book['availability'] = response.css('p.availability::text').getall()[1].strip()
        book['rating'] = response.css('p.star-rating::attr(class)').get().split()[1]
        book['description'] = response.css('div#product_description + p::text').get()
        book['url'] = response.url
        
        # 提取图片URL
        image_rel_url = response.css('div.item.active img::attr(src)').get()
        book['image_url'] = response.urljoin(image_rel_url) if image_rel_url else None
        
        yield book

创建 BookItem:

# books_spider/books_spider/items.py
import scrapy

class BookItem(scrapy.Item):
    title = scrapy.Field()
    price = scrapy.Field()
    category = scrapy.Field()
    availability = scrapy.Field()
    rating = scrapy.Field()
    description = scrapy.Field()
    url = scrapy.Field()
    image_url = scrapy.Field()

创建数据处理 Pipeline:

# books_spider/books_spider/pipelines.py
import re
from itemadapter import ItemAdapter

class BooksPipeline:
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        
        # 清理价格,只保留数字
        price = adapter.get('price')
        if price:
            # 提取数字部分,例如从 "£51.77" 提取 "51.77"
            price_value = re.search(r'[0-9.]+', price)
            if price_value:
                adapter['price'] = float(price_value.group())
        
        # 标准化评分
        rating = adapter.get('rating')
        if rating:
            # 将评星文本(One, Two, Three, Four, Five)转换为数字
            rating_map = {
                'One': 1,
                'Two': 2,
                'Three': 3,
                'Four': 4,
                'Five': 5
            }
            adapter['rating'] = rating_map.get(rating, 0)
        
        # 处理库存信息
        availability = adapter.get('availability')
        if availability:
            # 提取数字,例如从 "In stock (19 available)" 提取 "19"
            stock_match = re.search(r'(\d+) available', availability)
            if stock_match:
                adapter['availability'] = int(stock_match.group(1))
            else:
                adapter['availability'] = 0
        
        return item

激活 Pipeline:

# books_spider/books_spider/settings.py
ITEM_PIPELINES = {
   'books_spider.pipelines.BooksPipeline': 300,
}

运行爬虫:

cd books_spider
scrapy crawl books -o books.json

8. 数据存储

8.1 存储为不同格式

JSON 格式:

import json

# 将数据保存为JSON
def save_to_json(data, filename):
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)

# 从JSON加载数据
def load_from_json(filename):
    with open(filename, 'r', encoding='utf-8') as f:
        return json.load(f)

# 使用示例
data = [{'name': '张三', 'age': 30}, {'name': '李四', 'age': 25}]
save_to_json(data, 'people.json')
loaded_data = load_from_json('people.json')

CSV 格式:

import csv

# 将数据保存为CSV
def save_to_csv(data, filename, fieldnames=None):
    if not fieldnames and data:
        # 自动获取字段名
        fieldnames = data[0].keys()
    
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)

# 从CSV加载数据
def load_from_csv(filename):
    with open(filename, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        return list(reader)

# 使用示例
data = [{'name': '张三', 'age': '30'}, {'name': '李四', 'age': '25'}]
save_to_csv(data, 'people.csv')
loaded_data = load_from_csv('people.csv')

Excel 格式:

import pandas as pd

# 将数据保存为Excel
def save_to_excel(data, filename, sheet_name='Sheet1'):
    df = pd.DataFrame(data)
    df.to_excel(filename, sheet_name=sheet_name, index=False)

# 从Excel加载数据
def load_from_excel(filename, sheet_name='Sheet1'):
    df = pd.read_excel(filename, sheet_name=sheet_name)
    return df.to_dict('records')

# 使用示例
data = [{'name': '张三', 'age': 30}, {'name': '李四', 'age': 25}]
save_to_excel(data, 'people.xlsx')
loaded_data = load_from_excel('people.xlsx')

XML 格式:

import xml.etree.ElementTree as ET
from xml.dom import minidom

# 将数据保存为XML
def save_to_xml(data, filename, root_element='data', item_element='item'):
    root = ET.Element(root_element)
    
    for item in data:
        item_elem = ET.SubElement(root, item_element)
        for key, value in item.items():
            child = ET.SubElement(item_elem, key)
            child.text = str(value)
    
    # 格式化XML
    rough_string = ET.tostring(root, 'utf-8')
    reparsed = minidom.parseString(rough_string)
    pretty_xml = reparsed.toprettyxml(indent="  ")
    
    with open(filename, 'w', encoding='utf-8') as f:
        f.write(pretty_xml)

# 从XML加载数据
def load_from_xml(filename, item_element='item'):
    tree = ET.parse(filename)
    root = tree.getroot()
    
    data = []
    for item_elem in root.findall(item_element):
        item_data = {}
        for child in item_elem:
            item_data[child.tag] = child.text
        data.append(item_data)
    
    return data

# 使用示例
data = [{'name': '张三', 'age': 30}, {'name': '李四', 'age': 25}]
save_to_xml(data, 'people.xml')
loaded_data = load_from_xml('people.xml')

8.2 数据库存储

SQLite:

import sqlite3

# 连接到SQLite数据库
def create_connection(db_file):
    conn = None
    try:
        conn = sqlite3.connect(db_file)
        return conn
    except sqlite3.Error as e:
        print(e)
    return conn

# 创建表
def create_table(conn, create_table_sql):
    try:
        c = conn.cursor()
        c.execute(create_table_sql)
    except sqlite3.Error as e:
        print(e)

# 插入多条数据
def insert_many_records(conn, table_name, records, fields):
    placeholders = ', '.join(['?' for _ in fields])
    fields_str = ', '.join(fields)
    sql = f'INSERT INTO {table_name} ({fields_str}) VALUES ({placeholders})'
    
    try:
        c = conn.cursor()
        # 将字典列表转换为值元组列表
        values = [[record.get(field) for field in fields] for record in records]
        c.executemany(sql, values)
        conn.commit()
        return c.rowcount
    except sqlite3.Error as e:
        print(e)
        return 0

# 查询数据
def select_all_records(conn, table_name):
    sql = f'SELECT * FROM {table_name}'
    try:
        c = conn.cursor()
        c.execute(sql)
        columns = [column[0] for column in c.description]
        result = []
        for row in c.fetchall():
            result.append(dict(zip(columns, row)))
        return result
    except sqlite3.Error as e:
        print(e)
        return []

# 使用示例
def main():
    database = "pythonsqlite.db"
    
    # 创建人员表的SQL语句
    sql_create_people_table = """
    CREATE TABLE IF NOT EXISTS people (
        id integer PRIMARY KEY,
        name text NOT NULL,
        age integer
    );
    """
    
    # 创建连接
    conn = create_connection(database)
    
    if conn is not None:
        # 创建表
        create_table(conn, sql_create_people_table)
        
        # 准备数据
        people = [
            {'name': '张三', 'age': 30},
            {'name': '李四', 'age': 25},
            {'name': '王五', 'age': 40}
        ]
        
        # 插入数据
        fields = ['name', 'age']
        rows_affected = insert_many_records(conn, 'people', people, fields)
        print(f"插入了 {rows_affected} 条记录。")
        
        # 查询所有记录
        all_people = select_all_records(conn, 'people')
        print("查询结果:")
        for person in all_people:
            print(f"ID: {person['id']}, 姓名: {person['name']}, 年龄: {person['age']}")
        
        # 关闭连接
        conn.close()
    else:
        print("无法创建数据库连接。")

if __name__ == '__main__':
    main()

MySQL/MariaDB:

import mysql.connector
from mysql.connector import Error

# 连接到MySQL数据库
def create_connection(host_name, user_name, user_password, db_name=None):
    connection = None
    try:
        connection = mysql.connector.connect(
            host=host_name,
            user=user_name,
            passwd=user_password,
            database=db_name
        )
        print("MySQL数据库连接成功")
    except Error as e:
        print(f"MySQL连接错误: {e}")
    
    return connection

# 创建数据库
def create_database(connection, query):
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        print("数据库创建成功")
    except Error as e:
        print(f"创建数据库错误: {e}")

# 执行查询
def execute_query(connection, query):
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        connection.commit()
        print("查询成功执行")
    except Error as e:
        print(f"查询执行错误: {e}")

# 执行读取查询
def execute_read_query(connection, query):
    cursor = connection.cursor(dictionary=True)
    result = None
    try:
        cursor.execute(query)
        result = cursor.fetchall()
        return result
    except Error as e:
        print(f"读取数据错误: {e}")
        return None

# 使用示例
def main():
    # 连接参数
    host = "localhost"
    user = "root"
    password = "your_password"
    
    # 连接到MySQL服务器
    connection = create_connection(host, user, password)
    
    if connection:
        # 创建数据库
        create_database_query = "CREATE DATABASE IF NOT EXISTS web_crawler"
        create_database(connection, create_database_query)
        
        # 关闭之前的连接
        connection.close()
        
        # 连接到新创建的数据库
        connection = create_connection(host, user, password, "web_crawler")
        
        if connection:
            # 创建表
            create_table_query = """
            CREATE TABLE IF NOT EXISTS products (
                id INT AUTO_INCREMENT PRIMARY KEY,
                name VARCHAR(255) NOT NULL,
                price DECIMAL(10, 2),
                description TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
            """
            execute_query(connection, create_table_query)
            
            # 插入数据
            insert_data_query = """
            INSERT INTO products (name, price, description)
            VALUES 
                ('笔记本电脑', 5999.99, '高性能笔记本电脑,适合办公和游戏'),
                ('智能手机', 3999.00, '最新款智能手机,支持5G网络'),
                ('蓝牙耳机', 299.50, '无线蓝牙耳机,音质清晰');
            """
            execute_query(connection, insert_data_query)
            
            # 查询数据
            select_query = "SELECT * FROM products"
            products = execute_read_query(connection, select_query)
            
            if products:
                print("所有产品:")
                for product in products:
                    print(f"ID: {product['id']}")
                    print(f"名称: {product['name']}")
                    print(f"价格: {product['price']}")
                    print(f"描述: {product['description']}")
                    print(f"创建时间: {product['created_at']}")
                    print("-" * 30)
            
            # 关闭连接
            connection.close()
            print("MySQL连接已关闭")

if __name__ == "__main__":
    main()

MongoDB:

from pymongo import MongoClient

# 连接MongoDB
def connect_mongodb(host='localhost', port=27017, db_name='web_crawler'):
    try:
        client = MongoClient(host, port)
        db = client[db_name]
        print(f"MongoDB连接成功: {db_name}")
        return db
    except Exception as e:
        print(f"MongoDB连接错误: {e}")
        return None

# 插入多条文档
def insert_many_documents(db, collection_name, documents):
    try:
        collection = db[collection_name]
        result = collection.insert_many(documents)
        return len(result.inserted_ids)
    except Exception as e:
        print(f"插入文档错误: {e}")
        return 0

# 查询所有文档
def find_all_documents(db, collection_name, query=None, projection=None):
    try:
        collection = db[collection_name]
        return list(collection.find(query, projection))
    except Exception as e:
        print(f"查询文档错误: {e}")
        return []

# 查询符合条件的文档
def find_documents(db, collection_name, query):
    try:
        collection = db[collection_name]
        return list(collection.find(query))
    except Exception as e:
        print(f"查询文档错误: {e}")
        return []

# 更新文档
def update_documents(db, collection_name, query, update):
    try:
        collection = db[collection_name]
        result = collection.update_many(query, update)
        return result.modified_count
    except Exception as e:
        print(f"更新文档错误: {e}")
        return 0

# 删除文档
def delete_documents(db, collection_name, query):
    try:
        collection = db[collection_name]
        result = collection.delete_many(query)
        return result.deleted_count
    except Exception as e:
        print(f"删除文档错误: {e}")
        return 0

# 使用示例
def main():
    # 连接MongoDB
    db = connect_mongodb()
    
    if db:
        # 准备数据
        products = [
            {
                "name": "笔记本电脑",
                "brand": "联想",
                "price": 5999.99,
                "specs": {
                    "cpu": "Intel i5",
                    "ram": "8GB",
                    "storage": "512GB SSD"
                },
                "in_stock": True
            },
            {
                "name": "智能手机",
                "brand": "小米",
                "price": 2999.00,
                "specs": {
                    "screen": "6.5英寸",
                    "camera": "4800万像素",
                    "battery": "4500mAh"
                },
                "in_stock": True
            },
            {
                "name": "智能手表",
                "brand": "华为",
                "price": 1499.00,
                "specs": {
                    "screen": "1.3英寸",
                    "battery": "14天续航",
                    "water_resistant": True
                },
                "in_stock": False
            }
        ]
        
        # 插入数据
        collection_name = "products"
        count = insert_many_documents(db, collection_name, products)
        print(f"插入了 {count} 条文档")
        
        # 查询所有文档
        all_products = find_all_documents(db, collection_name)
        print("\n所有产品:")
        for product in all_products:
            print(f"ID: {product['_id']}")
            print(f"名称: {product['name']}")
            print(f"品牌: {product['brand']}")
            print(f"价格: {product['price']}")
            print("-" * 30)
        
        # 按条件查询
        in_stock_products = find_documents(db, collection_name, {"in_stock": True})
        print(f"\n有库存的产品数量: {len(in_stock_products)}")
        
        # 更新文档
        update_count = update_documents(
            db, 
            collection_name, 
            {"name": "智能手表"}, 
            {"$set": {"in_stock": True, "price": 1299.00}}
        )
        print(f"更新了 {update_count} 条文档")
        
        # 再次查询手表
        watches = find_documents(db, collection_name, {"name": "智能手表"})
        if watches:
            print(f"\n更新后的手表信息:")
            watch = watches[0]
            print(f"名称: {watch['name']}")
            print(f"价格: {watch['price']}")
            print(f"库存状态: {watch['in_stock']}")
        
        # 删除文档
        delete_count = delete_documents(db, collection_name, {"brand": "华为"})
        print(f"删除了 {delete_count} 条文档")
        
        # 查询剩余文档数
        remaining = find_all_documents(db, collection_name)
        print(f"剩余文档数: {len(remaining)}")

if __name__ == "__main__":
    main()

8.3 云存储与数据托管服务

存储到 Google Sheets:

import gspread
from oauth2client.service_account import ServiceAccountCredentials

def save_to_google_sheets(data, sheet_name):
    # 授权访问Google API
    scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
    credentials = ServiceAccountCredentials.from_json_keyfile_name('your-credentials.json', scope)
    gc = gspread.authorize(credentials)
    
    try:
        # 尝试打开现有的工作表
        sheet = gc.open(sheet_name)
    except gspread.exceptions.SpreadsheetNotFound:
        # 如果不存在则创建新的
        sheet = gc.create(sheet_name)
        # 共享给您的Gmail账户
        sheet.share('your-email@gmail.com', perm_type='user', role='writer')
    
    # 获取第一个工作表
    worksheet = sheet.get_worksheet(0)
    
    # 如果数据为空,则返回
    if not data:
        return
    
    # 获取所有字段名称
    headers = list(data[0].keys())
    
    # 清空工作表并设置标题行
    worksheet.clear()
    worksheet.append_row(headers)
    
    # 添加数据行
    for item in data:
        row = [item.get(header, '') for header in headers]
        worksheet.append_row(row)
    
    print(f"数据已保存到Google Sheets: {sheet_name}")
    print(f"查看链接: {sheet.url}")

# 使用示例
data = [
    {'name': '张三', 'age': 30, 'city': '北京'},
    {'name': '李四', 'age': 25, 'city': '上海'},
    {'name': '王五', 'age': 40, 'city': '广州'}
]

save_to_google_sheets(data, 'User Data')

存储到 AWS S3:

import boto3
import json
import tempfile
from botocore.exceptions import ClientError

def save_to_s3(data, bucket_name, file_name, file_format='json'):
    """
    将数据保存到AWS S3
    :param data: 要保存的数据
    :param bucket_name: S3桶名称
    :param file_name: 保存的文件名
    :param file_format: 文件格式 (json 或 csv)
    :return: 成功返回True,失败返回False
    """
    # 创建S3客户端
    s3_client = boto3.client('s3')
    
    try:
        # 创建临时文件
        with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp:
            if file_format == 'json':
                json.dump(data, temp, ensure_ascii=False, indent=4)
            elif file_format == 'csv':
                import csv
                fieldnames = data[0].keys() if data else []
                writer = csv.DictWriter(temp, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(data)
            else:
                print(f"不支持的文件格式: {file_format}")
                return False
        
        # 上传文件到S3
        with open(temp.name, 'rb') as file:
            s3_client.upload_fileobj(file, bucket_name, file_name)
        
        print(f"数据已上传到S3: s3://{bucket_name}/{file_name}")
        return True
    
    except ClientError as e:
        print(f"上传到S3时出错: {e}")
        return False

# 使用示例
data = [
    {'id': 1, 'name': '笔记本电脑', 'price': 5999.99},
    {'id': 2, 'name': '智能手机', 'price': 3999.00},
    {'id': 3, 'name': '蓝牙耳机', 'price': 299.50}
]

# 上传为JSON
save_to_s3(data, 'your-bucket-name', 'products.json')

# 上传为CSV
save_to_s3(data, 'your-bucket-name', 'products.csv', 'csv')

使用 Firebase Realtime Database:

import firebase_admin
from firebase_admin import credentials
from firebase_admin import db
import datetime

def initialize_firebase(credentials_path, database_url):
    """初始化Firebase连接"""
    try:
        cred = credentials.Certificate(credentials_path)
        firebase_admin.initialize_app(cred, {
            'databaseURL': database_url
        })
        print("Firebase初始化成功")
        return True
    except Exception as e:
        print(f"Firebase初始化失败: {e}")
        return False

def save_to_firebase(data, ref_path):
    """
    将数据保存到Firebase Realtime Database
    :param data: 要保存的数据
    :param ref_path: Firebase中的引用路径
    """
    try:
        # 获取数据库引用
        ref = db.reference(ref_path)
        
        # 添加时间戳
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        # 保存数据
        for i, item in enumerate(data):
            item_data = dict(item)
            item_data['timestamp'] = timestamp
            ref.child(f"item_{i}").set(item_data)
        
        print(f"数据已保存到Firebase: {ref_path}")
        return True
    
    except Exception as e:
        print(f"保存到Firebase失败: {e}")
        return False

def read_from_firebase(ref_path):
    """
    从Firebase Realtime Database读取数据
    :param ref_path: Firebase中的引用路径
    :return: 读取的数据
    """
    try:
        # 获取数据库引用
        ref = db.reference(ref_path)
        
        # 获取所有数据
        data = ref.get()
        
        if data:
            # 转换为列表格式
            return [item for item in data.values()]
        else:
            return []
        
    except Exception as e:
        print(f"从Firebase读取失败: {e}")
        return []

# 使用示例
# 初始化Firebase (只需要执行一次)
initialize_firebase(
    'path/to/your/firebase-credentials.json',
    'https://your-project-id.firebaseio.com/'
)

# 准备数据
products = [
    {'name': '笔记本电脑', 'price': 5999.99, 'brand': '联想'},
    {'name': '智能手机', 'price': 3999.00, 'brand': '小米'},
    {'name': '蓝牙耳机', 'price': 299.50, 'brand': '华为'}
]

# 保存数据
save_to_firebase(products, 'products')

# 读取数据
retrieved_products = read_from_firebase('products')
print("\n从Firebase读取的数据:")
for product in retrieved_products:
    print(f"名称: {product['name']}")
    print(f"价格: {product['price']}")
    print(f"品牌: {product['brand']}")
    print(f"时间戳: {product['timestamp']}")
    print("-" * 30)

9. 实战项目:电商数据爬虫

让我们创建一个完整的电商数据爬虫项目,爬取京东商品数据并存储到数据库中。

9.1 项目设计与需求

项目目标:

  • 爬取京东特定类别的商品信息
  • 获取商品标题、价格、评价数、商店名等信息
  • 存储数据到MongoDB数据库
  • 支持增量更新和去重
  • 实现数据可视化分析

目录结构:

jd_spider/
├── spider/
│   ├── __init__.py
│   ├── config.py      # 配置文件
│   ├── crawler.py     # 爬虫主类
│   ├── db.py          # 数据库操作
│   └── utils.py       # 工具函数
├── analysis/
│   ├── __init__.py
│   ├── analyzer.py    # 数据分析
│   └── visualize.py   # 数据可视化
├── main.py            # 主程序
└── requirements.txt   # 依赖包

9.2 配置和依赖

requirements.txt:

requests==2.31.0
beautifulsoup4==4.12.2
selenium==4.15.2
pymongo==4.6.0
pandas==2.1.3
matplotlib==3.8.2
seaborn==0.13.0
webdriver-manager==4.0.1
fake-useragent==1.3.0
python-dotenv==1.0.0

config.py:

import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 爬虫配置
CRAWLER_CONFIG = {
    'MAX_PAGES': int(os.getenv('MAX_PAGES', 5)),
    'DELAY_MIN': float(os.getenv('DELAY_MIN', 1.0)),
    'DELAY_MAX': float(os.getenv('DELAY_MAX', 3.0)),
    'TIMEOUT': int(os.getenv('TIMEOUT', 10)),
    'RETRY_TIMES': int(os.getenv('RETRY_TIMES', 3)),
    'USER_AGENT_ROTATE': os.getenv('USER_AGENT_ROTATE', 'True').lower() == 'true',
}

# 数据库配置
DB_CONFIG = {
    'MONGO_URI': os.getenv('MONGO_URI', 'mongodb://localhost:27017/'),
    'DB_NAME': os.getenv('DB_NAME', 'jd_products'),
    'COLLECTION_NAME': os.getenv('COLLECTION_NAME', 'products'),
}

# 代理配置
PROXY_CONFIG = {
    'USE_PROXY': os.getenv('USE_PROXY', 'False').lower() == 'true',
    'PROXY_API': os.getenv('PROXY_API', ''),
}

# Selenium配置
SELENIUM_CONFIG = {
    'USE_SELENIUM': os.getenv('USE_SELENIUM', 'False').lower() == 'true',
    'HEADLESS': os.getenv('HEADLESS', 'True').lower() == 'true',
}

# 日志配置
LOG_CONFIG = {
    'LEVEL': os.getenv('LOG_LEVEL', 'INFO'),
    'FILE': os.getenv('LOG_FILE', 'jd_spider.log'),
}

9.3 数据库操作

db.py:

import pymongo
from spider.config import DB_CONFIG
import logging

class Database:
    def __init__(self):
        """初始化数据库连接"""
        self.client = None
        self.db = None
        self.collection = None
        self.logger = logging.getLogger(__name__)
    
    def connect(self):
        """连接到MongoDB"""
        try:
            self.client = pymongo.MongoClient(DB_CONFIG['MONGO_URI'])
            self.db = self.client[DB_CONFIG['DB_NAME']]
            self.collection = self.db[DB_CONFIG['COLLECTION_NAME']]
            
            # 创建索引 (如果不存在)
            if 'product_id_1' not in self.collection.index_information():
                self.collection.create_index([('product_id', pymongo.ASCENDING)], unique=True)
            
            self.logger.info(f"成功连接到MongoDB: {DB_CONFIG['DB_NAME']}.{DB_CONFIG['COLLECTION_NAME']}")
            return True
        except Exception as e:
            self.logger.error(f"连接MongoDB失败: {e}")
            return False
    
    def close(self):
        """关闭数据库连接"""
        if self.client:
            self.client.close()
            self.logger.info("MongoDB连接已关闭")
    
    def save_product(self, product):
        """保存单个商品信息,存在则更新"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return False
        
        try:
            # 使用product_id作为唯一标识
            result = self.collection.update_one(
                {'product_id': product['product_id']},
                {'$set': product},
                upsert=True
            )
            
            if result.upserted_id:
                self.logger.debug(f"新增商品: {product['title']}")
                return True
            elif result.modified_count > 0:
                self.logger.debug(f"更新商品: {product['title']}")
                return True
            else:
                self.logger.debug(f"商品已存在且无变化: {product['title']}")
                return False
        except Exception as e:
            self.logger.error(f"保存商品失败: {e}")
            return False
    
    def save_products(self, products):
        """批量保存商品信息"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return 0
        
        saved_count = 0
        for product in products:
            if self.save_product(product):
                saved_count += 1
        
        self.logger.info(f"已保存 {saved_count}/{len(products)} 件商品")
        return saved_count
    
    def get_product_count(self):
        """获取商品总数"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return 0
        
        return self.collection.count_documents({})
    
    def get_all_products(self, limit=None):
        """获取所有商品信息"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return []
        
        cursor = self.collection.find({})
        if limit:
            cursor = cursor.limit(limit)
        
        return list(cursor)
    
    def get_products_by_category(self, category):
        """按类别获取商品信息"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return []
        
        return list(self.collection.find({'category': category}))
    
    def get_price_range(self):
        """获取价格范围"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return (0, 0)
        
        pipeline = [
            {
                '$group': {
                    '_id': None,
                    'min_price': {'$min': '$price_number'},
                    'max_price': {'$max': '$price_number'}
                }
            }
        ]
        
        result = list(self.collection.aggregate(pipeline))
        if result:
            return (result[0]['min_price'], result[0]['max_price'])
        return (0, 0)
    
    def get_top_shops(self, limit=10):
        """获取店铺出现次数最多的Top N"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return []
        
        pipeline = [
            {'$group': {'_id': '$shop', 'count': {'$sum': 1}}},
            {'$sort': {'count': -1}},
            {'$limit': limit}
        ]
        
        return list(self.collection.aggregate(pipeline))

9.4 爬虫实现

utils.py:

import logging
import random
import time
import re
from fake_useragent import UserAgent
from spider.config import CRAWLER_CONFIG, LOG_CONFIG

# 配置日志
def setup_logger():
    logger = logging.getLogger('jd_spider')
    logger.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
    
    # 文件处理器
    file_handler = logging.FileHandler(LOG_CONFIG['FILE'], encoding='utf-8')
    file_handler.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
    
    # 格式化器
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    
    # 添加处理器
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

# 随机延迟
def random_delay():
    delay = random.uniform(CRAWLER_CONFIG['DELAY_MIN'], CRAWLER_CONFIG['DELAY_MAX'])
    time.sleep(delay)
    return delay

# 获取随机User-Agent
def get_random_ua():
    try:
        ua = UserAgent()
        return ua.random
    except:
        # 如果fake-useragent失败,使用预定义的列表
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
        ]
        return random.choice(user_agents)

# 提取数字
def extract_number(text):
    if not text:
        return 0
    
    matches = re.findall(r'\d+\.?\d*', text)
    if matches:
        return float(matches[0])
    return 0

# 清理文本
def clean_text(text):
    if not text:
        return ""
    
    # 移除多余空白字符
    text = re.sub(r'\s+', ' ', text).strip()
    # 移除特殊字符
    text = re.sub(r'[^\w\s\u4e00-\u9fff.,-]', '', text)
    return text

crawler.py:

import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import quote
import logging
import time
import re
import json
from spider.config import CRAWLER_CONFIG, SELENIUM_CONFIG
from spider.utils import random_delay, get_random_ua, extract_number, clean_text, setup_logger

class JDCrawler:
    def __init__(self):
        """初始化京东爬虫"""
        self.logger = logging.getLogger('jd_spider')
        self.session = requests.Session()
        self.driver = None
        self.setup_session()
        
        if SELENIUM_CONFIG['USE_SELENIUM']:
            self.setup_selenium()
    
    def setup_session(self):
        """配置请求会话"""
        # 设置请求头
        self.session.headers.update({
            'User-Agent': get_random_ua(),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Referer': 'https://www.jd.com/',
        })
    
    def setup_selenium(self):
        """配置Selenium WebDriver"""
        chrome_options = Options()
        
        if SELENIUM_CONFIG['HEADLESS']:
            chrome_options.add_argument('--headless')
        
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument(f'user-agent={get_random_ua()}')
        chrome_options.add_argument('--disable-blink-features=AutomationControlled')
        
        # 禁用图片加载以提高速度
        chrome_prefs = {"profile.managed_default_content_settings.images": 2}
        chrome_options.add_experimental_option("prefs", chrome_prefs)
        
        service = Service(ChromeDriverManager().install())
        self.driver = webdriver.Chrome(service=service, options=chrome_options)
        self.driver.set_page_load_timeout(CRAWLER_CONFIG['TIMEOUT'])
        
        self.logger.info("Selenium WebDriver已初始化")
    
    def close(self):
        """关闭资源"""
        if self.driver:
            self.driver.quit()
            self.logger.info("Selenium WebDriver已关闭")
    
    def search_products(self, keyword, max_pages=None):
        """
        搜索商品
        :param keyword: 搜索关键词
        :param max_pages: 最大爬取页数
        :return: 商品列表
        """
        if max_pages is None:
            max_pages = CRAWLER_CONFIG['MAX_PAGES']
        
        all_products = []
        
        for page in range(1, max_pages + 1):
            try:
                self.logger.info(f"正在爬取关键词 '{keyword}' 的第 {page} 页")
                
                # 构建搜索URL
                url = f'https://search.jd.com/Search?keyword={quote(keyword)}&page={page}'
                
                if SELENIUM_CONFIG['USE_SELENIUM']:
                    products_on_page = self._parse_with_selenium(url)
                else:
                    products_on_page = self._parse_with_requests(url)
                
                if not products_on_page:
                    self.logger.warning(f"第 {page} 页没有找到商品,停止爬取")
                    break
                
                self.logger.info(f"第 {page} 页找到 {len(products_on_page)} 个商品")
                all_products.extend(products_on_page)
                
                # 随机延迟
                delay = random_delay()
                self.logger.debug(f"随机延迟 {delay:.2f} 秒")
                
            except Exception as e:
                self.logger.error(f"爬取第 {page} 页时出错: {e}")
                break
        
        self.logger.info(f"总共爬取了 {len(all_products)} 个商品")
        return all_products
    
    def _parse_with_requests(self, url):
        """使用requests解析页面"""
        try:
            # 每次请求前更新User-Agent
            if CRAWLER_CONFIG['USER_AGENT_ROTATE']:
                self.session.headers.update({'User-Agent': get_random_ua()})
            
            response = self.session.get(url, timeout=CRAWLER_CONFIG['TIMEOUT'])
            response.raise_for_status()
            
            # 使用BeautifulSoup解析HTML
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 解析商品列表
            return self._extract_products_from_soup(soup)
            
        except requests.RequestException as e:
            self.logger.error(f"请求错误: {e}")
            return []
    
    def _parse_with_selenium(self, url):
        """使用Selenium解析页面"""
        try:
            self.driver.get(url)
            
            # 等待商品列表加载完成
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, "div.gl-i-wrap"))
            )
            
            # 滚动页面以加载更多内容
            self._scroll_page()
            
            # 获取页面源码
            html = self.driver.page_source
            soup = BeautifulSoup(html, 'html.parser')
            
            # 解析商品列表
            return self._extract_products_from_soup(soup)
            
        except Exception as e:
            self.logger.error(f"Selenium解析错误: {e}")
            return []
    
    def _scroll_page(self):
        """滚动页面加载所有内容"""
        try:
            # 获取当前页面高度
            last_height = self.driver.execute_script("return document.body.scrollHeight")
            
            while True:
                # 滚动到页面底部
                self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                
                # 等待页面加载
                time.sleep(2)
                
                # 获取新的页面高度
                new_height = self.driver.execute_script("return document.body.scrollHeight")
                
                # 如果高度没有变化,说明已经到底部了
                if new_height == last_height:
                    break
                
                last_height = new_height
        except Exception as e:
            self.logger.error(f"滚动页面时出错: {e}")
    
    def _extract_products_from_soup(self, soup):
        """从BeautifulSoup对象中提取商品信息"""
        products = []
        
        # 查找所有商品项
        product_items = soup.select('li.gl-item')
        
        for item in product_items:
            try:
                # 提取商品ID
                product_id = item.get('data-sku')
                if not product_id:
                    continue
                
                # 提取商品价格
                price_element = item.select_one('div.p-price strong i')
                price = price_element.text if price_element else 'N/A'
                price_number = extract_number(price)
                
                # 提取商品标题
                title_element = item.select_one('div.p-name em')
                title = title_element.get_text(' ', strip=True) if title_element else 'N/A'
                title = clean_text(title)
                
                # 提取店铺名称
                shop_element = item.select_one('div.p-shop span a')
                shop = shop_element.get_text(strip=True) if shop_element else 'N/A'
                shop = clean_text(shop)
                
                # 提取评论数
                commit_element = item.select_one('div.p-commit strong a')
                commit_text = commit_element.text if commit_element else '0'
                commits = extract_number(commit_text)
                
                # 提取商品链接
                link = f"https://item.jd.com/{product_id}.html"
                
                # 获取当前时间
                timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
                
                # 构建商品信息
                product_info = {
                    'product_id': product_id,
                    'title': title,
                    'price': price,
                    'price_number': price_number,
                    'shop': shop,
                    'commits': int(commits),
                    'link': link,
                    'timestamp': timestamp
                }
                
                products.append(product_info)
                
            except Exception as e:
                self.logger.error(f"解析商品时出错: {e}")
                continue
        
        return products
    
    def get_product_details(self, product_id):
        """获取商品详细信息"""
        try:
            url = f"https://item.jd.com/{product_id}.html"
            
            if SELENIUM_CONFIG['USE_SELENIUM']:
                self.driver.get(url)
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.ID, "detail"))
                )
                html = self.driver.page_source
            else:
                response = self.session.get(url, timeout=CRAWLER_CONFIG['TIMEOUT'])
                response.raise_for_status()
                html = response.text
            
            soup = BeautifulSoup(html, 'html.parser')
            
            # 提

此回复受到限制,因为已达到允许的最大输出 Token 数。


让我们继续完成电商数据爬虫项目的实现。

9.4 爬虫实现(续)

crawler.py(继续):

    def get_product_details(self, product_id):
        """获取商品详细信息"""
        try:
            url = f"https://item.jd.com/{product_id}.html"
            
            if SELENIUM_CONFIG['USE_SELENIUM']:
                self.driver.get(url)
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.ID, "detail"))
                )
                html = self.driver.page_source
            else:
                response = self.session.get(url, timeout=CRAWLER_CONFIG['TIMEOUT'])
                response.raise_for_status()
                html = response.text
            
            soup = BeautifulSoup(html, 'html.parser')
            
            # 提取商品详情
            details = {}
            
            # 标题
            title_element = soup.select_one('.sku-name')
            details['title'] = title_element.get_text(strip=True) if title_element else 'N/A'
            
            # 价格 (通过JS加载,可能无法直接获取)
            details['price'] = 'N/A'
            
            # 品牌
            brand_element = soup.select_one('#parameter-brand li a')
            details['brand'] = brand_element.get_text(strip=True) if brand_element else 'N/A'
            
            # 商品参数
            params = {}
            param_elements = soup.select('#parameter2 li')
            for param in param_elements:
                text = param.get_text(strip=True)
                if ':' in text:
                    key, value = text.split(':', 1)
                    params[key.strip()] = value.strip()
            
            details['parameters'] = params
            
            # 商品描述
            desc_element = soup.select_one('.news')
            details['description'] = desc_element.get_text(strip=True) if desc_element else 'N/A'
            
            # 获取评论数据
            details['comments'] = self._get_product_comments(product_id)
            
            # 添加商品ID和URL
            details['product_id'] = product_id
            details['url'] = url
            
            return details
            
        except Exception as e:
            self.logger.error(f"获取商品详情时出错 (ID: {product_id}): {e}")
            return None
    
    def _get_product_comments(self, product_id, page=1, page_size=10):
        """获取商品评论数据"""
        try:
            # 构建评论API URL
            url = f"https://club.jd.com/comment/productPageComments.action?productId={product_id}&score=0&sortType=5&page={page}&pageSize={page_size}"
            
            # 设置Referer和头信息
            headers = {
                'Referer': f'https://item.jd.com/{product_id}.html',
                'User-Agent': get_random_ua()
            }
            
            response = requests.get(url, headers=headers, timeout=CRAWLER_CONFIG['TIMEOUT'])
            response.raise_for_status()
            
            # 解析JSON响应
            comment_data = response.json()
            
            return {
                'count': comment_data.get('productCommentSummary', {}).get('commentCount', 0),
                'good_count': comment_data.get('productCommentSummary', {}).get('goodCount', 0),
                'poor_count': comment_data.get('productCommentSummary', {}).get('poorCount', 0),
                'average_score': comment_data.get('productCommentSummary', {}).get('averageScore', 0),
                'comments': comment_data.get('comments', [])[:5]  # 只保留前5条评论
            }
            
        except Exception as e:
            self.logger.error(f"获取商品评论时出错 (ID: {product_id}): {e}")
            return {
                'count': 0,
                'good_count': 0,
                'poor_count': 0,
                'average_score': 0,
                'comments': []
            }
    
    def get_category_products(self, category_id, max_pages=None):
        """爬取特定分类下的商品"""
        if max_pages is None:
            max_pages = CRAWLER_CONFIG['MAX_PAGES']
        
        all_products = []
        
        for page in range(1, max_pages + 1):
            try:
                self.logger.info(f"正在爬取分类 ID '{category_id}' 的第 {page} 页")
                
                # 构建分类URL (页码*2-1为京东的分页规则)
                url = f'https://list.jd.com/list.html?cat={category_id}&page={2*page-1}'
                
                if SELENIUM_CONFIG['USE_SELENIUM']:
                    products_on_page = self._parse_with_selenium(url)
                else:
                    products_on_page = self._parse_with_requests(url)
                
                if not products_on_page:
                    self.logger.warning(f"第 {page} 页没有找到商品,停止爬取")
                    break
                
                # 添加分类信息
                for product in products_on_page:
                    product['category_id'] = category_id
                
                self.logger.info(f"第 {page} 页找到 {len(products_on_page)} 个商品")
                all_products.extend(products_on_page)
                
                # 随机延迟
                delay = random_delay()
                self.logger.debug(f"随机延迟 {delay:.2f} 秒")
                
            except Exception as e:
                self.logger.error(f"爬取分类第 {page} 页时出错: {e}")
                break
        
        self.logger.info(f"总共爬取了 {len(all_products)} 个商品")
        return all_products

9.5 数据分析部分

analyzer.py:

import pandas as pd
import numpy as np
from datetime import datetime
import logging
from spider.db import Database

class ProductAnalyzer:
    def __init__(self):
        """初始化数据分析器"""
        self.logger = logging.getLogger('jd_spider')
        self.db = Database()
        self.db.connect()
        self.data = None
    
    def load_data(self, force_reload=False):
        """从数据库加载数据"""
        if self.data is None or force_reload:
            products = self.db.get_all_products()
            if products:
                self.data = pd.DataFrame(products)
                # 转换价格为数值类型
                if 'price_number' in self.data.columns:
                    self.data['price_number'] = pd.to_numeric(self.data['price_number'], errors='coerce')
                # 转换时间戳
                if 'timestamp' in self.data.columns:
                    self.data['timestamp'] = pd.to_datetime(self.data['timestamp'], errors='coerce')
                
                self.logger.info(f"已加载 {len(self.data)} 条商品数据")
            else:
                self.logger.warning("没有数据可加载")
                self.data = pd.DataFrame()
        
        return self.data
    
    def get_basic_stats(self):
        """获取基本统计信息"""
        if self.data is None or self.data.empty:
            self.load_data()
        
        if self.data.empty:
            return {}
        
        stats = {
            'total_products': len(self.data),
            'unique_shops': self.data['shop'].nunique(),
            'avg_price': self.data['price_number'].mean(),
            'median_price': self.data['price_number'].median(),
            'min_price': self.data['price_number'].min(),
            'max_price': self.data['price_number'].max(),
            'price_std': self.data['price_number'].std(),
            'avg_commits': self.data['commits'].mean(),
        }
        
        # 添加最新和最早爬取日期
        if 'timestamp' in self.data.columns:
            stats['latest_date'] = self.data['timestamp'].max()
            stats['earliest_date'] = self.data['timestamp'].min()
        
        return stats
    
    def get_price_distribution(self, bins=10):
        """获取价格分布"""
        if self.data is None or self.data.empty:
            self.load_data()
        
        if self.data.empty:
            return {}
        
        # 计算价格区间
        price_bins = pd.cut(self.data['price_number'], bins=bins)
        price_dist = price_bins.value_counts().sort_index()
        
        # 转换为dict
        price_ranges = [str(interval) for interval in price_dist.index]
        counts = price_dist.values.tolist()
        
        return {
            'price_ranges': price_ranges,
            'counts': counts,
            'bin_edges': price_bins.cat.categories.tolist()
        }
    
    def get_top_shops(self, limit=10):
        """获取店铺商品数量排名"""
        if self.data is None or self.data.empty:
            self.load_data()
        
        if self.data.empty:
            return []
        
        shop_counts = self.data['shop'].value_counts().head(limit)
        
        return [
            {'shop': shop, 'count': count}
            for shop, count in shop_counts.items()
        ]
    
    def get_commit_vs_price(self):
        """分析评论数与价格的关系"""
        if self.data is None or self.data.empty:
            self.load_data()
        
        if self.data.empty:
            return {}
        
        # 确保数据类型正确
        self.data['commits'] = pd.to_numeric(self.data['commits'], errors='coerce')
        
        # 计算评论数和价格的相关性
        correlation = self.data['commits'].corr(self.data['price_number'])
        
        # 为可视化准备数据
        data_for_plot = self.data[['price_number', 'commits']].copy()
        
        # 为了更好的可视化,可以对评论数进行对数转换
        data_for_plot['log_commits'] = np.log1p(data_for_plot['commits'])
        
        return {
            'correlation': correlation,
            'data': data_for_plot.to_dict(orient='records'),
            'price_vs_commits': self.data[['price_number', 'commits']].values.tolist()
        }
    
    def analyze_titles(self):
        """分析商品标题"""
        if self.data is None or self.data.empty:
            self.load_data()
        
        if self.data.empty:
            return {}
        
        # 标题长度分析
        self.data['title_length'] = self.data['title'].astype(str).apply(len)
        
        # 标题关键词分析 (简化版,真实场景可能需要NLP库)
        title_words = []
        for title in self.data['title']:
            if isinstance(title, str):
                words = title.split()
                title_words.extend(words)
        
        # 计算词频
        word_counts = pd.Series(title_words).value_counts().head(20)
        
        return {
            'avg_title_length': self.data['title_length'].mean(),
            'max_title_length': self.data['title_length'].max(),
            'min_title_length': self.data['title_length'].min(),
            'title_length_distribution': self.data['title_length'].value_counts().sort_index().to_dict(),
            'top_words': word_counts.to_dict()
        }
    
    def close(self):
        """关闭数据库连接"""
        if self.db:
            self.db.close()

visualize.py:

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from matplotlib.font_manager import FontProperties
import logging
import os
from analyzer import ProductAnalyzer

# 配置中文字体
try:
    font = FontProperties(fname=r'C:\Windows\Fonts\simhei.ttf')
except:
    # 对于Linux/Mac,尝试使用系统字体
    font = FontProperties()

class DataVisualizer:
    def __init__(self, analyzer=None, output_dir='./output'):
        """初始化数据可视化器"""
        self.logger = logging.getLogger('jd_spider')
        
        if analyzer is None:
            self.analyzer = ProductAnalyzer()
        else:
            self.analyzer = analyzer
        
        self.output_dir = output_dir
        
        # 创建输出目录
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)
        
        # 设置可视化样式
        sns.set(style="whitegrid")
        plt.rcParams['figure.figsize'] = (12, 8)
    
    def plot_price_distribution(self, bins=20, save=True):
        """绘制价格分布直方图"""
        data = self.analyzer.load_data()
        
        if data.empty:
            self.logger.warning("没有数据可用于可视化")
            return None
        
        # 创建图形
        plt.figure(figsize=(12, 8))
        
        # 绘制价格分布直方图
        sns.histplot(data=data, x='price_number', bins=bins, kde=True)
        plt.title('商品价格分布', fontproperties=font, fontsize=16)
        plt.xlabel('价格 (元)', fontproperties=font, fontsize=12)
        plt.ylabel('商品数量', fontproperties=font, fontsize=12)
        
        # 添加一些统计信息文本
        avg_price = data['price_number'].mean()
        median_price = data['price_number'].median()
        
        plt.axvline(avg_price, color='r', linestyle='--', label=f'平均价: {avg_price:.2f}元')
        plt.axvline(median_price, color='g', linestyle='--', label=f'中位价: {median_price:.2f}元')
        
        plt.legend(prop=font)
        
        # 保存图像
        if save:
            file_path = os.path.join(self.output_dir, 'price_distribution.png')
            plt.savefig(file_path, dpi=300, bbox_inches='tight')
            self.logger.info(f"价格分布图已保存至: {file_path}")
        
        return plt.gcf()
    
    def plot_top_shops(self, limit=10, save=True):
        """绘制店铺商品数量排名"""
        top_shops = self.analyzer.get_top_shops(limit)
        
        if not top_shops:
            self.logger.warning("没有数据可用于可视化")
            return None
        
        # 转换数据格式
        shops = [item['shop'] for item in top_shops]
        counts = [item['count'] for item in top_shops]
        
        # 创建图形
        plt.figure(figsize=(12, 8))
        
        # 水平条形图
        bars = plt.barh(range(len(shops)), counts, align='center')
        plt.yticks(range(len(shops)), shops, fontproperties=font)
        
        # 添加数据标签
        for bar in bars:
            width = bar.get_width()
            plt.text(width + 0.3, bar.get_y() + bar.get_height()/2, f'{width:.0f}',
                    ha='left', va='center')
        
        plt.title('商品数量Top店铺', fontproperties=font, fontsize=16)
        plt.xlabel('商品数量', fontproperties=font, fontsize=12)
        plt.ylabel('店铺名称', fontproperties=font, fontsize=12)
        
        # 保存图像
        if save:
            file_path = os.path.join(self.output_dir, 'top_shops.png')
            plt.savefig(file_path, dpi=300, bbox_inches='tight')
            self.logger.info(f"店铺排名图已保存至: {file_path}")
        
        return plt.gcf()
    
    def plot_price_vs_commits(self, save=True):
        """绘制价格与评论数关系散点图"""
        data = self.analyzer.load_data()
        
        if data.empty:
            self.logger.warning("没有数据可用于可视化")
            return None
        
        # 创建图形
        plt.figure(figsize=(12, 8))
        
        # 对评论数进行对数变换,使散点图更清晰
        data['log_commits'] = np.log1p(data['commits'])
        
        # 绘制散点图
        sns.scatterplot(data=data, x='price_number', y='log_commits', alpha=0.6)
        
        plt.title('商品价格与评论数关系', fontproperties=font, fontsize=16)
        plt.xlabel('价格 (元)', fontproperties=font, fontsize=12)
        plt.ylabel('评论数 (对数)', fontproperties=font, fontsize=12)
        
        # 添加相关性信息
        correlation = data['price_number'].corr(data['commits'])
        plt.annotate(f'相关系数: {correlation:.3f}', 
                    xy=(0.05, 0.95), xycoords='axes fraction',
                    fontproperties=font, fontsize=12,
                    bbox=dict(boxstyle="round,pad=0.3", fc="white", ec="grey", alpha=0.8))
        
        # 保存图像
        if save:
            file_path = os.path.join(self.output_dir, 'price_vs_commits.png')
            plt.savefig(file_path, dpi=300, bbox_inches='tight')
            self.logger.info(f"价格评论关系图已保存至: {file_path}")
        
        return plt.gcf()
    
    def plot_title_word_cloud(self, save=True):
        """绘制标题词云"""
        try:
            from wordcloud import WordCloud
            import jieba  # 用于中文分词
            
            data = self.analyzer.load_data()
            
            if data.empty:
                self.logger.warning("没有数据可用于可视化")
                return None
            
            # 合并所有标题
            all_titles = ' '.join(data['title'].astype(str))
            
            # 中文分词
            words = ' '.join(jieba.cut(all_titles))
            
            # 创建词云
            wordcloud = WordCloud(
                font_path=font.get_file() if hasattr(font, 'get_file') else None,
                width=1000, height=600,
                background_color='white',
                max_words=100
            ).generate(words)
            
            # 绘制词云
            plt.figure(figsize=(12, 8))
            plt.imshow(wordcloud, interpolation='bilinear')
            plt.axis('off')
            plt.title('商品标题词云', fontproperties=font, fontsize=16)
            
            # 保存图像
            if save:
                file_path = os.path.join(self.output_dir, 'title_word_cloud.png')
                plt.savefig(file_path, dpi=300, bbox_inches='tight')
                self.logger.info(f"标题词云已保存至: {file_path}")
            
            return plt.gcf()
            
        except ImportError:
            self.logger.error("无法绘制词云,请安装 wordcloud 和 jieba 包")
            return None
    
    def generate_all_visualizations(self):
        """生成所有可视化图表"""
        self.plot_price_distribution()
        self.plot_top_shops()
        self.plot_price_vs_commits()
        self.plot_title_word_cloud()
        
        self.logger.info("所有可视化图表已生成")
    
    def close(self):
        """关闭分析器"""
        if self.analyzer:
            self.analyzer.close()

9.6 主程序

main.py:

import argparse
import sys
import logging
import os
import json
from spider.utils import setup_logger
from spider.crawler import JDCrawler
from spider.db import Database
from analysis.analyzer import ProductAnalyzer
from analysis.visualize import DataVisualizer

def main():
    """主函数"""
    # 设置日志
    logger = setup_logger()
    
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='京东商品爬虫')
    parser.add_argument('--mode', '-m', choices=['crawl', 'analyze', 'both'], default='both',
                        help='运行模式: 爬取、分析或两者都执行')
    parser.add_argument('--keyword', '-k', type=str, help='搜索关键词')
    parser.add_argument('--category', '-c', type=str, help='商品分类ID')
    parser.add_argument('--pages', '-p', type=int, default=5, help='爬取页数')
    parser.add_argument('--output', '-o', type=str, default='./output', help='输出目录')
    parser.add_argument('--visualize', '-v', action='store_true', help='生成可视化图表')
    parser.add_argument('--details', '-d', action='store_true', help='爬取商品详情')
    
    args = parser.parse_args()
    
    # 创建输出目录
    if not os.path.exists(args.output):
        os.makedirs(args.output)
    
    # 初始化数据库
    db = Database()
    if not db.connect():
        logger.error("无法连接到数据库,程序退出")
        sys.exit(1)
    
    # 初始化爬虫
    if args.mode in ['crawl', 'both']:
        crawler = JDCrawler()
        
        try:
            products = []
            
            # 根据参数确定爬取方式
            if args.keyword:
                logger.info(f"开始爬取关键词: '{args.keyword}'")
                products = crawler.search_products(args.keyword, args.pages)
            elif args.category:
                logger.info(f"开始爬取分类ID: '{args.category}'")
                products = crawler.get_category_products(args.category, args.pages)
            else:
                logger.error("必须指定关键词(-k)或分类ID(-c)")
                sys.exit(1)
            
            logger.info(f"爬取完成,获取 {len(products)} 个商品")
            
            # 保存数据到数据库
            saved_count = db.save_products(products)
            logger.info(f"成功保存 {saved_count} 个商品到数据库")
            
            # 爬取商品详情(如果指定)
            if args.details and products:
                logger.info("开始爬取商品详情...")
                for i, product in enumerate(products[:10]):  # 只爬取前10个商品详情
                    logger.info(f"爬取商品详情 {i+1}/10: {product['title'][:30]}...")
                    details = crawler.get_product_details(product['product_id'])
                    if details:
                        # 保存详情到JSON文件
                        details_file = os.path.join(args.output, f"product_{product['product_id']}.json")
                        with open(details_file, 'w', encoding='utf-8') as f:
                            json.dump(details, f, ensure_ascii=False, indent=4)
                        logger.info(f"商品详情已保存到: {details_file}")
                
                logger.info("商品详情爬取完成")
            
        except KeyboardInterrupt:
            logger.info("用户中断,停止爬取")
        except Exception as e:
            logger.error(f"爬取过程中出错: {e}")
        finally:
            # 关闭爬虫
            crawler.close()
    
    # 数据分析
    if args.mode in ['analyze', 'both']:
        analyzer = ProductAnalyzer()
        
        # 加载数据
        data = analyzer.load_data()
        
        if data.empty:
            logger.warning("没有数据可分析")
        else:
            logger.info("开始分析数据...")
            
            # 获取基本统计信息
            stats = analyzer.get_basic_stats()
            
            # 打印统计信息
            logger.info(f"商品总数: {stats.get('total_products', 0)}")
            logger.info(f"店铺数量: {stats.get('unique_shops', 0)}")
            logger.info(f"平均价格: {stats.get('avg_price', 0):.2f}元")
            logger.info(f"价格范围: {stats.get('min_price', 0):.2f} - {stats.get('max_price', 0):.2f}元")
            logger.info(f"平均评论数: {stats.get('avg_commits', 0):.1f}")
            
            # 获取店铺排名
            top_shops = analyzer.get_top_shops(10)
            logger.info("\nTop 10 店铺:")
            for i, shop in enumerate(top_shops):
                logger.info(f"{i+1}. {shop['shop']}: {shop['count']}个商品")
            
            # 保存分析结果到JSON
            analysis_file = os.path.join(args.output, "analysis_results.json")
            with open(analysis_file, 'w', encoding='utf-8') as f:
                json.dump({
                    'basic_stats': stats,
                    'top_shops': top_shops,
                    'price_distribution': analyzer.get_price_distribution(),
                    'title_analysis': analyzer.analyze_titles()
                }, f, ensure_ascii=False, indent=4, default=str)
            
            logger.info(f"分析结果已保存到: {analysis_file}")
            
            # 可视化
            if args.visualize:
                logger.info("开始生成可视化图表...")
                visualizer = DataVisualizer(analyzer, args.output)
                visualizer.generate_all_visualizations()
                visualizer.close()
                logger.info(f"可视化图表已保存到目录: {args.output}")
    
    # 关闭数据库连接
    db.close()
    logger.info("程序执行完毕")

if __name__ == "__main__":
    main()

9.7 使用方法

  1. 创建项目目录结构:

    mkdir -p jd_spider/spider jd_spider/analysis
    touch jd_spider/spider/__init__.py jd_spider/analysis/__init__.py
    
  2. 将代码复制到相应文件:

    • jd_spider/spider/config.py
    • jd_spider/spider/utils.py
    • jd_spider/spider/db.py
    • jd_spider/spider/crawler.py
    • jd_spider/analysis/analyzer.py
    • jd_spider/analysis/visualize.py
    • jd_spider/main.py
  3. 安装依赖包:

    pip install -r requirements.txt
    
  4. 使用示例:

    # 爬取并分析笔记本电脑数据(5页)
    python main.py --keyword "笔记本电脑" --pages 5 --visualize
    
    # 仅爬取数据
    python main.py --mode crawl --keyword "智能手表" --pages 3
    
    # 爬取指定分类商品
    python main.py --category "670,671,672" --pages 2
    
    # 仅进行数据分析和可视化
    python main.py --mode analyze --visualize
    
    # 爬取详细信息
    python main.py --keyword "耳机" --pages 2 --details
    

10. 实战项目:新闻聚合爬虫

接下来我们将构建一个新闻聚合爬虫,从多个新闻网站抓取最新新闻,并提供分类和搜索功能。

10.1 项目设计与需求

项目目标:

  • 抓取多个新闻网站的最新新闻
  • 自动分类和聚合相似新闻
  • 提供搜索功能
  • 生成简单的Web界面展示结果

目录结构:

news_spider/
├── spiders/
│   ├── __init__.py
│   ├── base_spider.py   # 爬虫基类
│   ├── sina_spider.py   # 新浪新闻爬虫
│   ├── sohu_spider.py   # 搜狐新闻爬虫
│   └── tencent_spider.py # 腾讯新闻爬虫
├── utils/
│   ├── __init__.py
│   ├── db.py            # 数据库操作
│   ├── cleaner.py       # 文本清洗
│   └── classifier.py    # 新闻分类
├── web/
│   ├── __init__.py
│   ├── app.py           # Web应用
│   ├── static/          # 静态资源
│   └── templates/       # HTML模板
├── config.py            # 配置文件
├── crawler.py           # 爬虫控制器
├── main.py              # 主程序
└── requirements.txt     # 依赖包

10.2 爬虫基类与通用组件

config.py:

# news_spider/config.py
import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 爬虫配置
SPIDER_CONFIG = {
    'INTERVAL': int(os.getenv('SPIDER_INTERVAL', 3600)),  # 爬取间隔(秒)
    'REQUEST_DELAY': float(os.getenv('REQUEST_DELAY', 1.0)),  # 请求间隔(秒)
    'MAX_PAGES': int(os.getenv('MAX_PAGES', 3)),          # 每个分类爬取的最大页数
    'USER_AGENT_ROTATE': os.getenv('USER_AGENT_ROTATE', 'True').lower() == 'true',
    'HEADERS': {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    }
}

# 启用的爬虫
ENABLED_SPIDERS = [
    'sina',
    'sohu',
    'tencent'
]

# 数据库配置
DB_CONFIG = {
    'MONGO_URI': os.getenv('MONGO_URI', 'mongodb://localhost:27017/'),
    'DB_NAME': os.getenv('DB_NAME', 'news_spider'),
    'COLLECTION': os.getenv('COLLECTION', 'news'),
}

# 分类配置
CATEGORIES = {
    'tech': ['科技', '互联网', 'IT', '数码', '技术', '软件', '编程', '人工智能', 'AI'],
    'finance': ['财经', '金融', '股市', '理财', '经济', '投资', '基金', '债券'],
    'sports': ['体育', '足球', '篮球', 'NBA', 'CBA', '棋牌', '乒乓球', '羽毛球'],
    'entertainment': ['娱乐', '电影', '电视剧', '明星', '综艺', '音乐', '艺人'],
    'society': ['社会', '法制', '法治', '案件', '警方', '事件', '热点'],
    'world': ['国际', '世界', '全球', '海外', '外交', '环球', '各国'],
    'military': ['军事', '国防', '武器', '军队', '战争', '导弹', '航母', '战机'],
    'health': ['健康', '医疗', '医院', '疾病', '养生', '保健', '医生', '药品'],
    'edu': ['教育', '学校', '大学', '高考', '考试', '学习', '培训', '留学'],
    'car': ['汽车', '车型', '自动驾驶', '电动车', '新能源车', '交通'],
    'house': ['房产', '房地产', '楼市', '房价', '装修', '买房', '租房'],
    'travel': ['旅游', '旅行', '景点', '度假', '酒店', '民宿', '出游'],
}

# 网站配置
SITES = {
    'sina': {
        'name': '新浪新闻',
        'url': 'https://news.sina.com.cn/',
    },
    'sohu': {
        'name': '搜狐新闻',
        'url': 'https://news.sohu.com/',
    },
    'tencent': {
        'name': '腾讯新闻',
        'url': 'https://news.qq.com/',
    },
}

# Web应用配置
WEB_CONFIG = {
    'PORT': int(os.getenv('WEB_PORT', 5000)),
    'HOST': os.getenv('WEB_HOST', '127.0.0.1'),
    'DEBUG': os.getenv('WEB_DEBUG', 'True').lower() == 'true',
    'ITEMS_PER_PAGE': int(os.getenv('ITEMS_PER_PAGE', 20)),
}

# 日志配置
LOG_CONFIG = {
    'LEVEL': os.getenv('LOG_LEVEL', 'INFO'),
    'FILE': os.getenv('LOG_FILE', 'news_spider.log'),
}

utils/db.py:

# news_spider/utils/db.py
import pymongo
from datetime import datetime
import logging
from config import DB_CONFIG

class NewsDatabase:
    def __init__(self):
        """初始化数据库连接"""
        self.client = None
        self.db = None
        self.collection = None
        self.logger = logging.getLogger('news_spider')
    
    def connect(self):
        """连接到MongoDB"""
        try:
            self.client = pymongo.MongoClient(DB_CONFIG['MONGO_URI'])
            self.db = self.client[DB_CONFIG['DB_NAME']]
            self.collection = self.db[DB_CONFIG['COLLECTION']]
            
            # 创建索引
            if 'url_1' not in self.collection.index_information():
                self.collection.create_index([('url', pymongo.ASCENDING)], unique=True)
            
            if 'publish_time_1' not in self.collection.index_information():
                self.collection.create_index([('publish_time', pymongo.DESCENDING)])
            
            if 'category_1' not in self.collection.index_information():
                self.collection.create_index([('category', pymongo.ASCENDING)])
            
            if 'source_1' not in self.collection.index_information():
                self.collection.create_index([('source', pymongo.ASCENDING)])
            
            # 创建文本索引用于搜索
            if 'text_index' not in self.collection.index_information():
                self.collection.create_index([
                    ('title', 'text'), 
                    ('summary', 'text'),
                    ('content', 'text')
                ], name='text_index')
            
            self.logger.info(f"成功连接到MongoDB: {DB_CONFIG['DB_NAME']}.{DB_CONFIG['COLLECTION']}")
            return True
        except Exception as e:
            self.logger.error(f"连接MongoDB失败: {e}")
            return False
    
    def close(self):
        """关闭数据库连接"""
        if self.client:
            self.client.close()
            self.logger.info("MongoDB连接已关闭")
    
    def save_news(self, news_item):
        """保存单个新闻,如果存在则更新"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return False
        
        # 设置爬取时间
        news_item['crawl_time'] = datetime.now()
        
        try:
            # 使用URL作为唯一标识
            result = self.collection.update_one(
                {'url': news_item['url']},
                {'$set': news_item},
                upsert=True
            )
            
            if result.upserted_id:
                self.logger.debug(f"新增新闻: {news_item['title']}")
                return True
            elif result.modified_count > 0:
                self.logger.debug(f"更新新闻: {news_item['title']}")
                return True
            else:
                self.logger.debug(f"新闻已存在且无变化: {news_item['title']}")
                return False
        except Exception as e:
            self.logger.error(f"保存新闻失败: {e}")
            return False
    
    def save_many_news(self, news_items):
        """批量保存新闻"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return 0
        
        saved_count = 0
        for item in news_items:
            if self.save_news(item):
                saved_count += 1
        
        self.logger.info(f"已保存 {saved_count}/{len(news_items)} 条新闻")
        return saved_count
    
    def get_news_count(self):
        """获取新闻总数"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return 0
        
        return self.collection.count_documents({})
    
    def get_latest_news(self, limit=20, skip=0, category=None, source=None):
        """获取最新新闻"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return []
        
        # 构建查询条件
        query = {}
        if category:
            query['category'] = category
        if source:
            query['source'] = source
        
        # 查询并按发布时间排序
        cursor = self.collection.find(query).sort('publish_time', pymongo.DESCENDING).skip(skip).limit(limit)
        return list(cursor)
    
    def search_news(self, keyword, limit=20, skip=0, category=None, source=None):
        """搜索新闻"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return []
        
        # 构建查询条件
        query = {'$text': {'$search': keyword}}
        if category:
            query['category'] = category
        if source:
            query['source'] = source
        
        # 文本搜索查询
        cursor = self.collection.find(
            query,
            {'score': {'$meta': 'textScore'}}  # 添加相关性分数
        ).sort([
            ('score', {'$meta': 'textScore'}),  # 按相关性排序
            ('publish_time', pymongo.DESCENDING)  # 其次按时间排序
        ]).skip(skip).limit(limit)
        
        return list(cursor)
    
    def get_category_stats(self):
        """获取各分类新闻数量"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return {}
        
        pipeline = [
            {'$group': {'_id': '$category', 'count': {'$sum': 1}}},
            {'$sort': {'count': -1}}
        ]
        
        result = list(self.collection.aggregate(pipeline))
        return {item['_id']: item['count'] for item in result}
    
    def get_source_stats(self):
        """获取各来源新闻数量"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return {}
        
        pipeline = [
            {'$group': {'_id': '$source', 'count': {'$sum': 1}}},
            {'$sort': {'count': -1}}
        ]
        
        result = list(self.collection.aggregate(pipeline))
        return {item['_id']: item['count'] for item in result}
    
    def get_timespan_stats(self):
        """获取每日新闻数量趋势"""
        if not self.collection:
            self.logger.error("数据库未连接")
            return {}
        
        pipeline = [
            {'$project': {
                'date': {'$dateToString': {'format': '%Y-%m-%d', 'date': '$publish_time'}}
            }},
            {'$group': {'_id': '$date', 'count': {'$sum': 1}}},
            {'$sort': {'_id': 1}}
        ]
        
        result = list(self.collection.aggregate(pipeline))
        return {item['_id']: item['count'] for item in result}

utils/cleaner.py:

# news_spider/utils/cleaner.py
import re
import html
from bs4 import BeautifulSoup
import logging

class TextCleaner:
    def __init__(self):
        """初始化文本清洗器"""
        self.logger = logging.getLogger('news_spider')
    
    def clean_html(self, html_content):
        """清洗HTML内容,提取纯文本"""
        if not html_content:
            return ""
        
        try:
            # 使用BeautifulSoup解析HTML
            soup = BeautifulSoup(html_content, 'html.parser')
            
            # 移除脚本和样式元素
            for script in soup(["script", "style", "iframe"]):
                script.extract()
            
            # 获取文本
            text = soup.get_text(separator=' ')
            
            # 清理文本
            return self.clean_text(text)
            
        except Exception as e:
            self.logger.error(f"清洗HTML内容时出错: {e}")
            return ""
    
    def clean_text(self, text):
        """清洗文本内容"""
        if not text:
            return ""
        
        try:
            # 解码HTML实体
            text = html.unescape(text)
            
            # 移除多余空白字符
            text = re.sub(r'\s+', ' ', text).strip()
            
            # 移除特殊字符和控制字符
            text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text)
            
            # 移除常见新闻页面中的特定文本
            patterns_to_remove = [
                r'责任编辑:[\w\s]+',
                r'来源:[\w\s]+',
                r'作者:[\w\s]+',
                r'原标题:[\w\s]+',
                r'编辑:[\w\s]+',
                r'记者:[\w\s]+',
                r'©[\w\s]+版权所有',
                r'图片来源:[\w\s]+',
                r'点击查看更多',
                r'相关阅读',
                r'更多精彩内容',
                r'收藏本文',
                r'进入[\w\s]+专题',
                r'扫描二维码'
            ]
            
            for pattern in patterns_to_remove:
                text = re.sub(pattern, '', text)
            
            return text.strip()
            
        except Exception as e:
            self.logger.error(f"清洗文本内容时出错: {e}")
            return text
    
    def extract_summary(self, content, max_length=200):
        """从内容中提取摘要"""
        if not content:
            return ""
        
        # 清理内容
        clean_content = self.clean_text(content)
        
        # 截取指定长度
        if len(clean_content) > max_length:
            # 尝试在句子结束处截断
            for i in range(max_length, max_length - 50, -1):
                if i < len(clean_content) and clean_content[i] in ['.', '。', '!', '!', '?', '?']:
                    return clean_content[:i+1]
            
            # 如果没找到合适的句子结束位置,直接截断并加上省略号
            summary = clean_content[:max_length].strip()
            if len(clean_content) > max_length:
                summary += '...'
            return summary
        else:
            return clean_content

utils/classifier.py:

# news_spider/utils/classifier.py
import logging
import re
from config import CATEGORIES

class NewsClassifier:
    def __init__(self):
        """初始化新闻分类器"""
        self.logger = logging.getLogger('news_spider')
        self.categories = CATEGORIES
    
    def classify(self, title, content=None):
        """
        根据标题和内容分类新闻
        :param title: 新闻标题
        :param content: 新闻内容(可选)
        :return: 类别名称,如 'tech', 'finance' 等
        """
        if not title:
            return 'other'
        
        # 组合标题和内容(如果有)
        text = title
        if content:
            text = title + ' ' + content[:500]  # 只使用内容的前500个字符
        
        # 计算各类别的匹配分数
        scores = {}
        for category, keywords in self.categories.items():
            score = 0
            for keyword in keywords:
                # 计算关键词在文本中出现的次数
                count = len(re.findall(keyword, text, re.IGNORECASE))
                
                # 标题中出现的权重更高
                title_count = len(re.findall(keyword, title, re.IGNORECASE))
                
                # 计算得分 (标题出现权重为3,内容出现权重为1)
                score += title_count * 3 + (count - title_count)
            
            scores[category] = score
        
        # 找出得分最高的类别
        if scores:
            max_category = max(scores.items(), key=lambda x: x[1])
            
            # 如果最高分大于0,则返回该类别
            if max_category[1] > 0:
                return max_category[0]
        
        # 默认类别为other
        return 'other'
    
    def get_category_display_name(self, category):
        """获取分类的显示名称"""
        category_display_names = {
            'tech': '科技',
            'finance': '财经',
            'sports': '体育',
            'entertainment': '娱乐',
            'society': '社会',
            'world': '国际',
            'military': '军事',
            'health': '健康',
            'edu': '教育',
            'car': '汽车',
            'house': '房产',
            'travel': '旅游',
            'other': '其他'
        }
        
        return category_display_names.get(category, '其他')

10.3 爬虫实现

spiders/base_spider.py:

# news_spider/spiders/base_spider.py
import requests
import logging
import random
import time
from datetime import datetime
from abc import ABC, abstractmethod
from utils.cleaner import TextCleaner
from utils.classifier import NewsClassifier
from config import SPIDER_CONFIG

class BaseSpider(ABC):
    def __init__(self, name, source_name=None, base_url=None):
        """
        初始化爬虫基类
        :param name: 爬虫名称
        :param source_name: 新闻来源名称
        :param base_url: 网站基础URL
        """
        self.name = name
        self.source_name = source_name or name
        self.base_url = base_url
        self.logger = logging.getLogger('news_spider')
        self.cleaner = TextCleaner()
        self.classifier = NewsClassifier()
        self.session = requests.Session()
        
        # 设置请求头
        self.session.headers.update(SPIDER_CONFIG['HEADERS'])
    
    def get_random_headers(self):
        """获取随机请求头"""
        if SPIDER_CONFIG['USER_AGENT_ROTATE']:
            user_agents = [
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
                'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
                'Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1'
            ]
            return {'User-Agent': random.choice(user_agents)}
        return {}
    
    def request_page(self, url, method='get', params=None, data=None, timeout=10, retry=3):
        """
        请求页面
        :param url: 请求URL
        :param method: 请求方法,默认get
        :param params: 请求参数
        :param data: 请求数据
        :param timeout: 超时时间
        :param retry: 重试次数
        :return: 响应对象或None
        """
        # 随机延迟
        time.sleep(SPIDER_CONFIG['REQUEST_DELAY'])
        
        for i in range(retry):
            try:
                headers = self.get_random_headers()
                
                if method.lower() == 'get':
                    response = self.session.get(url, params=params, headers=headers, timeout=timeout)
                elif method.lower() == 'post':
                    response = self.session.post(url, params=params, data=data, headers=headers, timeout=timeout)
                else:
                    raise ValueError(f"不支持的请求方法: {method}")
                
                response.raise_for_status()  # 如果状态码不是200,将引发异常
                
                # 设置编码
                if response.encoding == 'ISO-8859-1':
                    response.encoding = response.apparent_encoding
                
                return response
                
            except requests.exceptions.RequestException as e:
                self.logger.warning(f"请求失败 ({i+1}/{retry}): {url} - {e}")
                if i == retry - 1:
                    self.logger.error(f"请求失败次数达到上限: {url}")
                    return None
                time.sleep(1)  # 延迟后重试
        
        return None
    
    def parse_date(self, date_str):
        """
        解析日期字符串为datetime对象
        :param date_str: 日期字符串
        :return: datetime对象或当前时间
        """
        try:
            # 尝试各种常见的日期格式
            formats = [
                '%Y-%m-%d %H:%M:%S',
                '%Y-%m-%d %H:%M',
                '%Y年%m月%d日 %H:%M:%S',
                '%Y年%m月%d日 %H:%M',
                '%Y年%m月%d日',
                '%Y/%m/%d %H:%M:%S',
                '%Y/%m/%d %H:%M',
                '%Y/%m/%d',
                '%m-%d %H:%M',
                '%m月%d日 %H:%M'
            ]
            
            for fmt in formats:
                try:
                    # 尝试解析完整日期时间
                    return datetime.strptime(date_str, fmt)
                except ValueError:
                    continue
            
            # 处理特殊格式,如"X小时前"、"X分钟前"等
            if '分钟前' in date_str:
                minutes = int(date_str.replace('分钟前', '').strip())
                return datetime.now().replace(microsecond=0) - timedelta(minutes=minutes)
            
            if '小时前' in date_str:
                hours = int(date_str.replace('小时前', '').strip())
                return datetime.now().replace(microsecond=0) - timedelta(hours=hours)
            
            if '昨天' in date_str:
                time_part = date_str.replace('昨天', '').strip()
                if ':' in time_part:
                    hour, minute = map(int, time_part.split(':'))
                    yesterday = datetime.now().replace(microsecond=0) - timedelta(days=1)
                    return yesterday.replace(hour=hour, minute=minute, second=0)
                else:
                    return datetime.now().replace(microsecond=0) - timedelta(days=1)
            
            # 如果都无法识别,记录警告并返回当前时间
            self.logger.warning(f"无法解析日期字符串: '{date_str}',使用当前时间")
            return datetime.now().replace(microsecond=0)
            
        except Exception as e:
            self.logger.error(f"解析日期出错: {e}")
            return datetime.now().replace(microsecond=0)
    
    def create_news_item(self, url, title, content=None, html_content=None, summary=None, 
                         publish_time=None, author=None, source=None, category=None, tags=None, image_url=None):
        """
        创建新闻项
        :param url: 新闻URL
        :param title: 新闻标题
        :param content: 新闻内容(纯文本)
        :param html_content: 新闻HTML内容
        :param summary: 新闻摘要
        :param publish_time: 发布时间
        :param author: 作者
        :param source: 来源
        :param category: 分类
        :param tags: 标签列表
        :param image_url: 图片URL
        :return: 新闻项字典
        """
        # 清洗标题
        title = self.cleaner.clean_text(title)
        
        # 从HTML内容提取纯文本(如果提供了HTML内容但没有纯文本)
        if not content and html_content:
            content = self.cleaner.clean_html(html_content)
        elif content:
            content = self.cleaner.clean_text(content)
        
        # 生成摘要(如果没有提供)
        if not summary and content:
            summary = self.cleaner.extract_summary(content)
        elif summary:
            summary = self.cleaner.clean_text(summary)
        
        # 分类新闻(如果没有提供分类)
        if not category:
            category = self.classifier.classify(title, content)
        
        # 默认来源为爬虫名称
        if not source:
            source = self.source_name
        
        # 默认发布时间为当前时间
        if not publish_time:
            publish_time = datetime.now().replace(microsecond=0)
        elif isinstance(publish_time, str):
            publish_time = self.parse_date(publish_time)
        
        # 创建新闻项
        news_item = {
            'url': url,
            'title': title,
            'content': content,
            'summary': summary,
            'html_content': html_content,
            'publish_time': publish_time,
            'author': author,
            'source': source,
            'category': category,
            'tags': tags or [],
            'image_url': image_url,
            'spider': self.name,
            'crawl_time': datetime.now().replace(microsecond=0)
        }
        
        return news_item
    
    @abstractmethod
    def crawl(self):
        """爬取新闻,需要子类实现"""
        pass

spiders/sina_spider.py:

# news_spider/spiders/sina_spider.py
from bs4 import BeautifulSoup
import re
import json
import time
from datetime import datetime
from spiders.base_spider import BaseSpider
from config import CATEGORIES, SITES

class SinaSpider(BaseSpider):
    def __init__(self):
        """初始化新浪新闻爬虫"""
        super().__init__(
            name='sina',
            source_name=SITES['sina']['name'],
            base_url=SITES['sina']['url']
        )
        
        # 新浪新闻各分类URL
        self.category_urls = {
            'society': 'https://news.sina.com.cn/society/',
            'tech': 'https://tech.sina.com.cn/',
            'finance': 'https://finance.sina.com.cn/',
            'sports': 'https://sports.sina.com.cn/',
            'entertainment': 'https://ent.sina.com.cn/',
            'military': 'https://mil.sina.com.cn/',
            'world': 'https://news.sina.com.cn/world/',
        }
    
    def crawl(self):
        """爬取新浪新闻"""
        self.logger.info("开始爬取新浪新闻")
        all_news = []
        
        # 爬取各分类的新闻
        for category, url in self.category_urls.items():
            try:
                self.logger.info(f"爬取新浪新闻分类: {category}")
                news_list = self.crawl_category(category, url)
                all_news.extend(news_list)
                
                # 避免请求过快
                time.sleep(1)
                
            except Exception as e:
                self.logger.error(f"爬取新浪新闻分类 {category} 出错: {e}")
        
        self.logger.info(f"新浪新闻爬取完成,共获取 {len(all_news)} 条新闻")
        return all_news
    
    def crawl_category(self, category, url, max_pages=1):
        """
        爬取指定分类的新闻
        :param category: 分类名称
        :param url: 分类URL
        :param max_pages: 最大爬取页数
        :return: 新闻列表
        """
        news_list = []
        
        # 爬取分类首页
        response = self.request_page(url)
        if not response:
            return news_list
        
        # 解析新闻列表
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 尝试从首页提取新闻链接
        news_links = []
        
        # 查找新闻链接
        for a in soup.find_all('a', href=True):
            link = a['href']
            
            # 排除非新闻链接
            if not self.is_news_link(link):
                continue
            
            # 添加到新闻链接列表
            if link not in news_links:
                news_links.append(link)
        
        # 爬取新闻详情
        for link in news_links[:20]:  # 限制每个分类最多爬取20条
            try:
                news_item = self.crawl_news_detail(link, category)
                if news_item:
                    news_list.append(news_item)
                time.sleep(0.5)  # 避免请求过快
            except Exception as e:
                self.logger.error(f"爬取新闻详情出错: {link} - {e}")
        
        return news_list
    
    def is_news_link(self, url):
        """判断是否为新闻链接"""
        # 新浪新闻链接通常为 http(s)://xxx.sina.com.cn/xxx/xxx-xxxxxxxx.shtml 格式
        patterns = [
            r'https?://[a-z]+\.sina\.com\.cn/.*\d{4}-\d{2}-\d{2}.*\.s?html',
            r'https?://[a-z]+\.sina\.com\.cn/.*/\d{4}-\d{2}-\d{2}/doc-[a-z0-9]+\.s?html'
        ]
        
        for pattern in patterns:
            if re.match(pattern, url):
                return True
        
        return False
    
    def crawl_news_detail(self, url, category=None):
        """
        爬取新闻详情
        :param url: 新闻URL
        :param category: 分类(可选)
        :return: 新闻项或None
        """
        # 请求新闻详情页
        response = self.request_page(url)
        if not response:
            return None
        
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        
        try:
            # 提取标题
            title_element = soup.select_one('.main-title, h1.title, .article-header h1')
            if not title_element:
                return None
            title = title_element.text.strip()
            
            # 提取发布时间
            time_element = soup.select_one('.date, .pub-time, .article-header .date, .time-source .time')
            publish_time = datetime.now()
            if time_element:
                time_text = time_element.text.strip()
                publish_time = self.parse_date(time_text)
            
            # 提取作者
            author_element = soup.select_one('.author, .show_author, .article-header .source')
            author = author_element.text.strip() if author_element else None
            
            # 提取内容
            content_element = soup.select_one('#artibody, .article-content, .article-body')
            if not content_element:
                return None
            
            html_content = str(content_element)
            content = self.cleaner.clean_html(html_content)
            
            # 提取图片
            image_element = soup.select_one('#artibody img, .article-content img')
            image_url = image_element['src'] if image_element and image_element.has_attr('src') else None
            
            # 生成摘要
            summary = self.cleaner.extract_summary(content)
            
            # 创建新闻项
            news_item = self.create_news_item(
                url=url,
                title=title,
                content=content,
                html_content=html_content,
                summary=summary,
                publish_time=publish_time,
                author=author,
                category=category,
                image_url=image_url
            )
            
            return news_item
            
        except Exception as e:
            self.logger.error(f"解析新闻详情出错: {url} - {e}")
            return None

spiders/sohu_spider.py:

# news_spider/spiders/sohu_spider.py
from bs4 import BeautifulSoup
import re
import json
import time
from datetime import datetime
from spiders.base_spider import BaseSpider
from config import CATEGORIES, SITES

class SohuSpider(BaseSpider):
    def __init__(self):
        """初始化搜狐新闻爬虫"""
        super().__init__(
            name='sohu',
            source_name=SITES['sohu']['name'],
            base_url=SITES['sohu']['url']
        )
        
        # 搜狐新闻各分类URL
        self.category_urls = {
            'society': 'https://news.sohu.com/c/8/',
            'tech': 'https://it.sohu.com/',
            'finance': 'https://business.sohu.com/',
            'sports': 'https://sports.sohu.com/',
            'entertainment': 'https://yule.sohu.com/',
            'military': 'https://mil.sohu.com/',
            'world': 'https://news.sohu.com/c/9/',
            'health': 'https://health.sohu.com/',
            'travel': 'https://travel.sohu.com/',
            'car': 'https://auto.sohu.com/',
        }
    
    def crawl(self):
        """爬取搜狐新闻"""
        self.logger.info("开始爬取搜狐新闻")
        all_news = []
        
        # 爬取各分类的新闻
        for category, url in self.category_urls.items():
            try:
                self.logger.info(f"爬取搜狐新闻分类: {category}")
                news_list = self.crawl_category(category, url)
                all_news.extend(news_list)
                
                # 避免请求过快
                time.sleep(1)
                
            except Exception as e:
                self.logger.error(f"爬取搜狐新闻分类 {category} 出错: {e}")
        
        self.logger.info(f"搜狐新闻爬取完成,共获取 {len(all_news)} 条新闻")
        return all_news
    
    def crawl_category(self, category, url, max_pages=1):
        """
        爬取指定分类的新闻
        :param category: 分类名称
        :param url: 分类URL
        :param max_pages: 最大爬取页数
        :return: 新闻列表
        """
        news_list = []
        
        # 爬取分类首页
        response = self.request_page(url)
        if not response:
            return news_list
        
        # 解析新闻列表
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 尝试提取新闻链接
        news_links = []
        
        # 查找新闻链接
        for a in soup.find_all('a', href=True):
            link = a['href']
            
            # 确保是完整URL
            if link.startswith('//'):
                link = 'https:' + link
            elif not link.startswith('http'):
                continue
            
            # 排除非搜狐新闻链接
            if not self.is_news_link(link):
                continue
            
            # 添加到新闻链接列表
            if link not in news_links:
                news_links.append(link)
        
        # 爬取新闻详情
        for link in news_links[:20]:  # 限制每个分类最多爬取20条
            try:
                news_item = self.crawl_news_detail(link, category)
                if news_item:
                    news_list.append(news_item)
                time.sleep(0.5)  # 避免请求过快
            except Exception as e:
                self.logger.error(f"爬取新闻详情出错: {link} - {e}")
        
        return news_list
    
    def is_news_link(self, url):
        """判断是否为搜狐新闻链接"""
        # 搜狐新闻链接通常为 http(s)://www.sohu.com/a/xxxxxxxxx_xxxxx 格式
        patterns = [
            r'https?://(www\.)?sohu\.com/a/\d+_\d+',
            r'https?://[a-z]+\.sohu\.com/a/\d+_\d+'
        ]
        
        for pattern in patterns:
            if re.match(pattern, url):
                return True
        
        return False
    
    def crawl_news_detail(self, url, category=None):
        """
        爬取新闻详情
        :param url: 新闻URL
        :param category: 分类(可选)
        :return: 新闻项或None
        """
        # 请求新闻详情页
        response = self.request_page(url)
        if not response:
            return None
        
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        
        try:
            # 尝试从页面中获取新闻数据
            news_data = None
            for script in soup.select('script'):
                if script.string and '_ARTICLE_INFO' in script.string:
                    match = re.search(r'var\s+_ARTICLE_INFO\s*=\s*({.+?});', script.string, re.DOTALL)
                    if match:
                        try:
                            news_data = json.loads(match.group(1))
                            break
                        except json.JSONDecodeError:
                            continue
            
            # 如果找到JSON数据,使用它来提取信息
            if news_data:
                title = news_data.get('title', '')
                
                # 尝试获取发布时间
                publish_time_str = news_data.get('publicTime', '')
                publish_time = self.parse_date(publish_time_str) if publish_time_str else datetime.now()
                
                # 获取作者信息
                author = news_data.get('authorName', '')
                
                # 提取图片链接
                image_url = None
                if 'cover' in news_data:
                    image_url = news_data['cover']
                
                # 内容可能在HTML中,而不是JSON中
                content_element = soup.select_one('article')
                if not content_element:
                    content_element = soup.select_one('.article')
                
                if content_element:
                    html_content = str(content_element)
                    content = self.cleaner.clean_html(html_content)
                else:
                    return None
            else:
                # 若无法从JSON中获取,则尝试从HTML中提取
                title_element = soup.select_one('h1')
                if not title_element:
                    return None
                
                title = title_element.text.strip()
                
                # 提取发布时间
                time_element = soup.select_one('.article-info span, .time')
                publish_time = datetime.now()
                if time_element:
                    time_text = time_element.text.strip()
                    publish_time = self.parse_date(time_text)
                
                # 提取作者
                author_element = soup.select_one('.author span, .user-info h4')
                author = author_element.text.strip() if author_element else None
                
                # 提取内容
                content_element = soup.select_one('article, .article')
                if not content_element:
                    return None
                
                html_content = str(content_element)
                content = self.cleaner.clean_html(html_content)
                
                # 提取图片
                image_element = soup.select_one('article img, .article-info img')
                image_url = image_element['src'] if image_element and image_element.has_attr('src') else None
            
            # 生成摘要
            summary = self.cleaner.extract_summary(content)
            
            # 创建新闻项
            news_item = self.create_news_item(
                url=url,
                title=title,
                content=content,
                html_content=html_content,
                summary=summary,
                publish_time=publish_time,
                author=author,
                category=category,
                image_url=image_url
            )
            
            return news_item
            
        except Exception as e:
            self.logger.error(f"解析新闻详情出错: {url} - {e}")
            return None

spiders/tencent_spider.py:

# news_spider/spiders/tencent_spider.py
from bs4 import BeautifulSoup
import re
import json
import time
from datetime import datetime
from spiders.base_spider import BaseSpider
from config import CATEGORIES, SITES

class TencentSpider(BaseSpider):
    def __init__(self):
        """初始化腾讯新闻爬虫"""
        super().__init__(
            name='tencent',
            source_name=SITES['tencent']['name'],
            base_url=SITES['tencent']['url']
        )
        
        # 腾讯新闻各分类URL
        self.category_urls = {
            'tech': 'https://new.qq.com/ch/tech/',
            'finance': 'https://new.qq.com/ch/finance/',
            'sports': 'https://new.qq.com/ch/sports/',
            'entertainment': 'https://new.qq.com/ch/ent/',
            'world': 'https://new.qq.com/ch/world/',
            'military': 'https://new.qq.com/ch/milite/',
            'health': 'https://new.qq.com/ch/health/',
            'car': 'https://new.qq.com/ch/auto/',
            'house': 'https://new.qq.com/ch/house/',
        }
    
    def crawl(self):
        """爬取腾讯新闻"""
        self.logger.info("开始爬取腾讯新闻")
        all_news = []
        
        # 爬取各分类的新闻
        for category, url in self.category_urls.items():
            try:
                self.logger.info(f"爬取腾讯新闻分类: {category}")
                news_list = self.crawl_category(category, url)
                all_news.extend(news_list)
                
                # 避免请求过快
                time.sleep(1)
                
            except Exception as e:
                self.logger.error(f"爬取腾讯新闻分类 {category} 出错: {e}")
        
        self.logger.info(f"腾讯新闻爬取完成,共获取 {len(all_news)} 条新闻")
        return all_news
    
    def crawl_category(self, category, url, max_pages=1):
        """
        爬取指定分类的新闻
        :param category: 分类名称
        :param url: 分类URL
        :param max_pages: 最大爬取页数
        :return: 新闻列表
        """
        news_list = []
        
        # 爬取分类首页
        response = self.request_page(url)
        if not response:
            return news_list
        
        # 解析网页数据
        # 腾讯新闻使用了大量的JavaScript,所以我们需要从HTML中提取JSON数据
        try:
            # 尝试从页面中提取新闻数据
            news_data = None
            
            # 从HTML中查找包含新闻数据的script标签
            pattern = re.compile(r'window\.DATA\s*=\s*({.+?}});', re.DOTALL)
            match = pattern.search(response.text)
            
            if match:
                try:
                    json_str = match.group(1)
                    data = json.loads(json_str)
                    
                    # 提取新闻列表
                    if 'list' in data:
                        # 处理每条新闻
                        for item in data['list']:
                            # 提取新闻URL
                            news_url = item.get('url', '')
                            if not news_url:
                                continue
                            
                            # 确保URL格式正确
                            if not news_url.startswith('http'):
                                news_url = 'https:' + news_url if news_url.startswith('//') else 'https://' + news_url
                            
                            # 爬取新闻详情
                            news_item = self.crawl_news_detail(news_url, category)
                            if news_item:
                                news_list.append(news_item)
                            
                            # 避免请求过快
                            time.sleep(0.5)
                except json.JSONDecodeError:
                    self.logger.error(f"解析JSON数据失败")
            
            # 如果无法从JSON中提取,则尝试从HTML中直接提取链接
            if not news_list:
                soup = BeautifulSoup(response.text, 'html.parser')
                
                # 查找新闻链接
                for a in soup.find_all('a', href=True):
                    link = a['href']
                    
                    # 确保是完整URL
                    if link.startswith('//'):
                        link = 'https:' + link
                    elif not link.startswith('http'):
                        continue
                    
                    # 仅处理腾讯新闻链接
                    if self.is_news_link(link) and link not in [item['url'] for item in news_list]:
                        # 爬取新闻详情
                        news_item = self.crawl_news_detail(link, category)
                        if news_item:
                            news_list.append(news_item)
                        
                        # 限制每个分类最多爬取20条
                        if len(news_list) >= 20:
                            break
                        
                        # 避免请求过快
                        time.sleep(0.5)
        
        except Exception as e:
            self.logger.error(f"爬取分类页面出错: {url} - {e}")
        
        return news_list
    
    def is_news_link(self, url):
        """判断是否为腾讯新闻链接"""
        # 腾讯新闻链接格式多样,这里做简单判断
        patterns = [
            r'https?://new\.qq\.com/omn/\w+/\w+\.html',
            r'https?://new\.qq\.com/rain/a/\w+',
            r'https?://view\.inews\.qq\.com/\w+/\w+'
        ]
        
        for pattern in patterns:
            if re.match(pattern, url):
                return True
        
        return False
    
    def crawl_news_detail(self, url, category=None):
        """
        爬取新闻详情
        :param url: 新闻URL
        :param category: 分类(可选)
        :return: 新闻项或None
        """
        # 请求新闻详情页
        response = self.request_page(url)
        if not response:
            return None
        
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        
        try:
            # 尝试从页面中获取新闻数据
            news_data = None
            
            # 尝试查找包含新闻数据的script标签
            pattern = re.compile(r'window\.DATA\s*=\s*({.+?}});', re.DOTALL)
            for script in soup.find_all('script'):
                if script.string:
                    match = pattern.search(script.string)
                    if match:
                        try:
                            news_data = json.loads(match.group(1))
                            break
                        except json.JSONDecodeError:
                            continue
            
            # 如果找到JSON数据,使用它来提取信息
            if news_data and 'article_info' in news_data:
                article_info = news_data['article_info']
                
                title = article_info.get('title', '')
                
                # 尝试获取发布时间
                publish_time_str = article_info.get('publish_time', '')
                publish_time = self.parse_date(publish_time_str) if publish_time_str else datetime.now()
                
                # 获取作者信息
                author = article_info.get('author', '')
                
                # 提取内容
                content = ''
                if 'content' in article_info:
                    # 内容可能是HTML或者纯文本
                    html_content = article_info['content']
                    content = self.cleaner.clean_html(html_content)
                
                # 提取图片链接
                image_url = None
                if 'cover' in article_info:
                    image_url = article_info['cover']
            else:
                # 若无法从JSON中获取,则尝试从HTML中提取
                title_element = soup.select_one('h1')
                if not title_element:
                    return None
                
                title = title_element.text.strip()
                
                # 提取发布时间
                time_element = soup.select_one('.time, .article-time, .pubtime, .date')
                publish_time = datetime.now()
                if time_element:
                    time_text = time_element.text.strip()
                    publish_time = self.parse_date(time_text)
                
                # 提取作者
                author_element = soup.select_one('.author, .src, .article-source')
                author = author_element.text.strip() if author_element else None
                
                # 提取内容
                content_element = soup.select_one('.content-article, .article-content')
                if content_element:
                    html_content = str(content_element)
                    content = self.cleaner.clean_html(html_content)
                else:
                    return None
                
                # 提取图片
                image_element = soup.select_one('.content-article img')
                image_url = image_element['src'] if image_element and image_element.has_attr('src') else None
            
            # 生成摘要
            summary = self.cleaner.extract_summary(content)
            
            # 创建新闻项
            news_item = self.create_news_item(
                url=url,
                title=title,
                content=content,
                summary=summary,
                publish_time=publish_time,
                author=author,
                category=category,
                image_url=image_url
            )
            
            return news_item
            
        except Exception as e:
            self.logger.error(f"解析新闻详情出错: {url} - {e}")
            return None

10.4 爬虫控制器

crawler.py:

# news_spider/crawler.py
import importlib
import threading
import time
import logging
import schedule
from datetime import datetime
from utils.db import NewsDatabase
from config import ENABLED_SPIDERS, SPIDER_CONFIG

class NewsCrawler:
    def __init__(self):
        """初始化新闻爬虫控制器"""
        self.logger = logging.getLogger('news_spider')
        self.db = NewsDatabase()
        self.db.connect()
        self.spiders = {}
        self.load_spiders()
    
    def load_spiders(self):
        """加载所有启用的爬虫"""
        for spider_name in ENABLED_SPIDERS:
            try:
                # 动态导入爬虫模块
                module_name = f'spiders.{spider_name}_spider'
                module = importlib.import_module(module_name)
                
                # 获取类名(假设类名为XxxSpider,如SinaSpider)
                class_name = ''.join(word.capitalize() for word in spider_name.split('_')) + 'Spider'
                
                # 获取爬虫类并实例化
                spider_class = getattr(module, class_name)
                self.spiders[spider_name] = spider_class()
                
                self.logger.info(f"已加载爬虫: {spider_name}")
                
            except Exception as e:
                self.logger.error(f"加载爬虫 {spider_name} 失败: {e}")
    
    def run_spider(self, spider_name):
        """
        运行指定的爬虫
        :param spider_name: 爬虫名称
        :return: 爬取的新闻数量
        """
        if spider_name not in self.spiders:
            self.logger.error(f"爬虫 {spider_name} 不存在")
            return 0
        
        try:
            self.logger.info(f"开始运行爬虫: {spider_name}")
            start_time = time.time()
            
            # 爬取新闻
            news_items = self.spiders[spider_name].crawl()
            
            # 保存到数据库
            saved_count = self.db.save_many_news(news_items)
            
            end_time = time.time()
            duration = end_time - start_time
            
            self.logger.info(f"爬虫 {spider_name} 完成: 爬取 {len(news_items)} 条新闻,成功保存 {saved_count} 条,用时 {duration:.2f} 秒")
            
            return saved_count
            
        except Exception as e:
            self.logger.error(f"运行爬虫 {spider_name} 出错: {e}")
            return 0
    
    def run_all_spiders(self):
        """运行所有爬虫"""
        total_saved = 0
        start_time = time.time()
        
        self.logger.info(f"开始运行所有爬虫,时间: {datetime.now()}")
        
        for spider_name in self.spiders:
            saved = self.run_spider(spider_name)
            total_saved += saved
        
        end_time = time.time()
        duration = end_time - start_time
        
        self.logger.info(f"所有爬虫运行完成: 总共保存 {total_saved} 条新闻,总用时 {duration:.2f} 秒")
        
        return total_saved
    
    def schedule_crawl(self):
        """启动定时爬取任务"""
        interval = SPIDER_CONFIG['INTERVAL']  # 爬取间隔(秒)
        
        self.logger.info(f"设置定时爬取任务,间隔: {interval}秒")
        
        # 先运行一次
        self.run_all_spiders()
        
        # 设置定时任务
        schedule.every(interval).seconds.do(self.run_all_spiders)
        
        # 持续运行定时任务
        while True:
            schedule.run_pending()
            time.sleep(1)
    
    def close(self):
        """关闭资源"""
        if self.db:
            self.db.close()
        self.logger.info("爬虫控制器已关闭")

10.5 Web界面

web/app.py:

# news_spider/web/app.py
from flask import Flask, render_template, request, jsonify, redirect, url_for, abort
import os
import logging
from datetime import datetime, timedelta
from utils.db import NewsDatabase
from utils.classifier import NewsClassifier
from config import WEB_CONFIG, CATEGORIES, SITES

app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)

# 初始化日志
logger = logging.getLogger('news_spider')

# 初始化数据库和分类器
db = NewsDatabase()
db.connect()
classifier = NewsClassifier()

@app.template_filter('format_date')
def format_date(value):
    """
    格式化日期过滤器
    :param value: datetime对象
    :return: 格式化的日期字符串
    """
    if not value:
        return ''
    
    now = datetime.now()
    diff = now - value
    
    if diff < timedelta(minutes=1):
        return '刚刚'
    elif diff < timedelta(hours=1):
        return f'{int(diff.total_seconds() // 60)}分钟前'
    elif diff < timedelta(days=1):
        return f'{int(diff.total_seconds() // 3600)}小时前'
    elif diff < timedelta(days=30):
        return f'{diff.days}天前'
    else:
        return value.strftime('%Y-%m-%d')

@app.route('/')
def index():
    """首页"""
    page = request.args.get('page', 1, type=int)
    category = request.args.get('category', None)
    source = request.args.get('source', None)
    items_per_page = WEB_CONFIG['ITEMS_PER_PAGE']
    
    # 获取最新新闻
    skip = (page - 1) * items_per_page
    news = db.get_latest_news(limit=items_per_page, skip=skip, category=category, source=source)
    
    # 获取统计信息
    stats = {
        'total': db.get_news_count(),
        'categories': db.get_category_stats(),
        'sources': db.get_source_stats()
    }
    
    # 准备分类和来源信息
    categories = {k: classifier.get_category_display_name(k) for k in CATEGORIES.keys()}
    categories['other'] = '其他'
    
    sources = SITES
    
    return render_template(
        'index.html',
        news=news,
        stats=stats,
        page=page,
        items_per_page=items_per_page,
        total_items=stats['total'],
        category=category,
        source=source,
        categories=categories,
        sources=sources
    )

@app.route('/search')
def search():
    """搜索页面"""
    keyword = request.args.get('q', '')
    page = request.args.get('page', 1, type=int)
    category = request.args.get('category', None)
    source = request.args.get('source', None)
    items_per_page = WEB_CONFIG['ITEMS_PER_PAGE']
    
    if not keyword:
        return redirect(url_for('index'))
    
    # 搜索新闻
    skip = (page - 1) * items_per_page
    news = db.search_news(keyword, limit=items_per_page, skip=skip, category=category, source=source)
    
    # 获取统计信息
    stats = {
        'total': len(news),  # 这里只是当前搜索结果的数量
        'categories': db.get_category_stats(),
        'sources': db.get_source_stats()
    }
    
    # 准备分类和来源信息
    categories = {k: classifier.get_category_display_name(k) for k in CATEGORIES.keys()}
    categories['other'] = '其他'
    
    sources = SITES
    
    return render_template(
        'search.html',
        news=news,
        keyword=keyword,
        stats=stats,
        page=page,
        items_per_page=items_per_page,
        total_items=stats['total'],
        category=category,
        source=source,
        categories=categories,
        sources=sources
    )

@app.route('/news/<news_id>')
def news_detail(news_id):
    """新闻详情页"""
    # 从数据库获取新闻
    news = db.collection.find_one({'_id': news_id})
    
    if not news:
        abort(404)
    
    # 获取相同分类的推荐新闻
    related_news = db.get_latest_news(limit=5, category=news.get('category'))
    
    # 排除当前新闻
    related_news = [n for n in related_news if str(n['_id']) != news_id]
    
    return render_template(
        'detail.html',
        news=news,
        related_news=related_news[:4]  # 最多显示4条相关新闻
    )

@app.route('/api/stats')
def api_stats():
    """统计数据API"""
    # 获取统计信息
    stats = {
        'total': db.get_news_count(),
        'categories': db.get_category_stats(),
        'sources': db.get_source_stats(),
        'timespan': db.get_timespan_stats()
    }
    
    return jsonify(stats)

@app.errorhandler(404)
def page_not_found(e):
    """404页面"""
    return render_template('404.html'), 404

@app.errorhandler(500)
def server_error(e):
    """500页面"""
    logger.error(f"服务器错误: {e}")
    return render_template('500.html'), 500

def start_web_app():
    """启动Web应用"""
    app.run(
        host=WEB_CONFIG['HOST'],
        port=WEB_CONFIG['PORT'],
        debug=WEB_CONFIG['DEBUG']
    )

web/templates/index.html:

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>新闻聚合 - 最新资讯</title>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css">
    <link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
    <nav class="navbar navbar-expand-lg navbar-dark bg-dark">
        <div class="container">
            <a class="navbar-brand" href="/">新闻聚合</a>
            <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent">
                <span class="navbar-toggler-icon"></span>
            </button>
            <div class="collapse navbar-collapse" id="navbarSupportedContent">
                <ul class="navbar-nav me-auto mb-2 mb-lg-0">
                    <li class="nav-item">
                        <a class="nav-link active" href="/">首页</a>
                    </li>
                    <li class="nav-item dropdown">
                        <a class="nav-link dropdown-toggle" href="#" id="categoryDropdown" role="button" data-bs-toggle="dropdown">
                            分类
                        </a>
                        <ul class="dropdown-menu">
                            <li><a class="dropdown-item" href="/">全部</a></li>
                            {% for cat_key, cat_name in categories.items() %}
                            <li><a class="dropdown-item" href="/?category={{ cat_key }}">{{ cat_name }}</a></li>
                            {% endfor %}
                        </ul>
                    </li>
                    <li class="nav-item dropdown">
                        <a class="nav-link dropdown-toggle" href="#" id="sourceDropdown" role="button" data-bs-toggle="dropdown">
                            来源
                        </a>
                        <ul class="dropdown-menu">
                            <li><a class="dropdown-item" href="/">全部</a></li>
                            {% for source_key, source_data in sources.items() %}
                            <li><a class="dropdown-item" href="/?source={{ source_data.name }}">{{ source_data.name }}</a></li>
                            {% endfor %}
                        </ul>
                    </li>
                </ul>
                <form class="d-flex" action="/search">
                    <input class="form-control me-2" type="search" name="q" placeholder="搜索新闻" required>
                    <button class="btn btn-outline-light" type="submit">搜索</button>
                </form>
            </div>
        </div>
    </nav>

    <div class="container my-4">
        {% if category %}
        <div class="alert alert-info">
            当前分类: {{ categories.get(category, '全部') }}
            <a href="/" class="float-end">查看全部</a>
        </div>
        {% endif %}

        {% if source %}
        <div class="alert alert-info">
            来源: {{ source }}
            <a href="/" class="float-end">查看全部</a>
        </div>
        {% endif %}

        <div class="row">
            <div class="col-md-8">
                <h2>最新新闻</h2>
                
                {% if news %}
                <div class="list-group news-list">
                    {% for item in news %}
                    <a href="/news/{{ item._id }}" class="list-group-item list-group-item-action">
                        <div class="d-flex w-100 align-items-center mb-1">
                            {% if item.image_url %}
                            <div class="news-image me-3">
                                <img src="{{ item.image_url }}" alt="{{ item.title }}" class="img-thumbnail">
                            </div>
                            {% endif %}
                            <div class="news-content">
                                <h5 class="mb-1">{{ item.title }}</h5>
                                <p class="mb-1">{{ item.summary }}</p>
                                <small>
                                    <span class="badge bg-secondary">{{ item.source }}</span>
                                    <span class="badge bg-info">{{ categories.get(item.category, '其他') }}</span>
                                    <span class="text-muted">{{ item.publish_time|format_date }}</span>
                                </small>
                            </div>
                        </div>
                    </a>
                    {% endfor %}
                </div>

                <!-- 分页 -->
                <nav class="mt-4">
                    <ul class="pagination justify-content-center">
                        {% set total_pages = (total_items / items_per_page)|round(0, 'ceil')|int %}
                        
                        {% if page > 1 %}
                        <li class="page-item">
                            <a class="page-link" href="?page={{ page - 1 }}{% if category %}&category={{ category }}{% endif %}{% if source %}&source={{ source }}{% endif %}">上一页</a>
                        </li>
                        {% else %}
                        <li class="page-item disabled">
                            <span class="page-link">上一页</span>
                        </li>
                        {% endif %}
                        
                        {% for i in range(1, total_pages + 1) %}
                            {% if i == page %}
                            <li class="page-item active">
                                <span class="page-link">{{ i }}</span>
                            </li>
                            {% elif i <= 3 or i >= total_pages - 2 or (i >= page - 1 and i <= page + 1) %}
                            <li class="page-item">
                                <a class="page-link" href="?page={{ i }}{% if category %}&category={{ category }}{% endif %}{% if source %}&source={{ source }}{% endif %}">{{ i }}</a>
                            </li>
                            {% elif i == 4 or i == total_pages - 3 %}
                            <li class="page-item disabled">
                                <span class="page-link">...</span>
                            </li>
                            {% endif %}
                        {% endfor %}
                        
                        {% if page < total_pages %}
                        <li class="page-item">
                            <a class="page-link" href="?page={{ page + 1 }}{% if category %}&category={{ category }}{% endif %}{% if source %}&source={{ source }}{% endif %}">下一页</a>
                        </li>
                        {% else %}
                        <li class="page-item disabled">
                            <span class="page-link">下一页</span>
                        </li>
                        {% endif %}
                    </ul>
                </nav>
                {% else %}
                <div class="alert alert-warning">
                    没有找到相关新闻
                </div>
                {% endif %}
            </div>
            
            <div class="col-md-4">
                <div class="card mb-4">
                    <div class="card-header">
                        统计信息
                    </div>
                    <div class="card-body">
                        <p>总新闻数: {{ stats.total }}</p>
                        <h6>分类统计</h6>
                        <ul class="list-group">
                            {% for cat, count in stats.categories.items() %}
                            <li class="list-group-item d-flex justify-content-between align-items-center">
                                <a href="/?category={{ cat }}">{{ categories.get(cat, '其他') }}</a>
                                <span class="badge bg-primary rounded-pill">{{ count }}</span>
                            </li>
                            {% endfor %}
                        </ul>
                        
                        <h6 class="mt-3">来源统计</h6>
                        <ul class="list-group">
                            {% for src, count in stats.sources.items() %}
                            <li class="list-group-item d-flex justify-content-between align-items-center">
                                <a href="/?source={{ src }}">{{ src }}</a>
                                <span class="badge bg-primary rounded-pill">{{ count }}</span>
                            </li>
                            {% endfor %}
                        </ul>
                    </div>
                </div>
            </div>
        </div>
    </div>

    <footer class="bg-dark text-white py-4 mt-4">
        <div class="container">
            <div class="row">
                <div class="col-md-6">
                    <h5>新闻聚合</h5>
                    <p>一个使用Python爬虫技术的新闻聚合平台,汇集多个主流新闻网站的最新资讯。</p>
                </div>
                <div class="col-md-6 text-md-end">
                    <p>&copy; 2023 新闻聚合</p>
                    <p>仅作学习和研究使用,请勿用于商业目的。</p>
                </div>
            </div>
        </div>
    </footer>

    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
    <script src="{{ url_for('static', filename='js/script.js') }}"></script>
</body>
</html>

html 2

在画布上打开

web/templates/detail.html:

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{{ news.title }} - 新闻聚合</title>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css">
    <link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
    <nav class="navbar navbar-expand-lg navbar-dark bg-dark">
        <div class="container">
            <a class="navbar-brand" href="/">新闻聚合</a>
            <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent">
                <span class="navbar-toggler-icon"></span>
            </button>
            <div class="collapse navbar-collapse" id="navbarSupportedContent">
                <ul class="navbar-nav me-auto mb-2 mb-lg-0">
                    <li class="nav-item">
                        <a class="nav-link" href="/">首页</a>
                    </li>
                    <li class="nav-item">
                        <a class="nav-link" href="/?category={{ news.category }}">{{ news.category }}</a>
                    </li>
                </ul>
                <form class="d-flex" action="/search">
                    <input class="form-control me-2" type="search" name="q" placeholder="搜索新闻" required>
                    <button class="btn btn-outline-light" type="submit">搜索</button>
                </form>
            </div>
        </div>
    </nav>

    <div class="container my-4">
        <div class="row">
            <div class="col-md-8">
                <article>
                    <header class="mb-4">
                        <h1 class="fw-bolder mb-1">{{ news.title }}</h1>
                        <div class="text-muted fst-italic mb-2">
                            发布于 {{ news.publish_time|format_date }}
                            {% if news.author %}
                            由 {{ news.author }}
                            {% endif %}
                        </div>
                        <span class="badge bg-secondary me-2">{{ news.source }}</span>
                        <span class="badge bg-primary">{{ news.category }}</span>
                    </header>
                    
                    {% if news.image_url %}
                    <figure class="mb-4">
                        <img class="img-fluid rounded" src="{{ news.image_url }}" alt="{{ news.title }}">
                    </figure>
                    {% endif %}
                    
                    <section class="mb-5 news-content">
                        {% if news.html_content %}
                            {{ news.html_content|safe }}
                        {% else %}
                            <p>{{ news.content }}</p>
                        {% endif %}
                    </section>
                </article>
                
                <div class="card mb-4">
                    <div class="card-header">新闻来源</div>
                    <div class="card-body">
                        <p class="mb-0">
                            原文链接: <a href="{{ news.url }}" target="_blank">{{ news.url }}</a>
                        </p>
                    </div>
                </div>
            </div>
            
            <div class="col-md-4">
                <div class="card mb-4">
                    <div class="card-header">相关推荐</div>
                    <div class="card-body">
                        {% if related_news %}
                        <ul class="list-unstyled related-news">
                            {% for item in related_news %}
                            <li class="mb-3">
                                <a href="/news/{{ item._id }}">{{ item.title }}</a>
                                <div class="text-muted small">{{ item.publish_time|format_date }}</div>
                            </li>
                            {% endfor %}
                        </ul>
                        {% else %}
                        <p class="card-text">暂无相关推荐</p>
                        {% endif %}
                    </div>
                </div>
                
                <div class="card mb-4">
                    <div class="card-header">标签</div>
                    <div class="card-body">
                        {% if news.tags %}
                        <div class="tags-cloud">
                            {% for tag in news.tags %}
                            <a href="/search?q={{ tag }}" class="btn btn-sm btn-outline-secondary me-1 mb-1">{{ tag }}</a>
                            {% endfor %}
                        </div>
                        {% else %}
                        <p class="card-text">暂无标签</p>
                        {% endif %}
                    </div>
                </div>
            </div>
        </div>
    </div>

    <footer class="bg-dark text-white py-4 mt-4">
        <div class="container">
            <div class="row">
                <div class="col-md-6">
                    <h5>新闻聚合</h5>
                    <p>一个使用Python爬虫技术的新闻聚合平台,汇集多个主流新闻网站的最新资讯。</p>
                </div>
                <div class="col-md-6 text-md-end">
                    <p>&copy; 2023 新闻聚合</p>
                    <p>仅作学习和研究使用,请勿用于商业目的。</p>
                </div>
            </div>
        </div>
    </footer>

    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
    <script src="{{ url_for('static', filename='js/script.js') }}"></script>
</body>
</html>

html 3

在画布上打开

web/static/css/style.css:

/* Basic styling */
body {
    font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
    background-color: #f8f9fa;
}

/* News list styling */
.news-list .list-group-item {
    border-left: none;
    border-right: none;
    padding: 1rem;
    transition: background-color 0.2s;
}

.news-list .list-group-item:hover {
    background-color: #f5f5f5;
}

.news-image {
    width: 120px;
    min-width: 120px;
    height: 80px;
    overflow: hidden;
}

.news-image img {
    width: 100%;
    height: 100%;
    object-fit: cover;
}

/* News content styling */
.news-content {
    line-height: 1.6;
    font-size: 1.1rem;
}

.news-content img {
    max-width: 100%;
    height: auto;
    margin: 1rem 0;
}

.news-content a {
    color: #007bff;
}

/* Related news styling */
.related-news li {
    border-bottom: 1px solid #eee;
    padding-bottom: 0.5rem;
}

.related-news li:last-child {
    border-bottom: none;
}

/* Tags cloud */
.tags-cloud {
    display: flex;
    flex-wrap: wrap;
}

/* Media queries for responsive design */
@media (max-width: 768px) {
    .news-image {
        width: 100px;
        height: 70px;
    }
    
    .news-list .list-group-item h5 {
        font-size: 1rem;
    }
    
    .news-list .list-group-item p {
        font-size: 0.85rem;
    }
}

@media (max-width: 576px) {
    .news-content {
        font-size: 1rem;
    }
    
    .d-flex.w-100.align-items-center {
        flex-direction: column;
        align-items: flex-start !important;
    }
    
    .news-image {
        width: 100%;
        height: 150px;
        margin-bottom: 0.75rem;
    }
    
    .news-content {
        width: 100%;
    }
}

10.6 主程序

main.py:

# news_spider/main.py
import os
import sys
import logging
import argparse
import threading
from crawler import NewsCrawler
from web.app import start_web_app
from utils.db import NewsDatabase
from config import LOG_CONFIG

# 配置日志
def setup_logger():
    """设置日志"""
    logger = logging.getLogger('news_spider')
    logger.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
    
    # 文件处理器
    file_handler = logging.FileHandler(LOG_CONFIG['FILE'], encoding='utf-8')
    file_handler.setLevel(getattr(logging, LOG_CONFIG['LEVEL']))
    
    # 格式化器
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    
    # 清除已有处理器
    if logger.handlers:
        logger.handlers.clear()
    
    # 添加处理器
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

def main():
    """主函数"""
    # 设置日志
    logger = setup_logger()
    
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='新闻聚合爬虫')
    parser.add_argument('--crawl', action='store_true', help='运行爬虫')
    parser.add_argument('--web', action='store_true', help='启动Web应用')
    parser.add_argument('--schedule', action='store_true', help='启动定时爬取')
    parser.add_argument('--spider', type=str, help='指定要运行的爬虫(如sina,sohu,tencent)')
    
    args = parser.parse_args()
    
    # 如果没有指定任何选项,默认启动Web应用
    if not (args.crawl or args.web or args.schedule or args.spider):
        args.web = True
    
    # 测试数据库连接
    db = NewsDatabase()
    if not db.connect():
        logger.error("无法连接到数据库,请检查配置")
        sys.exit(1)
    
    # 显示数据库中的新闻数量
    news_count = db.get_news_count()
    logger.info(f"数据库中已有 {news_count} 条新闻")
    db.close()
    
    # 创建爬虫控制器
    crawler = None
    if args.crawl or args.schedule or args.spider:
        crawler = NewsCrawler()
    
    # 运行指定爬虫
    if args.spider:
        spider_names = args.spider.split(',')
        for spider_name in spider_names:
            crawler.run_spider(spider_name.strip())
    
    # 运行所有爬虫
    if args.crawl and not args.spider:
        crawler.run_all_spiders()
    
    # 启动定时爬取
    if args.schedule:
        logger.info("启动定时爬取")
        schedule_thread = threading.Thread(target=crawler.schedule_crawl)
        schedule_thread.daemon = True
        schedule_thread.start()
    
    # 启动Web应用
    if args.web:
        logger.info("启动Web应用")
        if args.schedule:
            logger.info("Web应用和定时爬取同时运行")
            start_web_app()
        else:
            start_web_app()
    
    # 如果启动了定时爬取但没有启动Web应用,保持主线程运行
    if args.schedule and not args.web:
        try:
            # 保持主线程运行
            while True:
                pass
        except KeyboardInterrupt:
            logger.info("程序被用户中断")
        finally:
            if crawler:
                crawler.close()
    
    # 如果只是运行爬虫或指定爬虫,关闭爬虫控制器
    if (args.crawl or args.spider) and not (args.web or args.schedule):
        if crawler:
            crawler.close()

if __name__ == "__main__":
    main()

10.7 使用方法

  1. 创建项目目录结构:

    mkdir -p news_spider/spiders news_spider/utils news_spider/web/static news_spider/web/templates
    touch news_spider/spiders/__init__.py news_spider/utils/__init__.py news_spider/web/__init__.py
    
  2. 将代码复制到相应文件:
    将上面提供的所有代码文件复制到相应的目录中。

  3. 安装依赖包:
    创建requirements.txt文件,内容如下:

    requests==2.31.0
    beautifulsoup4==4.12.2
    pymongo==4.6.0
    Flask==2.3.3
    schedule==1.2.0
    python-dotenv==1.0.0
    

    然后安装依赖:

    pip install -r requirements.txt
    
  4. 运行程序:

    • 只爬取新闻:

      python main.py --crawl
      
    • 指定爬取特定新闻源:

      python main.py --spider sina,sohu
      
    • 启动定时爬取:

      python main.py --schedule
      
    • 只启动Web应用:

      python main.py --web
      
    • 同时启动定时爬取和Web应用:

      python main.py --schedule --web
      
  5. 访问Web界面:
    在浏览器中访问 http://127.0.0.1:5000 即可查看新闻聚合界面。

11. 爬虫的法律与道德

11.1 爬虫的法律风险

法律风险:

  1. 侵犯商业利益: 未经许可抓取商业网站数据可能被视为侵犯商业利益
  2. 侵犯著作权: 批量复制或重新发布他人内容可能违反著作权法
  3. 侵犯隐私: 抓取、存储和处理个人数据可能违反隐私法规
  4. 违反服务条款: 几乎所有网站都有明确禁止爬虫的服务条款
  5. 计算机犯罪: 在某些情况下,某些类型的爬虫活动可能被视为未授权访问

案例:

  • LinkedIn诉hiQ Labs案: 法院认为对公开数据的爬取不违反《计算机欺诈与滥用法案》
  • Facebook诉Power Ventures案: 法院认定在收到明确"停止和终止"通知后继续爬取构成违法

11.2 道德准则

爬虫的道德准则:

  1. 尊重robots.txt: 始终检查和遵守网站的robots.txt文件
  2. 合理的爬取速度: 控制请求频率,避免对服务器造成负担
  3. 清晰的身份标识: 在请求头中标明你的爬虫身份和联系方式
  4. 数据使用透明: 明确你将如何使用抓取的数据
  5. 尊重著作权: 获取适当的许可才能使用或分发内容
  6. 最小数据原则: 只收集必要的数据,不要过度收集
  7. 保护隐私: 不要爬取或存储个人识别信息

11.3 遵守robots.txt

什么是robots.txt:
robots.txt是放置在网站根目录的文本文件,指导爬虫哪些页面可以爬取,哪些不可以。

robots.txt示例:

User-agent: *       # 适用于所有爬虫
Disallow: /private/ # 禁止访问/private/目录
Disallow: /tmp/     # 禁止访问/tmp/目录
Allow: /public/     # 允许访问/public/目录

User-agent: Googlebot  # 仅适用于Google爬虫
Disallow: /no-google/  # 禁止Google访问/no-google/目录

Python中检查robots.txt:

import requests
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse

def can_fetch(url, user_agent="MyBot/1.0"):
    """
    检查是否允许爬取特定URL
    :param url: 要检查的URL
    :param user_agent: 爬虫的用户代理字符串
    :return: 布尔值,True表示允许爬取
    """
    parsed_url = urlparse(url)
    robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
    
    # 获取robots.txt文件
    try:
        robot_parser = RobotFileParser()
        robot_parser.set_url(robots_url)
        robot_parser.read()
        
        # 检查


Python 3.x 网络爬虫从零基础到项目实战 (续)

11. 爬虫的法律与道德(续)

11.3 遵守robots.txt(续)

Python中检查robots.txt:

import requests
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse

def can_fetch(url, user_agent="MyBot/1.0"):
    """
    检查是否允许爬取特定URL
    :param url: 要检查的URL
    :param user_agent: 爬虫的用户代理字符串
    :return: 布尔值,True表示允许爬取
    """
    parsed_url = urlparse(url)
    robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
    
    # 获取robots.txt文件
    try:
        robot_parser = RobotFileParser()
        robot_parser.set_url(robots_url)
        robot_parser.read()
        
        # 检查是否允许爬取
        allowed = robot_parser.can_fetch(user_agent, url)
        return allowed
    except Exception as e:
        print(f"检查robots.txt时出错: {e}")
        # 如果无法获取robots.txt,建议默认为不允许爬取
        return False

# 示例
url = "https://www.example.com/page.html"
if can_fetch(url, "MySpider/1.0"):
    print(f"允许爬取: {url}")
else:
    print(f"不允许爬取: {url}")

11.4 合理爬取策略

合理爬取的最佳实践:

  1. 识别自己: 在User-Agent中提供真实信息和联系方式
  2. 限制请求频率: 使用延迟和并发控制
  3. 使用缓存: 避免重复请求相同的资源
  4. 优雅地处理错误: 遇到错误时进行退避重试而不是立即重试
  5. 遵循条款限制: 遵守网站明确的使用条款
  6. 遵守robots.txt: 始终检查robots.txt
  7. 分析网站结构: 理解网站架构以减少不必要的请求

请求频率控制示例:

import time
import random
import requests
from urllib.parse import urlparse

class RateLimitedRequester:
    def __init__(self, requests_per_minute=20):
        """
        初始化请求器,设置每分钟最大请求数
        :param requests_per_minute: 每分钟允许的最大请求数
        """
        self.requests_per_minute = requests_per_minute
        self.min_interval = 60.0 / requests_per_minute
        self.domain_last_request = {}  # 每个域名最后请求的时间
    
    def get(self, url, **kwargs):
        """
        发送GET请求,并遵循频率限制
        :param url: 请求URL
        :param kwargs: 传递给requests.get的参数
        :return: requests.Response对象
        """
        # 解析域名
        domain = urlparse(url).netloc
        
        # 检查该域名的上一次请求时间
        if domain in self.domain_last_request:
            # 计算距离上次请求的时间间隔
            elapsed = time.time() - self.domain_last_request[domain]
            
            # 如果间隔小于最小间隔,则休眠一段时间
            if elapsed < self.min_interval:
                sleep_time = self.min_interval - elapsed
                # 添加一些随机变化,避免请求过于规律
                sleep_time += random.uniform(0, self.min_interval * 0.1)
                print(f"休眠 {sleep_time:.2f} 秒以遵守频率限制...")
                time.sleep(sleep_time)
        
        # 发送请求
        response = requests.get(url, **kwargs)
        
        # 更新最后请求时间
        self.domain_last_request[domain] = time.time()
        
        return response

# 使用示例
requester = RateLimitedRequester(requests_per_minute=10)  # 每分钟最多10个请求

urls = [
    "https://www.example.com/page1",
    "https://www.example.com/page2",
    "https://www.example.com/page3",
    # ...更多URL...
]

for url in urls:
    try:
        print(f"请求: {url}")
        response = requester.get(url)
        print(f"状态码: {response.status_code}")
        # 处理响应...
    except Exception as e:
        print(f"请求失败: {e}")

11.5 爬虫政策分析工具

以下是一个简单的工具,可以分析网站的爬虫相关政策:

import requests
from bs4 import BeautifulSoup
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse, urljoin

class WebsiteCrawlPolicyAnalyzer:
    def __init__(self, url, user_agent="PythonCrawlAnalyzer/1.0"):
        """
        初始化爬虫政策分析器
        :param url: 要分析的网站URL
        :param user_agent: 用户代理字符串
        """
        self.url = url
        self.user_agent = user_agent
        self.domain = urlparse(url).netloc
        self.base_url = f"{urlparse(url).scheme}://{self.domain}"
        self.robots_url = f"{self.base_url}/robots.txt"
        self.tos_url = None  # 服务条款URL,将在分析过程中找到
        
        self.robot_parser = RobotFileParser()
        self.headers = {"User-Agent": user_agent}
    
    def analyze(self):
        """执行完整分析并返回结果"""
        results = {
            "robots_txt": self.analyze_robots_txt(),
            "crawl_delay": self.get_crawl_delay(),
            "terms_of_service": self.find_terms_of_service(),
            "meta_robots": self.check_meta_robots()
        }
        
        # 添加综合评估
        results["assessment"] = self.overall_assessment(results)
        
        return results
    
    def analyze_robots_txt(self):
        """分析robots.txt文件"""
        try:
            self.robot_parser.set_url(self.robots_url)
            self.robot_parser.read()
            
            # 检查是否存在完全禁止的规则
            fully_disallowed = False
            for rline in self.robot_parser.lines:
                if rline.startswith("User-agent: *") or rline.startswith(f"User-agent: {self.user_agent}"):
                    if "Disallow: /" in self.robot_parser.lines[self.robot_parser.lines.index(rline) + 1:self.robot_parser.lines.index(rline) + 5]:
                        fully_disallowed = True
                        break
            
            # 检查是否允许爬取首页
            index_allowed = self.robot_parser.can_fetch(self.user_agent, self.base_url)
            
            # 获取禁止的路径示例
            disallowed_paths = self._get_disallowed_paths()
            
            return {
                "exists": True,
                "fully_disallowed": fully_disallowed,
                "index_allowed": index_allowed,
                "disallowed_paths": disallowed_paths,
                "raw_content": self._get_robots_content()
            }
        except Exception as e:
            print(f"分析robots.txt时出错: {e}")
            return {
                "exists": False,
                "error": str(e)
            }
    
    def _get_robots_content(self):
        """获取robots.txt的原始内容"""
        try:
            response = requests.get(self.robots_url, headers=self.headers)
            if response.status_code == 200:
                return response.text
            return None
        except:
            return None
    
    def _get_disallowed_paths(self):
        """获取对当前用户代理禁止的路径"""
        disallowed = []
        try:
            response = requests.get(self.robots_url, headers=self.headers)
            if response.status_code != 200:
                return []
                
            lines = response.text.split('\n')
            user_agent_matches = False
            
            for line in lines:
                line = line.strip()
                if line.startswith('User-agent:'):
                    agent = line[11:].strip()
                    # 检查是否适用于我们的User-Agent或通配符
                    user_agent_matches = (agent == '*' or self.user_agent.lower().startswith(agent.lower()))
                
                elif user_agent_matches and line.startswith('Disallow:'):
                    path = line[9:].strip()
                    if path and path != '/':  # 忽略空路径和根路径
                        disallowed.append(path)
        except Exception as e:
            print(f"获取禁止路径时出错: {e}")
        
        return disallowed[:10]  # 返回最多10个示例
    
    def get_crawl_delay(self):
        """获取Crawl-delay设置"""
        try:
            for rline in self.robot_parser.lines:
                if rline.lower().startswith("crawl-delay:"):
                    return float(rline.split(":", 1)[1].strip())
            return None
        except:
            return None
    
    def find_terms_of_service(self):
        """尝试找到服务条款页面并检查爬虫相关条款"""
        try:
            # 获取首页内容
            response = requests.get(self.base_url, headers=self.headers)
            if response.status_code != 200:
                return {"found": False, "reason": "无法访问网站首页"}
            
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 查找可能的服务条款链接
            tos_links = []
            for a in soup.find_all('a', href=True):
                text = a.text.lower()
                if any(term in text for term in ['terms', 'conditions', 'terms of service', 'terms of use', 
                                                 '服务条款', '使用条款', '用户协议']):
                    href = a['href']
                    full_url = urljoin(self.base_url, href)
                    tos_links.append(full_url)
            
            if not tos_links:
                return {"found": False, "reason": "找不到服务条款链接"}
            
            # 检查第一个服务条款链接
            self.tos_url = tos_links[0]
            tos_response = requests.get(self.tos_url, headers=self.headers)
            if tos_response.status_code != 200:
                return {"found": False, "reason": "无法访问服务条款页面"}
            
            tos_soup = BeautifulSoup(tos_response.text, 'html.parser')
            tos_text = tos_soup.get_text().lower()
            
            # 检查是否包含爬虫相关条款
            scraping_related = []
            keywords = ['scrape', 'crawl', 'spider', 'bot', 'automatic', 'data mining', 
                       '爬虫', '抓取', '自动化', '数据挖掘']
            
            for keyword in keywords:
                if keyword in tos_text:
                    # 尝试提取包含关键词的句子
                    start = max(0, tos_text.find(keyword) - 100)
                    end = min(len(tos_text), tos_text.find(keyword) + 100)
                    context = tos_text[start:end].replace('\n', ' ').strip()
                    scraping_related.append(f"...{context}...")
            
            return {
                "found": True,
                "url": self.tos_url,
                "mentions_scraping": len(scraping_related) > 0,
                "scraping_contexts": scraping_related[:3]  # 最多返回3个示例
            }
        except Exception as e:
            return {"found": False, "reason": f"分析服务条款时出错: {e}"}
    
    def check_meta_robots(self):
        """检查网站首页的meta robots标签"""
        try:
            response = requests.get(self.url, headers=self.headers)
            if response.status_code != 200:
                return {"error": f"无法访问URL: {response.status_code}"}
            
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 查找meta robots标签
            meta_robots = soup.find('meta', attrs={'name': 'robots'})
            meta_googlebot = soup.find('meta', attrs={'name': 'googlebot'})
            
            results = {"found": False}
            
            if meta_robots:
                content = meta_robots.get('content', '').lower()
                results = {
                    "found": True,
                    "content": content,
                    "noindex": 'noindex' in content,
                    "nofollow": 'nofollow' in content
                }
            
            if meta_googlebot:
                results["googlebot"] = {
                    "content": meta_googlebot.get('content', '').lower()
                }
            
            return results
        except Exception as e:
            return {"error": str(e)}
    
    def overall_assessment(self, results):
        """对爬取策略进行综合评估"""
        # 初始评分为5(中性),范围为0-10
        # 0表示完全禁止爬取,10表示完全允许爬取
        score = 5
        
        assessment = {
            "score": score,
            "factors": [],
            "recommendation": ""
        }
        
        # 检查robots.txt
        if results["robots_txt"]["exists"]:
            if results["robots_txt"]["fully_disallowed"]:
                score -= 5
                assessment["factors"].append("robots.txt禁止爬取整个网站")
            elif not results["robots_txt"]["index_allowed"]:
                score -= 3
                assessment["factors"].append("robots.txt禁止爬取首页")
            else:
                score += 1
                assessment["factors"].append("robots.txt允许爬取首页")
            
            if results["crawl_delay"]:
                score -= 1
                assessment["factors"].append(f"存在爬取速率限制: {results['crawl_delay']}秒/请求")
        else:
            score += 2
            assessment["factors"].append("网站没有robots.txt文件")
        
        # 检查meta robots
        if results["meta_robots"]["found"]:
            if results["meta_robots"].get("noindex", False):
                score -= 2
                assessment["factors"].append("页面使用noindex meta标签")
            if results["meta_robots"].get("nofollow", False):
                score -= 2
                assessment["factors"].append("页面使用nofollow meta标签")
        
        # 检查服务条款
        if results["terms_of_service"]["found"]:
            if results["terms_of_service"]["mentions_scraping"]:
                score -= 2
                assessment["factors"].append("服务条款明确提到爬虫限制")
        
        # 确保分数在0-10范围内
        score = max(0, min(10, score))
        assessment["score"] = score
        
        # 根据分数给出建议
        if score <= 2:
            assessment["recommendation"] = "强烈建议不要爬取,网站明确禁止爬虫活动"
        elif score <= 5:
            assessment["recommendation"] = "谨慎爬取,严格遵守robots.txt的限制,控制爬取速度"
        elif score <= 8:
            assessment["recommendation"] = "可以爬取,但应遵守良好实践,避免过多请求"
        else:
            assessment["recommendation"] = "爬取无明显限制,但仍应保持适度的爬取速度和尊重网站资源"
        
        return assessment

# 使用示例
def analyze_website_crawl_policy(url):
    """分析网站的爬虫政策"""
    analyzer = WebsiteCrawlPolicyAnalyzer(url)
    results = analyzer.analyze()
    
    print(f"\n爬虫策略分析: {url}\n{'-'*50}")
    
    # 打印robots.txt分析
    print("\n【robots.txt分析】")
    if results["robots_txt"]["exists"]:
        print(f"- 是否完全禁止爬取: {'是' if results['robots_txt']['fully_disallowed'] else '否'}")
        print(f"- 是否允许爬取首页: {'是' if results['robots_txt']['index_allowed'] else '否'}")
        
        if results["robots_txt"]["disallowed_paths"]:
            print("- 禁止爬取的路径示例:")
            for path in results["robots_txt"]["disallowed_paths"]:
                print(f"  * {path}")
    else:
        print("- 网站没有robots.txt文件")
    
    # 打印爬取延迟
    print("\n【爬取延迟设置】")
    if results["crawl_delay"]:
        print(f"- Crawl-delay: {results['crawl_delay']}秒")
    else:
        print("- 未设置Crawl-delay")
    
    # 打印服务条款分析
    print("\n【服务条款分析】")
    if results["terms_of_service"]["found"]:
        print(f"- 服务条款地址: {results['terms_of_service']['url']}")
        print(f"- 是否提及爬虫: {'是' if results['terms_of_service']['mentions_scraping'] else '否'}")
        
        if results["terms_of_service"]["mentions_scraping"]:
            print("- 相关条款摘录:")
            for context in results["terms_of_service"]["scraping_contexts"]:
                print(f"  * {context}")
    else:
        print(f"- 找不到服务条款页面: {results['terms_of_service']['reason']}")
    
    # 打印meta robots分析
    print("\n【Meta Robots分析】")
    if results["meta_robots"].get("found", False):
        print(f"- Meta Robots: {results['meta_robots']['content']}")
        print(f"- noindex: {'是' if results['meta_robots'].get('noindex', False) else '否'}")
        print(f"- nofollow: {'是' if results['meta_robots'].get('nofollow', False) else '否'}")
    else:
        print("- 页面未使用meta robots标签")
    
    # 打印综合评估
    print("\n【综合评估】")
    print(f"- 爬取友好度评分: {results['assessment']['score']}/10")
    print("- 影响因素:")
    for factor in results["assessment"]["factors"]:
        print(f"  * {factor}")
    print(f"- 建议: {results['assessment']['recommendation']}")

# 使用示例
if __name__ == "__main__":
    analyze_website_crawl_policy("https://www.example.com")

12. 进阶技巧与优化

12.1 高效爬虫设计模式

12.1.1 生产者-消费者模式

这种模式将URL发现(生产者)和内容下载处理(消费者)分离,提高效率:

import threading
import queue
import time
import random
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin

class ProducerConsumerCrawler:
    def __init__(self, seed_urls, max_depth=2, num_producers=1, num_consumers=4):
        """
        初始化生产者-消费者爬虫
        :param seed_urls: 种子URL列表
        :param max_depth: 最大爬取深度
        :param num_producers: 生产者线程数
        :param num_consumers: 消费者线程数
        """
        self.url_queue = queue.Queue()  # URL队列
        self.seen_urls = set()  # 已见过的URLs集合
        self.result_data = []  # 爬取结果
        self.result_lock = threading.Lock()  # 结果列表锁
        
        self.max_depth = max_depth
        self.num_producers = num_producers
        self.num_consumers = num_consumers
        
        # 初始化种子URL
        for url in seed_urls:
            self.url_queue.put((url, 0))  # (url, depth)
            self.seen_urls.add(url)
        
        # 停止标志
        self.should_stop = False
    
    def producer(self):
        """生产者线程:从页面中发现新URL并加入队列"""
        while not self.should_stop:
            try:
                # 从队列获取URL和深度
                url, depth = self.url_queue.get(timeout=1)
                
                # 超过最大深度则不处理
                if depth >= self.max_depth:
                    self.url_queue.task_done()
                    continue
                
                try:
                    # 获取页面内容
                    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
                    response = requests.get(url, headers=headers, timeout=5)
                    
                    if response.status_code != 200:
                        self.url_queue.task_done()
                        continue
                    
                    # 使用BeautifulSoup解析页面
                    soup = BeautifulSoup(response.text, 'html.parser')
                    
                    # 提取所有链接
                    for a_tag in soup.find_all('a', href=True):
                        # 构建绝对URL
                        link = urljoin(url, a_tag['href'])
                        
                        # 如果是新链接,加入队列
                        if link not in self.seen_urls:
                            with threading.Lock():
                                self.seen_urls.add(link)
                            self.url_queue.put((link, depth + 1))
                    
                except Exception as e:
                    print(f"处理URL时出错: {url} - {e}")
                
                # 标记任务完成
                self.url_queue.task_done()
                
                # 随机延迟,避免请求过快
                time.sleep(random.uniform(0.5, 1.5))
                
            except queue.Empty:
                # 队列为空,等待一下再试
                time.sleep(0.2)
    
    def consumer(self):
        """消费者线程:处理URL并提取数据"""
        while not self.should_stop:
            try:
                # 从队列获取URL和深度
                url, _ = self.url_queue.get(timeout=1)
                
                try:
                    # 获取页面内容
                    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
                    response = requests.get(url, headers=headers, timeout=5)
                    
                    if response.status_code != 200:
                        self.url_queue.task_done()
                        continue
                    
                    # 解析页面并提取信息
                    soup = BeautifulSoup(response.text, 'html.parser')
                    title = soup.title.text.strip() if soup.title else "No Title"
                    
                    # 提取文章内容示例 (根据实际网站结构调整选择器)
                    content = ""
                    article = soup.find('article') or soup.find('div', class_='content')
                    if article:
                        # 提取所有段落文本
                        paragraphs = article.find_all('p')
                        content = '\n'.join([p.text.strip() for p in paragraphs])
                    
                    # 创建数据项并添加到结果列表
                    data_item = {
                        'url': url,
                        'title': title,
                        'content': content[:200] + "..." if len(content) > 200 else content,
                        'time': time.strftime('%Y-%m-%d %H:%M:%S')
                    }
                    
                    # 使用锁保护共享的结果列表
                    with self.result_lock:
                        self.result_data.append(data_item)
                        print(f"已处理: {url} - {title}")
                    
                except Exception as e:
                    print(f"处理URL时出错: {url} - {e}")
                
                # 标记任务完成
                self.url_queue.task_done()
                
                # 随机延迟,避免请求过快
                time.sleep(random.uniform(1.0, 2.0))
                
            except queue.Empty:
                # 队列为空,等待一下再试
                time.sleep(0.2)
    
    def crawl(self, max_items=100, timeout=60):
        """
        开始爬取
        :param max_items: 最大爬取项数
        :param timeout: 最大爬取时间(秒)
        :return: 爬取的数据列表
        """
        # 创建并启动生产者线程
        producer_threads = []
        for i in range(self.num_producers):
            t = threading.Thread(target=self.producer)
            t.daemon = True
            t.start()
            producer_threads.append(t)
        
        # 创建并启动消费者线程
        consumer_threads = []
        for i in range(self.num_consumers):
            t = threading.Thread(target=self.consumer)
            t.daemon = True
            t.start()
            consumer_threads.append(t)
        
        # 设置开始时间
        start_time = time.time()
        
        # 主循环
        try:
            while True:
                # 检查是否达到最大项数
                if len(self.result_data) >= max_items:
                    print(f"已达到最大项数限制: {max_items}")
                    break
                
                # 检查是否超时
                if time.time() - start_time > timeout:
                    print(f"爬取超时: {timeout}秒")
                    break
                
                # 检查队列是否为空且所有任务已完成
                if self.url_queue.empty():
                    # 等待一下,让生产者有机会添加新URL
                    time.sleep(2)
                    
                    # 如果队列仍然为空,结束爬取
                    if self.url_queue.empty():
                        print("URL队列为空,爬取完成")
                        break
                
                time.sleep(0.5)
            
        except KeyboardInterrupt:
            print("用户中断爬取")
        
        finally:
            # 设置停止标志并等待线程结束
            self.should_stop = True
            
            for t in producer_threads + consumer_threads:
                t.join(1.0)
            
            print(f"爬取完成,共获取 {len(self.result_data)} 个项目,处理 {len(self.seen_urls)} 个URL")
            return self.result_data

# 使用示例
if __name__ == "__main__":
    seed_urls = [
        "https://news.ycombinator.com/",
        "https://www.theverge.com/"
    ]
    
    crawler = ProducerConsumerCrawler(
        seed_urls=seed_urls,
        max_depth=2,
        num_producers=2,
        num_consumers=4
    )
    
    results = crawler.crawl(max_items=20, timeout=60)
    
    print("\n爬取结果示例:")
    for i, item in enumerate(results[:5]):
        print(f"\n[{i+1}] {item['title']}")
        print(f"URL: {item['url']}")
        print(f"内容预览: {item['content'][:100]}...")
12.1.2 异步爬虫

使用Python的asyncio可以实现高效的异步爬虫:

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin
import logging
import random
from aiohttp import ClientSession

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class AsyncCrawler:
    def __init__(self, seed_urls, max_depth=2, max_urls_per_domain=20):
        """
        初始化异步爬虫
        :param seed_urls: 种子URL列表
        :param max_depth: 最大爬取深度
        :param max_urls_per_domain: 每个域名最大爬取URL数
        """
        self.seed_urls = seed_urls
        self.max_depth = max_depth
        self.max_urls_per_domain = max_urls_per_domain
        
        self.visited_urls = set()
        self.url_queue = asyncio.Queue()
        self.results = []
        
        self.domain_counts = {}  # 域名爬取计数
        self.semaphore = asyncio.Semaphore(10)  # 限制并发请求数
    
    async def fetch(self, url, session, depth):
        """异步获取页面内容"""
        try:
            async with self.semaphore:
                # 添加随机延迟,避免请求过快
                await asyncio.sleep(random.uniform(0.5, 1.5))
                
                async with session.get(url, timeout=10) as response:
                    if response.status != 200:
                        logger.warning(f"获取失败: {url} - 状态码: {response.status}")
                        return None
                    
                    # 获取内容类型,确保是HTML
                    content_type = response.headers.get('Content-Type', '').lower()
                    if 'text/html' not in content_type:
                        logger.warning(f"跳过非HTML内容: {url} - {content_type}")
                        return None
                    
                    # 获取页面内容
                    html = await response.text()
                    
                    # 解析页面
                    soup = BeautifulSoup(html, 'html.parser')
                    
                    # 提取标题
                    title = soup.title.text.strip() if soup.title else "No Title"
                    
                    # 提取内容
                    content = ""
                    article = soup.find('article') or soup.find('div', class_='content')
                    if article:
                        paragraphs = article.find_all('p')
                        content = '\n'.join([p.text.strip() for p in paragraphs])
                    
                    # 提取新链接
                    if depth < self.max_depth:
                        await self.extract_links(soup, url, depth + 1)
                    
                    # 记录结果
                    self.results.append({
                        'url': url,
                        'title': title,
                        'content_preview': content[:200] + '...' if content and len(content) > 200 else content,
                        'depth': depth
                    })
                    
                    logger.info(f"已爬取: {url} - {title}")
                    return html
                    
        except aiohttp.ClientError as e:
            logger.error(f"请求错误: {url} - {e}")
        except asyncio.TimeoutError:
            logger.error(f"请求超时: {url}")
        except Exception as e:
            logger.error(f"爬取错误: {url} - {e}")
        
        return None
    
    async def extract_links(self, soup, base_url, new_depth):
        """从页面提取链接并加入队列"""
        # 从URL中提取域名
        from urllib.parse import urlparse
        base_domain = urlparse(base_url).netloc
        
        # 检查域名限制
        if self.domain_counts.get(base_domain, 0) >= self.max_urls_per_domain:
            return
        
        # 提取所有链接
        for a_tag in soup.find_all('a', href=True):
            href = a_tag['href']
            
            # 跳过锚点和JavaScript链接
            if href.startswith('#') or href.startswith('javascript:'):
                continue
            
            # 构建完整URL
            full_url = urljoin(base_url, href)
            
            # 解析URL
            parsed = urlparse(full_url)
            
            # 只处理HTTP和HTTPS链接
            if not parsed.scheme in ['http', 'https']:
                continue
            
            # 获取域名
            domain = parsed.netloc
            
            # 跳过已访问的URL
            if full_url in self.visited_urls:
                continue
            
            # 标记为已访问
            self.visited_urls.add(full_url)
            
            # 更新域名计数
            self.domain_counts[domain] = self.domain_counts.get(domain, 0) + 1
            
            # 加入队列
            await self.url_queue.put((full_url, new_depth))
    
    async def worker(self, session):
        """工作任务:从队列获取URL并处理"""
        while True:
            try:
                # 从队列获取URL和深度
                url, depth = await asyncio.wait_for(self.url_queue.get(), timeout=5.0)
                
                # 爬取页面
                await self.fetch(url, session, depth)
                
                # 标记任务完成
                self.url_queue.task_done()
                
            except asyncio.TimeoutError:
                # 队列超时,可能是任务已经完成
                break
            except Exception as e:
                logger.error(f"工作任务出错: {e}")
                self.url_queue.task_done()
    
    async def crawl(self, num_workers=10, timeout=60):
        """
        开始爬取
        :param num_workers: 工作任务数
        :param timeout: 爬取超时时间(秒)
        """
        # 将种子URL加入队列
        for url in self.seed_urls:
            self.visited_urls.add(url)
            await self.url_queue.put((url, 0))
        
        # 创建限时任务
        try:
            # 创建客户端会话
            async with aiohttp.ClientSession(
                headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'},
                timeout=aiohttp.ClientTimeout(total=15)  # 15秒超时
            ) as session:
                # 创建工作任务
                tasks = []
                for _ in range(num_workers):
                    task = asyncio.create_task(self.worker(session))
                    tasks.append(task)
                
                # 设置开始时间
                start_time = time.time()
                
                # 等待队列处理完毕或超时
                while not self.url_queue.empty():
                    # 检查超时
                    if time.time() - start_time > timeout:
                        logger.info(f"爬取超时: {timeout}秒")
                        break
                    
                    # 等待一小段时间
                    await asyncio.sleep(0.5)
                
                # 任务完成或超时,取消所有任务
                for task in tasks:
                    task.cancel()
                
                # 等待任务取消
                await asyncio.gather(*tasks, return_exceptions=True)
        
        except Exception as e:
            logger.error(f"爬取过程出错: {e}")
        
        logger.info(f"爬取完成,共获取 {len(self.results)} 个页面,访问 {len(self.visited_urls)} 个URL")
        return self.results

async def main():
    """主函数"""
    seed_urls = [
        "https://news.ycombinator.com/",
        "https://www.theverge.com/"
    ]
    
    crawler = AsyncCrawler(
        seed_urls=seed_urls,
        max_depth=2,
        max_urls_per_domain=10
    )
    
    results = await crawler.crawl(num_workers=5, timeout=30)
    
    # 打印部分结果
    print("\n爬取结果:")
    for i, result in enumerate(results[:5], 1):
        print(f"\n[{i}] {result['title']}")
        print(f"URL: {result['url']}")
        print(f"深度: {result['depth']}")
        print(f"内容预览: {result['content_preview']}")
    
    print(f"\n总共爬取了 {len(results)} 个页面")

if __name__ == "__main__":
    # 运行异步主函数
    asyncio.run(main())

12.2 性能优化技巧

12.2.1 连接池与会话重用

使用连接池和会话重用可以显著提高爬虫性能:

import requests
from concurrent.futures import ThreadPoolExecutor
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
from bs4 import BeautifulSoup

class OptimizedCrawler:
    def __init__(self, max_workers=10, timeout=10, max_retries=3):
        """
        初始化优化爬虫
        :param max_workers: 最大工作线程数
        :param timeout: 请求超时时间
        :param max_retries: 最大重试次数
        """
        self.max_workers = max_workers
        self.timeout = timeout
        
        # 创建会话对象
        self.session = requests.Session()
        
        # 配置重试策略
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=0.5,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "POST"]
        )
        
        # 创建适配器,设置连接池
        adapter = HTTPAdapter(
            max_retries=retry_strategy,
            pool_connections=20,  # 连接池保持的连接数
            pool_maxsize=50       # 连接池最大连接数
        )
        
        # 设置适配器
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
        
        # 设置默认请求头
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            "Accept": "text/html,application/xhtml+xml,application/xml",
            "Accept-Language": "en-US,en;q=0.9",
        })
        
        # 线程池
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    def close(self):
        """关闭资源"""
        self.session.close()
        self.executor.shutdown()
    
    def fetch_url(self, url):
        """
        获取URL内容
        :param url: 要获取的URL
        :return: (URL, 状态码, 内容) 元组或 (URL, 错误信息, None) 元组
        """
        try:
            start_time = time.time()
            response = self.session.get(url, timeout=self.timeout)
            elapsed = time.time() - start_time
            
            print(f"获取 {url} - 状态: {response.status_code} - 用时: {elapsed:.2f}秒")
            
            return (url, response.status_code, response.text)
        except Exception as e:
            print(f"获取 {url} 出错: {e}")
            return (url, str(e), None)
    
    def batch_fetch(self, urls):
        """
        批量获取URL内容
        :param urls: URL列表
        :return: 结果列表
        """
        # 使用线程池提交任务
        futures = [self.executor.submit(self.fetch_url, url) for url in urls]
        
        # 获取结果
        results = []
        for future in futures:
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"获取任务结果时出错: {e}")
        
        return results
    
    def extract_data(self, html, url):
        """
        从HTML中提取数据
        :param html: HTML内容
        :param url: URL
        :return: 提取的数据字典
        """
        try:
            soup = BeautifulSoup(html, 'html.parser')
            
            # 提取标题
            title = soup.title.text.strip() if soup.title else "No Title"
            
            # 提取描述
            description = ""
            meta_desc = soup.find('meta', attrs={'name': 'description'})
            if meta_desc and 'content' in meta_desc.attrs:
                description = meta_desc.attrs['content']
            
            # 提取链接
            links = []
            for a in soup.find_all('a', href=True):
                links.append(a['href'])
            
            return {
                'url': url,
                'title': title,
                'description': description,
                'links': links[:10]  # 限制只返回前10个链接
            }
        except Exception as e:
            print(f"提取数据出错: {url} - {e}")
            return {
                'url': url,
                'error': str(e)
            }
    
    def crawl_and_extract(self, urls):
        """
        爬取URL并提取数据
        :param urls: URL列表
        :return: 提取的数据列表
        """
        # 批量获取URL内容
        fetch_results = self.batch_fetch(urls)
        
        # 提取数据
        data = []
        for url, status, html in fetch_results:
            if html:  # 如果有内容
                result = self.extract_data(html, url)
                data.append(result)
        
        return data

# 使用示例
if __name__ == "__main__":
    urls = [
        "https://www.python.org",
        "https://www.github.com",
        "https://www.wikipedia.org",
        "https://www.reddit.com",
        "https://news.ycombinator.com"
    ]
    
    crawler = OptimizedCrawler(max_workers=3, timeout=5)
    
    try:
        print("开始爬取...")
        start_time = time.time()
        
        results = crawler.crawl_and_extract(urls)
        
        elapsed = time.time() - start_time
        print(f"\n爬取完成,用时: {elapsed:.2f}秒,获取了 {len(results)} 个页面")
        
        # 打印结果
        for i, result in enumerate(results, 1):
            print(f"\n[{i}] {result['title']}")
            print(f"URL: {result['url']}")
            if 'description' in result:
                print(f"描述: {result['description'][:100]}...")
            if 'links' in result and result['links']:
                print(f"部分链接: {', '.join(result['links'][:3])}")
    
    finally:
        # 关闭资源
        crawler.close()
12.2.2 数据压缩与内存优化

处理大量数据时,压缩和内存优化非常重要:

import gzip
import json
import time
import os
import tempfile
from bs4 import BeautifulSoup
import psutil
import requests
from memory_profiler import profile

class MemoryEfficientCrawler:
    def __init__(self, use_compression=True, batch_size=100):
        """
        初始化内存高效爬虫
        :param use_compression: 是否使用压缩
        :param batch_size: 批处理大小
        """
        self.use_compression = use_compression
        self.batch_size = batch_size
        self.temp_dir = tempfile.mkdtemp()
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    def log_memory_usage(self, label):
        """记录内存使用情况"""
        process = psutil.Process(os.getpid())
        mem_info = process.memory_info()
        print(f"内存使用 [{label}]: {mem_info.rss / 1024 / 1024:.2f} MB")
    
    def fetch_url(self, url):
        """获取URL内容"""
        try:
            response = self.session.get(url, timeout=10)
            return response.text
        except Exception as e:
            print(f"获取URL出错: {url} - {e}")
            return None
    
    def extract_data(self, html, url):
        """从HTML中提取数据"""
        if not html:
            return None
        
        soup = BeautifulSoup(html, 'html.parser')
        
        title = soup.title.text.strip() if soup.title else "No Title"
        
        # 提取主要文本
        main_text = ""
        for p in soup.find_all('p'):
            main_text += p.text + "\n"
        
        return {
            'url': url,
            'title': title,
            'text': main_text,
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
        }
    
    def save_batch(self, batch, batch_num):
        """保存一批数据到临时文件"""
        if not batch:
            return
        
        filename = os.path.join(self.temp_dir, f"batch_{batch_num}.json")
        
        if self.use_compression:
            # 压缩保存
            with gzip.open(filename + '.gz', 'wt', encoding='utf-8') as f:
                json.dump(batch, f)
        else:
            # 不压缩保存
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(batch, f)
    
    def load_batch(self, batch_num):
        """从临时文件加载一批数据"""
        filename = os.path.join(self.temp_dir, f"batch_{batch_num}.json")
        
        if self.use_compression:
            try:
                with gzip.open(filename + '.gz', 'rt', encoding='utf-8') as f:
                    return json.load(f)
            except FileNotFoundError:
                return []
        else:
            try:
                with open(filename, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except FileNotFoundError:
                return []
    
    @profile
    def crawl(self, urls):
        """爬取URL列表并提取数据"""
        self.log_memory_usage("开始")
        
        result_count = 0
        batch_count = 0
        current_batch = []
        
        # 爬取和处理URL
        for i, url in enumerate(urls):
            print(f"处理 {i+1}/{len(urls)}: {url}")
            
            # 获取页面内容
            html = self.fetch_url(url)
            
            # 从HTML中提取数据
            data = self.extract_data(html, url)
            
            if data:
                current_batch.append(data)
                result_count += 1
            
            # 如果当前批次达到批处理大小,保存并清空
            if len(current_batch) >= self.batch_size:
                self.save_batch(current_batch, batch_count)
                batch_count += 1
                current_batch = []
                
                # 记录内存使用
                self.log_memory_usage(f"Batch {batch_count}")
        
        # 保存最后的批次
        if current_batch:
            self.save_batch(current_batch, batch_count)
            batch_count += 1
        
        self.log_memory_usage("爬取完成")
        
        # 处理结果
        all_results = []
        
        # 顺序加载所有批次
        for i in range(batch_count):
            batch_data = self.load_batch(i)
            all_results.extend(batch_data)
            
            # 使用后删除批次文件以释放空间
            if self.use_compression:
                try:
                    os.remove(os.path.join(self.temp_dir, f"batch_{i}.json.gz"))
                except:
                    pass
            else:
                try:
                    os.remove(os.path.join(self.temp_dir, f"batch_{i}.json"))
                except:
                    pass
        
        self.log_memory_usage("处理完成")
        
        print(f"爬取完成:处理了 {len(urls)} 个URL,成功获取 {result_count} 条数据")
        return all_results
    
    def cleanup(self):
        """清理临时文件"""
        try:
            # 删除临时目录
            for file in os.listdir(self.temp_dir):
                os.remove(os.path.join(self.temp_dir, file))
            os.rmdir(self.temp_dir)
            print(f"已清理临时目录: {self.temp_dir}")
        except Exception as e:
            print(f"清理临时文件时出错: {e}")

# 使用示例
if __name__ == "__main__":
    # 生成测试URL列表
    test_urls = [
        "https://en.wikipedia.org/wiki/Python_(programming_language)",
        "https://en.wikipedia.org/wiki/Web_scraping",
        "https://en.wikipedia.org/wiki/Data_mining",
        "https://en.wikipedia.org/wiki/Natural_language_processing",
        "https://en.wikipedia.org/wiki/Machine_learning"
    ]
    
    # 创建爬虫实例
    crawler = MemoryEfficientCrawler(use_compression=True, batch_size=2)
    
    try:
        # 执行爬取
        results = crawler.crawl(test_urls)
        
        # 打印结果
        print(f"\n获取了 {len(results)} 条结果:")
        for i, result in enumerate(results[:3], 1):  # 只打印前3条
            print(f"\n[{i}] {result['title']}")
            print(f"URL: {result['url']}")
            text_preview = result['text'][:100].replace('\n', ' ')
            print(f"文本预览: {text_preview}...")
    
    finally:
        # 清理资源
        crawler.cleanup()
12.2.3 缓存策略

实现高效的缓存机制可以减少重复请求:

import hashlib
import os
import pickle
import time
import requests
from bs4 import BeautifulSoup
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class CachedCrawler:
    def __init__(self, cache_dir='.cache', expiry_hours=24):
        """
        初始化带缓存的爬虫
        :param cache_dir: 缓存目录
        :param expiry_hours: 缓存过期时间(小时)
        """
        self.cache_dir = cache_dir
        self.expiry_seconds = expiry_hours * 3600
        
        # 创建缓存目录
        os.makedirs(cache_dir, exist_ok=True)
        
        # 创建会话
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        
        # 缓存统计
        self.cache_hits = 0
        self.cache_misses = 0
    
    def _get_cache_key(self, url):
        """生成URL的缓存键"""
        # 使用URL的MD5哈希作为缓存键
        return hashlib.md5(url.encode()).hexdigest()
    
    def _get_cache_path(self, cache_key):
        """获取缓存文件路径"""
        return os.path.join(self.cache_dir, f"{cache_key}.pkl")
    
    def _save_to_cache(self, cache_key, data):
        """保存数据到缓存"""
        cache_path = self._get_cache_path(cache_key)
        try:
            # 序列化数据
            cache_data = {
                'timestamp': time.time(),
                'data': data
            }
            
            with open(cache_path, 'wb') as f:
                pickle.dump(cache_data, f)
            
            return True
        except Exception as e:
            logger.error(f"保存缓存失败: {e}")
            return False
    
    def _load_from_cache(self, cache_key):
        """从缓存加载数据"""
        cache_path = self._get_cache_path(cache_key)
        
        # 检查缓存文件是否存在
        if not os.path.exists(cache_path):
            return None
        
        try:
            # 加载缓存数据
            with open(cache_path, 'rb') as f:
                cache_data = pickle.load(f)
            
            # 检查是否过期
            if time.time() - cache_data['timestamp'] > self.expiry_seconds:
                logger.debug(f"缓存已过期: {cache_key}")
                return None
            
            return cache_data['data']
        except Exception as e:
            logger.error(f"加载缓存失败: {e}")
            return None
    
    def fetch_url(self, url, bypass_cache=False):
        """
        获取URL内容,优先使用缓存
        :param url: 要获取的URL
        :param bypass_cache: 是否绕过缓存,强制重新获取
        :return: (状态码, 内容) 元组
        """
        # 生成缓存键
        cache_key = self._get_cache_key(url)
        
        # 如果不强制绕过缓存,尝试从缓存加载
        if not bypass_cache:
            cached_data = self._load_from_cache(cache_key)
            if cached_data:
                self.cache_hits += 1
                logger.info(f"缓存命中: {url}")
                return cached_data
        
        # 缓存未命中或强制绕过缓存,发送请求
        self.cache_misses += 1
        logger.info(f"获取URL: {url}")
        
        try:
            response = self.session.get(url, timeout=10)
            data = (response.status_code, response.text)
            
            # 请求成功,保存到缓存
            if response.status_code == 200:
                self._save_to_cache(cache_key, data)
            
            return data
        except Exception as e:
            logger.error(f"请求失败: {url} - {e}")
            return (0, None)
    
    def extract_data(self, html):
        """从HTML提取数据"""
        if not html:
            return None
        
        soup = BeautifulSoup(html, 'html.parser')
        
        # 提取标题
        title = soup.title.text.strip() if soup.title else "No Title"
        
        # 提取描述
        description = ""
        meta_desc = soup.find('meta', attrs={'name': 'description'})
        if meta_desc and 'content' in meta_desc.attrs:
            description = meta_desc['content']
        
        # 提取正文内容 (简单示例)
        content = ""
        main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')
        if main_content:
            paragraphs = main_content.find_all('p')
            content = '\n'.join([p.text for p in paragraphs])
        
        # 提取链接
        links = []
        for a in soup.find_all('a', href=True):
            href = a['href']
            if href.startswith('http'):
                links.append(href)
        
        return {
            'title': title,
            'description': description,
            'content_preview': content[:200] + '...' if len(content) > 200 else content,
            'links': links[:10]  # 最多10个链接
        }
    
    def crawl_with_cache(self, urls, force_refresh=False):
        """
        使用缓存爬取多个URL
        :param urls: URL列表
        :param force_refresh: 是否强制刷新缓存
        :return: 结果列表
        """
        results = []
        
        for url in urls:
            # 获取URL内容
            status_code, html = self.fetch_url(url, bypass_cache=force_refresh)
            
            # 如果请求成功,提取数据
            if status_code == 200 and html:
                data = self.extract_data(html)
                if data:
                    # 添加URL和状态码
                    data['url'] = url
                    data['status_code'] = status_code
                    results.append(data)
            else:
                logger.warning(f"无法获取内容: {url} - 状态码: {status_code}")
        
        # 打印缓存统计
        total_requests = self.cache_hits + self.cache_misses
        hit_rate = self.cache_hits / total_requests * 100 if total_requests > 0 else 0
        
        logger.info(f"缓存统计: 命中 {self.cache_hits} 次,未命中 {self.cache_misses} 次,命中率 {hit_rate:.2f}%")
        
        return results
    
    def clear_cache(self, older_than_hours=None):
        """
        清理缓存
        :param older_than_hours: 如果设置,只清理早于指定小时数的缓存
        :return: 已删除的缓存文件数
        """
        count = 0
        now = time.time()
        
        for filename in os.listdir(self.cache_dir):
            if not filename.endswith('.pkl'):
                continue
            
            file_path = os.path.join(self.cache_dir, filename)
            
            # 如果指定了时间阈值
            if older_than_hours is not None:
                try:
                    # 加载缓存数据以获取时间戳
                    with open(file_path, 'rb') as f:
                        cache_data = pickle.load(f)
                    
                    # 检查是否早于阈值
                    if now - cache_data['timestamp'] < older_than_hours * 3600:
                        continue
                except:
                    # 如果无法读取文件,默认删除
                    pass
            
            # 删除文件
            try:
                os.remove(file_path)
                count += 1
            except Exception as e:
                logger.error(f"删除缓存文件失败: {file_path} - {e}")
        
        logger.info(f"已清理 {count} 个缓存文件")
        return count

# 使用示例
if __name__ == "__main__":
    # 创建缓存爬虫
    crawler = CachedCrawler(cache_dir='./.url_cache', expiry_hours=48)
    
    # 测试URL
    urls = [
        "https://www.python.org",
        "https://docs.python.org/3/",
        "https://pypi.org"
    ]
    
    # 第一次运行,应该都是缓存未命中
    print("\n第一次运行 (应该全部从网络获取):")
    results1 = crawler.crawl_with_cache(urls)
    
    # 打印部分结果
    for i, result in enumerate(results1, 1):
        print(f"\n[{i}] {result['title']}")
        print(f"URL: {result['url']}")
        print(f"描述: {result['description'][:100]}...")
    
    # 第二次运行,应该都是缓存命中
    print("\n第二次运行 (应该全部从缓存获取):")
    results2 = crawler.crawl_with_cache(urls)
    
    # 强制刷新缓存
    print("\n第三次运行 (强制刷新缓存):")
    results3 = crawler.crawl_with_cache(urls, force_refresh=True)
    
    # 清理过期缓存
    print("\n清理超过24小时的缓存:")
    deleted_count = crawler.clear_cache(older_than_hours=24)

12.3 分布式爬虫

12.3.1 使用Redis实现简单的分布式爬虫

import redis
import json
import time
import random
import requests
from bs4 import BeautifulSoup
import threading
import logging
from urllib.parse import urlparse, urljoin

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class DistributedCrawler:
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        """
        初始化分布式爬虫
        :param redis_host: Redis主机地址
        :param redis_port: Redis端口
        :param redis_db: Redis数据库
        """
        # 连接Redis
        self.redis = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
        
        # 队列名称
        self.url_queue_key = 'crawler:urls'
        self.result_key = 'crawler:results'
        self.visited_key = 'crawler:visited'
        self.worker_activity_key = 'crawler:worker_activity'
        
        # 会话对象
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        
        # 爬虫ID
        self.worker_id = f"worker-{random.randint(1000, 9999)}"
        logger.info(f"工作节点ID: {self.worker_id}")
    
    def add_seed_urls(self, urls):
        """
        添加种子URL到队列
        :param urls: URL列表
        """
        pipe = self.redis.pipeline()
        
        for url in urls:
            # 如果URL未被访问过,添加到队列
            if not self.redis.sismember(self.visited_key, url):
                # 将URL添加到队列
                pipe.lpush(self.url_queue_key, url)
                logger.info(f"添加种子URL: {url}")
        
        pipe.execute()
    
    def mark_url_processed(self, url):
        """
        标记URL为已处理
        :param url: 处理完的URL
        """
        self.redis.sadd(self.visited_key, url)
    
    def get_url_to_process(self, timeout=0):
        """
        从队列中获取一个URL处理
        :param timeout: 阻塞超时时间(秒)
        :return: URL或None
        """
        result = self.redis.brpop(self.url_queue_key, timeout)
        if result:
            return result[1].decode('utf-8')
        return None
    
    def save_result(self, result):
        """
        保存爬取结果
        :param result: 爬取结果字典
        """
        if not result:
            return
        
        # 序列化结果
        result_json = json.dumps(result)
        
        # 保存到Redis列表
        self.redis.lpush(self.result_key, result_json)
        
        # 限制结果列表长度,防止内存溢出
        self.redis.ltrim(self.result_key, 0, 9999)  # 保留最新的10000条结果
    
    def update_worker_status(self, status):
        """
        更新工作节点状态
        :param status: 状态信息
        """
        status_data = {
            'id': self.worker_id,
            'timestamp': time.time(),
            'status': status
        }
        
        self.redis.hset(self.worker_activity_key, self.worker_id, json.dumps(status_data))
        
        # 设置过期时间,如果节点停止更新状态,将自动从活动列表中移除
        self.redis.expire(self.worker_activity_key, 60)  # 60秒过期
    
    def fetch_url(self, url):
        """
        获取URL内容
        :param url: 要获取的URL
        :return: 响应对象或None
        """
        try:
            # 随机延迟,避免请求过快
            time.sleep(random.uniform(1.0, 3.0))
            
            # 发送请求
            response = self.session.get(url, timeout=10)
            return response
        except Exception as e:
            logger.error(f"获取URL出错: {url} - {e}")
            return None
    
    def extract_links(self, html, base_url):
        """
        从HTML中提取链接
        :param html: HTML内容
        :param base_url: 基础URL
        :return: 链接列表
        """
        if not html:
            return []
        
        try:
            soup = BeautifulSoup(html, 'html.parser')
            links = []
            
            # 解析URL域名
            base_domain = urlparse(base_url).netloc
            
            # 提取所有链接
            for a in soup.find_all('a', href=True):
                href = a['href']
                
                # 跳过空链接、锚点和JavaScript链接
                if not href or href.startswith('#') or href.startswith('javascript:'):
                    continue
                
                # 构建完整URL
                full_url = urljoin(base_url, href)
                parsed = urlparse(full_url)
                
                # 只保留HTTP和HTTPS链接
                if not parsed.scheme in ['http', 'https']:
                    continue
                
                # 只保留同域名链接
                if parsed.netloc == base_domain:
                    # 规范化URL
                    normalized_url = full_url.split('#')[0]  # 移除锚点
                    links.append(normalized_url)
            
            return links
        except Exception as e:
            logger.error(f"提取链接出错: {e}")
            return []
    
    def extract_data(self, html, url):
        """
        从HTML中提取数据
        :param html: HTML内容
        :param url: URL
        :return: 提取的数据
        """
        if not html:
            return None
        
        try:
            soup = BeautifulSoup(html, 'html.parser')
            
            # 提取标题
            title = soup.title.text.strip() if soup.title else "No Title"
            
            # 提取页面元信息
            description = ""
            keywords = ""
            
            meta_desc = soup.find('meta', attrs={'name': 'description'})
            if meta_desc and 'content' in meta_desc.attrs:
                description = meta_desc['content']
            
            meta_keywords = soup.find('meta', attrs={'name': 'keywords'})
            if meta_keywords and 'content' in meta_keywords.attrs:
                keywords = meta_keywords['content']
            
            # 提取正文内容
            content = ""
            main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')
            if main_content:
                # 提取所有段落
                paragraphs = main_content.find_all('p')
                content = '\n'.join([p.text.strip() for p in paragraphs])
            
            # 如果找不到主要内容,尝试提取所有段落
            if not content:
                paragraphs = soup.find_all('p')
                content = '\n'.join([p.text.strip() for p in paragraphs])
            
            # 构建结果
            return {
                'url': url,
                'title': title,
                'description': description,
                'keywords': keywords,
                'content': content[:1000] + '...' if len(content) > 1000 else content,
                'crawl_time': time.strftime('%Y-%m-%d %H:%M:%S')
            }
        except Exception as e:
            logger.error(f"提取数据出错: {url} - {e}")
            return None
    
    def process_url(self, url, max_depth=2, current_depth=0):
        """
        处理单个URL
        :param url: 要处理的URL
        :param max_depth: 最大抓取深度
        :param current_depth: 当前深度
        """
        if not url:
            return
        
        # 更新工作节点状态
        self.update_worker_status(f"处理: {url}")
        
        # 检查URL是否已被处理
        if self.redis.sismember(self.visited_key, url):
            logger.debug(f"URL已处理: {url}")
            return
        
        # 获取URL内容
        logger.info(f"获取URL: {url}")
        response = self.fetch_url(url)
        
        if not response or response.status_code != 200:
            logger.warning(f"获取URL失败: {url} - 状态码: {response.status_code if response else 'N/A'}")
            self.mark_url_processed(url)
            return
        
        # 从HTML中提取数据
        data = self.extract_data(response.text, url)
        if data:
            self.save_result(data)
        
        # 如果未达到最大深度,提取并添加新链接
        if current_depth < max_depth:
            # 提取链接
            links = self.extract_links(response.text, url)
            
            # 添加新链接到队列
            pipe = self.redis.pipeline()
            added_count = 0
            
            for link in links:
                # 检查链接是否已被处理
                if not self.redis.sismember(self.visited_key, link):
                    pipe.lpush(self.url_queue_key, link)
                    added_count += 1
            
            pipe.execute()
            logger.info(f"添加了 {added_count} 个新链接到队列")
        
        # 标记URL为已处理
        self.mark_url_processed(url)
    
    def start_worker(self, max_depth=2, timeout=60):
        """
        启动工作节点
        :param max_depth: 最大抓取深度
        :param timeout: 如果没有URL可处理,等待多少秒后退出
        """
        logger.info(f"启动工作节点: {self.worker_id}")
        
        # 更新工作节点状态
        self.update_worker_status("启动中")
        
        start_time = time.time()
        last_url_time = start_time
        processed_count = 0
        
        try:
            while True:
                # 更新工作节点状态
                self.update_worker_status(f"活跃 ({processed_count}个URL)")
                
                # 从队列获取URL
                url = self.get_url_to_process(timeout=1)
                
                if url:
                    # 处理URL
                    self.process_url(url, max_depth=max_depth)
                    processed_count += 1
                    last_url_time = time.time()
                else:
                    # 检查是否超时
                    if time.time() - last_url_time > timeout:
                        logger.info(f"超过 {timeout} 秒没有新URL,退出")
                        break
                
                # 避免CPU空转
                time.sleep(0.1)
        except KeyboardInterrupt:
            logger.info("收到中断信号,停止爬虫")
        except Exception as e:
            logger.error(f"爬虫出错: {e}")
        finally:
            # 更新工作节点状态
            self.update_worker_status(f"已停止 (处理了 {processed_count} 个URL)")
            logger.info(f"工作节点停止: {self.worker_id},处理了 {processed_count} 个URL")
    
    def stats(self):
        """获取爬虫统计信息"""
        # 获取队列长度
        url_queue_len = self.redis.llen(self.url_queue_key)
        
        # 获取已访问URL数量
        visited_count = self.redis.scard(self.visited_key)
        
        # 获取结果数量
        result_count = self.redis.llen(self.result_key)
        
        # 获取活动工作节点
        workers = []
        for worker_id, worker_data in self.redis.hgetall(self.worker_activity_key).items():
            try:
                worker_info = json.loads(worker_data)
                worker_info['id'] = worker_id.decode('utf-8')
                workers.append(worker_info)
            except:
                pass
        
        return {
            'pending_urls': url_queue_len,
            'visited_urls': visited_count,
            'results': result_count,
            'active_workers': len(workers),
            'workers': workers
        }
    
    def get_results(self, limit=10):
        """
        获取最新爬取结果
        :param limit: 返回的最大结果数
        :return: 结果列表
        """
        results = []
        
        # 获取最新结果
        result_jsons = self.redis.lrange(self.result_key, 0, limit - 1)
        
        for result_json in result_jsons:
            try:
                result = json.loads(result_json)
                results.append(result)
            except:
                continue
        
        return results

# 使用示例
if __name__ == "__main__":
    # 创建分布式爬虫
    crawler = DistributedCrawler(redis_host='localhost', redis_port=6379)
    
    # 如果是主节点,添加种子URL
    is_master = input("是否为主节点? (y/n): ").lower() == 'y'
    
    if is_master:
        # 添加种子URL
        seed_urls = [
            "https://www.python.org",
            "https://docs.python.org",
            "https://pypi.org"
        ]
        
        crawler.add_seed_urls(seed_urls)
        
        # 显示初始统计信息
        stats = crawler.stats()
        print(f"初始统计: {stats}")
        
        # 启动一个监控线程
        def monitor_stats():
            while True:
                try:
                    stats = crawler.stats()
                    print(f"\n爬虫统计 - {time.strftime('%H:%M:%S')}:")
                    print(f"待处理URL: {stats['pending_urls']}")
                    print(f"已访问URL: {stats['visited_urls']}")
                    print(f"结果数量: {stats['results']}")
                    print(f"活动工作节点: {stats['active_workers']}")
                    
                    # 显示最近结果
                    results = crawler.get_results(5)
                    if results:
                        print("\n最新结果:")
                        for i, result in enumerate(results, 1):
                            print(f"{i}. {result['title']} - {result['url']}")
                    
                    time.sleep(5)
                except KeyboardInterrupt:
                    break
                except Exception as e:
                    print(f"监控出错: {e}")
                    time.sleep(5)
        
        # 启动监控线程
        monitor_thread = threading.Thread(target=monitor_stats)
        monitor_thread.daemon = True
        monitor_thread.start()
    
    # 启动工作节点
    max_depth = 2
    timeout = 60
    
    print(f"启动工作节点,最大深度: {max_depth},超时: {timeout}秒")
    crawler.start_worker(max_depth=max_depth, timeout=timeout)
    
    # 结束后显示统计信息
    if is_master:
        stats = crawler.stats()
        print(f"\n最终统计:")
        print(f"待处理URL: {stats['pending_urls']}")
        print(f"已访问URL: {stats['visited_urls']}")
        print(f"结果数量: {stats['results']}")
12.3.2 使用Scrapy和ScrapyRT实现API分布式爬虫

# 这个示例需要先安装Scrapy和ScrapyRT
# pip install scrapy scrapyrt

# 1. 创建Scrapy项目
# scrapy startproject distributed_crawler
# cd distributed_crawler
# scrapy genspider example example.com

# 2. 编辑items.py
"""
# distributed_crawler/distributed_crawler/items.py
import scrapy

class WebsiteItem(scrapy.Item):
    url = scrapy.Field()
    title = scrapy.Field()
    description = scrapy.Field()
    keywords = scrapy.Field()
    content = scrapy.Field()
    links = scrapy.Field()
    timestamp = scrapy.Field()
"""

# 3. 编辑spider
"""
# distributed_crawler/distributed_crawler/spiders/website.py
import scrapy
import time
from urllib.parse import urlparse, urljoin
from distributed_crawler.items import WebsiteItem

class WebsiteSpider(scrapy.Spider):
    name = 'website'
    
    # 这些设置可以在请求中覆盖
    custom_settings = {
        'DOWNLOAD_DELAY': 1,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 4,
    }
    
    def __init__(self, *args, **kwargs):
        super(WebsiteSpider, self).__init__(*args, **kwargs)
        # 从参数中获取URL
        url = kwargs.get('url')
        if url:
            self.start_urls = [url]
        self.allowed_domains = []  # 将在start_requests中设置
    
    def start_requests(self):
        for url in self.start_urls:
            # 设置允许的域名
            parsed_url = urlparse(url)
            domain = parsed_url.netloc
            self.allowed_domains.append(domain)
            
            yield scrapy.Request(url=url, callback=self.parse)
    
    def parse(self, response):
        # 提取页面信息
        item = WebsiteItem()
        item['url'] = response.url
        item['title'] = response.css('title::text').get()
        
        # 提取元信息
        item['description'] = response.css('meta[name="description"]::attr(content)').get()
        item['keywords'] = response.css('meta[name="keywords"]::attr(content)').get()
        
        # 提取内容
        content_selector = response.css('main') or response.css('article') or response.css('div.content')
        if content_selector:
            paragraphs = content_selector.css('p::text').getall()
            item['content'] = '\\n'.join([p.strip() for p in paragraphs if p.strip()])
        else:
            # 如果找不到主要内容容器,尝试获取所有段落
            paragraphs = response.css('p::text').getall()
            item['content'] = '\\n'.join([p.strip() for p in paragraphs if p.strip()])
        
        # 提取链接
        links = []
        for href in response.css('a::attr(href)').getall():
            full_url = urljoin(response.url, href)
            parsed = urlparse(full_url)
            # 只保留同域名的HTTP/HTTPS链接
            if parsed.scheme in ['http', 'https'] and parsed.netloc in self.allowed_domains:
                links.append(full_url)
        
        item['links'] = links
        item['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S')
        
        yield item
        
        # 继续爬取链接 (可选,取决于是否要在API中支持自动爬取)
        crawl_links = self.settings.getbool('CRAWL_LINKS', False)
        if crawl_links:
            for url in links:
                yield scrapy.Request(url=url, callback=self.parse)
"""

# 4. 编辑settings.py
"""
# distributed_crawler/distributed_crawler/settings.py
BOT_NAME = 'distributed_crawler'

SPIDER_MODULES = ['distributed_crawler.spiders']
NEWSPIDER_MODULE = 'distributed_crawler.spiders'

# 爬虫设置
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
ROBOTSTXT_OBEY = True
DOWNLOAD_DELAY = 1
CONCURRENT_REQUESTS_PER_DOMAIN = 4

# 启用或禁用Spider中间件
# SPIDER_MIDDLEWARES = {
#    'distributed_crawler.middlewares.DistributedCrawlerSpiderMiddleware': 543,
# }

# 启用或禁用下载器中间件
DOWNLOADER_MIDDLEWARES = {
   'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
   'scrapy.downloadermiddlewares.retry.RetryMiddleware': 500,
}

# ScrapyRT设置
CRAWL_LINKS = False  # 默认不自动爬取链接
"""

# 5. 创建客户端代码
import requests
import json
import threading
import time
import random

class ScrapyRTClient:
    def __init__(self, scrapyrt_url="http://localhost:9080"):
        """
        初始化ScrapyRT客户端
        :param scrapyrt_url: ScrapyRT服务的URL
        """
        self.scrapyrt_url = scrapyrt_url
    
    def crawl_url(self, url, spider_name="website", crawl_links=False):
        """
        爬取单个URL
        :param url: 要爬取的URL
        :param spider_name: 要使用的Spider名称
        :param crawl_links: 是否爬取链接
        :return: 爬取结果
        """
        params = {
            "spider_name": spider_name,
            "url": url,
            "crawl_links": crawl_links
        }
        
        try:
            response = requests.get(f"{self.scrapyrt_url}/crawl.json", params=params, timeout=30)
            if response.status_code == 200:
                return response.json()
            else:
                print(f"爬取失败: {url} - 状态码: {response.status_code}")
                return None
        except Exception as e:
            print(f"爬取出错: {url} - {e}")
            return None
    
    def batch_crawl(self, urls, spider_name="website", max_concurrency=5):
        """
        批量爬取URL
        :param urls: URL列表
        :param spider_name: 要使用的Spider名称
        :param max_concurrency: 最大并发请求数
        :return: 爬取结果列表
        """
        results = []
        threads = []
        results_lock = threading.Lock()
        
        def worker(url):
            result = self.crawl_url(url, spider_name)
            if result:
                with results_lock:
                    results.append(result)
        
        # 创建线程
        for url in urls:
            thread = threading.Thread(target=worker, args=(url,))
            threads.append(thread)
        
        # 控制并发
        active_threads = []
        for thread in threads:
            if len(active_threads) >= max_concurrency:
                # 等待一个线程完成
                old_thread = active_threads.pop(0)
                old_thread.join()
            
            # 启动新线程
            thread.start()
            active_threads.append(thread)
            
            # 随机延迟,避免同时发起太多请求
            time.sleep(random.uniform(0.1, 0.5))
        
        # 等待所有线程完成
        for thread in active_threads:
            thread.join()
        
        return results

# 使用示例 - 启动ScrapyRT服务
"""
要使用这个客户端,需要先启动ScrapyRT服务:
cd distributed_crawler
scrapyrt -p 9080
"""

# 客户端使用示例
if __name__ == "__main__":
    client = ScrapyRTClient("http://localhost:9080")
    
    # 爬取单个URL
    result = client.crawl_url("https://www.python.org")
    
    if result:
        # 打印爬取结果
        items = result.get('items', [])
        if items:
            item = items[0]
            print(f"标题: {item.get('title')}")
            print(f"描述: {item.get('description')}")
            print(f"内容预览: {item.get('content', '')[:200]}...")
            print(f"找到 {len(item.get('links', []))} 个链接")
    
    # 批量爬取
    urls = [
        "https://www.python.org",
        "https://docs.python.org",
        "https://pypi.org"
    ]
    
    print("\n批量爬取...")
    batch_results = client.batch_crawl(urls, max_concurrency=2)
    
    print(f"爬取了 {len(batch_results)} 个URL")
    for i, result in enumerate(batch_results, 1):
        items = result.get('items', [])
        if items:
            item = items[0]
            print(f"\n[{i}] {item.get('title')} - {item.get('url')}")

12.4 监控与日志系统

建立完善的监控系统对于大型爬虫项目至关重要:

import logging
import time
import threading
import json
import os
import sqlite3
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from collections import defaultdict, deque

class CrawlerMonitor:
    def __init__(self, db_file='crawler_monitor.db', log_file='crawler.log'):
        """
        初始化爬虫监控系统
        :param db_file: 数据库文件路径
        :param log_file: 日志文件路径
        """
        # 设置日志
        self.logger = self._setup_logger(log_file)
        
        # 初始化数据库
        self.db_file = db_file
        self._init_db()
        
        # 监控数据
        self.stats = {
            'requests': 0,
            'success': 0,
            'errors': 0,
            'bytes_downloaded': 0
        }
        
        # 实时监控数据(保留最近一小时的数据,10秒一个点)
        self.time_series = {
            'timestamps': deque(maxlen=360),
            'requests': deque(maxlen=360),
            'success': deque(maxlen=360),
            'errors': deque(maxlen=360)
        }
        
        # 错误计数器
        self.error_counts = defaultdict(int)
        
        # 域名请求计数
        self.domain_requests = defaultdict(int)
        
        # 锁,用于线程安全
        self.lock = threading.Lock()
        
        # 启动监控线程
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.is_running = True
        self.monitor_thread.start()
        
        self.logger.info("爬虫监控系统已初始化")
    
    def _setup_logger(self, log_file):
        """设置日志系统"""
        logger = logging.getLogger('crawler_monitor')
        logger.setLevel(logging.INFO)
        
        # 文件处理器
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.INFO)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # 格式化器
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        # 添加处理器
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        
        return logger
    
    def _init_db(self):
        """初始化数据库"""
        try:
            conn = sqlite3.connect(self.db_file)
            cursor = conn.cursor()
            
            # 创建请求日志表
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS request_log (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT,
                method TEXT,
                status_code INTEGER,
                response_time REAL,
                content_length INTEGER,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
            ''')
            
            # 创建错误日志表
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS error_log (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT,
                error_type TEXT,
                error_message TEXT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
            ''')
            
            # 创建统计数据表
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS stats (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                requests INTEGER,
                success INTEGER,
                errors INTEGER,
                bytes_downloaded INTEGER,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
            ''')
            
            conn.commit()
            conn.close()
            
        except Exception as e:
            self.logger.error(f"初始化数据库出错: {e}")
    
    def log_request(self, url, method="GET", status_code=None, response_time=None, content_length=0):
        """
        记录请求
        :param url: 请求的URL
        :param method: 请求方法
        :param status_code: 状态码
        :param response_time: 响应时间(秒)
        :param content_length: 内容长度(字节)
        """
        with self.lock:
            # 更新统计数据
            self.stats['requests'] += 1
            
            if status_code and 200 <= status_code < 400:
                self.stats['success'] += 1
            elif status_code:
                self.stats['errors'] += 1
            
            self.stats['bytes_downloaded'] += content_length
            
            # 更新域名请求计数
            from urllib.parse import urlparse
            domain = urlparse(url).netloc
            self.domain_requests[domain] += 1
        
        # 记录到数据库
        try:
            conn = sqlite3.connect(self.db_file)
            cursor = conn.cursor()
            
            cursor.execute('''
            INSERT INTO request_log (url, method, status_code, response_time, content_length)
            VALUES (?, ?, ?, ?, ?)
            ''', (url, method, status_code, response_time, content_length))
            
            conn.commit()
            conn.close()
            
        except Exception as e:
            self.logger.error(f"记录请求出错: {e}")
    
    def log_error(self, url, error_type, error_message):
        """
        记录错误
        :param url: 请求的URL
        :param error_type: 错误类型
        :param error_message: 错误消息
        """
        with self.lock:
            # 更新错误计数器
            self.error_counts[error_type] += 1
        
        # 记录到数据库
        try:
            conn = sqlite3.connect(self.db_file)
            cursor = conn.cursor()
            
            cursor.execute('''
            INSERT INTO error_log (url, error_type, error_message)
            VALUES (?, ?, ?)
            ''', (url, error_type, error_message))
            
            conn.commit()
            conn.close()
            
        except Exception as e:
            self.logger.error(f"记录错误出错: {e}")
    
    def _monitor_loop(self):
        """监控线程主循环"""
        last_save = time.time()
        
        while self.is_running:
            try:
                # 每10秒记录一次时间序列数据
                with self.lock:
                    current_time = time.time()
                    self.time_series['timestamps'].append(current_time)
                    self.time_series['requests'].append(self.stats['requests'])
                    self.time_series['success'].append(self.stats['success'])
                    self.time_series['errors'].append(self.stats['errors'])
                
                # 每分钟保存一次统计数据到数据库
                if time.time() - last_save >= 60:
                    with self.lock:
                        stats_copy = self.stats.copy()
                    
                    try:
                        conn = sqlite3.connect(self.db_file)
                        cursor = conn.cursor()
                        
                        cursor.execute('''
                        INSERT INTO stats (requests, success, errors, bytes_downloaded)
                        VALUES (?, ?, ?, ?)
                        ''', (stats_copy['requests'], stats_copy['success'], 
                              stats_copy['errors'], stats_copy['bytes_downloaded']))
                        
                        conn.commit()
                        conn.close()
                        
                        last_save = time.time()
                        
                    except Exception as e:
                        self.logger.error(f"保存统计数据出错: {e}")
                
                time.sleep(10)
                
            except Exception as e:
                self.logger.error(f"监控线程出错: {e}")
                time.sleep(10)
    
    def get_stats(self):
        """获取当前统计数据"""
        with self.lock:
            return self.stats.copy()
    
    def get_error_summary(self):
        """获取错误摘要"""
        with self.lock:
            return dict(self.error_counts)
    
    def get_domain_stats(self, top_n=10):
        """
        获取域名统计数据
        :param top_n: 返回前N个请求最多的域名
        :return: 域名统计字典
        """
        with self.lock:
            # 按请求数排序
            sorted_domains = sorted(self.domain_requests.items(), 
                                   key=lambda x: x[1], reverse=True)
            
            # 返回前N个
            return dict(sorted_domains[:top_n])
    
    def generate_report(self, output_dir='reports'):
        """
        生成报告
        :param output_dir: 输出目录
        :return: 报告文件路径
        """
        # 创建输出目录
        os.makedirs(output_dir, exist_ok=True)
        
        # 当前时间
        now = datetime.now()
        report_date = now.strftime('%Y-%m-%d_%H-%M-%S')
        
        # 准备报告数据
        stats = self.get_stats()
        error_summary = self.get_error_summary()
        domain_stats = self.get_domain_stats()
        
        # 从数据库获取近24小时的统计数据
        hourly_stats = self._get_hourly_stats()
        
        # 生成文本报告
        report_file = os.path.join(output_dir, f'crawler_report_{report_date}.txt')
        
        with open(report_file, 'w', encoding='utf-8') as f:
            f.write(f"爬虫监控报告 - {now.strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write("="*50 + "\n\n")
            
            f.write("总体统计\n")
            f.write("-"*50 + "\n")
            f.write(f"总请求数: {stats['requests']}\n")
            f.write(f"成功请求: {stats['success']}\n")
            f.write(f"失败请求: {stats['errors']}\n")
            success_rate = (stats['success'] / stats['requests'] * 100) if stats['requests'] > 0 else 0
            f.write(f"成功率: {success_rate:.2f}%\n")
            f.write(f"下载数据: {stats['bytes_downloaded'] / 1024 / 1024:.2f} MB\n\n")
            
            f.write("错误统计\n")
            f.write("-"*50 + "\n")
            for error_type, count in error_summary.items():
                f.write(f"{error_type}: {count}\n")
            f.write("\n")
            
            f.write("域名统计 (Top 10)\n")
            f.write("-"*50 + "\n")
            for domain, count in domain_stats.items():
                f.write(f"{domain}: {count}\n")
            f.write("\n")
            
            f.write("最近24小时统计\n")
            f.write("-"*50 + "\n")
            for hour, hour_stats in hourly_stats.items():
                f.write(f"{hour}: 请求={hour_stats['requests']}, 成功={hour_stats['success']}, "
                       f"错误={hour_stats['errors']}\n")
        
        # 生成图表
        self._generate_charts(output_dir, report_date, hourly_stats)
        
        return report_file
    
    def _get_hourly_stats(self):
        """从数据库获取近24小时的统计数据"""
        hourly_stats = {}
        
        try:
            conn = sqlite3.connect(self.db_file)
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            
            # 24小时前的时间戳
            twenty_four_hours_ago = (datetime.now() - timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S')
            
            # 查询每小时的统计数据
            cursor.execute('''
            SELECT 
                strftime('%Y-%m-%d %H:00:00', timestamp) as hour,
                SUM(requests) as requests,
                SUM(success) as success,
                SUM(errors) as errors,
                SUM(bytes_downloaded) as bytes_downloaded
            FROM stats
            WHERE timestamp >= ?
            GROUP BY hour
            ORDER BY hour
            ''', (twenty_four_hours_ago,))
            
            for row in cursor.fetchall():
                hourly_stats[row['hour']] = {
                    'requests': row['requests'],
                    'success': row['success'],
                    'errors': row['errors'],
                    'bytes_downloaded': row['bytes_downloaded']
                }
            
            conn.close()
            
        except Exception as e:
            self.logger.error(f"获取小时统计数据出错: {e}")
        
        return hourly_stats
    
    def _generate_charts(self, output_dir, report_date, hourly_stats):
        """生成报表图表"""
        try:
            # 设置图表样式
            plt.style.use('ggplot')
            
            # 图表1: 请求统计
            plt.figure(figsize=(10, 6))
            
            hours = list(hourly_stats.keys())
            requests = [hourly_stats[h]['requests'] for h in hours]
            success = [hourly_stats[h]['success'] for h in hours]
            errors = [hourly_stats[h]['errors'] for h in hours]
            
            plt.plot(hours, requests, 'b-', label='总请求')
            plt.plot(hours, success, 'g-', label='成功')
            plt.plot(hours, errors, 'r-', label='错误')
            
            plt.title('24小时请求统计')
            plt.xlabel('时间')
            plt.ylabel('请求数')
            plt.xticks(rotation=45)
            plt.legend()
            plt.tight_layout()
            
            plt.savefig(os.path.join(output_dir, f'requests_{report_date}.png'))
            plt.close()
            
            # 图表2: 域名请求分布
            plt.figure(figsize=(10, 6))
            
            domain_stats = self.get_domain_stats()
            domains = list(domain_stats.keys())
            counts = list(domain_stats.values())
            
            plt.bar(domains, counts)
            plt.title('域名请求分布')
            plt.xlabel('域名')
            plt.ylabel('请求数')
            plt.xticks(rotation=45)
            plt.tight_layout()
            
            plt.savefig(os.path.join(output_dir, f'domains_{report_date}.png'))
            plt.close()
            
            # 图表3: 错误类型分布
            error_summary = self.get_error_summary()
            if error_summary:
                plt.figure(figsize=(10, 6))
                
                error_types = list(error_summary.keys())
                error_counts = list(error_summary.values())
                
                plt.bar(error_types, error_counts)
                plt.title('错误类型分布')
                plt.xlabel('错误类型')
                plt.ylabel('数量')
                plt.xticks(rotation=45)
                plt.tight_layout()
                
                plt.savefig(os.path.join(output_dir, f'errors_{report_date}.png'))
                plt.close()
            
        except Exception as e:
            self.logger.error(f"生成图表出错: {e}")
    
    def stop(self):
        """停止监控系统"""
        self.is_running = False
        if self.monitor_thread.is_alive():
            self.monitor_thread.join(timeout=2)
        self.logger.info("爬虫监控系统已停止")

# 使用示例
if __name__ == "__main__":
    # 初始化监控系统
    monitor = CrawlerMonitor()
    
    # 模拟爬虫请求
    for i in range(100):
        # 模拟成功请求
        if i % 10 != 0:
            monitor.log_request(
                url=f"https://example.com/page{i}",
                status_code=200,
                response_time=0.3 + (i % 5) * 0.1,
                content_length=5000 + (i % 10) * 500
            )
        else:
            # 模拟错误请求
            error_types = ["ConnectionError", "TimeoutError", "HTTPError"]
            monitor.log_request(
                url=f"https://example.com/error{i}",
                status_code=404 if i % 20 == 0 else 500,
                response_time=1.5 + (i % 3) * 0.5,
                content_length=0
            )
            monitor.log_error(
                url=f"https://example.com/error{i}",
                error_type=error_types[i % len(error_types)],
                error_message=f"Simulated error {i}"
            )
        
        # 模拟不同域名的请求
        domains = ["example.com", "test.com", "sample.org", "demo.net"]
        monitor.log_request(
            url=f"https://{domains[i % len(domains)]}/page{i}",
            status_code=200,
            response_time=0.2,
            content_length=2000
        )
        
        time.sleep(0.05)  # 小延迟使模拟更真实
    
    # 生成报告
    report_file = monitor.generate_report()
    print(f"报告已生成: {report_file}")
    
    # 打印统计数据
    stats = monitor.get_stats()
    print("\n当前统计:")
    for key, value in stats.items():
        print(f"{key}: {value}")
    
    # 获取域名统计
    domain_stats = monitor.get_domain_stats()
    print("\n域名统计:")
    for domain, count in domain_stats.items():
        print(f"{domain}: {count}")
    
    # 停止监控系统
    monitor.stop()

总结与最佳实践

爬虫效率最佳实践

  1. 优化请求策略

    • 使用连接池和会话重用
    • 实现智能延迟和退避机制
    • 使用异步请求提高吞吐量
  2. 数据处理优化

    • 使用生产者-消费者模式分离请求和处理
    • 实现增量爬取,避免重复爬取
    • 使用流式处理大量数据
  3. 资源管理

    • 控制内存使用,特别是处理大量数据时
    • 实现请求节流和优先级队列
    • 使用缓存减少重复请求

爬虫稳定性最佳实践

  1. 错误处理

    • 实现完善的异常处理机制
    • 日志记录和监控错误模式
    • 添加自动重试机制
  2. 健壮性设计

    • 支持断点续爬
    • 实现分布式架构提高可靠性
    • 优雅处理网站结构变化
  3. 长期运行

    • 实现健康检查和自动恢复
    • 设置资源限制防止过度消耗
    • 定期备份数据和状态

爬虫道德与合规最佳实践

  1. 遵守规则

    • 尊重robots.txt
    • 实现合理的请求频率控制
    • 添加明确的身份标识
  2. 减少影响

    • 使用条件请求和缓存减少带宽消耗
    • 避免在高峰期大量爬取
    • 实现自适应爬取速率
  3. 数据处理

    • 遵守隐私和数据保护法规
    • 仅收集必要的数据
    • 实现数据老化和删除政策

通过本教程,你已经学习了从基础到高级的Python网络爬虫技术,掌握了多种工具和库的使用,以及如何应对各种爬虫挑战。从简单的静态网页爬取到复杂的动态页面处理,从单线程爬虫到分布式系统,你现在拥有了构建高效、稳定和道德的网络爬虫所需的知识和技能。

随着经验的积累,你可以进一步探索更高级的主题,如自然语言处理、机器学习集成、爬虫自动化和防检测技术。记住,强大的能力伴随着责任,始终以尊重和合理的方式使用你的爬虫技术。

Logo

智能硬件社区聚焦AI智能硬件技术生态,汇聚嵌入式AI、物联网硬件开发者,打造交流分享平台,同步全国赛事资讯、开展 OPC 核心人才招募,助力技术落地与开发者成长。

更多推荐