确保线程池中的任务执行顺序

我一直在阅读有关线程池模式,我似乎无法find以下问题的通常解决scheme。

我有时想让任务连续执行。 例如,我从文件中读取文本块,出于某种原因,我需要按照该顺序处理块。 所以基本上我想要消除一些任务的并发性。

考虑这种情况,带*的任务需要按照它们的顺序进行处理。其他任务可以按任意顺序处理。

 push task1 push task2 push task3 * push task4 * push task5 push task6 * .... and so on 

在一个线程池的上下文中,没有这个约束,一个待处理任务的队列工作正常,但是在这里显然没有。

我想过让一些线程在一个线程特定的队列上运行,其他的在“全局”队列上运行。 然后,为了连续执行一些任务,我只需将它们推送到单个线程所在的队列中。 这听起来有点笨拙。

所以,在这个漫长的故事中真正的问题是:你将如何解决这个问题? 你将如何确保这些任务是有序的

编辑

作为一个更普遍的问题,假设上面的情况变成了

 push task1 push task2 ** push task3 * push task4 * push task5 push task6 * push task7 ** push task8 * push task9 .... and so on 

我的意思是一个小组内的任务应该按顺序执行,但这些小组本身可以混合在一起。 所以你可以有3-2-5-4-7例如。

还有一点需要注意的是,我并不能直接访问一个组中的所有任务(我不能等待所有的任务在开始组之前到达)。

感谢您的时间。

类似下面的内容将允许串行和并行任务排队,其中串行任务将被一个接一个地执行,并行任务将以任意顺序执行,但并行执行。 这使您能够在必要时序列化任务,也可以执行并行任务,但是在接收任务时这样做,即不需要事先知道整个序列,执行顺序是dynamic维护的。

 internal class TaskQueue { private readonly object _syncObj = new object(); private readonly Queue<QTask> _tasks = new Queue<QTask>(); private int _runningTaskCount; public void Queue(bool isParallel, Action task) { lock (_syncObj) { _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task }); } ProcessTaskQueue(); } public int Count { get{lock (_syncObj){return _tasks.Count;}} } private void ProcessTaskQueue() { lock (_syncObj) { if (_runningTaskCount != 0) return; while (_tasks.Count > 0 && _tasks.Peek().IsParallel) { QTask parallelTask = _tasks.Dequeue(); QueueUserWorkItem(parallelTask); } if (_tasks.Count > 0 && _runningTaskCount == 0) { QTask serialTask = _tasks.Dequeue(); QueueUserWorkItem(serialTask); } } } private void QueueUserWorkItem(QTask qTask) { Action completionTask = () => { qTask.Task(); OnTaskCompleted(); }; _runningTaskCount++; ThreadPool.QueueUserWorkItem(_ => completionTask()); } private void OnTaskCompleted() { lock (_syncObj) { if (--_runningTaskCount == 0) { ProcessTaskQueue(); } } } private class QTask { public Action Task { get; set; } public bool IsParallel { get; set; } } } 

更新

要处理具有串行和并行任务组合的任务组, GroupedTaskQueue可以pipe理每个组的TaskQueue 。 同样,您不需要事先知道群组,它是在接收任务时dynamicpipe理的。

 internal class GroupedTaskQueue { private readonly object _syncObj = new object(); private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>(); private readonly string _defaultGroup = Guid.NewGuid().ToString(); public void Queue(bool isParallel, Action task) { Queue(_defaultGroup, isParallel, task); } public void Queue(string group, bool isParallel, Action task) { TaskQueue queue; lock (_syncObj) { if (!_queues.TryGetValue(group, out queue)) { queue = new TaskQueue(); _queues.Add(group, queue); } } Action completionTask = () => { task(); OnTaskCompleted(group, queue); }; queue.Queue(isParallel, completionTask); } private void OnTaskCompleted(string group, TaskQueue queue) { lock (_syncObj) { if (queue.Count == 0) { _queues.Remove(group); } } } } 

线程池适用于任务的相对顺序无关紧要的情况,只要它们全部完成。 尤其是,它们都可以并行完成。

如果你的任务必须按照特定的顺序完成,那么它们不适合并行,所以一个线程池是不合适的。

如果要将这些串行任务从主线程移出,那么具有任务队列的单个后台线程将适合这些任务。 您可以继续使用线程池来处理适合并行的其余任务。

是的,这意味着你必须决定在哪里提交任务,这取决于它是一个有序的任务还是一个“可能是并行的”任务,但这不是什么大不了的事情。

