芹菜并行分布式任务与多处理

我有一个CPU密集型的芹菜任务。 我想使用大量EC2实例的所有处理能力(核心)来更快地完成这项工作我认为是一个多处理的芹菜并行分布式任务)

线程多处理分布式计算分布式并行处理等术语都是我想要更好地理解的术语。

示例任务:

@app.task for item in list_of_millions_of_ids: id = item # do some long complicated equation here very CPU heavy!!!!!!! database.objects(newid=id).save() 

使用上面的代码(如果可能,举个例子)如何使用Celery来分配这个任务,通过允许在云中的所有可用机器上利用所有计算CPU能力来分割这个任务?

你的目标是:

  1. 将你的工作分配给许多机器(分布式计算/分布式并行处理)
  2. 将工作分配给所有CPU(多处理/线程)

芹菜可以很容易地为你做这两个。 首先要了解的是每个芹菜工作者默认configuration为运行与系统上可用的CPU核心一样多的任务:

并发性是用于同时处理任务的prefork工作进程的数量,当所有这些工作都忙于工作时,新任务将不得不等待其中一个任务完成,然后才能进行处理。

默认并发数是该机器(包括核心)上CPU的数量 ,您可以使用-c选项指定自定义数字。 没有推荐的值,因为最佳的数字取决于许多因素,但是如果你的任务主要是I / O限制的,那么你可以尝试增加它,实验表明,增加两倍以上的CPU数量是很less的有效的,而且可能会降低性能。

这意味着每个单独的任务不需要担心使用多处理/线程来利用多个CPU /内核。 相反,芹菜会同时运行足够的任务来使用每个可用的CPU。

除此之外,下一步是创build一个处理list_of_millions_of_ids子集的任务。 这里有几个选项 – 一个是让每个任务处理一个ID,所以你运行N个任务,其中N == len(list_of_millions_of_ids) 。 这将保证工作在所有任务中平均分配,因为永远不会有一个工人提早结束而只是在等待; 如果需要工作,它可以从队列中取出一个id。 你可以用芹菜group (如John Doe提到的)做这个。

tasks.py:

 @app.task def process_id(item): id = item #long complicated equation here database.objects(newid=id).save() 

并执行任务:

 from celery import group from tasks import process_id jobs = group(process_id.s(item) for item in list_of_millions_of_ids) result = jobs.apply_async() 

另一种select是将列表分成较小的部分,并将这些部分分发给您的工作人员。 这种方法存在浪费一些周期的风险,因为最终可能会有一些工作人员在等待,而其他人仍在工作。 然而, 芹菜文件指出 ,这种担心往往是没有根据的:

有些人可能会担心,分块处理会导致并行性下降,但对于繁忙的群集来说这种情况很less,实际上,由于避免了消息传递的开销,因此可能会大大提高性能。

所以,你可能会发现分块清单和分块到每个任务performance更好,因为减less了消息的开销。 你也可以通过计算每个id,将其存储在一个列表中,然后在完成后将整个列表添加到数据库中,而不是一次只做一个id 。 分块方法看起来像这样

tasks.py:

 @app.task def process_ids(items): for item in items: id = item #long complicated equation here database.objects(newid=id).save() # Still adding one id at a time, but you don't have to. 

并开始任务:

 from tasks import process_ids jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here. jobs.apply_async() 

你可以尝试一下,什么样的大小给你最好的结果。 你希望find一个可以减less消息传递开销的最佳位置,同时保持足够小的尺寸,以至于不会让工作人员比其他工作人员更快地完成工作,然后就等着无所事事。

为什么不使用group芹菜任务呢?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

基本上,你应该把ids分成块(或范围),并把它们分给一组任务。

对于更加复杂的情况,比如汇总特定芹菜任务的结果,我已经成功地将chord任务用于类似的目的:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

增加settings.CELERYD_CONCURRENCY到一个合理的数字,你可以负担,那么芹菜工人将继续执行你的任务在一个组或一个和弦,直到完成。

注意:由于kombu存在漏洞,过去工作量大的工人重复使用有困难,我不知道现在是否修好了。 也许是这样,但是如果没有,请减lessCELERYD_MAX_TASKS_PER_CHILD。

基于简化和修改代码的示例我运行:

 @app.task def do_matches(): match_data = ... result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s()) 

summarize得到所有single_batch_processor任务的结果。 每个任务运行在任何芹菜工人, kombu坐标。

现在我明白了: single_batch_processorsummarize也必须是芹菜的任务,而不是常规的function – 否则当然不会并行化(我甚至不知道和弦构造函数将接受它,如果它不是一个芹菜任务)。

在分配世界中,首先应该记住的只有一件事:

不成熟的优化是万恶之源。 D. Knuth

我知道这听起来很明显,但在分发之前,你使用了最好的algorithm(如果它存在…)。 话虽如此,优化分配是三件事之间的平衡:

  1. 从持久性介质写入/读取数据,
  2. 将数据从媒体A移动到媒体B,
  3. 处理数据,

计算机制造得越接近你的处理单元(3)越快,效率越高(1)和(2)。 经典集群中的顺序是:networking硬盘,本地硬盘,RAM,内部处理单元领域…如今处理器已经变得足够复杂,被认为是通常称为核心的独立硬件处理单元的集合,这些核心处理数据(3)通过线程(2)。 想象一下你的核心速度如此之快,以至于当你用一个线程发送数据时,你使用的是计算机功耗的50%,如果核心有2个线程,那么你将使用100%。 每个核心有两个线程称为超线程,而您的操作系统每个超线程核心将会看到两个CPU。

pipe理处理器中的线程通常称为multithreading。 pipe理来自操作系统的CPU通常被称为多处理。 pipe理群集中的并发任务通常称为并行编程。 pipe理群集中的相关任务通常称为分布式编程。

那么你的瓶颈在哪里?

  • 在(1)中:尝试从上一级(比较接近处理单元的那一级,例如,如果networking硬盘驱动器先缓慢存储在本地硬盘驱动器中)
  • 在(2)中:这是最常见的一种,尽量避免分发或压缩“即时”数据包所不需要的通信包(例如,如果HD较慢,只保存“批量计算”消息并保留中间结果在RAM中)。
  • 在(3)中:你完成了! 您正在使用所有的处理能力。

芹菜呢?

Celery是一个分布式编程的消息传递框架,它将使用一个代理模块进行通信(2)和一个后端模块进行持久化(1),这意味着你可以通过改变configuration来避免大部分瓶颈(如果可能的话)您的networking,只在您的networking上。 首先分析您的代码,以在单台计算机上实现最佳性能。 然后使用默认configuration的集群中的celery并设置CELERY_RESULT_PERSISTENT=True

 from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//', backend='redis://localhost') @app.task def process_id(all_the_data_parameters_needed_to_process_in_this_computer): #code that does stuff return result 

在执行期间打开你最喜欢的监视工具,我使用默认的rabbitMQ和花芹菜和顶部的cpu,你的结果将被保存在你的后端。 networking瓶颈的一个例子是任务队列增长太多以至于延迟执行,你可以继续改变模块或芹菜configuration,如果不是你的瓶颈在别的地方。

增加更多的芹菜工人肯定会加快执行任务。 你可能还有另一个瓶颈:数据库。 确保它可以处理同时插入/更新。

关于您的问题:您正在通过在您的EC2实例上分配另一个进程作为芹菜来添加芹菜工作者。 根据您需要的工作人员数量,您可能需要添加更多的实例。