限制通过并行任务库运行的活动任务数量的最佳方法

考虑一个拥有大量需要处理的作业的队列。 队列的限制是一次只能得到1份工作,无法知道有多less工作。 这些工作需要10秒才能完成,并且涉及很多等待Web服务的响应,因此不受CPU限制。

如果我使用这样的东西

while (true) { var job = Queue.PopJob(); if (job == null) break; Task.Factory.StartNew(job.Execute); } 

然后,它会激烈地从队列中挤出工作,比完成工作快得多,耗尽内存并落在它的屁股上。 > <

我不能使用(我不认为) ParallelOptions.MaxDegreeOfParallelism,因为我不能使用Parallel.Invoke或Parallel.ForEach

我find了3个替代品

  1. 用TaskreplaceTask.Factory.StartNew

     Task task = new Task(job.Execute,TaskCreationOptions.LongRunning) task.Start(); 

    这似乎有点解决问题,但我不清楚这是怎么回事 ,如果这是最好的方法。

  2. 创build一个限制并发度的自定义任务计划程序

  3. 使用像BlockingCollection这样的东西,在开始时将作业添加到集合,并在完成时移除以限制可以运行的数字。

#1我必须相信自动做出正确的决定,#2 /#3我必须计算出可以运行的最大数量的任务。

我是否正确地理解了这一点 – 哪一个更好,或者有另一种方法?

编辑 – 这是我从下面的答案,生产者 – 消费者模式。

除了整体吞吐量的目的不是让工作更快地出队比可以处理,并没有多个线程轮询队列(这里没有显示,但这是一个非阻塞操作,并将导致巨大的交易成本,如果从多个地方高频调查) 。

 // BlockingCollection<>(1) will block if try to add more than 1 job to queue (no // point in being greedy!), or is empty on take. var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1); // Setup a number of consumer threads. // Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time // in job is blocked waiting IO then likely be 8. for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++) { Thread consumer = new Thread(() => { while (!jobs.IsCompleted) { var job = jobs.Take(); job.Execute(); } } consumer.Start(); } // Producer to take items of queue and put in blocking collection ready for processing while (true) { var job = Queue.PopJob(); if (job != null) jobs.Add(job); else { jobs.CompletedAdding() // May need to wait for running jobs to finish break; } } 

我刚刚给了一个非常适用于这个问题的答案 。

基本上,TPL Task类是用来调度CPU绑定的工作。 这不是为了阻止工作。

您正在使用不是CPU的资源:等待服务回复。 这意味着第三方物stream将错误的资源,因为它在一定程度上承担CPU的有限性。

自己pipe理资源:启动固定数量的线程或长时间运行任务(基本相同)。 根据经验决定线程的数量。

你不能把不可靠的系统投入生产。 出于这个原因,我build议#1,但扼杀 。 不要创build与工作项目一样多的线程。 创build饱和远程服务所需的多个线程。 写一个帮助函数,产生N个线程并使用它们来处理M个工作项目。 您可以通过这种方式获得完全可预测的可靠结果。

长时间运行的任务(或线程)不能很好地与代码或第三方库中的awaitawait导致的潜在的stream程分割和延续,所以不要打扰使用长时间运行的任务。 在async/await世界中,它们是无用的。 更多细节在这里 。

您可以调用ThreadPool.SetMaxThreads但在进行此调用之前,请确保使用ThreadPool.SetMinThreads设置最小线程数,使用低于或等于最大值的值。 顺便说一下,MSDN文档是错误的。 至less在.NET 4.5和4.6中,我可以使用这些方法调用来降低计算机上的内核数量,在这种情况下,我使用这种技术来降低内存限制的32位服务的处理能力。

但是,如果您不想限制整个应用程序,而仅限于其中的处理部分,则自定义任务调度程序将执行该任务。 很久以前,MS发布了包含一个LimitedConcurrencyLevelTaskScheduler自定义任务调度器的示例 。 用Task.Factory.StartNew手动产生主要的处理任务,提供自定义的任务调度器,而由它产生的每个其他任务将使用它,包括async/await甚至Task.Yield ,用于在async方法中实现asynchronous。

