7 votes

Scrapy exportateur personnalisé

Je suis en train de définir un exportateur d'éléments qui envoie des éléments à une file d'attente de messages. Voici le code.

from scrapy.contrib.exporter import JsonLinesItemExporter
from scrapy.utils.serialize import ScrapyJSONEncoder
from scrapy import log

from scrapy.conf import settings

from carrot.connection import BrokerConnection, Exchange
from carrot.messaging import Publisher

log.start()

class QueueItemExporter(JsonLinesItemExporter):

    def __init__(self, **kwargs):

        log.msg("Initialisation de l'exportateur de file d'attente", level=log.DEBUG)

        self._configure(kwargs)

        host_name = settings.get('BROKER_HOST', 'localhost')
        port = settings.get('BROKER_PORT', 5672)
        userid = settings.get('BROKER_USERID', "guest")
        password = settings.get('BROKER_PASSWORD', "guest")
        virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/")

        self.encoder = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)(**kwargs)

        log.msg("Connexion au courtier", level=log.DEBUG)
        self.q_connection = BrokerConnection(hostname=host_name, port=port,
                        userid=userid, password=password,
                        virtual_host=virtual_host)
        self.exchange = Exchange("scrapers", type="topic")
        log.msg("Connecté", level=log.DEBUG)

    def start_exporting(self):
        spider_name = "test"
        log.msg("Initialisation de l'éditeur", level=log.DEBUG)
        self.publisher = Publisher(connection=self.q_connection,
                        exchange=self.exchange, routing_key="scrapy.spider.%s" % spider_name)
        log.msg("terminé", level=log.DEBUG)

    def finish_exporting(self):
        self.publisher.close()

    def export_item(self, item):
        log.msg("Envoi de l'élément exporté", level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        self.publisher.send({"scraped_data": self.encoder.encode(itemdict)})
        log.msg("envoyé à la file d'attente - scrapy.spider.naukri", level=log.DEBUG)

Je rencontre quelques problèmes. Les éléments ne sont pas envoyés à la file d'attente. J'ai ajouté ce qui suit à mes paramètres :

FEED_EXPORTERS = {
    "queue": 'scrapers.exporters.QueueItemExporter'
}

FEED_FORMAT = "queue"

LOG_STDOUT = True

Le code ne génère aucune erreur, et je ne vois pas non plus les messages de journalisation. Je suis à bout pour essayer de déboguer cela.

Toute aide serait grandement appréciée.

5voto

Udi Points 6298

Les "Exportateurs de flux" sont des raccourcis rapides (et quelque peu sales) pour appeler certains exportateurs d'articles "standard". Au lieu de configurer un exportateur de flux à partir des paramètres, connectez en dur votre exportateur d'articles personnalisé à votre pipeline personnalisé, comme expliqué ici http://doc.scrapy.org/fr/0.14/topics/exporters.html#using-item-exporters :

from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals
from scrapy.contrib.exporter import XmlItemExporter

class MyPipeline(object):

    def __init__(self):
        ...
        dispatcher.connect(self.spider_opened, signals.spider_opened)
        dispatcher.connect(self.spider_closed, signals.spider_closed)
        ...

    def spider_opened(self, spider):
        self.exporter = QueueItemExporter()
        self.exporter.start_exporting()

    def spider_closed(self, spider):
        self.exporter.finish_exporting()

    def process_item(self, item, spider):
        # VOTRE CODE ICI
        ...
        self.exporter.export_item(item)
        return item

2voto

drozzy Points 7887

Astuce : Un bon exemple pour commencer est Écriture des éléments dans MongoDb des documents officiels.

J'ai fait quelque chose de similaire. J'ai créé un pipeline qui place chaque élément dans un service similaire à S3 (j'utilise Minio ici, mais vous avez compris l'idée). Il crée un nouveau compartiment pour chaque spider et place chaque élément dans un objet avec un nom aléatoire. Le code source complet peut être trouvé dans mon dépôt.

En commençant par l'araignée des citations simple du tutoriel :

import scrapy

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    start_urls = ['http://quotes.toscrape.com/page/1/',
                    'http://quotes.toscrape.com/page/1/']

    def parse(self, response):
        for quote in response.css('div.quote'):
            yield {
                    'text':quote.css('span.text::text').extract_first(),
                    'author':quote.css('span small::text').extract_first(),
                    'tags':quote.css('div.tags a.tag::text').extract()
                    }
        next_page = response.css('li.next a::attr(href)').extract_first()
        if next_page is not None:
            next_page = response.urljoin(next_page)
            yield scrapy.Request(next_page, callback=self.parse)

Dans settings.py :

ITEM_PIPELINES = {
    'scrapy_quotes.pipelines.ScrapyQuotesPipeline': 300,
}

Dans scrapy_quotes/pipelines.py créez un pipeline et un exportateur d'éléments :

import uuid
from StringIO import StringIO
from scrapy.contrib.exporter import BaseItemExporter
from scrapy.conf import settings
from scrapy import signals
from scrapy.xlib.pydispatch import dispatcher
from scrapy import log
from scrapy.utils.python import to_bytes
from scrapy.utils.serialize import ScrapyJSONEncoder

class S3ItemExporter(BaseItemExporter):
    def __init__(self, bucket, **kwargs):
        self._configure(kwargs)
        self.bucket = bucket
        kwargs.setdefault('ensure_ascii', not self.encoding)
        self.encoder = ScrapyJSONEncoder(**kwargs)

    def start_exporting(self):
        self.client = connect()
        create_bucket(self.client, self.bucket)

    def finish_exporting(self):
        log.msg("Terminé de l'exportateur d'éléments S3", level=log.DEBUG)

    def export_item(self, item):
        log.msg("L'exportateur d'éléments S3 a obtenu l'élément : %s" % item, level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        data = self.encoder.encode(itemdict)
        size = len(data) 
        object_data = StringIO(data)
        name = str(uuid.uuid4())
        put_object(self.client, self.bucket, name, object_data, size)  

class ScrapyQuotesPipeline(object):
    """Exporte les éléments extraits, vers différents compartiments,
    un par araignée"""
    @classmethod
    def from_crawler(cls, crawler):
        pipeline = cls()
        crawler.signals.connect(pipeline.spider_opened, signals.spider_opened)
        crawler.signals.connect(pipeline.spider_closed, signals.spider_closed)
        return pipeline

    def spider_opened(self, spider):
        self.exporteur = S3ItemExporter(spider.name)
        self.exporteur.start_exporting()

    def spider_closed(self, spider):
        self.exporteur.finish_exporting()

    def process_item(self, item, spider):
        self.exporteur.export_item(item)
        return item

# Lié à S3
from minio import Minio
import os
from minio.error import ResponseError
def connect():
    return Minio('192.168.1.111:9000',
            access_key='0M6PYKBBAVQVQGVWVZKQ',
            secret_key='H6vPxz0aHSMZPgagZ3G0lJ6CbhN8RlTtD78SPsL8',
            secure=False)

def create_bucket(client, name):
    client.make_bucket(name)

def put_object(client, bucket_name, object_name, object_data, size):
    client.put_object(bucket_name, object_name, object_data, size)

0voto

webwurst Points 1125

J'ai rencontré le même problème, bien que probablement quelques mises à jour de version en avance. Le petit détail qui l'a résolu pour moi a été de définir FEED_URI = "quelque chose" dans settings.py. Sans cela, l'entrée dans FEED_EXPORTERS n'était pas du tout respectée.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X