处理来自Java ExecutorService任务的exception

我正在尝试使用Java的ThreadPoolExecutor类来运行具有固定数量的线程的大量重量级任务。 每个任务都有很多地方可能由于例外而失败。

我已经子类ThreadPoolExecutor ,我已经重写了afterExecute方法应该提供任何运行任务时遇到的未捕获的exception。 但是,我似乎无法使其工作。

例如:

 public class ThreadPoolErrors extends ThreadPoolExecutor { public ThreadPoolErrors() { super( 1, // core threads 1, // max threads 1, // timeout TimeUnit.MINUTES, // timeout units new LinkedBlockingQueue<Runnable>() // work queue ); } protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if(t != null) { System.out.println("Got an error: " + t); } else { System.out.println("Everything's fine--situation normal!"); } } public static void main( String [] args) { ThreadPoolErrors threadPool = new ThreadPoolErrors(); threadPool.submit( new Runnable() { public void run() { throw new RuntimeException("Ouch! Got an error."); } } ); threadPool.shutdown(); } } 

这个程序的输出是“一切正常 – 情况正常!” 即使提交给线程池的唯一Runnable引发exception。 任何线索到这里发生了什么?

谢谢!

从文档 :

注意:当任务(如FutureTask)中明确地或通过诸如submit之类的方法包含任务时,这些任务对象捕获和维护计算exception,所以它们不会导致突然终止,并且内部exception不会传递给此方法。

当你提交一个Runnable时,它会被包装在一个Future中。

你的afterExecute应该是这样的:

  protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Future<?> future = (Future<?>) r; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) { System.out.println(t); } } 

警告 :应该指出的是,这个解决scheme将阻止调用线程。


如果你想处理任务抛出的exception,那么通常使用Callable而不是Runnable

Callable.call()被允许抛出检查的exception,并将它们传播callback用线程:

 Callable task = ... Future future = executor.submit(task); try { future.get(); } catch (ExecutionException ex) { ex.getCause().printStackTrace(); } 

如果Callable.call()抛出一个exception,它将被包装在ExecutionException ,并被Future.get()抛出。

这很可能比ThreadPoolExecutor子类更可取。 如果exception是可恢复的,那么它也给了你重新提交任务的机会。

这种行为的解释是正确的在afterExecute javadoc :

注意:当任务(如FutureTask)中明确地或通过诸如submit之类的方法包含任务时,这些任务对象捕获和维护计算exception,所以它们不会导致突然终止,并且内部exception不会传递给此方法。

我使用VerboseRunnable -log中的 VerboseRunnable类,它吞下所有exception并logging它们。 非常方便,例如:

 import com.jcabi.log.VerboseRunnable; scheduler.scheduleWithFixedDelay( new VerboseRunnable( Runnable() { public void run() { // the code, which may throw } }, true // it means that all exceptions will be swallowed and logged ), 1, 1, TimeUnit.MILLISECONDS ); 

我通过将提供的runnable提交给执行程序来解决这个问题。

 CompletableFuture.runAsync( () -> { try { runnable.run(); } catch (Throwable e) { Log.info(Concurrency.class, "runAsync", e); } }, executorService ); 

另一个解决scheme是使用ManagedTaskManagedTaskListener

您需要一个CallableRunnable来实现接口ManagedTask

方法getManagedTaskListener返回你想要的实例。

 public ManagedTaskListener getManagedTaskListener() { 

而你在ManagedTaskListener中实现了taskDone方法:

 @Override public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) { if (exception != null) { LOGGER.log(Level.SEVERE, exception.getMessage()); } } 

有关托pipe任务生命周期和侦听器的更多详细信息。

如果您的ExecutorService来自外部源(例如,无法对ThreadPoolExecutor进行子类化并覆盖afterExecute() ),则可以使用dynamic代理来实现所需的行为:

 public static ExecutorService errorAware(final ExecutorService executor) { return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {ExecutorService.class}, (proxy, method, args) -> { if (method.getName().equals("submit")) { final Object arg0 = args[0]; if (arg0 instanceof Runnable) { args[0] = new Runnable() { @Override public void run() { final Runnable task = (Runnable) arg0; try { task.run(); if (task instanceof Future<?>) { final Future<?> future = (Future<?>) task; if (future.isDone()) { try { future.get(); } catch (final CancellationException ce) { // Your error-handling code here ce.printStackTrace(); } catch (final ExecutionException ee) { // Your error-handling code here ee.getCause().printStackTrace(); } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); } } } } catch (final RuntimeException re) { // Your error-handling code here re.printStackTrace(); throw re; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } else if (arg0 instanceof Callable<?>) { args[0] = new Callable<Object>() { @Override public Object call() throws Exception { final Callable<?> task = (Callable<?>) arg0; try { return task.call(); } catch (final Exception e) { // Your error-handling code here e.printStackTrace(); throw e; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } } return method.invoke(executor, args); }); } 

这是因为AbstractExecutorService :: submit正在将你的runnable封装到RunnableFuture (除了FutureTask )像下面

 AbstractExecutorService.java public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE//////// execute(ftask); return ftask; } 

然后execute将它传递给WorkerWorker.run()将调用下面。

 ThreadPoolExecutor.java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); /////////HERE//////// } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } 

最后task.run(); 在上面的代码调用将调用FutureTask.run() 。 这里是exception处理程序代码,因为你没有得到预期的exception。

 class FutureTask<V> implements RunnableFuture<V> public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { /////////HERE//////// result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 

如果要监视任务的执行,则可以旋转1或2个线程(可能更多取决于负载),并使用它们从ExecutionCompletionService包装器中执行任务。

这工作

  • 它是从SingleThreadExecutor派生的,但你可以很容易地适应它
  • Java 8 lamdas代码,但容易修复

它会创build一个单线程执行器,可以得到很多任务; 并等待当前的一个结束执行从下一个开始

在uncaugth错误或exception的情况下, uncaughtExceptionHandler将捕获它

公共最终课SingleThreadExecutorWithExceptions {

    公共静态ExecutorService newSingleThreadExecutorWithExceptions(最终Thread.UncaughtExceptionHandler uncaughtExceptionHandler){

         ThreadFactory factory =(Runnable runnable) - > {
             final Thread newThread = new Thread(runnable,“SingleThreadExecutorWithExceptions”);
             newThread.setUncaughtExceptionHandler((最后一个线程caugthThread,最后一个Throwable throwable) - > {
                 uncaughtExceptionHandler.uncaughtException(caugthThread,throwable);
             });
            返回newThread;
         };
        返回新的FinalizableDelegatedExecutorService
                 (新的ThreadPoolExecutor(1,1,
                         0L,TimeUnit.MILLISECONDS,
                        新的LinkedBlockingQueue(),
                        厂){


                     afterExecute(Runnable runnable,Throwable throwable){
                         super.afterExecute(runnable,throwable);
                         if(throwable == null && runnable instanceof Future){
                            尝试{
                                未来的未来=(未来)可运行;
                                 if(future.isDone()){
                                    的Future.get();
                                 }
                             catch(CancellationException ce){
                                 throwable = ce;
                             catch(ExecutionException ee){
                                 throwable = ee.getCause();
                             catch(InterruptedException即){
                                 Thread.currentThread()中断();  //忽略/重置
                             }
                         }
                         if(throwable!= null){
                             uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),抛出);
                         }
                     }
                 });
     }



    私有静态类FinalizableDelegatedExecutorService
            inheritanceDelegatedExecutorService {
         FinalizableDelegatedExecutorService(ExecutorService执行器){
            超级(执行);
         }
        保护无效finalize(){
             super.shutdown();
         }
     }

     / **
      *仅包含ExecutorService方法的包装类
      ExecutorService实现*。
      * /
    私人静态类DelegatedExecutorService扩展AbstractExecutorService {
         private final ExecutorService e;
         DelegatedExecutorService(ExecutorService executor){e = executor;  }
         public void execute(Runnable command){e.execute(command);  }
         public void shutdown(){e.shutdown();  }
         public List shutdownNow(){return e.shutdownNow();  }
         public boolean isShutdown(){return e.isShutdown();  }
         public boolean isTerminated(){return e.isTerminated();  }
         public boolean awaitTermination(long timeout,TimeUnit unit)
                抛出InterruptedException {
            返回e.awaitTermination(超时,单位);
         }
        公共未来提交(Runnable任务){
            返回e.submit(任务);
         }
        公开Future submit(可调用任务){
            返回e.submit(任务);
         }
        公共未来提交(Runnable任务,T结果){
            返回e.submit(任务,结果);
         }
         public List> invokeAll(Collection> tasks)
                抛出InterruptedException {
            返回e.invokeAll(任务);
         }
         public List> invokeAll(Collection> tasks,
                                             超时,TimeUnit单位)
                抛出InterruptedException {
            返回e.invokeAll(任务,超时,单位);
         }
         public T invokeAny(Collection> tasks)
                抛出InterruptedException,ExecutionException {
            返回e.invokeAny(任务);
         }
        公共T invokeAny(集合>任务,
                               超时,TimeUnit单位)
                抛出InterruptedException,ExecutionException,TimeoutException {
            返回e.invokeAny(任务,超时,单位);
         }
     }



    私人SingleThreadExecutorWithExceptions(){}
 }

我不提供ThreadPoolExecutor的子类,而是提供一个ThreadFactory实例来创build新的线程,并为它们提供一个UncaughtExceptionHandler