Java执行者:如何在不阻塞的情况下通知任务完成?

假设我有一个队列中充满了需要提交给执行者服务的任务。 我希望他们一次处理一个。 我能想到的最简单的方法是:

  1. 从队列中取一个任务
  2. 提交给执行者
  3. 在返回的Future上调用.get,并阻塞,直到结果可用
  4. 从队列中取另一个任务…

但是,我试图避免完全阻止。 如果我有一万个这样的队列,需要一次处理一个任务,那么我将耗尽堆栈空间,因为他们中的大多数将持续被阻塞的线程。

我想提交一个任务并提供一个任务完成时调用的callback。 我将使用该callback通知作为发送下一个任务的标志。 (functionaljava和jetlang显然使用这样的非阻塞algorithm,但我不明白他们的代码)

我怎样才能使用JDK的java.util.concurrent来完成,而不是写自己的执行器服务?

(给我这些任务的队列本身可能会阻塞,但是这是后面要解决的问题)

定义一个callback接口来接收你想在完成通知中传递的参数。 然后在任务结束时调用它。

你甚至可以为Runnable任务编写一个通用包装,并将这些包提交给ExecutorService 。 或者,请参阅下面的Java 8中内置的机制。

 class CallbackTask implements Runnable { private final Runnable task; private final Callback callback; CallbackTask(Runnable task, Callback callback) { this.task = task; this.callback = callback; } public void run() { task.run(); callback.complete(); } } 

通过CompletableFuture ,Java 8包含了一个更加复杂的方法来组合可以asynchronous和有条件地完成进程的pipe道。 这是一个人为的但完整的通知的例子。

 import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; public class GetTaskNotificationWithoutBlocking { public static void main(String... argv) throws Exception { ExampleService svc = new ExampleService(); GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking(); CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work); f.thenAccept(listener::notify); System.out.println("Exiting main()"); } void notify(String msg) { System.out.println("Received message: " + msg); } } class ExampleService { String work() { sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */ char[] str = new char[5]; ThreadLocalRandom current = ThreadLocalRandom.current(); for (int idx = 0; idx < str.length; ++idx) str[idx] = (char) ('A' + current.nextInt(26)); String msg = new String(str); System.out.println("Generated message: " + msg); return msg; } public static void sleep(long average, TimeUnit unit) { String name = Thread.currentThread().getName(); long timeout = Math.min(exponential(average), Math.multiplyExact(10, average)); System.out.printf("%s sleeping %d %s...%n", name, timeout, unit); try { unit.sleep(timeout); System.out.println(name + " awoke."); } catch (InterruptedException abort) { Thread.currentThread().interrupt(); System.out.println(name + " interrupted."); } } public static long exponential(long avg) { return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble())); } } 

