Java:在特定队列大小后阻止提交的ExecutorService
我试图编写一个解决scheme,在这个解决scheme中,一个线程产生可以并行执行的I / O密集型任务。 每个任务都有重要的内存数据。 所以我希望能够限制暂时的任务数量。
如果我这样创buildThreadPoolExecutor:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(maxQueue));
然后executor.submit(callable)
抛出RejectedExecutionException
当队列填满和所有线程已经忙。
我可以做些什么来使executor.submit(callable)
块当队列已满,所有线程都忙?
编辑 :我试过这个 :
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
它有点达到我想达到的效果,但以一种不雅的方式(基本上拒绝的线程在调用线程中运行,因此阻止调用线程提交更多)。
编辑:( 5年后提问)
对于阅读这个问题及其答案的任何人,请不要把接受的答案作为一个正确的解决scheme。 请阅读所有的答案和评论。
我也做了同样的事情。 诀窍是创build一个BlockingQueue,其中offer()方法实际上是一个put()。 (你可以使用任何基础BlockingQueue impl你想要的)。
public class LimitedQueue<E> extends LinkedBlockingQueue<E> { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }
请注意,这只适用于corePoolSize==maxPoolSize
线程池,因此请注意(请参阅注释)。
下面是我如何解决这个问题:
(注意:这个解决scheme确实阻塞了提交Callable的线程,所以它可以防止抛出RejectedExecutionException)
public class BoundedExecutor extends ThreadPoolExecutor{ private final Semaphore semaphore; public BoundedExecutor(int bound) { super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); semaphore = new Semaphore(bound); } /**Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{ semaphore.acquire(); return submit(task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); semaphore.release(); } }
目前接受的答案有一个潜在的重大问题 – 它改变了ThreadPoolExecutor.execute的行为,如果你有一个corePoolSize < maxPoolSize
,ThreadPoolExecutor逻辑决不会在核心之外增加额外的工作。
从ThreadPoolExecutor .execute(Runnable):
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);
具体来说,最后的“其他”块将永远不会被击中。
更好的select是做类似于OP已经做的事情 – 使用RejectedExecutionHandler来执行相同的put
逻辑:
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e); } }
正如评论中指出的那样(指这个答案 ),有一些需要注意的方法:
- 如果
corePoolSize==0
,则在任务可见之前存在池中的所有线程都可能死亡的争用条件 - 使用包装队列任务(不适用于
ThreadPoolExecutor
)的实现将导致问题,除非处理程序也以相同的方式包装它。
牢记这些问题,该解决scheme将适用于大多数典型的ThreadPoolExecutors,并将正确处理corePoolSize < maxPoolSize
的情况。
我认为这与使用ArrayBlockingQueue
而不是LinkedBlockingQueue
一样简单。
忽略我…这是完全错误的。 ThreadPoolExecutor
调用Queue#offer
put
,这将有你需要的效果。
您可以扩展ThreadPoolExecutor
并提供execute(Runnable)
调用来代替offer
。
这恐怕不是一个完全令人满意的答案。
我有类似的问题,我通过从ThreadPoolExecutor
使用beforeExecute/afterExecute
挂钩实现:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Blocks current task execution if there is not enough resources for it. * Maximum task count usage controlled by maxTaskCount property. */ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final ReentrantLock taskLock = new ReentrantLock(); private final Condition unpaused = taskLock.newCondition(); private final int maxTaskCount; private volatile int currentTaskCount; public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int maxTaskCount) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.maxTaskCount = maxTaskCount; } /** * Executes task if there is enough system resources for it. Otherwise * waits. */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskLock.lock(); try { // Spin while we will not have enough capacity for this job while (maxTaskCount < currentTaskCount) { try { unpaused.await(); } catch (InterruptedException e) { t.interrupt(); } } currentTaskCount++; } finally { taskLock.unlock(); } } /** * Signalling that one more task is welcome */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskLock.lock(); try { currentTaskCount--; unpaused.signalAll(); } finally { taskLock.unlock(); } } }
这对你来说应该是够好的了。 顺便说一句,原来的执行是基于任务大小,因为一个任务可能比另一个任务大100倍,提交两个巨大的任务是杀死箱子,但运行一个大而充足的小是好的。 如果你的I / O密集型任务的大小大致相同,你可以使用这个类,否则只是让我知道,我会发布基于大小的实现。
PS你会想检查ThreadPoolExecutor
javadoc。 这是Doug Lea关于如何轻松定制的非常好的用户指南。
我知道这是一个古老的问题,但有一个类似的问题,创build新的任务是非常快的,如果有太多的OutOfMemoryError发生,因为现有的任务没有完成得够快。
在我的情况下, Callables
提交,我需要的结果,因此我需要存储由executor.submit()
返回的所有Futures
。 我的解决办法是把Futures
放到最大规模的BlockingQueue
。 一旦该队列已满,在完成一些任务之前不会生成更多任务(从队列中删除元素)。 在伪代码中:
final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads); final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize); try { Thread taskGenerator = new Thread() { @Override public void run() { while (reader.hasNext) { Callable task = generateTask(reader.next()); Future future = executor.submit(task); try { // if queue is full blocks until a task // is completed and hence no future tasks are submitted. futures.put(compoundFuture); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } executor.shutdown(); } } taskGenerator.start(); // read from queue as long as task are being generated // or while Queue has elements in it while (taskGenerator.isAlive() || !futures.isEmpty()) { Future compoundFuture = futures.take(); // do something } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (ExecutionException ex) { throw new MyException(ex); } finally { executor.shutdownNow(); }