等到Future <T>中的任何一个完成

我有很less的asynchronous任务正在运行,我需要等到至less有一个任务完成(将来可能需要等待N个任务完成)。 目前他们被提呈为未来,所以我需要类似的东西

/** * Blocks current thread until one of specified futures is done and returns it. */ public static <T> Future<T> waitForAny(Collection<Future<T>> futures) throws AllFuturesFailedException 

有没有这样的事情? 或者任何类似的,对于未来不是必需的。 目前我通过收集期货循环,检查是否完成,然后睡一会儿再检查。 这看起来不是最好的解决scheme,因为如果我长时间睡眠,那么不必要的延迟就会增加,如果我睡了很短的时间就会影响性能。

我可以尝试使用

 new CountDownLatch(1) 

并在任务完成时减less倒计时

 countdown.await() 

,但是我发现只有我控制Future的创造才有可能。 这是可能的,但是需要系统重新devise,因为目前创build任务的逻辑(发送Callable到ExecutorService)与决定等待哪个Future是分开的。 我也可以覆盖

 <T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable) 

并创buildRunnableFuture的自定义实现,具有附加侦听器以在任务完成时得到通知的能力,然后将这个侦听器附加到需要的任务并使用CountDownLatch,但这意味着我必须重写newTaskFor对于我使用的每个ExecutorService – 并且可能会有实现不扩展AbstractExecutorService。 我也可以尝试包装给定ExecutorService出于同样的目的,但是我必须装饰所有产生期货的方法。

所有这些解决scheme可能工作,但似乎很不自然。 它看起来像我缺less一些简单的东西,比如

 WaitHandle.WaitAny(WaitHandle[] waitHandles) 

在C#中。 有这样的问题有什么众所周知的解决scheme吗?

更新:

本来我根本没有进入未来的创造,所以没有优雅的解决scheme。 重新devise系统后,我有权访问未来创build,并能够添加countDownLatch.countdown()执行过程,然后我可以countDownLatch.await(),一切工作正常。 感谢其他答案,我不知道ExecutorCompletionService,它确实可以在类似的任务有帮助,但在这种特殊情况下,它不能使用,因为一些期货创build没有任何执行者 – 实际任务通过networking发送到另一台服务器,远程完成并收到完成通知。

据我所知,Java没有与WaitHandle.WaitAny方法类似的结构。

在我看来,这可以通过一个“WaitableFuture”装饰器来实现:

 public WaitableFuture<T> extends Future<T> { private CountDownLatch countDownLatch; WaitableFuture(CountDownLatch countDownLatch) { super(); this.countDownLatch = countDownLatch; } void doTask() { super.doTask(); this.countDownLatch.countDown(); } } 

虽然这只能在执行代码之前插入,否则执行代码将不会有新的doTask()方法。 但是如果在执行之前无法以某种方式获得对Future对象的控制权,我真的看不到没有轮询的方法。

或者如果将来总是在自己的线程中运行,并且可以通过某种方式获得该线程。 然后你可以产生一个新的线程来join其他线程,然后在联接返回后处理等待机制。这将是非常难看的,并且会引起很多开销。 如果某些Future对象没有完成,则根据死线程,可能会有很多被阻塞的线程。 如果你不小心,这可能会泄漏内存和系统资源。

 /** * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join(). */ public static joinAny(Collection<Thread> threads, int numberToWaitFor) { CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor); foreach(Thread thread in threads) { (new Thread(new JoinThreadHelper(thread, countDownLatch))).start(); } countDownLatch.await(); } class JoinThreadHelper implements Runnable { Thread thread; CountDownLatch countDownLatch; JoinThreadHelper(Thread thread, CountDownLatch countDownLatch) { this.thread = thread; this.countDownLatch = countDownLatch; } void run() { this.thread.join(); this.countDownLatch.countDown(); } } 

简单的,检查出ExecutorCompletionService 。

ExecutorService.invokeAny

为什么不只是创build一个结果队列,并在队列中等待? 或者更简单地说,使用一个CompletionService,因为它是一个ExecutorService +结果队列。

wait()和notifyAll()实际上很简单。

首先,定义一个locking对象。 (你可以使用任何类,但我喜欢明确):

 package com.javadude.sample; public class Lock {} 

接下来,定义你的工作线程。 当他完成处理时,他必须通知该锁对象。 请注意,通知必须位于locking对象的同步块上。

 package com.javadude.sample; public class Worker extends Thread { private Lock lock_; private long timeToSleep_; private String name_; public Worker(Lock lock, String name, long timeToSleep) { lock_ = lock; timeToSleep_ = timeToSleep; name_ = name; } @Override public void run() { // do real work -- using a sleep here to simulate work try { sleep(timeToSleep_); } catch (InterruptedException e) { interrupt(); } System.out.println(name_ + " is done... notifying"); // notify whoever is waiting, in this case, the client synchronized (lock_) { lock_.notify(); } } } 

最后,你可以写你的客户端:

 package com.javadude.sample; public class Client { public static void main(String[] args) { Lock lock = new Lock(); Worker worker1 = new Worker(lock, "worker1", 15000); Worker worker2 = new Worker(lock, "worker2", 10000); Worker worker3 = new Worker(lock, "worker3", 5000); Worker worker4 = new Worker(lock, "worker4", 20000); boolean started = false; int numNotifies = 0; while (true) { synchronized (lock) { try { if (!started) { // need to do the start here so we grab the lock, just // in case one of the threads is fast -- if we had done the // starts outside the synchronized block, a fast thread could // get to its notification *before* the client is waiting for it worker1.start(); worker2.start(); worker3.start(); worker4.start(); started = true; } lock.wait(); } catch (InterruptedException e) { break; } numNotifies++; if (numNotifies == 4) { break; } System.out.println("Notified!"); } } System.out.println("Everyone has notified me... I'm done"); } } 

既然你不关心哪一个完成,为什么不只是一个WaitHandle的所有线程和等待呢? 无论谁先完成可以设置句柄。

看到这个选项:

 public class WaitForAnyRedux { private static final int POOL_SIZE = 10; public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException { List<Callable<T>> callables = new ArrayList<Callable<T>>(); for (final T t : collection) { Callable<T> callable = Executors.callable(new Thread() { @Override public void run() { synchronized (t) { try { t.wait(); } catch (InterruptedException e) { } } } }, t); callables.add(callable); } BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE); ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue); return executorService.invokeAny(callables); } static public void main(String[] args) throws InterruptedException, ExecutionException { final List<Integer> integers = new ArrayList<Integer>(); for (int i = 0; i < POOL_SIZE; i++) { integers.add(i); } (new Thread() { public void run() { Integer notified = null; try { notified = waitForAny(integers); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("notified=" + notified); } }).start(); synchronized (integers) { integers.wait(3000); } Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE)); System.out.println("Waking up " + randomInt); synchronized (randomInt) { randomInt.notify(); } } }