如何让ThreadPoolExecutor在排队之前将线程增加到max?
我一直在使用ThreadPoolExecutor
的默认行为感到沮丧, ThreadPoolExecutor
支持许多人使用的ExecutorService
线程池。 引用Javadocs:
如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则只有在队列已满时才会创build一个新线程。
这意味着如果使用下面的代码定义一个线程池,它将永远不会启动第二个线程,因为LinkedBlockingQueue
是无界的。
ExecutorService threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
只有当你有一个有限的队列和队列已满时 ,核心号码以上的任何线程才会启动。 我怀疑有大量的初级Javamultithreading程序员不了解ThreadPoolExecutor
的这种行为。
现在我有特定的用例,这是不是最佳的。 我正在寻找方法,而无需编写自己的TPE课程来解决这个问题。
我的要求是一个Web服务,正在回电给一个可能不可靠的第三方。
- 我不想与web请求同步callback,所以我想使用线程池。
- 我通常得到这些几分钟,所以我不想有一个
newFixedThreadPool(...)
与大量的线程,大多是hibernate的。 - 每隔一段时间,我都会碰到这种stream量,我想把线程的数量扩大到最大值(比方说50)。
- 我需要做一个最好的尝试来做所有的callback,所以我想排队50以上的任何额外的。我不想通过使用
newCachedThreadPool()
淹没我的networking服务器的其余部分。
如何解决ThreadPoolExecutor
中的限制, 在更multithreading启动之前队列需要被限制和已满? 如何在排队任务之前启动更multithreading?
编辑:
@Flavio关于使用ThreadPoolExecutor.allowCoreThreadTimeOut(true)
使核心线程超时并退出很有用。 我认为,但我仍然需要核心线程function。 如果可能的话,我不希望池中的线程数量低于核心大小。
我怎样才能解决这个限制在
ThreadPoolExecutor
的队列需要被限制和充分之前,更多的线程将被启动。
我相信我终于find了一个用ThreadPoolExecutor
来解决这个限制的有点优雅(也许有点冒险)的解决scheme。 它涉及到扩展queue.offer(...)
使其在queue.offer(...)
返回false
时已经有一些任务排队。 如果当前线程没有跟上排队的任务,TPE将添加额外的线程。 如果池已经处于最大线程数,那么将调用RejectedExecutionHandler
。 然后是处理程序,然后put(...)
放入队列中。
写一个队列的地方肯定是很奇怪的, offer(...)
可以返回false
, put()
永远不会被阻塞,所以这就是黑客部分。 但是这对于TPE的队列使用效果很好,所以我没有看到这样做的问题。
代码如下:
// extend LinkedBlockingQueue to force offer() to return false conditionally BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { private static final long serialVersionUID = -6903933921423432194L; @Override public boolean offer(Runnable e) { /* * Offer it to the queue if there is 0 items already queued, else * return false so the TPE will add another thread. If we return false * and max threads have been reached then the RejectedExecutionHandler * will be called which will do the put into the queue. */ if (size() == 0) { return super.offer(e); } else { return false; } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*secs*/, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { /* * This does the actual put into the queue. Once the max threads * have been reached, the tasks will then queue up. */ executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } });
有了这个机制,当我将任务提交给队列时, ThreadPoolExecutor
将:
- 最初将线程数扩展到核心大小(这里是1)。
- 提供给队列。 如果队列是空的,它将排队等待现有的线程处理。
- 如果队列已经有一个或多个元素,
offer(...)
将返回false。 - 如果返回false,则扩大池中的线程数,直到达到最大值(这里是50)。
- 如果在最大那么它调用
RejectedExecutionHandler
-
RejectedExecutionHandler
然后将任务放入队列,以FIFO顺序由第一个可用线程处理。
虽然在我上面的示例代码中,队列是无界的,但也可以将其定义为有界队列。 例如,如果您向LinkedBlockingQueue
添加容量1000,则会:
- 将线程扩展到最大
- 然后排队等待,直到完成1000个任务
- 然后阻止呼叫者,直到空间变得可用于队列。
另外,如果您确实需要在RejectedExecutionHandler
使用offer(...)
,那么您可以使用offer(E, long, TimeUnit)
方法而不是Long.MAX_VALUE
作为超时。
编辑:
我已经调整了我的offer(...)
方法覆盖每Ralf的反馈。 这只会扩大池中的线程数量,如果他们跟不上负载。
编辑:
对这个答案的另一个调整可能是实际询问TPE是否存在空闲线程,如果存在空闲线程则只对队列进行排队。 你将不得不为此做一个真正的类,并添加一个ourQueue.setThreadPoolExecutor(tpe);
方法就可以了。
那么你的offer(...)
方法可能看起来像这样:
- 检查
tpe.getPoolSize() == tpe.getMaximumPoolSize()
在这种情况下调用super.offer(...)
。 - 否则,如果
tpe.getPoolSize() > tpe.getActiveCount()
然后调用super.offer(...)
因为似乎有空闲的线程。 - 否则返回
false
来分叉另一个线程。
也许这个:
int poolSize = tpe.getPoolSize(); int maximumPoolSize = tpe.getMaximumPoolSize(); if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) { return super.offer(e); } else { return false; }
请注意,TPE上的get方法是昂贵的,因为它们访问volatile
字段或(在getActiveCount()
的情况下)lockingTPE并遍历线程列表。 此外,这里有竞争条件可能会导致任务不正确排队或另一个线程在有空闲线程时分叉。
将核心大小和最大大小设置为相同的值,并允许使用allowCoreThreadTimeOut(true)
从池中删除核心线程。
在这个问题上我已经有了另外两个答案,但是我怀疑这个答案是最好的。
它是基于目前被接受的答案的技术,即:
- 重写队列的
offer()
方法(有时)返回false, - 这导致
ThreadPoolExecutor
产生一个新的线程或拒绝任务,和 - 设置
RejectedExecutionHandler
实际上RejectedExecutionHandler
任务排队。
问题是什么时候offer()
应该返回false。 目前接受的答案在队列上有几个任务时返回false,但正如我在我的评论中指出的那样,这会导致不希望的结果。 或者,如果您始终返回false,则即使您有线程在队列中等待,也会继续产生新线程。
解决scheme是使用Java 7 LinkedTransferQueue
并offer()
调用tryTransfer()
。 当有一个等待消费者线程的任务将被传递给该线程。 否则, offer()
将返回false,并且ThreadPoolExecutor
将产生一个新的线程。
BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() { @Override public boolean offer(Runnable e) { return tryTransfer(e); } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } });
下面是一个让我感觉更简单的版本:无论何时执行新任务,增加corePoolSize(maximumPoolSize的限制),然后减小corePoolSize(下降到用户指定的“core pool size”的限制)任务完成。
换句话说,跟踪运行或排队任务的数量,并且只要在用户指定的“核心池大小”和maximumPoolSize之间,确保corePoolSize等于任务数量。
public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor { private int userSpecifiedCorePoolSize; private int taskCount; public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); userSpecifiedCorePoolSize = corePoolSize; } @Override public void execute(Runnable runnable) { synchronized (this) { taskCount++; setCorePoolSizeToTaskCountWithinBounds(); } super.execute(runnable); } @Override protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); synchronized (this) { taskCount--; setCorePoolSizeToTaskCountWithinBounds(); } } private void setCorePoolSizeToTaskCountWithinBounds() { int threads = taskCount; if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize; if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize(); setCorePoolSize(threads); } }
正如所写,类不支持在构造之后更改用户指定的corePoolSize或maximumPoolSize,也不支持直接或通过remove()
或purge()
操作工作队列。
我们有一个ThreadPoolExecutor
的子类,它接受一个额外的creationThreshold
ThreadPoolExecutor
并覆盖execute
。
public void execute(Runnable command) { super.execute(command); final int poolSize = getPoolSize(); if (poolSize < getMaximumPoolSize()) { if (getQueue().size() > creationThreshold) { synchronized (this) { setCorePoolSize(poolSize + 1); setCorePoolSize(poolSize); } } } }
也许这也有帮助,但是你的看起来更加艺术化。
build议的答案只解决了JDK线程池中的一个(1)问题:
-
JDK线程池偏向于排队。 所以不是产生一个新的线程,而是排列任务。 只有当队列达到极限,线程池才会产生一个新的线程。
-
线程退休不会在负载变轻时发生。 例如,如果我们有一连串的作业碰到池,导致池达到最大,然后轻负载最多2个任务,池将使用所有线程服务轻负载,以防止线程退役。 (只需要2个线程…)
对上面的行为不满意,我继续实施了一个池来克服上面的缺陷。
解决方法2)使用Lifo调度解决问题。 Ben Maurer在ACM 2015年应用会议上提出了这个想法: Systems @ Facebook规模
所以一个新的实现诞生了:
LifoThreadPoolExecutorSQP
到目前为止,这个实现提高了ZEL的asynchronous执行性能。
该实现是自旋能够减less上下文切换的开销,对于某些用例产生优越的性能。
希望它有帮助…
PS:JDK Fork Join Pool实现ExecutorService,作为一个“普通”线程池,执行是高性能的,它使用LIFO线程调度,但是不能控制内部队列大小,退休超时…
我有另一个build议,按照最初的想法改变队列返回false。 在这个例子中,所有的任务都可以进入队列,但是当execute()
之后任何一个任务被排队时,我们都会跟着它的队列所拒绝的一个哨兵no-op任务,导致一个新的线程产生,这将执行no-op紧接着是队列中的东西。
由于工作线程可能正在轮询LinkedBlockingQueue
以执行新任务,因此即使存在可用线程,任务也可能会排入队列。 为了避免在有线程可用的情况下产生新的线程,我们需要跟踪有多less线程正在等待队列上的新任务,并且只有当队列上的任务多于等待线程时才产生一个新的线程。
final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } }; final AtomicInteger waitingThreads = new AtomicInteger(0); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { @Override public boolean offer(Runnable e) { // offer returning false will cause the executor to spawn a new thread if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get(); else return super.offer(e); } @Override public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.poll(timeout, unit); } finally { waitingThreads.decrementAndGet(); } } @Override public Runnable take() throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.take(); } finally { waitingThreads.decrementAndGet(); } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) { @Override public void execute(Runnable command) { super.execute(command); if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP); } }; threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r == SENTINEL_NO_OP) return; else throw new RejectedExecutionException(); } });
我能想到的最好的解决办法就是扩展。
ThreadPoolExecutor
提供了几个钩子方法: beforeExecute
和afterExecute
。 在你的扩展中,你可以保持使用一个有界的队列来提供任务,第二个无限的队列来处理溢出。 当有人调用submit
,可以尝试将请求放入有界队列中。 如果遇到exception情况,只需将任务粘贴到溢出队列中即可。 然后,您可以使用afterExecute
挂钩完成任务后查看溢出队列中是否有任何内容。 这样,执行者会首先处理有界队列中的东西,并在时间允许的情况下自动从这个无界队列中抽取。
这似乎比你的解决scheme更多的工作,但至less它不涉及给队列意外的行为。 我也想象有一个更好的方法来检查队列和线程的状态,而不是依靠exception,这是相当缓慢的抛出。