在超时后中断任务的ExecutorService

我正在寻找一个可以提供超时的ExecutorService实现。 如果提交给ExecutorService的任务的运行时间超过了超时时间,则会中断该任务。 实现这样一个野兽并不是一件困难的事情,但是我想知道是否有人知道现有的实现。

以下是我在下面的一些讨论中提出的。 任何意见?

import java.util.List; import java.util.concurrent.*; public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor { private final long timeout; private final TimeUnit timeoutUnit; private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>(); public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } @Override public void shutdown() { timeoutExecutor.shutdown(); super.shutdown(); } @Override public List<Runnable> shutdownNow() { timeoutExecutor.shutdownNow(); return super.shutdownNow(); } @Override protected void beforeExecute(Thread t, Runnable r) { if(timeout > 0) { final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit); runningTasks.put(r, scheduled); } } @Override protected void afterExecute(Runnable r, Throwable t) { ScheduledFuture timeoutTask = runningTasks.remove(r); if(timeoutTask != null) { timeoutTask.cancel(false); } } class TimeoutTask implements Runnable { private final Thread thread; public TimeoutTask(Thread thread) { this.thread = thread; } @Override public void run() { thread.interrupt(); } } } 

你可以使用ScheduledExecutorService来做到这一点。 首先,您只需提交一次立即开始并保留创build的未来。 之后,您可以提交一段新的任务,在一段时间后取消保留的未来。

  ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); final Future handler = executor.submit(new Callable(){ ... }); executor.schedule(new Runnable(){ public void run(){ handler.cancel(); } }, 10000, TimeUnit.MILLISECONDS); 

这将执行你的处理程序(主要function被中断)10秒,然后将取消(即中断)该特定的任务。

不幸的是,解决scheme是有缺陷的。 ScheduledThreadPoolExecutor存在一种错误,在这个问题中也有报道:取消提交的任务不会完全释放与任务相关的内存资源; 仅当任务到期时才会释放资源。

如果因此创build了一个TimeoutThreadPoolExecutor (相当长的到期时间(典型用法)),并且足够快地提交任务,那么即使这些任务实际上已经成功完成,您最终也将填充内存。

你可以看到以下(非常粗糙的)testing程序的问题:

 public static void main(String[] args) throws InterruptedException { ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES); //ExecutorService service = Executors.newFixedThreadPool(1); try { final AtomicInteger counter = new AtomicInteger(); for (long i = 0; i < 10000000; i++) { service.submit(new Runnable() { @Override public void run() { counter.incrementAndGet(); } }); if (i % 10000 == 0) { System.out.println(i + "/" + counter.get()); while (i > counter.get()) { Thread.sleep(10); } } } } finally { service.shutdown(); } } 

该程序耗尽可用内存,尽pipe它等待产生的Runnable s完成。

我虽然这一段时间,但不幸的是,我不能拿出一个很好的解决scheme。

编辑:我发现这个问题被报告为JDK错误6602600 ,并且似乎已经被最近修复。

将任务包装在FutureTask中,您可以为FutureTask指定超时。 看看我对这个问题的回答中的例子,

java本地进程超时

如何使用如http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html中所述的;ExecutorService.shutDownNow()方法? 这似乎是最简单的解决scheme。

看来问题不在于JDK错误6602600(2010-05-22解决),而是错误地调用了睡眠(10)。 另外需要注意的是,主线程必须直接给其他线程通过在外层的每个分支中调用SLEEP(0)来实现这个任务。 我认为,使用Thread.yield()而不是Thread.sleep(0)更好,

以前的问题代码的结果纠正部分是这样的:

 ....................... ........................ Thread.yield(); if (i % 1000== 0) { System.out.println(i + "/" + counter.get()+ "/"+service.toString()); } // // while (i > counter.get()) { // Thread.sleep(10); // } 

它可以正确工作,外部计数器的数量可达150万个testing圆圈。

经过大量时间的考察,
最后,我使用ExecutorService invokeAll方法来解决这个问题。
这将严格中断任务,而任务运行。
这里是例子

 ExecutorService executorService = Executors.newCachedThreadPool(); try { List<Callable<Object>> callables = new ArrayList<>(); // Add your long time task (callable) callables.add(new VaryLongTimeTask()); // Assign tasks for specific execution timeout (eg 2 sec) List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS); for (Future<Object> future : futures) { // Getting result } } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown(); 

亲是你也可以在同一个ExecutorService提交ListenableFuture
只需稍微更改第一行代码。

 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); 

ListeningExecutorService是google guava项目( com.google.guava )上ExecutorService的Listeningfunction

那么这个另外的想法呢?

  • 两个有两个执行者:
    • 一个用于:
      • 提交任务,而不关心任务的超时
      • 增加未来的结果以及结束时间到内部结构
    • 一个用于执行内部作业,如果某些任务超时并且必须取消,则检查内部结构。

小样本在这里:

 public class AlternativeExecutorService { private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue = new CopyOnWriteArrayList(); private final ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job private final ListeningExecutorService threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for private ScheduledFuture scheduledFuture; private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L; public AlternativeExecutorService() { scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS); } public void pushTask(OwnTask task) { ListenableFuture<Void> future = threadExecutor.submit(task); // -> create your Callable futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end } public void shutdownInternalScheduledExecutor() { scheduledFuture.cancel(true); scheduledExecutor.shutdownNow(); } long getCurrentMillisecondsTime() { return Calendar.getInstance().get(Calendar.MILLISECOND); } class ListenableFutureTask { private final ListenableFuture<Void> future; private final OwnTask task; private final long milliSecEndTime; private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime) { this.future = future; this.task = task; this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS); } ListenableFuture<Void> getFuture() { return future; } OwnTask getTask() { return task; } long getMilliSecEndTime() { return milliSecEndTime; } } class TimeoutManagerJob implements Runnable { CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList() { return futureQueue; } @Override public void run() { long currentMileSecValue = getCurrentMillisecondsTime(); for (ListenableFutureTask futureTask : futureQueue) { consumeFuture(futureTask, currentMileSecValue); } } private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue) { ListenableFuture<Void> future = futureTask.getFuture(); boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue; if (isTimeout) { if (!future.isDone()) { future.cancel(true); } futureQueue.remove(futureTask); } } } class OwnTask implements Callable<Void> { private long timeoutDuration; private TimeUnit timeUnit; OwnTask(long timeoutDuration, TimeUnit timeUnit) { this.timeoutDuration = timeoutDuration; this.timeUnit = timeUnit; } @Override public Void call() throws Exception { // do logic return null; } public long getTimeoutDuration() { return timeoutDuration; } public TimeUnit getTimeUnit() { return timeUnit; } } } 

使用John W答案我创build了一个实现,当任务开始执行时正确地开始超时。 我甚至写了一个unit testing:)

但是,它不适合我的需要,因为当Future.cancel()时(即调用Thread.interrupted()时,某些IO操作不会中断)。

无论如何,如果有人有兴趣,我创build了一个要点: https : //gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf