scrapy pipelines过滤重复数据
- 方法 1:基于内存的简单去重(适合小规模数据)
- 方法 2:基于持久化存储去重(适合大规模数据/重启恢复)
- 方法 3:使用 Scrapy 内置的 dupefilter(针对请求去重)
- 方法 4:布隆过滤器(超大数据集优化)
- 方法 5:分布式去重(Redis)
- 关键点总结
方法 1:基于内存的简单去重(适合小规模数据)
使用 Python 的 set 或 dict 存储已抓取数据的唯一标识(如 URL、ID),在 Pipeline 中检查是否重复。
# pipelines.py
from scrapy.exceptions import DropItemclass DuplicatesPipeline:def __init__(self):self.seen_ids = set() # 存储已处理的唯一标识def process_item(self, item, spider):# 假设 item 中有唯一标识字段 'id'unique_id = item['id']if unique_id in self.seen_ids:raise DropItem(f"Duplicate item found: {item}")self.seen_ids.add(unique_id)return item
配置启用 Pipeline:
# settings.py
ITEM_PIPELINES = {'your_project.pipelines.DuplicatesPipeline': 300,
}
方法 2:基于持久化存储去重(适合大规模数据/重启恢复)
当数据量较大或需要持久化时,可以使用数据库(如 SQLite、Redis)或文件存储唯一标识。
示例:使用 SQLite
# pipelines.py
import sqlite3
from scrapy.exceptions import DropItemclass SQLiteDuplicatesPipeline:def __init__(self):self.conn = sqlite3.connect('scrapy_data.db')self.cursor = self.conn.cursor()self.cursor.execute('CREATE TABLE IF NOT EXISTS seen_ids (id TEXT PRIMARY KEY)')def process_item(self, item, spider):unique_id = item['id']self.cursor.execute('SELECT id FROM seen_ids WHERE id=?', (unique_id,))if self.cursor.fetchone():raise DropItem(f"Duplicate item found: {item}")else:self.cursor.execute('INSERT INTO seen_ids VALUES (?)', (unique_id,))self.conn.commit()return itemdef close_spider(self, spider):self.conn.close()
方法 3:使用 Scrapy 内置的 dupefilter(针对请求去重)
Scrapy 默认通过 DUPEFILTER_CLASS 过滤重复请求(基于 URL),但如果你需要更细粒度的 Item 去重,仍需自定义 Pipeline。
方法 4:布隆过滤器(超大数据集优化)
使用布隆过滤器(Bloom Filter)降低内存占用,适合海量数据去重,但有一定误判率。
# 安装:pip install pybloom-live
from pybloom_live import ScalableBloomFilter
from scrapy.exceptions import DropItemclass BloomDuplicatesPipeline:def __init__(self):self.bf = ScalableBloomFilter(initial_capacity=1000, mode=ScalableBloomFilter.SMALL_SET_GROWTH)def process_item(self, item, spider):unique_id = item['id']if unique_id in self.bf:raise DropItem(f"Duplicate item found: {item}")self.bf.add(unique_id)return item
配置启用 Pipeline:
# settings.py
ITEM_PIPELINES = {'your_project.pipelines.BloomDuplicatesPipeline': 200,
}
方法 5:分布式去重(Redis)
分布式爬虫中,使用 Redis 存储全局唯一标识,支持多爬虫实例共享去重数据。
# pipelines.py
import redis
from scrapy.exceptions import DropItemclass RedisDuplicatesPipeline:def __init__(self, redis_host, redis_port):self.redis = redis.StrictRedis(host=redis_host, port=redis_port, db=0)@classmethoddef from_crawler(cls, crawler):return cls(redis_host=crawler.settings.get('REDIS_HOST'),redis_port=crawler.settings.get('REDIS_PORT'))def process_item(self, item, spider):unique_id = item['id']if self.redis.sismember('seen_ids', unique_id):raise DropItem(f"Duplicate item found: {item}")self.redis.sadd('seen_ids', unique_id)return item
关键点总结
- 唯一标识选择:根据业务选择唯一字段(如 URL、商品 ID、哈希值)。
- 内存 vs 持久化:小数据用内存结构(set),大数据用数据库或布隆过滤器。
- 分布式需求:使用 Redis 或类似工具实现全局去重。
- 异常处理:发现重复时抛出 DropItem 终止后续 Pipeline 处理。
根据实际场景选择最适合的方案!