处理来自Java ExecutorService任务的exception
我正在尝试使用Java的ThreadPoolExecutor
类来运行具有固定数量的线程的大量重量级任务。 每个任务都有很多地方可能由于例外而失败。
我已经子类ThreadPoolExecutor
,我已经重写了afterExecute
方法应该提供任何运行任务时遇到的未捕获的exception。 但是,我似乎无法使其工作。
例如:
public class ThreadPoolErrors extends ThreadPoolExecutor { public ThreadPoolErrors() { super( 1, // core threads 1, // max threads 1, // timeout TimeUnit.MINUTES, // timeout units new LinkedBlockingQueue<Runnable>() // work queue ); } protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if(t != null) { System.out.println("Got an error: " + t); } else { System.out.println("Everything's fine--situation normal!"); } } public static void main( String [] args) { ThreadPoolErrors threadPool = new ThreadPoolErrors(); threadPool.submit( new Runnable() { public void run() { throw new RuntimeException("Ouch! Got an error."); } } ); threadPool.shutdown(); } }
这个程序的输出是“一切正常 – 情况正常!” 即使提交给线程池的唯一Runnable引发exception。 任何线索到这里发生了什么?
谢谢!
从文档 :
注意:当任务(如FutureTask)中明确地或通过诸如submit之类的方法包含任务时,这些任务对象捕获和维护计算exception,所以它们不会导致突然终止,并且内部exception不会传递给此方法。
当你提交一个Runnable时,它会被包装在一个Future中。
你的afterExecute应该是这样的:
protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Future<?> future = (Future<?>) r; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) { System.out.println(t); } }
警告 :应该指出的是,这个解决scheme将阻止调用线程。
如果你想处理任务抛出的exception,那么通常使用Callable
而不是Runnable
。
Callable.call()
被允许抛出检查的exception,并将它们传播callback用线程:
Callable task = ... Future future = executor.submit(task); try { future.get(); } catch (ExecutionException ex) { ex.getCause().printStackTrace(); }
如果Callable.call()
抛出一个exception,它将被包装在ExecutionException
,并被Future.get()
抛出。
这很可能比ThreadPoolExecutor
子类更可取。 如果exception是可恢复的,那么它也给了你重新提交任务的机会。
这种行为的解释是正确的在afterExecute javadoc :
注意:当任务(如FutureTask)中明确地或通过诸如submit之类的方法包含任务时,这些任务对象捕获和维护计算exception,所以它们不会导致突然终止,并且内部exception不会传递给此方法。
我使用VerboseRunnable
-log中的 VerboseRunnable
类,它吞下所有exception并logging它们。 非常方便,例如:
import com.jcabi.log.VerboseRunnable; scheduler.scheduleWithFixedDelay( new VerboseRunnable( Runnable() { public void run() { // the code, which may throw } }, true // it means that all exceptions will be swallowed and logged ), 1, 1, TimeUnit.MILLISECONDS );
我通过将提供的runnable提交给执行程序来解决这个问题。
CompletableFuture.runAsync( () -> { try { runnable.run(); } catch (Throwable e) { Log.info(Concurrency.class, "runAsync", e); } }, executorService );
另一个解决scheme是使用ManagedTask和ManagedTaskListener 。
您需要一个Callable或Runnable来实现接口ManagedTask 。
方法getManagedTaskListener
返回你想要的实例。
public ManagedTaskListener getManagedTaskListener() {
而你在ManagedTaskListener中实现了taskDone
方法:
@Override public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) { if (exception != null) { LOGGER.log(Level.SEVERE, exception.getMessage()); } }
有关托pipe任务生命周期和侦听器的更多详细信息。
如果您的ExecutorService
来自外部源(例如,无法对ThreadPoolExecutor
进行子类化并覆盖afterExecute()
),则可以使用dynamic代理来实现所需的行为:
public static ExecutorService errorAware(final ExecutorService executor) { return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {ExecutorService.class}, (proxy, method, args) -> { if (method.getName().equals("submit")) { final Object arg0 = args[0]; if (arg0 instanceof Runnable) { args[0] = new Runnable() { @Override public void run() { final Runnable task = (Runnable) arg0; try { task.run(); if (task instanceof Future<?>) { final Future<?> future = (Future<?>) task; if (future.isDone()) { try { future.get(); } catch (final CancellationException ce) { // Your error-handling code here ce.printStackTrace(); } catch (final ExecutionException ee) { // Your error-handling code here ee.getCause().printStackTrace(); } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); } } } } catch (final RuntimeException re) { // Your error-handling code here re.printStackTrace(); throw re; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } else if (arg0 instanceof Callable<?>) { args[0] = new Callable<Object>() { @Override public Object call() throws Exception { final Callable<?> task = (Callable<?>) arg0; try { return task.call(); } catch (final Exception e) { // Your error-handling code here e.printStackTrace(); throw e; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } } return method.invoke(executor, args); }); }
这是因为AbstractExecutorService :: submit
正在将你的runnable
封装到RunnableFuture
(除了FutureTask
)像下面
AbstractExecutorService.java public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE//////// execute(ftask); return ftask; }
然后execute
将它传递给Worker
和Worker.run()
将调用下面。
ThreadPoolExecutor.java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); /////////HERE//////// } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
最后
task.run();
在上面的代码调用将调用FutureTask.run()
。 这里是exception处理程序代码,因为你没有得到预期的exception。
class FutureTask<V> implements RunnableFuture<V> public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { /////////HERE//////// result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
如果要监视任务的执行,则可以旋转1或2个线程(可能更多取决于负载),并使用它们从ExecutionCompletionService包装器中执行任务。
这工作
- 它是从SingleThreadExecutor派生的,但你可以很容易地适应它
- Java 8 lamdas代码,但容易修复
它会创build一个单线程执行器,可以得到很多任务; 并等待当前的一个结束执行从下一个开始
在uncaugth错误或exception的情况下, uncaughtExceptionHandler将捕获它
公共最终课SingleThreadExecutorWithExceptions { 公共静态ExecutorService newSingleThreadExecutorWithExceptions(最终Thread.UncaughtExceptionHandler uncaughtExceptionHandler){ ThreadFactory factory =(Runnable runnable) - > { final Thread newThread = new Thread(runnable,“SingleThreadExecutorWithExceptions”); newThread.setUncaughtExceptionHandler((最后一个线程caugthThread,最后一个Throwable throwable) - > { uncaughtExceptionHandler.uncaughtException(caugthThread,throwable); }); 返回newThread; }; 返回新的FinalizableDelegatedExecutorService (新的ThreadPoolExecutor(1,1, 0L,TimeUnit.MILLISECONDS, 新的LinkedBlockingQueue(), 厂){ afterExecute(Runnable runnable,Throwable throwable){ super.afterExecute(runnable,throwable); if(throwable == null && runnable instanceof Future){ 尝试{ 未来的未来=(未来)可运行; if(future.isDone()){ 的Future.get(); } catch(CancellationException ce){ throwable = ce; catch(ExecutionException ee){ throwable = ee.getCause(); catch(InterruptedException即){ Thread.currentThread()中断(); //忽略/重置 } } if(throwable!= null){ uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),抛出); } } }); } 私有静态类FinalizableDelegatedExecutorService inheritanceDelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService执行器){ 超级(执行); } 保护无效finalize(){ super.shutdown(); } } / ** *仅包含ExecutorService方法的包装类 ExecutorService实现*。 * / 私人静态类DelegatedExecutorService扩展AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor){e = executor; } public void execute(Runnable command){e.execute(command); } public void shutdown(){e.shutdown(); } public List shutdownNow(){return e.shutdownNow(); } public boolean isShutdown(){return e.isShutdown(); } public boolean isTerminated(){return e.isTerminated(); } public boolean awaitTermination(long timeout,TimeUnit unit) 抛出InterruptedException { 返回e.awaitTermination(超时,单位); } 公共未来提交(Runnable任务){ 返回e.submit(任务); } 公开Future submit(可调用任务){ 返回e.submit(任务); } 公共未来提交(Runnable任务,T结果){ 返回e.submit(任务,结果); } public List> invokeAll(Collection> tasks) 抛出InterruptedException { 返回e.invokeAll(任务); } public List> invokeAll(Collection> tasks, 超时,TimeUnit单位) 抛出InterruptedException { 返回e.invokeAll(任务,超时,单位); } public T invokeAny(Collection> tasks) 抛出InterruptedException,ExecutionException { 返回e.invokeAny(任务); } 公共T invokeAny(集合>任务, 超时,TimeUnit单位) 抛出InterruptedException,ExecutionException,TimeoutException { 返回e.invokeAny(任务,超时,单位); } } 私人SingleThreadExecutorWithExceptions(){} }
我不提供ThreadPoolExecutor的子类,而是提供一个ThreadFactory实例来创build新的线程,并为它们提供一个UncaughtExceptionHandler