您好,登录后才能下订单哦!
本篇内容介绍了“Java ScheduledThreadPoolExecutor的坑如何解决”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
这个坑就是如果ScheduledThreadPoolExecutor
中执行的任务出错抛出异常后,不仅不会打印异常堆栈信息,同时还会取消后面的调度, 直接看例子。
@Test public void testException() throws InterruptedException { // 创建1个线程的调度任务线程池 ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); // 创建一个任务 Runnable runnable = new Runnable() { volatile int num = 0; @Override public void run() { num ++; // 模拟执行报错 if(num > 5) { throw new RuntimeException("执行错误"); } log.info("exec num: [{}].....", num); } }; // 每隔1秒钟执行一次任务 scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS); Thread.sleep(10000); }
运行结果:
只执行了5次后,就不打印,不执行了,因为报错了
任务报错,也没有打印一次堆栈,更导致调度任务取消,后果十分严重。
解决方法也非常简单,只要通过try catch捕获异常即可。
运行结果:
看到不仅打印了异常堆栈,而且也会进行周期性的调度。
更好的建议可以在自己的项目中封装一个包装类,要求所有的调度都提交通过我们统一的包装类, 如下代码:
@Slf4j public class RunnableWrapper implements Runnable { // 实际要执行的线程任务 private Runnable task; // 线程任务被创建出来的时间 private long createTime; // 线程任务被线程池运行的开始时间 private long startTime; // 线程任务被线程池运行的结束时间 private long endTime; // 线程信息 private String taskInfo; private boolean showWaitLog; /** * 执行间隔时间多久,打印日志 */ private long durMs = 1000L; // 当这个任务被创建出来的时候,就会设置他的创建时间 // 但是接下来有可能这个任务提交到线程池后,会进入线程池的队列排队 public RunnableWrapper(Runnable task, String taskInfo) { this.task = task; this.taskInfo = taskInfo; this.createTime = System.currentTimeMillis(); } public void setShowWaitLog(boolean showWaitLog) { this.showWaitLog = showWaitLog; } public void setDurMs(long durMs) { this.durMs = durMs; } // 当任务在线程池排队的时候,这个run方法是不会被运行的 // 但是当任务结束了排队,得到线程池运行机会的时候,这个方法会被调用 // 此时就可以设置线程任务的开始运行时间 @Override public void run() { this.startTime = System.currentTimeMillis(); // 此处可以通过调用监控系统的API,实现监控指标上报 // 用线程任务的startTime-createTime,其实就是任务排队时间 // 这边打印日志输出,也可以输出到监控系统中 if(showWaitLog) { log.info("任务信息: [{}], 任务排队时间: [{}]ms", taskInfo, startTime - createTime); } // 接着可以调用包装的实际任务的run方法 try { task.run(); } catch (Exception e) { log.error("run task error", e); throw e; } // 任务运行完毕以后,会设置任务运行结束的时间 this.endTime = System.currentTimeMillis(); // 此处可以通过调用监控系统的API,实现监控指标上报 // 用线程任务的endTime - startTime,其实就是任务运行时间 // 这边打印任务执行时间,也可以输出到监控系统中 if(endTime - startTime > durMs) { log.info("任务信息: [{}], 任务执行时间: [{}]ms", taskInfo, endTime - startTime); } } }
使用:
我们还可以在包装类里面封装各种监控行为,如本例打印日志执行时间等。
那大家有没有想过为什么任务出错会导致异常无法打印,甚至调度都取消了呢?让我们从源码出发,一探究竟。
1.下面是调度任务的入口方法。
// ScheduledThreadPoolExecutor#scheduleAtFixedRate public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 将执行任务和参数包装成ScheduledFutureTask对象 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 延迟执行 delayedExecute(t); return t; }
这个方法主要做了两个事情:
将执行任务和参数包装成ScheduledFutureTask对象
调用delayedExecute
方法延迟执行任务
2.延迟或周期性任务的主要执行方法, 主要是将任务丢到队列中,后续由工作线程获取执行。
// ScheduledThreadPoolExecutor#delayedExecute private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { // 将任务丢到阻塞队列中 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 开启工作线程,去执行任务,或者从队列中获取任务执行 ensurePrestart(); } }
3.现在任务已经在队列中了,我们看下任务执行的内容是什么,还记得前面的包装对象ScheduledFutureTask
类,它的实现类是ScheduledFutureTask
,继承了Runnable类。
// ScheduledFutureTask#run方法 public void run() { // 是不是周期性任务 boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); // 不是周期性任务的话, 直接调用一次下面的run else if (!periodic) ScheduledFutureTask.super.run(); // 如果是周期性任务,则调用runAndReset方法,如果返回true,继续执行 else if (ScheduledFutureTask.super.runAndReset()) { // 设置下次调度时间 setNextRunTime(); // 重新执行调度任务 reExecutePeriodic(outerTask); } }
这里的关键就是看ScheduledFutureTask.super.runAndReset()
方法是否返回true,如果是true的话继续调度。
4.runAndReset方法也很简单,关键就是看报异常如何处理。
// FutureTask#runAndReset protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; // 是否继续下次调度,默认false boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { // 执行任务 c.call(); // 执行成功的话,设置为true ran = true; // 异常处理,关键点 } catch (Throwable ex) { // 不会修改ran的值,最终是false,同时也不打印异常堆栈 setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 返回结果 return ran && s == NEW; }
关键点ran变量,最终返回是不是下次继续调度执行
如果抛出异常的话,可以看到不会修改ran为true。
“Java ScheduledThreadPoolExecutor的坑如何解决”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。