在芹菜任务中运行Scrapy蜘蛛
这不工作了 ,scrapy的API已经改变了。
现在,文档中提供了一种“ 从脚本运行Scrapy ”的方法,但是我得到了ReactorNotRestartable
错误。
我的任务:
from celery import Task from twisted.internet import reactor from scrapy.crawler import Crawler from scrapy import log, signals from scrapy.utils.project import get_project_settings from .spiders import MySpider class MyTask(Task): def run(self, *args, **kwargs): spider = MySpider settings = get_project_settings() crawler = Crawler(settings) crawler.signals.connect(reactor.stop, signal=signals.spider_closed) crawler.configure() crawler.crawl(spider) crawler.start() log.start() reactor.run()
扭曲的反应堆不能重新启动。 解决此问题的方法是让celery任务为每个要执行的爬网分配一个新的subprocess,如以下文章中的build议:
在Celery任务中运行Scrapy蜘蛛
这通过使用多处理包得到了“反应堆不能重新启动的问题”。 但是问题是这个解决方法现在已经被最新的celery版本所淘汰了,因为你将会遇到另一个守护进程无法产生subprocess的问题。 所以为了解决这个问题,你需要在芹菜版本中下载。
是的,scrapy API已经改变。 但是稍作修改(导入Crawler而不是CrawlerProcess)。 您可以通过在芹菜版本中进行解决工作。
芹菜问题可以在这里find: 芹菜问题#1709
这里是我更新的爬行脚本,通过使用台球而不是多处理,使用更新的芹菜版本:
from scrapy.crawler import Crawler from scrapy.conf import settings from myspider import MySpider from scrapy import log, project from twisted.internet import reactor from billiard import Process from scrapy.utils.project import get_project_settings class UrlCrawlerScript(Process): def __init__(self, spider): Process.__init__(self) settings = get_project_settings() self.crawler = Crawler(settings) self.crawler.configure() self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed) self.spider = spider def run(self): self.crawler.crawl(self.spider) self.crawler.start() reactor.run() def run_spider(url): spider = MySpider(url) crawler = UrlCrawlerScript(spider) crawler.start() crawler.join()
编辑:通过阅读芹菜问题#1709,他们build议使用台球而不是多处理,以便取消subprocess限制。 换句话说,我们应该尝试台球 ,看看它是否有效!
编辑2:是的,通过使用台球 ,我的脚本与最新的芹菜构build工程! 看到我更新的脚本。
Twisted反应堆不能重新启动,所以一旦一个蜘蛛结束运行, crawler
器隐crawler
停止反应堆,那个工人就没用了。
正如在其他问题的答案中所发表的那样,你所要做的就是杀死运行蜘蛛的工作人员,并用一个新的代替,这样可以防止反应堆被多次启动和停止。 要做到这一点,只需设置:
CELERYD_MAX_TASKS_PER_CHILD = 1
缺点是你并没有真正使用 Twisted反应堆来充分发挥潜力,并且浪费了运行多个反应堆的资源,因为一个反应堆可以一次运行多个蜘蛛。 一个更好的方法是每个工作人员运行一个反应堆(甚至是全球的一个反应堆),不要让crawler
触摸它。
我正在研究一个非常相似的项目,所以如果我取得进展,我会更新这个post。
在Celery Tasks Queue中运行Scrapy时避免ReactorNotRestartable错误我已经使用了线程。 用于在一个应用程序中运行Twisted reactor几次的方法相同。 Scrapy也使用Twisted,所以我们可以用同样的方法。
这里是代码:
from threading import Thread from scrapy.crawler import CrawlerProcess import scrapy class MySpider(scrapy.Spider): name = 'my_spider' class MyCrawler: spider_settings = {} def run_crawler(self): process = CrawlerProcess(self.spider_settings) process.crawl(MySpider) Thread(target=process.start).start()
不要忘记增加芹菜的CELERYD_CONCURRENCY。
CELERYD_CONCURRENCY = 10
对我来说工作得很好。
这不会阻止进程运行,但无论如何,scrapy最佳做法是在callback中处理数据。 只要这样做:
for crawler in process.crawlers: crawler.spider.save_result_callback = some_callback crawler.spider.save_result_callback_params = some_callback_params Thread(target=process.start).start()
如果你有很多任务需要处理,我会说这种方法效率很低。 因为芹菜线程 – 在自己的线程中运行每个任务。 比如说RabbitMQ作为经纪人,你可以通过> 10K q / s。 用芹菜这可能会导致10K线程开销! 我build议不要在这里使用芹菜。 而是直接访问经纪人!