如果您有必须序列化的组,但是可以与其他任务并行运行,那么您有多个select:

  1. 为每个组创build一个单独的任务,按顺序执行相关的组任务,并将此任务发布到线程池。
  2. 让组中的每个任务显式等待组中的前一个任务,并将其发布到线程池中。 这要求您的线程池可以处理线程正在等待尚未计划的任务而没有死锁的情况。
  3. 为每个组分配一个专用线程,并在相应的消息队列上发布组任务。

为了做一个线程池,你可能需要创build一些调度器。

类似的东西:

TaskQueue – > Scheduler – > Queue – > ThreadPool

调度程序在自己的线程中运行,保持作业之间的依赖关系。 当一个工作准备完成时,调度器只是把它推到线程池的队列中。

ThreadPool可能必须向Scheduler发送信号,以指示作业何时完成,以便调度程序可以根据作业将作业放入Queue中。

在你的情况下,依赖可能被存储在一个链表中。

假设您有以下依赖关系:3 – > 4 – > 6 – > 8

作业3正在线程池上运行,您仍然不知道作业8是否存在。

工作3结束。 你从链表中删除3,你把队列中的作业4放到线程池中。

工作8到达。 你把它放在链表的末尾。

必须完全同步的唯一结构是调度程序之前和之后的队列。

基本上,有一些未决的任务。 某些任务只能在一个或多个其他未决任务完成执行时执行。

未决任务可以在依赖关系图中build模:

  • “任务1 – >任务2”是指“任务2只有在任务1完成后才能执行”。 箭头指向执行顺序的方向。
  • 任务的不完整(指向它的任务的数量)确定任务是否准备好执行。 如果indegree是0,它可以被执行。
  • 有时一个任务必须等待多个任务完成,那么这个任务就是> 1。
  • 如果一个任务不需要等待其他任务完成(它的indegree为零),就可以将它提交给带有工作线程的线程池,或者带有等待被工作线程拾取的任务的队列。 你知道提交的任务不会造成死锁,因为任务不等待任何东西。 作为优化,您可以使用优先级队列,例如,依赖关系图中更多任务所依赖的任务将首先执行。 这也不能引起死锁,因为线程池中的所有任务都可以执行。 但是,它可以激起饥饿。
  • 如果一个任务完成执行,它可以从依赖关系图中移除,可能减less其他任务的整数,然后可以提交给工作线程池。

所以有(至less)一个线程用于添加/删除挂起的任务,并且有一个工作线程的线程池。

将任务添加到依赖关系图时,您必须检查:

  • 在依赖关系图中如何连接任务:它必须等待什么任务完成,哪些任务必须等待它完成? 相应地从新任务中获取连接。
  • 一旦连接被绘制:新的连接是否会导致依赖关系图中的任何循环? 如果是这样,就有一个僵局。

性能

  • 如果并行执行实际上很less可能,这种模式比顺序执行要慢,因为您需要额外的pipe理来按顺序执行所有操作。
  • 如果在实践中可以同时执行许多任务,则这种模式是快速的。

假设

正如您可能已经阅读过的内容一样,您必须devise任务,以免干扰其他任务。 而且,必须有一种方法来确定任务的优先级。 任务优先级应该包括每个任务处理的数据。 两个任务不能同时改变同一个对象; 其中一个任务应该优先于另一个,或者对该对象执行的操作必须是线程安全的。