但是,对于您的具体情况,这两种解决scheme不会停止在完成工作之前耗尽您的工作队列。 这可能不是可取的,取决于你的队列的实施和目的。 他们更像是“解开一堆任务,让调度员find执行它们的时间”types的解决scheme。 所以也许在这里更合适一些可能是通过semaphores来控制作业的更严格的方法。 代码看起来像这样:

 semaphore = new SemaphoreSlim(max_concurrent_jobs); while(...){ job = Queue.PopJob(); semaphore.Wait(); ProcessJobAsync(job); } async Task ProcessJobAsync(Job job){ await Task.Yield(); ... Process the job here... semaphore.Release(); } 

有一个以上的方式来剥皮猫。 使用你认为合适的东西。

微软有一个非常酷的库叫做DataFlow,它正是你想要的(以及更多)。 详情在这里 。

您应该使用ActionBlock类并设置ExecutionDataflowBlockOptions对象的MaxDegreeOfParallelism。 ActionBlock与async / await很好地配合,所以即使你的外部呼叫被等待,也不会有新的任务开始处理。

 ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }; this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items), actionBlockOptions); ... this.sendToAzureActionBlock.Post(itemsToProcess) 

这里的问题似乎没有太多的运行 Task ,这是太多的计划 Task 。 无论执行多快,您的代码都会尝试安排尽可能多的Task 。 如果你有太多的工作,这意味着你会得到OOM。

正因为如此,你提出的解决scheme都不能解决你的问题。 如果简单地指定LongRunning可以解决您的问题,那么很可能是因为创build一个新的Thread (这是LongRunningLongRunning )需要一些时间,这有效地节省了新的工作。 所以,这个解决scheme只能是偶然的,而且很可能会在以后导致其他问题。

关于解决scheme,我大多同意usr:最合适的最简单的解决scheme是创build固定数量的LongRunning任务,并有一个调用Queue.PopJob()循环Queue.PopJob()如果该方法不是线程安全的,则由lock保护Queue.PopJob() )和Execute()作业。

更新:经过一些更多的思考,我意识到下面的尝试将最有可能performance非常。 只有当你确定它会适合你时才使用它。


但是TPL试图找出最好的并行度,即使是IO界限的Task 。 所以,你可能会试图利用这个优势。 长期Task在这里不起作用,因为从第三方物stream的angular度来看,似乎没有任何工作要做,它会一遍又一遍地开始新的Task 。 你可以做的是在每个Task结束时开始一个新的Task 。 这样,TPL就会知道发生了什么,它的algorithm可能会运行良好。 另外,为了让第三方物stream公司决定平行度,在第一个Task开始时,开始另一行Task

这个algorithm可能运作良好。 但是,第三方物stream也可能会对并行度做出不好的决定,实际上我没有尝试过这样的事情。

在代码中,它看起来像这样:

 void ProcessJobs(bool isFirst) { var job = Queue.PopJob(); // assumes PopJob() is thread-safe if (job == null) return; if (isFirst) Task.Factory.StartNew(() => ProcessJobs(true)); job.Execute(); Task.Factory.StartNew(() => ProcessJob(false)); } 

并开始它

 Task.Factory.StartNew(() => ProcessJobs(true)); 

TaskCreationOptions.LongRunning对阻塞任务是有用的,在这里使用它是合法的。 它所做的是向调度程序build议将一个线程专用于任务。 调度器本身尝试保持与CPU核心数量相同的线程数量,以避免过度的上下文切换。

它在C#中的Threading in Joseph Albahari中有很好的描述

我使用消息队列/邮箱机制来实现这一点。 这与演员模型类似。 我有一个有一个邮箱的类。 我称这个class是我的“工作人员”。 它可以接收消息。 这些消息排队,实质上,它们定义了我希望工作人员运行的任务。 工作人员将Task.Wait()用于其任务完成,然后出队下一条消息并开始下一个任务。

通过限制我拥有的工作人员数量,我可以限制正在运行的并发线程/任务的数量。

这是源代码在分布式计算引擎的博客文章中概述的。 如果你看IActor和WorkerNode的代码,我希望这是有道理的。

https://long2know.com/2016/08/creating-a-distributed-computing-engine-with-the-actor-model-and-net-core/