如何使ThreadPoolExecutor的submit()方法块如果饱和?
我想创build一个ThreadPoolExecutor
,以便当它达到其最大值并且队列已满时, sumbit()
方法会在尝试添加新任务时阻塞。 我是否需要实现一个自定义的RejectedExecutionHandler
,或者是否有一个现有的方法来使用标准的Java库做到这一点?
我刚刚find的可能解决scheme之一:
public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore.release(); throw e; } } }
还有其他解决scheme吗? 我更喜欢基于RejectedExecutionHandler
东西,因为它似乎是处理这种情况的标准方法。
你可以使用ThreadPoolExecutor和一个blockingQueue:
public class ImageManager { BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize); RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread, 0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler); private int downloadThumbnail(String fileListPath){ executorService.submit(new yourRunnable()); } }
检查出这样做的四个select : 创build一个NotifyingBlockingThreadPoolExecutor
您应该使用CallerRunsPolicy
,它在调用线程中执行被拒绝的任务。 这样,在任务完成之前,它不能向执行器提交任何新的任务,此时将会有一些空闲的线程池或重复该过程。
从文档:
拒绝的任务
执行方法execute(java.lang.Runnable)中提交的新任务将在执行程序closures时被拒绝,并且执行程序对最大线程和工作队列的容量使用有限边界且饱和。 无论哪种情况,execute方法都会调用RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,java.util.concurrent.ThreadPoolExecutor)方法。 提供四个预定义的处理程序策略
- 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在拒绝时抛出运行时RejectedExecutionException。
- 在ThreadPoolExecutor.CallerRunsPolicy中,调用执行本身的线程运行该任务。 这提供了一个简单的反馈控制机制,将减慢提交新任务的速度。
- 在ThreadPoolExecutor.DiscardPolicy中,无法执行的任务将被简单地删除。
- 在ThreadPoolExecutor.DiscardOldestPolicy中,如果执行程序没有closures,工作队列头部的任务将被删除,然后重试执行(可能会再次失败,导致重复执行)。
此外,确保在调用ThreadPoolExecutor
构造函数时使用有界的队列,如ArrayBlockingQueue。 否则,什么都不会被拒绝。
编辑:为了响应您的评论,请将ArrayBlockingQueue的大小设置为等于线程池的最大大小,并使用AbortPolicy。
编辑2:好吧,我看到你在什么。 那么怎么样:重写beforeExecute()
方法来检查getActiveCount()
是否不超过getMaximumPoolSize()
,如果是的话,睡觉,然后再试一次?
Hibernate有一个BlockPolicy
,很简单,可以做你想做的事情:
请参阅: Executors.java
/** * A handler for rejected tasks that will have the caller block until * space is available. */ public static class BlockPolicy implements RejectedExecutionHandler { /** * Creates a <tt>BlockPolicy</tt>. */ public BlockPolicy() { } /** * Puts the Runnable to the blocking queue, effectively blocking * the delegating thread until space is available. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { try { e.getQueue().put( r ); } catch (InterruptedException e1) { log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r ); } } }
上面引用的来自“ Java并发实践 ”中的BoundedExecutor
答案只有在对Executor使用无限队列或信号量限制不大于队列大小时才能正常工作。 信号量在提交线程和池中的线程之间共享状态,即使队列大小<bound <=(队列大小+池大小),也可以使执行程序饱和。
使用CallerRunsPolicy
只有在你的任务不能永久运行的情况下才有效,在这种情况下,你的提交线程将永远保持在rejectedExecution
,如果你的任务需要很长时间才能运行,这是一个坏主意,因为提交线程不能提交任何新的任务或任何其他任何事情,如果它正在运行一个任务本身。
如果这是不可接受的,那么我build议在提交任务之前检查执行程序的有界队列的大小。 如果队列已满,请稍等片刻再尝试提交。 吞吐量将会受到影响,但是我build议这个解决scheme比许多其他解决scheme更简单,并保证没有任何任务会被拒绝。
我知道,这是一个黑客,但在我看来,这里提供的最干净的黑客;-)
由于ThreadPoolExecutor使用阻塞队列“offer”而不是“put”,因此可以覆盖阻塞队列的“offer”行为:
class BlockingQueueHack<T> extends ArrayBlockingQueue<T> { BlockingQueueHack(int size) { super(size); } public boolean offer(T task) { try { this.put(task); } catch (InterruptedException e) { throw new RuntimeException(e); } return true; } } ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));
我testing了它,它似乎工作。 实施一些超时政策是留给读者的练习。
下面的类包装一个ThreadPoolExecutor并使用信号量来阻塞工作队列已满:
public final class BlockingExecutor { private final Executor executor; private final Semaphore semaphore; public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) { BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory); this.semaphore = new Semaphore(queueSize + maxPoolSize); } private void execImpl (final Runnable command) throws InterruptedException { semaphore.acquire(); try { executor.execute(new Runnable() { @Override public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { // will never be thrown with an unbounded buffer (LinkedBlockingQueue) semaphore.release(); throw e; } } public void execute (Runnable command) throws InterruptedException { execImpl(command); } }
这个包装类是基于Brian Goetz的“Java并发实践”(Java Concurrency in Practice)一书中给出的解决scheme。 本书中的解决scheme只有两个构造函数参数:一个Executor
和一个用于信号量的边界。 这在Fixpoint给出的答案中显示。 这种方法存在一个问题:它可能处于线程繁忙,队列已满,但信号量刚刚释放许可的状态。 (在finally块中的semaphore.release()
)。 在这种状态下,一个新的任务可以抓取刚刚释放的许可证,但是由于任务队列已满而被拒绝。 当然这不是你想要的; 你想在这种情况下阻止。
为了解决这个问题,我们必须使用一个无限的队列,正如JCiP明确提到的那样。 信号量充当警卫,给出虚拟队列大小的效果。 这有副作用,有可能单位可以包含maxPoolSize + virtualQueueSize + maxPoolSize
任务。 这是为什么? 由于finally块中的semaphore.release()
。 如果所有的池线程同时调用这个语句,那么maxPoolSize
允许被释放,允许相同数目的任务进入单元。 如果我们使用有界的队列,它仍然是满的,导致被拒绝的任务。 现在,因为我们知道这只在池线程几乎完成时才会发生,所以这不是问题。 我们知道池线程不会阻塞,所以很快就会从队列中取出一个任务。
你可以使用有界的队列。 只要确保其大小等于virtualQueueSize + maxPoolSize
。 更大的尺寸是没用的,信号量会阻止让更多的物品进来。尺寸更小会导致被拒绝的任务。 任务被拒绝的机会随着规模的减小而增加。 例如,假设你想要一个maxPoolSize = 2和virtualQueueSize = 5的有界执行器。 然后用5 + 2 = 7个许可证和一个实际队列大小为5 + 2 = 7的信号量。 那么可以在单位中的实际任务数是2 + 5 + 2 = 9。 当执行程序已满时(队列中有5个任务,线程池中有2个任务,因此允许有0个),并且所有池线程都释放它们的许可证,则进入的任务只能执行2个许可证。
现在,来自JCiP的解决scheme使用起来有些麻烦,因为它没有强制执行所有这些约束(无界队列,或者受这些math约束限制等)。 我认为这只是一个很好的例子来演示如何基于已经可用的部分构build新的线程安全类,而不是一个完整的,可重用的类。 我不认为后者是作者的意图。
你可以像这样使用一个自定义的RejectedExecutionHandler
ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size max_handlers, // max size timeout_in_seconds, // idle timeout TimeUnit.SECONDS, queue, new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // This will block if the queue is full try { executor.getQueue().put(r); } catch (InterruptedException e) { System.err.println(e.getMessage()); } } });
创build你自己的阻塞队列,由Executor使用,阻止你正在寻找的行为,同时总是返回可用的剩余容量(确保执行者不会尝试创build比核心池更多的线程,或者触发拒绝处理程序)。
我相信这会让你find你正在寻找的阻塞行为。 拒绝处理程序将永远不符合法案,因为这表明执行者不能执行任务。 我可以想象的是,你在处理程序中得到某种“忙等待”的forms。 这不是你想要的,你想要一个阻止调用者的执行者队列…
你应该看看这个总结了几个解决scheme的链接(通知阻塞线程池) ,最后给出一个优雅的通知。
避免@FixPoint解决scheme的问题。 可以使用ListeningExecutorService并在FutureCallback中释放onSuccess和onFailure信号量。
最近我发现这个问题有同样的问题。 OP没有这么明确地表示,但是我们不想使用在提交者线程上执行一个任务的RejectedExecutionHandler
,因为如果这个任务是长时间运行的,那么这将不利用工作者线程。
阅读所有的答案和评论,特别是信号量的错误解决scheme或使用afterExecute
我仔细看看ThreadPoolExecutor的代码,看看是否有某种出路。 我惊奇地发现有2000多行(注释)代码,其中有些代码让我感到头晕目眩 。 鉴于相当简单的要求,我实际上有一个生产者,几个消费者,当没有消费者可以工作时让生产者阻止 – 我决定推出我自己的解决scheme。 这不是一个ExecutorService
而只是一个Executor
。 而且它不会将线程的数量调整到工作负载,而是只保留固定数量的线程,这也符合我的要求。 这是代码。 随意咆哮:-)
package x; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; /** * distributes {@code Runnable}s to a fixed number of threads. To keep the * code lean, this is not an {@code ExecutorService}. In particular there is * only very simple support to shut this executor down. */ public class ParallelExecutor implements Executor { // other bounded queues work as well and are useful to buffer peak loads private final BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>(); private final Thread[] threads; /*+**********************************************************************/ /** * creates the requested number of threads and starts them to wait for * incoming work */ public ParallelExecutor(int numThreads) { this.threads = new Thread[numThreads]; for(int i=0; i<numThreads; i++) { // could reuse the same Runner all over, but keep it simple Thread t = new Thread(new Runner()); this.threads[i] = t; t.start(); } } /*+**********************************************************************/ /** * returns immediately without waiting for the task to be finished, but may * block if all worker threads are busy. * * @throws RejectedExecutionException if we got interrupted while waiting * for a free worker */ @Override public void execute(Runnable task) { try { workQueue.put(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("interrupt while waiting for a free " + "worker.", e); } } /*+**********************************************************************/ /** * Interrupts all workers and joins them. Tasks susceptible to an interrupt * will preempt their work. Blocks until the last thread surrendered. */ public void interruptAndJoinAll() throws InterruptedException { for(Thread t : threads) { t.interrupt(); } for(Thread t : threads) { t.join(); } } /*+**********************************************************************/ private final class Runner implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { Runnable task; try { task = workQueue.take(); } catch (InterruptedException e) { // canonical handling despite exiting right away Thread.currentThread().interrupt(); return; } try { task.run(); } catch (RuntimeException e) { // production code to use a logging framework e.printStackTrace(); } } } } }
我相信通过使用java.util.concurrent.Semaphore
和委托Executor.newFixedThreadPool
行为来解决这个问题是非常好的方法。 新的执行者服务只有在有线程的情况下才会执行新的任务。 阻塞由Semaphorepipe理,许可数量等于线程数量。 当任务完成时,它返回一个许可证。
public class FixedThreadBlockingExecutorService extends AbstractExecutorService { private final ExecutorService executor; private final Semaphore blockExecution; public FixedThreadBlockingExecutorService(int nTreads) { this.executor = Executors.newFixedThreadPool(nTreads); blockExecution = new Semaphore(nTreads); } @Override public void shutdown() { executor.shutdown(); } @Override public List<Runnable> shutdownNow() { return executor.shutdownNow(); } @Override public boolean isShutdown() { return executor.isShutdown(); } @Override public boolean isTerminated() { return executor.isTerminated(); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return executor.awaitTermination(timeout, unit); } @Override public void execute(Runnable command) { blockExecution.acquireUninterruptibly(); executor.execute(() -> { try { command.run(); } finally { blockExecution.release(); } }); }
过去我也有同样的需求:每个客户端都有一个固定大小的阻塞队列,由共享线程池支持。 我结束了写我自己的一种ThreadPoolExecutor:
UserThreadPoolExecutor(阻塞队列(每个客户端)+线程池(在所有客户端之间共享))
请参阅: https : //github.com/d4rxh4wx/UserThreadPoolExecutor
每个UserThreadPoolExecutor都有一个来自共享ThreadPoolExecutor的最大数量的线程
每个UserThreadPoolExecutor可以:
- 如果未达到配额,则将任务提交给共享线程池执行程序。 如果达到配额,则作业排队(非消耗性阻塞等待CPU)。 一旦其提交的任务之一完成,配额递减,允许另一个任务等待提交给ThreadPoolExecutor
- 等待剩下的任务完成
我在弹性search客户端中发现了这个拒绝策略。 它阻止阻塞队列上的调用者线程。 下面的代码 –
static class ForceQueuePolicy implements XRejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { //should never happen since we never wait throw new EsRejectedExecutionException(e); } } @Override public long rejected() { return 0; } }
我最近有一个需要实现类似的东西,但在ScheduledExecutorService
。
我还必须确保处理在方法上传递的延迟,并确保当调用者期望的时候提交任务或执行失败,从而抛出RejectedExecutionException
exception。
ScheduledThreadPoolExecutor
在内部执行或提交一个任务的其他方法调用#schedule
,它仍然会调用覆盖的方法。
import java.util.concurrent.*; public class BlockingScheduler extends ScheduledThreadPoolExecutor { private final Semaphore maxQueueSize; public BlockingScheduler(int corePoolSize, ThreadFactory threadFactory, int maxQueueSize) { super(corePoolSize, threadFactory, new AbortPolicy()); this.maxQueueSize = new Semaphore(maxQueueSize); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay)); return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS); } @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay)); return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay)); return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit) { final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay)); return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS); } @Override protected void afterExecute(Runnable runnable, Throwable t) { super.afterExecute(runnable, t); try { if (t == null && runnable instanceof Future<?>) { try { ((Future<?>) runnable).get(); } catch (CancellationException | ExecutionException e) { t = e; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) { System.err.println(t); } } finally { releaseQueueUsage(); } } private long beforeSchedule(Runnable runnable, long delay) { try { return getQueuePermitAndModifiedDelay(delay); } catch (InterruptedException e) { getRejectedExecutionHandler().rejectedExecution(runnable, this); return 0; } } private long beforeSchedule(Callable callable, long delay) { try { return getQueuePermitAndModifiedDelay(delay); } catch (InterruptedException e) { getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this); return 0; } } private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException { final long beforeAcquireTimeStamp = System.currentTimeMillis(); maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS); final long afterAcquireTimeStamp = System.currentTimeMillis(); return afterAcquireTimeStamp - beforeAcquireTimeStamp; } private void releaseQueueUsage() { maxQueueSize.release(); } }
我有这里的代码,将不胜感激任何反馈。 https://github.com/AmitabhAwasthi/BlockingScheduler
这里的解决scheme似乎工作得很好。 它被称为NotifyingBlockingThreadPoolExecutor 。
演示程序。
编辑:有这个代码的问题 ,await()方法是越野车。 调用shutdown()+ awaitTermination()似乎工作正常。
我并不总是喜欢CallerRunsPolicy,尤其是因为它允许被拒绝的任务“跳过队列”并在之前提交的任务之前执行。 而且,在调用线程上执行任务可能需要比等待第一个时隙更长的时间。
我使用自定义的RejectedExecutionHandler解决了这个问题,它只是简单地阻塞调用线程一段时间,然后尝试再次提交任务:
public class BlockWhenQueueFull implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // The pool is full. Wait, then try again. try { long waitMs = 250; Thread.sleep(waitMs); } catch (InterruptedException interruptedException) {} executor.execute(r); } }
这个类只能在线程池执行器中像任何其他一样被用作RejectedExecutinHandler,例如:
executorPool = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new BlockWhenQueueFull());
我看到唯一的缺点是调用线程可能会被locking的时间稍长于绝对必要的时间(高达250ms)。 而且,由于这个执行器被有效地recursion调用,很长时间等待一个线程变为可用(小时)可能导致堆栈溢出。
不过,我个人喜欢这个方法。 它结构紧凑,易于理解,并且运行良好。