当队列满时ThreadPoolExecutor块?
我正在尝试使用ThreadPoolExecutor执行大量的任务。 以下是一个假设的例子:
def workQueue = new ArrayBlockingQueue<Runnable>(3, false) def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue) for(int i = 0; i < 100000; i++) threadPoolExecutor.execute(runnable)
问题是我很快得到java.util.concurrent.RejectedExecutionException,因为任务的数量超过了工作队列的大小。 但是,我期望的行为是让主线程阻塞,直到队列中有空间。 什么是完成这个最好的方法?
在一些非常狭隘的情况下,你可以实现一个java.util.concurrent.RejectedExecutionHandler来完成你所需要的。
RejectedExecutionHandler block = new RejectedExecutionHandler() { rejectedExecution(Runnable r, ThreadPoolExecutor executor) { executor.getQueue().put( r ); } }; ThreadPoolExecutor pool = new ... pool.setRejectedExecutionHandler(block);
现在。 由于以下原因,这是一个非常糟糕的主意
- 这很容易发生死锁,因为池中的所有线程都可能在放入队列之前死亡。 通过设置一个合理的存活时间来缓解这个问题。
- 这个任务没有被执行者预期的方式所包裹。 许多执行者实现在执行之前将其任务包装在某种跟踪对象中。 看看你的来源。
- 通过getQueue()进行添加是非常不鼓励的API,并可能在某些时候被禁止。
一个几乎总是更好的策略是安装ThreadPoolExecutor.CallerRunsPolicy,它将通过在调用execute()的线程上运行任务来限制您的应用程序。
然而,有时一个具有所有固有风险的封锁策略,实际上就是你想要的。 我会在这些条件下说
- 你只有一个线程调用execute()
- 您必须(或想要)具有非常小的队列长度
- 你绝对需要限制运行这个工作的线程数量(通常是为了外部原因),而调用者运行策略会打破这个。
- 你的任务的大小是不可预知的,所以如果这个池一时忙于4个短任务,并且你的一个调用execute的线程陷入了一个大问题,那么调用者可能会导致饥饿。
所以,正如我所说。 这很less需要,可能是危险的,但是你去了。
祝你好运。
检查固定点的答案: https ://stackoverflow.com/a/2001205“Java实践中的并发”一书。
这里是我的代码片断在这种情况下:
public void executeBlocking( Runnable command ) { if ( threadPool == null ) { logger.error( "Thread pool '{}' not initialized.", threadPoolName ); return; } ThreadPool threadPoolMonitor = this; boolean accepted = false; do { try { threadPool.execute( new Runnable() { @Override public void run() { try { command.run(); } // to make sure that the monitor is freed on exit finally { // Notify all the threads waiting for the resource, if any. synchronized ( threadPoolMonitor ) { threadPoolMonitor.notifyAll(); } } } } ); accepted = true; } catch ( RejectedExecutionException e ) { // Thread pool is full try { // Block until one of the threads finishes its job and exits. synchronized ( threadPoolMonitor ) { threadPoolMonitor.wait(); } } catch ( InterruptedException ignored ) { // return immediately break; } } } while ( !accepted ); }
threadPool是已经初始化的java.util.concurrent.ExecutorService的本地实例。
我使用一个自定义的RejectedExecutionHandler解决了这个问题,它只是简单地阻塞了一段时间的调用线程,然后尝试再次提交任务:
public class BlockWhenQueueFull implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // The pool is full. Wait, then try again. try { long waitMs = 250; Thread.sleep(waitMs); } catch (InterruptedException interruptedException) {} executor.execute(r); } }
这个类只能在线程池执行程序中像任何其他程序一样用作RejectedExecutionHandler。 在这个例子中:
executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())
我看到唯一的缺点是调用线程可能会被locking的时间稍长于绝对必要的时间(高达250ms)。 对于许多短时间运行的任务,可能会将等待时间减less到10ms左右。 而且,由于这个执行器被有效地recursion调用,很长时间等待一个线程变为可用(小时)可能导致堆栈溢出。
不过,我个人喜欢这个方法。 它结构紧凑,易于理解,并且运行良好。 我错过了什么重要的?