使用番石榴的可听的未来的API,并添加一个callback。 参看 来自网站:

 ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() { public Explosion call() { return pushBigRedButton(); } }); Futures.addCallback(explosion, new FutureCallback<Explosion>() { // we want this handler to run immediately after we push the big red button! public void onSuccess(Explosion explosion) { walkAwayFrom(explosion); } public void onFailure(Throwable thrown) { battleArchNemesis(); // escaped the explosion! } }); 

在Java 8中,您可以使用CompletableFuture 。 这里有一个例子,我在我的代码中使用它从我的用户服务中获取用户,将它们映射到我的视图对象,然后更新我的视图或显示错误对话框(这是一个GUI应用程序):

  CompletableFuture.supplyAsync( userService::listUsers ).thenApply( this::mapUsersToUserViews ).thenAccept( this::updateView ).exceptionally( throwable -> { showErrorDialogFor(throwable); return null; } ); 

它asynchronous执行。 我使用两个私有方法: mapUsersToUserViewsupdateView

您可以扩展FutureTask类,并覆盖done()方法,然后将FutureTask对象添加到ExecutorService ,以便在FutureTask立即完成时调用done()方法。

ThreadPoolExecutor也有beforeExecuteafterExecute钩子方法,你可以重载和利用。 这里是ThreadPoolExecutor的Javadocs的描述。

钩子方法

这个类提供了在执行每个任务之前和之后被调用的受保护的beforeExecute(java.lang.Thread, java.lang.Runnable)afterExecute(java.lang.Runnable, java.lang.Throwable)方法。 这些可以用来操纵执行环境; 例如,重新初始化ThreadLocals ,收集统计信息或添加日志条目。 此外,可以重写方法terminated()来执行Executor完全终止后需要执行的任何特殊处理。 如果挂钩或callback方法抛出exception,内部工作者线程可能会失败并突然终止。

使用CountDownLatch

它来自java.util.concurrent ,它正是在继续之前等待几个线程完成执行的方式。

为了达到您所要的callback效果,这确实需要额外的额外工作。 也就是说,在一个使用CountDownLatch的单独线程中自己处理,并等待它,然后继续通知您需要通知的任何内容。 没有原生的callback支持,或类似的效果。


编辑:现在我进一步了解你的问题,我认为你是太过分了,不必要的。 如果你拿一个普通的SingleThreadExecutor ,给它所有的任务,并且它会原生地进行排队。

如果你想确保没有任何任务将同时运行,那么使用SingleThreadedExecutor 。 任务将按照提交的顺序进行处理。 你甚至不需要执行这些任务,只需将它们提交给exec即可。

只要添加到马特的答案,这是帮助,这是一个更充实的例子来显示使用callback。

 private static Primes primes = new Primes(); public static void main(String[] args) throws InterruptedException { getPrimeAsync((p) -> System.out.println("onPrimeListener; p=" + p)); System.out.println("Adios mi amigito"); } public interface OnPrimeListener { void onPrime(int prime); } public static void getPrimeAsync(OnPrimeListener listener) { CompletableFuture.supplyAsync(primes::getNextPrime) .thenApply((prime) -> { System.out.println("getPrimeAsync(); prime=" + prime); if (listener != null) { listener.onPrime(prime); } return prime; }); } 

输出是:

  getPrimeAsync(); prime=241 onPrimeListener; p=241 Adios mi amigito 

你可以使用一个Callable的实现

 public class MyAsyncCallable<V> implements Callable<V> { CallbackInterface ci; public MyAsyncCallable(CallbackInterface ci) { this.ci = ci; } public V call() throws Exception { System.out.println("Call of MyCallable invoked"); System.out.println("Result = " + this.ci.doSomething(10, 20)); return (V) "Good job"; } } 

CallbackInterface是非常基本的东西

 public interface CallbackInterface { public int doSomething(int a, int b); } 

现在主class级将会是这样的

 ExecutorService ex = Executors.newFixedThreadPool(2); MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b); ex.submit(mac); 

这是Pache使用Guava的ListenableFuture解答的ListenableFuture

尤其是, Futures.transform()返回ListenableFuture所以可以用来链接asynchronous调用。 Futures.addCallback()返回void ,所以不能用于链接,但对于处理asynchronous完成的成功/失败很有用。

 // ListenableFuture1: Open Database ListenableFuture<Database> database = service.submit(new Callable<Database>() { public Database call() { // Let's assume this call is async, ie returns a ListenableFuture<Database> return openDatabase(); } }); // ListenableFuture2: Query Database for Cursor rows cursor = Futures.transform(database, new AsyncFunction<Database, Cursor>() { @Override public ListenableFuture<Cursor> apply(Database database) { // Let's assume this call is async, ie returns a ListenableFuture<Cursor> return database.query(table, columns, selection, args); } }); // ListenableFuture3: Convert Cursor rows to List<FooObject> fooList = Futures.transform(cursor, new Function<Cursor, List<FooObject>>() { @Override public List<FooObject> apply(Cursor cursor) { // Let's assume this call is synchronous, ie directly returns List<FooObject> return cursorToFooList(cursor); } }); // Final Callback: Handle the success/errors when final future completes Futures.addCallback(fooList, new FutureCallback<List<FooObject>>() { public void onSuccess(List<FooObject> fooObjects) { doSomethingWith(fooObjects); } public void onFailure(Throwable thrown) { log.error(thrown); } }); 

注意:除了链接asynchronous任务之外, Futures.transform()还允许您在单独的执行程序上安排每个任务(本例中未显示)。

简单的代码来实现使用ExecutorService Callback机制

 import java.util.concurrent.*; import java.util.*; public class CallBackDemo{ public CallBackDemo(){ System.out.println("creating service"); ExecutorService service = Executors.newFixedThreadPool(5); try{ for ( int i=0; i<5; i++){ Callback callback = new Callback(i+1); MyCallable myCallable = new MyCallable((long)i+1,callback); Future<Long> future = service.submit(myCallable); //System.out.println("future status:"+future.get()+":"+future.isDone()); } }catch(Exception err){ err.printStackTrace(); } service.shutdown(); } public static void main(String args[]){ CallBackDemo demo = new CallBackDemo(); } } class MyCallable implements Callable<Long>{ Long id = 0L; Callback callback; public MyCallable(Long val,Callback obj){ this.id = val; this.callback = obj; } public Long call(){ //Add your business logic System.out.println("Callable:"+id+":"+Thread.currentThread().getName()); callback.callbackMethod(); return id; } } class Callback { private int i; public Callback(int i){ this.i = i; } public void callbackMethod(){ System.out.println("Call back:"+i); // Add your business logic } } 

输出:

 creating service Callable:1:pool-1-thread-1 Call back:1 Callable:3:pool-1-thread-3 Callable:2:pool-1-thread-2 Call back:2 Callable:5:pool-1-thread-5 Call back:5 Call back:3 Callable:4:pool-1-thread-4 Call back:4 

重要提示:

  1. 如果newFixedThreadPool(5) FIFO顺序依次执行处理任务,请将newFixedThreadPool(5)replace为newFixedThreadPool(1)
  2. 如果在分析前一个任务callback的结果之后,想要处理下一个任务,只需在下面的行注释

     //System.out.println("future status:"+future.get()+":"+future.isDone()); 
  3. 你可以用一个replacenewFixedThreadPool()

     Executors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor 

    取决于你的用例。

  4. 如果你想asynchronous处理callback方法

    一个。 将共享的ExecutorService or ThreadPoolExecutor传递给Callable任务

    湾 将您的Callable方法转换为Callable/Runnable任务

    C。 将callback任务推送到ExecutorService or ThreadPoolExecutor

Interesting Posts