不可能做一个大小限制的caching线程池?
看起来不可能创build一个caching的线程池,它可以创build一个线程数量的限制。
下面是在标准Java库中实现的静态Executors.newCachedThreadPool:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
所以,使用该模板继续创build一个固定大小的caching线程池:
new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());
现在,如果你使用这个,并提交3个任务,一切都会好的。 提交任何进一步的任务将导致被拒绝的执行exception。
尝试这个:
new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());
将导致所有线程顺序执行。 也就是说,线程池将永远不会使多个线程来处理您的任务。
这是ThreadPoolExecutor的执行方法中的一个错误? 或者也许这是故意的? 或者有其他的方法?
编辑:我想要的东西就像caching线程池(它创build线程按需,然后在一定的时间后杀死它们),但它可以创build的线程数量的限制,并能够继续排队附加任务打到它的线程限制。 根据sjlee的回答,这是不可能的。 看看ThreadPoolExecutor的execute()方法确实是不可能的。 我需要inheritanceThreadPoolExecutor并像SwingWorker那样重写execute(),但是SwingWorker在其execute()方法中做了什么是一个彻底的破解。
ThreadPoolExecutor有以下几个关键的行为,你的问题可以通过这些行为来解释。
当提交任务时,
- 如果线程池尚未达到核心大小,则会创build新线程。
- 如果核心大小已经达到,并且没有空闲线程,则将任务排队。
- 如果核心大小已经达到,没有空闲线程,并且队列变满,它将创build新线程(直到达到最大大小)。
- 如果已达到最大大小,则不存在空闲线程,并且队列变满,拒绝策略将启动。
在第一个例子中,请注意SynchronousQueue的大小基本上为0.因此,达到最大大小(3)的那一刻,拒绝策略就会启动(#4)。
在第二个示例中,select的队列是具有无限大小的LinkedBlockingQueue。 因此,你会陷入行为#2。
对于cachingtypes或固定types,你不能真正的修饰,因为它们的行为几乎完全确定。
如果你想拥有一个有界和dynamic的线程池,你需要使用一个正的核心大小和最大的大小,再加上一个有限大小的队列。 例如,
new ThreadPoolExecutor(10, // core size 50, // max size 10*60, // idle timeout TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)); // queue with a size
除非我错过了一些东西,否则原始问题的解决scheme很简单。 以下代码实现了原始海报所描述的所需行为。 它将产生多达5个线程来处理无限的队列,空闲的线程将在60秒之后终止。
tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); tp.allowCoreThreadTimeOut(true);
在第一个示例中,由于AbortPolicy
是默认的RejectedExecutionHandler
,因此后续任务将被RejectedExecutionHandler
。 ThreadPoolExecutor包含以下策略,您可以通过setRejectedExecutionHandler
方法更改这些setRejectedExecutionHandler
:
CallerRunsPolicy AbortPolicy DiscardPolicy DiscardOldestPolicy
这听起来像你想用CallerRunsPolicycaching线程池。
有同样的问题。 由于没有其他的答案把所有的问题放在一起,我加我的:
现在清楚地写在文档中 :如果使用不阻塞的队列( LinkedBlockingQueue
),则最大线程设置无效,只使用核心线程。
所以:
public class MyExecutor extends ThreadPoolExecutor { public MyExecutor() { super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); allowCoreThreadTimeOut(true); } public void setThreads(int n){ setMaximumPoolSize(Math.max(1, n)); setCorePoolSize(Math.max(1, n)); } }
这个执行者有:
-
没有最大线程的概念,因为我们正在使用无限的队列。 这是一件好事,因为这样的队列如果遵循其通常的策略,可能会导致执行者创build大量的非核心的额外线程。
-
最大大小为
Integer.MAX_VALUE
队列。 如果挂起任务的数量超过Integer.MAX_VALUE
则Submit()
将抛出RejectedExecutionException
。 不知道我们会先用尽内存,否则会发生。 -
有4个核心线程可能。 空闲的核心线程自动退出,如果空闲5秒。所以,是的,严格按需线程。数字可以变化使用
setThreads()
方法。 -
确保核心线程的最小数量不小于1,否则
submit()
将拒绝每个任务。 由于核心线程需要> =最大线程,setThreads()
方法setThreads()
设置最大线程,尽pipe最大线程设置对于无界队列是无用的。
这里的答案都没有解决我的问题,这与使用Apache的HTTP客户端(3.x版本)创build有限的HTTP连接有关。 由于花了我几个小时才弄清楚一个好的设置,我会分享一下:
private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
这将创build一个ThreadPoolExecutor
,它以五个开头,最多保存十个同时运行的线程,使用CallerRunsPolicy
执行。
根据ThreadPoolExecutor的Javadoc:
如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则只有在队列已满时才会创build一个新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创build一个固定大小的线程池。
(强调我的)
抖动的答案是你想要的,尽pipe我的答案是你的另一个问题。 🙂
还有一个select。 您也可以使用任何其他队列,而不是使用新的SynchronousQueue,但是您必须确保其大小为1,以便强制executorservice创build新线程。
看起来好像任何答案实际上回答这个问题 – 事实上,我不能看到这样做 – 即使你从PooledExecutorService子类化,因为许多方法/属性是私人的,例如使addIfUnderMaximumPoolSize被保护,你可以请执行以下操作:
class MyThreadPoolService extends ThreadPoolService { public void execute(Runnable run) { if (poolSize() == 0) { if (addIfUnderMaximumPoolSize(run) != null) return; } super.execute(run); } }
我得到的最接近的是 – 但即使这不是一个很好的解决scheme
new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) { public void execute(Runnable command) { if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) { super.setCorePoolSize(super.getCorePoolSize() + 1); } super.execute(command); } protected void afterExecute(Runnable r, Throwable t) { // nothing in the queue if (getQueue().isEmpty() && getPoolSize() > min) { setCorePoolSize(getCorePoolSize() - 1); } }; };
ps没有testing过以上
这是你想要的(至less我猜是这样)。 为了解释一下Jonathan Feinberg的回答
Executors.newFixedThreadPool(int n)
创build一个线程池,重用使用共享的无界队列的固定数量的线程。 在任何时候,至多nThreads线程将被主动处理任务。 如果在所有线程处于活动状态时提交其他任务,则会在队列中等待,直到线程可用。 如果任何线程在closures之前的执行期间由于失败而终止,那么如果需要执行后续任务,则将取代它。 池中的线程将一直存在,直到明确closures。
这是另一个解决scheme。 我认为这个解决scheme的行为就像你想要的那样(虽然对这个解决scheme并不感到骄傲):
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { public boolean offer(Runnable o) { if (size() > 1) return false; return super.offer(o); }; public boolean add(Runnable o) { if (super.offer(o)) return true; else throw new IllegalStateException("Queue full"); } }; RejectedExecutionHandler handler = new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { queue.add(r); } }; dbThreadExecutor = new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
-
您可以按照@sjlee的build议使用
ThreadPoolExecutor
您可以dynamic控制池的大小。 看看这个问题的更多细节:
dynamic线程池
要么
-
你可以使用java 8引入的newWorkStealingPool API。
public static ExecutorService newWorkStealingPool()
使用所有可用的处理器创build工作线程池作为其目标并行级别。
默认情况下,并行级别设置为服务器中CPU内核的数量。 如果您有4个核心CPU服务器,则线程池大小将为4.此API返回ForkJoinPool
types的ExecutorService
并允许通过从ForkJoinPool中的繁忙线程中窃取任务来窃取空闲线程。