如果我正确地理解了这个问题,那么jdk的执行者就没有这个能力,但是很容易推出自己的。 你基本上需要

  • 一个工作线程池,每个工作线程都有一个专用队列
  • 对您提供的那些队列进行一些抽象工作(参见ExecutorService
  • 一些确定性地为每件作品select特定队列的algorithm
  • 然后每件作品都会获得优惠信息,并以正确的顺序进行处理

与jdk执行者的区别在于他们有1个队列,有n个线程,但你需要n个队列和m个线程(其中n可以不等于m)

*阅读后编辑每个任务有一个键*

再详细一点

  • 编写一些将键转换为给定范围(0-n,其中n是所需线程数)的索引(int),这可以像key.hashCode() % n一样简单,也可以是已知键值的一些静态映射到线程或任何你想要的
  • 在启动时
    • 创buildn个队列,把它们放在一个索引结构(数组,列表)
    • 开始n个线程,每个线程只是从队列中取一个阻塞
    • 当它接收到一些工作时,它知道如何执行特定于该任务/事件的工作(如果有异构事件,显然可以将任务映射到动作)
  • 将其存储在接受工作项目的某个外观后面
  • 当任务到达时,把它交给门面
    • 门面基于密钥find任务的正确队列,并将其提供给该队列

添加自动重新启动的工作线程到这个scheme是非常简单的,然后你需要工作线程向某个pipe理器注册以声明“我拥有这个队列”,然后对这个线程进行一些pipe理+检测线程中的错误(这意味着它取消注册该队列的所有权,将该队列返回到空闲的队列池,这是触发新线程的触发器)

选项1 – 复杂的一个

既然你有顺序的工作,你可以把这些工作集中在一起,让工作完成后自己重新提交给线程池。 假设我们有一份工作清单:

  [Task1, ..., Task6] 

就像你的例子。 我们有一个顺序的依赖关系,这样[Task3, Task4, Task6]是一个依赖链。 我们现在做一个工作(Erlang伪代码):

  Task4Job = fun() -> Task4(), % Exec the Task4 job push_job(Task6Job) end. Task3Job = fun() -> Task3(), % Execute the Task3 Job push_job(Task4Job) end. push_job(Task3Job). 

也就是说,我们通过将Task3作业包装到一个作业中, 作为延续,将队列中的下一个作业推送到线程池中。 这与类似于Node.js或Pythons Twisted框架的系统中的一般延续传递风格有很大的相似之处。

一般来说,您build立了一个系统,您可以在其中定义可defer进一步工作并重新提交进一步工作的工作链。

选项2 – 简单的一个

为什么我们甚至打扰到分工呢? 我的意思是,因为它们是依赖依赖的,所以在同一个线程上执行它们不会比获取这个链并在多个线程上传播更快或者更慢。 假设“足够”的工作负载,任何线程总是有工作,所以只是把工作捆绑在一起可能是最简单的:

  Task = fun() -> Task3(), Task4(), Task6() % Just build a new job, executing them in the order desired end, push_job(Task). 

如果你有一等公民的function,那么你可以用你的语言来构build它们,就像你可以在任何函数式编程语言,Python,Ruby块等等中一样。 。

我不是特别喜欢像“选项1”那样build立一个队列或延续堆栈的想法,但我肯定会select第二个选项。 在Erlang,我们甚至有一个由Erlang Solutions编写的jobs ,称为开源。 jobs是build立执行和负载调节这样的工作执行。 如果我要解决这个问题,我可能会把选项2和工作结合起来。

build议不使用线程池的答案就像对任务依赖关系/执行顺序的知识进行硬编码一样。 相反,我会创build一个CompositeTask来pipe理两个任务之间的开始/结束依赖关系。 通过封装任务接口后面的依赖关系,可以统一处理所有任务,并将其添加到池中。 这隐藏了执行细节,并允许任务依赖性更改,而不会影响您是否使用线程池。

这个问题没有指定一种语言 – 我将使用Java,我希望它是最可读的。

 class CompositeTask implements Task { Task firstTask; Task secondTask; public void run() { firstTask.run(); secondTask.run(); } } 

这将顺序执行任务并在同一个线程上执行。 您可以将许多CompositeTask起来,根据需要创build一系列连续的任务。

这里的缺点是,这将在所有的任务顺序执行期间绑定线程。 您可能还有其他您希望在第一个和第二个任务之间执行的任务。 因此,不是直接执行第二个任务,而是执行第二个任务的复合任务计划:

 class CompositeTask implements Runnable { Task firstTask; Task secondTask; ExecutorService executor; public void run() { firstTask.run(); executor.submit(secondTask); } } 

这确保了第二个任务在第一个任务完成之后才运行,并且允许池执行其他(可能更紧急的)任务。 请注意,第一个和第二个任务可以在单独的线程上执行,因此尽pipe它们不会并发执行,但是任务使用的任何共享数据都必须使其他线程可见(例如,通过使variablesvolatile

这是一个简单而强大且灵活的方法,它允许任务本身定义执行约束,而不是通过使用不同的线程池来完成。

使用两个活动对象 。 用两个词来表示:活动对象模式由优先级队列和一个或多个工作线程组成,它们可以从队列中获取任务并处理它。

因此,使用一个工作线程的一个活动对象:这些将是排队位置的所有任务将按顺序处理。 使用工作线程数超过1的第二个活动对象。在这种情况下,工作线程将以任意顺序从队列中获取和处理任务。

运气。

我认为线程池可以在这种情况下被有效地使用。 这个想法是为每个从属任务组使用独立的strand对象。 您可以使用或不带strand对象将任务添加到您的队列中。 您可以使用具有相关任务的同一个strand对象。 你的调度器检查下一个任务是否有一个strand ,如果这个strand被locking。 如果没有 – locking这strand并运行这个任务。 如果strand已经被locking – 保持这个任务在队列中,直到下一个调度事件。 当任务完成时,解锁它的strand

结果你需要单一的队列,你不需要任何额外的线程,没有复杂的组等。 strand对象可以很简单,用两种方法lockunlock

我经常遇到同样的devise问题,例如,处理多个同时会话的asynchronousnetworking服务器。 当会话中的任务是依赖的(这将会话内部任务映射到组内的依赖任务)时,会话是独立的(这将它们映射到独立任务和从属任务组)。 使用描述的方法,我完全避免了会话内的显式同步 每个会话都有自己的strand对象。

更重要的是,我使用这个想法的现有(伟大)实现: Boost Asio库 (C ++)。 我只是用他们的术语strand 。 实现是优雅的:在调度它们之前,我我的asynchronous任务包装到相应的strand对象中。

我想你是混合的概念。 当你想在线程之间分配一些工作时,线程池是可以的,但是如果你开始混合线程之间的依赖关系,那么这不是一个好主意。

我的build议,只是不要使用线程池来完成这些任务。 只要创build一个专用的线程,并保持一个简单的顺序项目队列,必须由该线程单独处理。 然后,如果没有顺序要求,可以继续向线程池中推送任务,并在有时使用专用线程。

澄清:使用常识,串行任务的队列应由一个接一个处理每个任务的单个线程执行:)

你将如何确保这些任务是有序的?

 push task1 push task2 push task346 push task5 

为了响应编辑:

 push task1 push task27 ** push task3468 * push task5 push task9 

就我所了解的情况而言,这是可以实现的。 基本上你需要的是做一些聪明的事情来协调你的任务在主线程中。 您需要的Java API是ExecutorCompletionService和Callable

首先,实现你的可调用任务:

 public interface MyAsyncTask extends Callable<MyAsyncTask> { // tells if I am a normal or dependent task private boolean isDependent; public MyAsyncTask call() { // do your job here. return this; } } 

然后在你的主线程中,使用CompletionService协调依赖任务的执行(即等待机制):

 ExecutorCompletionService<MyAsyncTask> completionExecutor = new ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5)); Future<MyAsyncTask> dependentFutureTask = null; for (MyAsyncTask task : tasks) { if (task.isNormal()) { // if it is a normal task, submit it immediately. completionExecutor.submit(task); } else { if (dependentFutureTask == null) { // submit the first dependent task, get a reference // of this dependent task for later use. dependentFutureTask = completionExecutor.submit(task); } else { // wait for last one completed, before submit a new one. dependentFutureTask.get(); dependentFutureTask = completionExecutor.submit(task); } } } 

通过这样做,你使用一个执行器(threadpool size 5)来执行正常任务和依赖任务,一旦提交就立即执行正常任务,依赖任务逐一执行(等待在主线程中通过调用get ()在提交新的依赖任务之前),所以在任何时候,你总是有一些正常的任务和一个依赖的任务(如果存在的话)在单个线程池中运行。

这只是一个开始,通过使用ExecutorCompletionService,FutureTask和Semaphore,可以实现更复杂的线程协调场景。

你有两种不同的任务。 把它们混合在一个队列中感觉很奇怪。 而不是有一个队列有两个。 为了简单起见,您甚至可以使用ThreadPoolExecutor。 对于串行任务只是给它一个固定的大小为1,对于可以同时执行的任务给予更多。 我不明白为什么这将是笨拙的。 保持简单和愚蠢。 你有两个不同的任务,所以相应地对待他们。

由于在启动依赖任务之前,只需要等待一个任务完成,如果可以在第一个任务中安排依赖任务,则可以轻松完成。 因此,在第二个示例中:在任务2结束时,调度任务7,在任务3结束时,调度任务4,依次为4-> 6和6-> 8。

一开始只需要安排任务1,2,5,9 …其余的应该跟随。

一个更普遍的问题是当一个依赖任务可以开始之前,你必须等待多个任务。 处理高效率是一个不平凡的工作。

有一个专门为此目的而被称为dexecutor的java框架(免责声明:我是所有者)

 DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor(); executor.addDependency("task1", "task2"); executor.addDependency("task4", "task6"); executor.addDependency("task6", "task8"); executor.addIndependent("task3"); executor.addIndependent("task5"); executor.addIndependent("task7"); executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING); 

任务1,任务3,任务5,任务7并行运行(取决于线程池大小),一旦任务1完成,任务2运行,任务2完成任务4运行,任务4完成任务6运行,最后任务6完成任务8运行。

有很多答案,显然有一个被接受。 但为什么不使用延续?

如果你有一个已知的“串行”条件,那么当你用这个条件排队第一个任务时,按住任务; 并为进一步的任务调用Task.ContinueWith()。

 public class PoolsTasks { private readonly object syncLock = new object(); private Task serialTask = Task.CompletedTask; private bool isSerialTask(Action task) { // However you determine what is serial ... return true; } public void RunMyTask(Action myTask) { if (isSerialTask(myTask)) { lock (syncLock) serialTask = serialTask.ContinueWith(_ => myTask()); } else Task.Run(myTask); } }