不可能做一个大小限制的caching线程池?

看起来不可能创build一个caching的线程池,它可以创build一个线程数量的限制。

下面是在标准Java库中实现的静态Executors.newCachedThreadPool:

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 

所以,使用该模板继续创build一个固定大小的caching线程池:

 new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>()); 

现在,如果你使用这个,并提交3个任务,一切都会好的。 提交任何进一步的任务将导致被拒绝的执行exception。

尝试这个:

 new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>()); 

将导致所有线程顺序执行。 也就是说,线程池将永远不会使多个线程来处理您的任务。

这是ThreadPoolExecutor的执行方法中的一个错误? 或者也许这是故意的? 或者有其他的方法?

编辑:我想要的东西就像caching线程池(它创build线程按需,然后在一定的时间后杀死它们),但它可以创build的线程数量的限制,并能够继续排队附加任务打到它的线程限制。 根据sjlee的回答,这是不可能的。 看看ThreadPoolExecutor的execute()方法确实是不可能的。 我需要inheritanceThreadPoolExecutor并像SwingWorker那样重写exe​​cute(),但是SwingWorker在其execute()方法中做了什么是一个彻底的破解。

ThreadPoolExecutor有以下几个关键的行为,你的问题可以通过这些行为来解释。

当提交任务时,

  1. 如果线程池尚未达到核心大小,则会创build新线程。
  2. 如果核心大小已经达到,并且没有空闲线程,则将任务排队。
  3. 如果核心大小已经达到,没有空闲线程,并且队列变满,它将创build新线程(直到达到最大大小)。
  4. 如果已达到最大大小,则不存在空闲线程,并且队列变满,拒绝策略将启动。

在第一个例子中,请注意SynchronousQueue的大小基本上为0.因此,达到最大大小(3)的那一刻,拒绝策略就会启动(#4)。

在第二个示例中,select的队列是具有无限大小的LinkedBlockingQueue。 因此,你会陷入行为#2。

对于cachingtypes或固定types,你不能真正的修饰,因为它们的行为几乎完全确定。

如果你想拥有一个有界和dynamic的线程池,你需要使用一个正的核心大小和最大的大小,再加上一个有限大小的队列。 例如,

 new ThreadPoolExecutor(10, // core size 50, // max size 10*60, // idle timeout TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)); // queue with a size 

除非我错过了一些东西,否则原始问题的解决scheme很简单。 以下代码实现了原始海报所描述的所需行为。 它将产生多达5个线程来处理无限的队列,空闲的线程将在60秒之后终止。

 tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); tp.allowCoreThreadTimeOut(true); 

在第一个示例中,由于AbortPolicy是默认的RejectedExecutionHandler ,因此后续任务将被RejectedExecutionHandler 。 ThreadPoolExecutor包含以下策略,您可以通过setRejectedExecutionHandler方法更改这些setRejectedExecutionHandler

 CallerRunsPolicy AbortPolicy DiscardPolicy DiscardOldestPolicy 

这听起来像你想用CallerRunsPolicycaching线程池。

有同样的问题。 由于没有其他的答案把所有的问题放在一起,我加我的:

现在清楚地写在文档中 :如果使用不阻塞的队列( LinkedBlockingQueue ),则最大线程设置无效,只使用核心线程。

所以:

 public class MyExecutor extends ThreadPoolExecutor { public MyExecutor() { super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); allowCoreThreadTimeOut(true); } public void setThreads(int n){ setMaximumPoolSize(Math.max(1, n)); setCorePoolSize(Math.max(1, n)); } } 

这个执行者有:

  1. 没有最大线程的概念,因为我们正在使用无限的队列。 这是一件好事,因为这样的队列如果遵循其通常的策略,可能会导致执行者创build大量的非核心的额外线程。

  2. 最大大小为Integer.MAX_VALUE队列。 如果挂起任务的数量超过Integer.MAX_VALUESubmit()将抛出RejectedExecutionException 。 不知道我们会先用尽内存,否则会发生。

  3. 有4个核心线程可能。 空闲的核心线程自动退出,如果空闲5秒。所以,是的,严格按需线程。数字可以变化使用setThreads()方法。

  4. 确保核心线程的最小数量不小于1,否则submit()将拒绝每个任务。 由于核心线程需要> =最大线程, setThreads()方法setThreads()设置最大线程,尽pipe最大线程设置对于无界队列是无用的。

这里的答案都没有解决我的问题,这与使用Apache的HTTP客户端(3.x版本)创build有限的HTTP连接有关。 由于花了我几个小时才弄清楚一个好的设置,我会分享一下:

 private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); 

这将创build一个ThreadPoolExecutor ,它以五个开头,最多保存十个同时运行的线程,使用CallerRunsPolicy执行。

根据ThreadPoolExecutor的Javadoc:

如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则只有在队列已满时才会创build一个新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创build一个固定大小的线程池。

(强调我的)

抖动的答案是你想要的,尽pipe我的答案是你的另一个问题。 🙂

还有一个select。 您也可以使用任何其他队列,而不是使用新的SynchronousQueue,但是您必须确保其大小为1,以便强制executorservice创build新线程。

看起来好像任何答案实际上回答这个问题 – 事实上,我不能看到这样做 – 即使你从PooledExecutorService子类化,因为许多方法/属性是私人的,例如使addIfUnderMaximumPoolSize被保护,你可以请执行以下操作:

 class MyThreadPoolService extends ThreadPoolService { public void execute(Runnable run) { if (poolSize() == 0) { if (addIfUnderMaximumPoolSize(run) != null) return; } super.execute(run); } } 

我得到的最接近的是 – 但即使这不是一个很好的解决scheme

 new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) { public void execute(Runnable command) { if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) { super.setCorePoolSize(super.getCorePoolSize() + 1); } super.execute(command); } protected void afterExecute(Runnable r, Throwable t) { // nothing in the queue if (getQueue().isEmpty() && getPoolSize() > min) { setCorePoolSize(getCorePoolSize() - 1); } }; }; 

ps没有testing过以上

这是你想要的(至less我猜是这样)。 为了解释一下Jonathan Feinberg的回答

Executors.newFixedThreadPool(int n)

创build一个线程池,重用使用共享的无界队列的固定数量的线程。 在任何时候,至多nThreads线程将被主动处理任务。 如果在所有线程处于活动状态时提交其他任务,则会在队列中等待,直到线程可用。 如果任何线程在closures之前的执行期间由于失败而终止,那么如果需要执行后续任务,则将取代它。 池中的线程将一直存在,直到明确closures。

这是另一个解决scheme。 我认为这个解决scheme的行为就像你想要的那样(虽然对这个解决scheme并不感到骄傲):

 final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { public boolean offer(Runnable o) { if (size() > 1) return false; return super.offer(o); }; public boolean add(Runnable o) { if (super.offer(o)) return true; else throw new IllegalStateException("Queue full"); } }; RejectedExecutionHandler handler = new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { queue.add(r); } }; dbThreadExecutor = new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler); 
  1. 您可以按照@sjlee的build议使用ThreadPoolExecutor

    您可以dynamic控制池的大小。 看看这个问题的更多细节:

    dynamic线程池

    要么

  2. 你可以使用java 8引入的newWorkStealingPool API。

     public static ExecutorService newWorkStealingPool() 

    使用所有可用的处理器创build工作线程池作为其目标并行级别。

默认情况下,并行级别设置为服务器中CPU内核的数量。 如果您有4个核心CPU服务器,则线程池大小将为4.此API返回ForkJoinPooltypes的ExecutorService并允许通过从ForkJoinPool中的繁忙线程中窃取任务来窃取空闲线程。