切換語言為:簡體

Scrapy與MongoDB的非同步資料儲存

  • 爱糖宝
  • 2024-06-14
  • 2079
  • 0
  • 0

在資料採集過程中,處理大量的資料請求和儲存任務是常見的需求。使用Scrapy來爬取資料並將其儲存到MongoDB中是一個高效的解決方案。本文將介紹如何實現一個非同步插入MongoDB的Scrapy管道。

專案背景

在本專案中,我們需要從某些公開網站上爬取資料,並將這些資訊非同步儲存到MongoDB資料庫中。爲了提高效能,我們可以採用非同步操作。這不僅能夠提升處理速度,還能更好地利用系統資源。

Scrapy與非同步MongoDB客戶端

我們將使用motor庫,它是一個非同步MongoDB驅動,能夠與asyncio很好地結合,實現非同步的MongoDB操作。透過Scrapy的管道,我們可以在處理爬取到的資料時,直接將其儲存到MongoDB中。

實現步驟

1. 安裝依賴

首先,我們需要安裝motor庫:

pip install motor

2. Scrapy管道實現

以下是我們的ScrapyPipeline類的實現,它實現了從Scrapy爬蟲到MongoDB的非同步資料插入。

import motor.motor_asyncio
from scrapy.utils.project import get_project_settings

class ScrapyPipeline:
    def __init__(self, host, port, db_name, collection_name):
        self.host = host
        self.port = port
        self.db_name = db_name
        self.collection_name = collection_name
        self.client = None

    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            host=settings.get("MONGODB_HOST"),
            port=settings.getint("MONGODB_PORT"),
            db_name=settings.get("MONGODB_DB"),
            collection_name=settings.get("MONGODB_LIST_PRODUCT_COL")
        )

    def open_spider(self, spider):
        print('爬蟲開始')
        self.client = motor.motor_asyncio.AsyncIOMotorClient(host=self.host, port=self.port)

    async def process_item(self, item, spider):
        item = dict(item)
        await self.client[self.db_name][self.collection_name].insert_one(item)
        return item

    def close_spider(self, spider):
        print('爬蟲結束')
        self.client.close()

3. 配置Scrapy專案

在Scrapy專案的settings.py檔案中,新增MongoDB的配置資訊:

MONGODB_HOST = 'localhost'
MONGODB_PORT = 27017
MONGODB_DB = 'SpiderProject'
MONGODB_LIST_PRODUCT_COL = 'test_data'

同時,啟用我們自定義的管道:

ITEM_PIPELINES = {
    'myproject.pipelines.ScrapyPipeline': 300,
}

4. 解釋關鍵部分

@classmethod from_crawler(cls, crawler)

這個方法是Scrapy的約定方法,用於從Scrapy的設定中建立管道例項。透過這個方法,我們可以將Scrapy的設定傳遞給管道類。

@classmethod
def from_crawler(cls, crawler):
    settings = crawler.settings
    return cls(
        host=settings.get("MONGODB_HOST"),
        port=settings.getint("MONGODB_PORT"),
        db_name=settings.get("MONGODB_DB"),
        collection_name=settings.get("MONGODB_LIST_PRODUCT_COL")
    )

open_spider(self, spider)

在爬蟲開始時,連線到MongoDB:

def open_spider(self, spider):
    print('爬蟲開始')
    self.client = motor.motor_asyncio.AsyncIOMotorClient(host=self.host, port=self.port)
    self.db = self.client[self.db_name]

process_item(self, item, spider)

這是非同步處理每個item的方法,將item插入到MongoDB中:

async def process_item(self, item, spider):
    item = dict(item)
    await self.db[self.collection_name].insert_one(item)
    return item

close_spider(self, spider)

在爬蟲結束時,關閉MongoDB連線:

def close_spider(self, spider):
    print('爬蟲結束')
    self.client.close()

總結

透過以上步驟,我們實現了一個非同步的Scrapy管道,用於將爬取的資料儲存到MongoDB中。這種方式不僅提高了數據處理的效率,還能充分利用系統資源。希望這篇文章能幫助你更好地理解和實現Scrapy與MongoDB的非同步資料儲存。

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.