将asynchronous计算包装成同步(阻塞)计算

类似的问题:

  • 包装一个asynchronousJavaScript函数使其同步的模式
  • 在C#中同步包装asynchronous方法

我有一个方法,我想暴露给图书馆客户端(尤其是脚本客户端)的对象如下所示:

interface MyNiceInterface { public Baz doSomethingAndBlock(Foo fooArg, Bar barArg); public Future<Baz> doSomething(Foo fooArg, Bar barArg); // doSomethingAndBlock is the straightforward way; // doSomething has more control but deals with // a Future and that might be too much hassle for // scripting clients } 

但是我可用的原始“东西”是一组事件驱动的类:

 interface BazComputationSink { public void onBazResult(Baz result); } class ImplementingThing { public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink); } 

其中ImplementingThing接受input,执行一些神秘的东西,比如将任务排队在任务队列上,然后当结果发生时, sink.onBazResult()被调用到一个线程上,该线程可能与ImplementingThing.doSomethingAsync()被称为。

有没有一种方法可以使用事件驱动函数以及并发基元来实现MyNiceInterface,以便脚本客户端可以愉快地等待一个阻塞线程?

编辑:我可以使用FutureTask吗?

使用你自己的未来实现:

 public class BazComputationFuture implements Future<Baz>, BazComputationSink { private volatile Baz result = null; private volatile boolean cancelled = false; private final CountDownLatch countDownLatch; public BazComputationFuture() { countDownLatch = new CountDownLatch(1); } @Override public boolean cancel(final boolean mayInterruptIfRunning) { if (isDone()) { return false; } else { countDownLatch.countDown(); cancelled = true; return !isDone(); } } @Override public Baz get() throws InterruptedException, ExecutionException { countDownLatch.await(); return result; } @Override public Baz get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { countDownLatch.await(timeout, unit); return result; } @Override public boolean isCancelled() { return cancelled; } @Override public boolean isDone() { return countDownLatch.getCount() == 0; } public void onBazResult(final Baz result) { this.result = result; countDownLatch.countDown(); } } public Future<Baz> doSomething(Foo fooArg, Bar barArg) { BazComputationFuture future = new BazComputationFuture(); doSomethingAsync(fooArg, barArg, future); return future; } public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { return doSomething(fooArg, barArg).get(); } 

该解决scheme在内部创build一个CountDownLatch,一旦收到callback就清除。 如果用户调用get,则CountDownLatch用于阻止调用线程,直到计算完成并调用onBazResultcallback函数。 CountDownLatch将确保如果在调用get()之前发生callback,则get()方法将立即返回结果。

那么,有一个简单的解决scheme,像这样做:

 public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { final AtomicReference<Baz> notifier = new AtomicReference(); doSomethingAsync(fooArg, barArg, new BazComputationSink() { public void onBazResult(Baz result) { synchronized (notifier) { notifier.set(result); notifier.notify(); } } }); synchronized (notifier) { while (notifier.get() == null) notifier.wait(); } return notifier.get(); } 

当然,这个假设你的Baz结果永远不会为空。

谷歌番石榴库有一个易于使用SettableFuture,使这个问题很简单(约10行代码)。

 public class ImplementingThing { public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { try { return doSomething(fooArg, barArg).get(); } catch (Exception e) { throw new RuntimeException("Oh dear"); } }; public Future<Baz> doSomething(Foo fooArg, Bar barArg) { final SettableFuture<Baz> future = new SettableFuture<Baz>(); doSomethingAsync(fooArg, barArg, new BazComputationSink() { @Override public void onBazResult(Baz result) { future.set(result); } }); return future; }; // Everything below here is just mock stuff to make the example work, // so you can copy it into your IDE and see it run. public static class Baz {} public static class Foo {} public static class Bar {} public static interface BazComputationSink { public void onBazResult(Baz result); } public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) { new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } Baz baz = new Baz(); sink.onBazResult(baz); } }).start(); }; public static void main(String[] args) { System.err.println("Starting Main"); System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null)); System.err.println("Ending Main"); } 

一个非常简单的例子,只是为了了解CountDownLatch,没有任何额外的代码。

java.util.concurrent.CountDownLatch是一个并发结构,允许一个或多个线程等待一组给定的操作完成。

CountDownLatch用给定的计数初始化。 这个计数通过调用countDown()方法递减。 等待这个计数达到零的线程可以调用其中一个await()方法。 调用await()阻塞线程,直到计数达到零。

下面是一个简单的例子。 在Decrementer在CountDownLatch上调用countDown() 3次之后,等待的Waiter从await()调用中释放。

你也可以提一些TimeOut等待。

 CountDownLatch latch = new CountDownLatch(3); Waiter waiter = new Waiter(latch); Decrementer decrementer = new Decrementer(latch); new Thread(waiter) .start(); new Thread(decrementer).start(); Thread.sleep(4000); public class Waiter implements Runnable{ CountDownLatch latch = null; public Waiter(CountDownLatch latch) { this.latch = latch; } public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Waiter Released"); } } 

// ————–

 public class Decrementer implements Runnable { CountDownLatch latch = null; public Decrementer(CountDownLatch latch) { this.latch = latch; } public void run() { try { Thread.sleep(1000); this.latch.countDown(); Thread.sleep(1000); this.latch.countDown(); Thread.sleep(1000); this.latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } 

参考

如果您不想使用CountDownLatch或者您的需求与Facebook相同,但function不同。 意思是如果一个方法被调用,则不要调用其他方法。

在这种情况下,你可以声明一个

 private volatile Boolean isInprocessOfLikeOrUnLike = false; 

然后你可以检查你的方法调用的开始,如果它是false那么调用方法,否则返回..取决于你的实现。

RxJava 2.x很简单:

 try { Baz baz = Single.create((SingleEmitter<Baz> emitter) -> doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) .toFuture().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } 

或者没有Lambda符号:

 Baz baz = Single.create(new SingleOnSubscribe<Baz>() { @Override public void subscribe(SingleEmitter<Baz> emitter) { doSomethingAsync(fooArg, barArg, new BazComputationSink() { @Override public void onBazResult(Baz result) { emitter.onSuccess(result); } }); } }).toFuture().get(); 

这是一个基于Paul Wagland的答案的更通用的解决scheme:

 public abstract class AsyncRunnable<T> { protected abstract void run(AtomicReference<T> notifier); protected final void finish(AtomicReference<T> notifier, T result) { synchronized (notifier) { notifier.set(result); notifier.notify(); } } public static <T> T wait(AsyncRunnable<T> runnable) { final AtomicReference<T> notifier = new AtomicReference<>(); // run the asynchronous code runnable.run(notifier); // wait for the asynchronous code to finish synchronized (notifier) { while (notifier.get() == null) { try { notifier.wait(); } catch (InterruptedException ignore) {} } } // return the result of the asynchronous code return notifier.get(); } } 

下面是一个如何使用它的例子::

  String result = AsyncRunnable.wait(new AsyncRunnable<String>() { @Override public void run(final AtomicReference<String> notifier) { // here goes your async code, eg: new Thread(new Runnable() { @Override public void run() { finish(notifier, "This was a asynchronous call!"); } }).start(); } }); 

代码的更详细的版本可以在这里find: http : //pastebin.com/hKHJUBqE

编辑:与问题有关的例子是:

 public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) { return AsyncRunnable.wait(new AsyncRunnable<Baz>() { @Override protected void run(final AtomicReference<Baz> notifier) { doSomethingAsync(fooArg, barArg, new BazComputationSink() { public void onBazResult(Baz result) { synchronized (notifier) { notifier.set(result); notifier.notify(); } } }); } }); }