您好,登录后才能下订单哦!
这篇文章主要介绍“java Spring定时任务Quartz执行过程是什么”,在日常操作中,相信很多人在java Spring定时任务Quartz执行过程是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”java Spring定时任务Quartz执行过程是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
在日常开发中经常会用到定时任务,用来;库表扫描发送MQ、T+n账单结算、缓存数据更新、秒杀活动状态变更,等等。因为有了Spring的Schedule极大的方便了我们对这类场景的使用。那么,除了应用你还了解它多少呢;
默认初始化多少个任务线程
JobStore有几种实现,你平时用的都是哪个
一个定时任务的执行流程简述下
为了更好的做源码分析,我们将平时用的定时任务服务单独抽离出来。
1itstack-demo-code-schedule
2└── src
3 ├── main
4 │ ├── java
5 │ │ └── org.itstack.demo
6 │ │ ├── DemoTask.java
7 │ │ └── JobImpl.java
8 │ └── resources
9 │ ├── props
10 │ │ └── config.properties
11 │ ├── spring
12 │ │ └── spring-config-schedule-task.xml
13 │ ├── logback.xml
14 │ └── spring-config.xml
15 └── test
16 └── java
17 └── org.itstack.demo.test
18 ├── ApiTest.java
19 ├── MyQuartz.java
20 └── MyTask.java
JDK 1.8
IDEA 2019.3.1
Spring 4.3.24.RELEASE
quartz 2.3.2 {不同版本略有代码差异}
1<dependency>
2 <groupId>org.quartz-scheduler</groupId>
3 <artifactId>quartz</artifactId>
4 <version>2.3.2</version>
5</dependency>
依赖于Spring版本升级quartz选择2.3.2,同时如果你如本文案例中所示使用xml配置任务。那么会有如下更改;
Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean
1 <bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
2 <property name="jobDetail" ref="taskHandler"/>
3 <property name="cronExpression" value="0/5 * * * * ?"/>
4 </bean>
Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean
1 <bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
2 <property name="jobDetail" ref="taskHandler"/>
3 <property name="cronExpression" value="0/5 * * * * ?"/>
4 </bean>
在正式分析前,可以看下quartz的默认配置,很多初始化动作都要从这里取得参数,同样你可以配置自己的配置文件。例如,当你的任务很多时,默认初始化的10个线程组不满足你的业务需求,就可以按需调整。
quart.properties
1# Default Properties file for use by StdSchedulerFactory
2# to create a Quartz Scheduler Instance, if a different
3# properties file is not explicitly specified.
4#
5
6org.quartz.scheduler.instanceName: DefaultQuartzScheduler
7org.quartz.scheduler.rmi.export: false
8org.quartz.scheduler.rmi.proxy: false
9org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
10
11org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
12org.quartz.threadPool.threadCount: 10
13org.quartz.threadPool.threadPriority: 5
14org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
15
16org.quartz.jobStore.misfireThreshold: 60000
17
18org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
平时我们使用Schedule基本都是注解或者xml配置文件,但是为了可以更简单的分析代码,我们从一个简单的Demo入手,放到main函数中。
DemoTask.java & 定义一个等待被执行的任务
1public class DemoTask {
2
3 private Logger logger = LoggerFactory.getLogger(DemoTask.class);
4
5 public void execute() throws Exception{
6 logger.info("定时处理用户信息任务:0/5 * * * * ?");
7 }
8
9}
MyTask.java & 测试类,将配置在xml中的代码抽离出来
1public class MyTask {
2
3 public static void main(String[] args) throws Exception {
4
5 DemoTask demoTask = new DemoTask();
6
7 // 定义了;执行的内容
8 MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
9 methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
10 methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
11 methodInvokingJobDetailFactoryBean.setConcurrent(true);
12 methodInvokingJobDetailFactoryBean.setName("demoTask");
13 methodInvokingJobDetailFactoryBean.afterPropertiesSet();
14
15 // 定义了;执行的计划
16 CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
17 cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
18 cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
19 cronTriggerFactoryBean.setName("demoTask");
20 cronTriggerFactoryBean.afterPropertiesSet();
21
22 // 实现了;执行的功能
23 SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
24 schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
25 schedulerFactoryBean.setAutoStartup(true);
26 schedulerFactoryBean.afterPropertiesSet();
27
28 schedulerFactoryBean.start();
29
30 // 暂停住
31 System.in.read();
32
33 }
34
35}
如果一切顺利,那么会有如下结果:
12020-01-04 10:47:16.369 [main] INFO org.quartz.impl.StdSchedulerFactory[1220] - Using default implementation for ThreadExecutor
22020-01-04 10:47:16.421 [main] INFO org.quartz.core.SchedulerSignalerImpl[61] - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
32020-01-04 10:47:16.422 [main] INFO org.quartz.core.QuartzScheduler[229] - Quartz Scheduler v.2.3.2 created.
42020-01-04 10:47:16.423 [main] INFO org.quartz.simpl.RAMJobStore[155] - RAMJobStore initialized.
52020-01-04 10:47:16.424 [main] INFO org.quartz.core.QuartzScheduler[294] - Scheduler meta-data: Quartz Scheduler (v2.3.2) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'
6 Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
7 NOT STARTED.
8 Currently in standby mode.
9 Number of jobs executed: 0
10 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
11 Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
12
132020-01-04 10:47:16.424 [main] INFO org.quartz.impl.StdSchedulerFactory[1374] - Quartz scheduler 'QuartzScheduler' initialized from an externally provided properties instance.
142020-01-04 10:47:16.424 [main] INFO org.quartz.impl.StdSchedulerFactory[1378] - Quartz scheduler version: 2.3.2
152020-01-04 10:47:16.426 [main] INFO org.quartz.core.QuartzScheduler[2293] - JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@3e9b1010
162020-01-04 10:47:16.651 [main] INFO org.quartz.core.QuartzScheduler[547] - Scheduler QuartzScheduler_$_NON_CLUSTERED started.
17一月 04, 2020 10:47:16 上午 org.springframework.scheduling.quartz.SchedulerFactoryBean startScheduler
18信息: Starting Quartz Scheduler now
192020-01-04 10:47:20.321 [QuartzScheduler_Worker-1] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
202020-01-04 10:47:25.001 [QuartzScheduler_Worker-2] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
212020-01-04 10:47:30.000 [QuartzScheduler_Worker-3] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
222020-01-04 10:47:35.001 [QuartzScheduler_Worker-4] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
232020-01-04 10:47:40.000 [QuartzScheduler_Worker-5] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
24
25Process finished with exit code -1
26
1// 定义了;执行的内容
2MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
3methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
4methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
5methodInvokingJobDetailFactoryBean.setConcurrent(true);
6methodInvokingJobDetailFactoryBean.setName("demoTask");
7methodInvokingJobDetailFactoryBean.afterPropertiesSet();
这块内容主要将我们的任务体(即待执行任务DemoTask)交给MethodInvokingJobDetailFactoryBean管理,首先设置必要信息;
targetObject:目标对象bean,也就是demoTask
targetMethod:目标方法name,也就是execute
concurrent:是否并行执行,非并行执行任务,如果上一个任务没有执行完,下一刻不会执行
name:xml配置非必传,源码中可以获取beanName
最后我们通过手动调用 afterPropertiesSet() 来模拟初始化。如果我们的类是交给 Spring 管理的,那么在实现了 InitializingBean 接口的类,在类配置信息加载后会自动执行 afterPropertiesSet() 。一般实现了 InitializingBean 接口的类,同时也会去实现 FactoryBean
MethodInvokingJobDetailFactoryBean.afterPropertiesSet()
1public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
2 prepare();
3 // Use specific name if given, else fall back to bean name.
4 String name = (this.name != null ? this.name : this.beanName);
5 // Consider the concurrent flag to choose between stateful and stateless job.
6 Class<?> jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);
7 // Build JobDetail instance.
8 JobDetailImpl jdi = new JobDetailImpl();
9 jdi.setName(name);
10 jdi.setGroup(this.group);
11 jdi.setJobClass((Class) jobClass);
12 jdi.setDurability(true);
13 jdi.getJobDataMap().put("methodInvoker", this);
14 this.jobDetail = jdi;
15
16 postProcessJobDetail(this.jobDetail);
17}
源码168行: 根据是否并行执行选择任务类,这两个类都是MethodInvokingJobDetailFactoryBean的内部类,非并行执行的StatefulMethodInvokingJob只是继承MethodInvokingJob添加了标记注解。
源码171行: 创建JobDetailImpl,添加任务明细信息,注意这类的jdi.setJobClass((Class) jobClass)实际就是MethodInvokingJob。MethodInvokingJob也是我们最终要反射调用执行的内容。
源码177行: 初始化任务后赋值给this.jobDetail = jdi,也就是最终的类对象
MethodInvokingJobDetailFactoryBean.getObject()
1@Override
2public JobDetail getObject() {
3 return this.jobDetail;
4}
源码:220行: 获取对象时返回 this.jobDetail,这也就解释了为什么 MethodInvokingJobDetailFactoryBean 初始化后直接赋值给了一个 JobDetail ;
1// 定义了;执行的计划
2CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
3cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
4cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
5cronTriggerFactoryBean.setName("demoTask");
6cronTriggerFactoryBean.afterPropertiesSet();
这一块主要定义任务的执行计划,并将任务执行内容交给 CronTriggerFactoryBean 管理,同时设置必要信息;
jobDetail:设置任务体,xml 中可以直接将对象赋值,硬编码中设置执行的 JobDetail 对象信息。也就是我们上面设置的 JobDetailImpl ,通过 getObject() 获取出来。
cronExpression:计划表达式;秒、分、时、日、月、周、年
CronTriggerFactoryBean.afterPropertiesSet()
1@Override
2public void afterPropertiesSet() throws ParseException {
3
4 // ... 校验属性信息
5
6 CronTriggerImpl cti = new CronTriggerImpl();
7 cti.setName(this.name);
8 cti.setGroup(this.group);
9 if (this.jobDetail != null) {
10 cti.setJobKey(this.jobDetail.getKey());
11 }
12 cti.setJobDataMap(this.jobDataMap);
13 cti.setStartTime(this.startTime);
14 cti.setCronExpression(this.cronExpression);
15 cti.setTimeZone(this.timeZone);
16 cti.setCalendarName(this.calendarName);
17 cti.setPriority(this.priority);
18 cti.setMisfireInstruction(this.misfireInstruction);
19 cti.setDescription(this.description);
20 this.cronTrigger = cti;
21}
源码237行: 创建触发器 CronTriggerImpl 并设置相关属性信息
源码245行: 生成执行计划类 cti.setCronExpression(this.cronExpression);
1public void setCronExpression(String cronExpression) throws ParseException {
2 TimeZone origTz = getTimeZone();
3 this.cronEx = new CronExpression(cronExpression);
4 this.cronEx.setTimeZone(origTz);
5}
CronExpression.java & 解析Cron表达式
1protected void buildExpression(String expression) throws ParseException {
2 expressionParsed = true;
3 try { // ... 初始化 TreeSet xxx = new TreeSet<Integer>();
4
5 int exprOn = SECOND;
6 StringTokenizer exprsTok = new StringTokenizer(expression, " \t",
7 false);
8
9 while (exprsTok.hasMoreTokens() && exprOn <= YEAR) {
10 String expr = exprsTok.nextToken().trim();
11
12 // ... 校验DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符
13
14 StringTokenizer vTok = new StringTokenizer(expr, ",");
15 while (vTok.hasMoreTokens()) {
16 String v = vTok.nextToken();
17 storeExpressionVals(0, v, exprOn);
18 }
19 exprOn++;
20 }
21
22 // ... 校验DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符
23
24} catch (ParseException pe) {
25 throw pe;
26} catch (Exception e) {
27 throw new ParseException("Illegal cron expression format ("
28 + e.toString() + ")", 0);
29}
30
}
Cron表达式有7个字段,CronExpression 把7个字段解析为7个 TreeSet 对象。
填充TreeSet对象值的时候,表达式都会转换为起始值、结束值和增量的计算模式,然后计算出匹配的值放进TreeSet对象
CronTriggerFactoryBean.getObject()
1@Override
2public CronTrigger getObject() {
3 return this.cronTrigger;
4}
源码257行: 获取对象时返回 this.cronTrigger ,也就是 CronTriggerImpl 对象
1// 调度了;执行的计划(scheduler)
2SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
3schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
4schedulerFactoryBean.setAutoStartup(true);
5schedulerFactoryBean.afterPropertiesSet();
6
7schedulerFactoryBean.start();
这一部分如名字一样调度工厂,相当于一个指挥官,可以从全局做调度,比如监听哪些trigger已经ready、分配线程等等,同样也需要设置必要的属性信息;
triggers:按需可以设置多个触发器,本文设置了一个 cronTriggerFactoryBean.getObject() 也就是 CronTriggerImpl 对象
autoStartup:默认是否自动启动任务,默认值为true
这个过程较长包括:调度工厂、线程池、注册任务等等,整体核心加载流程如下;
整个加载过程较长,抽取部分核心代码块进行分析,其中包括的类;
StdScheduler
StdSchedulerFactory
SimpleThreadPool
QuartzScheduler
QuartzSchedulerThread
RAMJobStore
CronTriggerImpl
CronExpression
SchedulerFactoryBean.afterPropertiesSet()
1public void afterPropertiesSet() throws Exception {
2 if (this.dataSource == null && this.nonTransactionalDataSource != null) {
3 this.dataSource = this.nonTransactionalDataSource;
4 }
5 if (this.applicationContext != null && this.resourceLoader == null) {
6 this.resourceLoader = this.applicationContext;
7 }
8 // Initialize the Scheduler instance...
9 this.scheduler = prepareScheduler(prepareSchedulerFactory());
10 try {
11 registerListeners();
12 registerJobsAndTriggers();
13 }
14 catch (Exception ex) {
15 try {
16 this.scheduler.shutdown(true);
17 }
18 catch (Exception ex2) {
19 logger.debug("Scheduler shutdown exception after registration failure", ex2);
20 }
21 throw ex;
22 }
23}
源码474行: 为调度器做准备工作 prepareScheduler(prepareSchedulerFactory()) ,依次执行如下;
11)初始化threadPool(线程池):开发者可以通过org.quartz.threadPool.class配置指定使用哪个线程池类,比如SimpleThreadPool。
22)初始化jobStore(任务存储方式):开发者可以通过org.quartz.jobStore.class配置指定使用哪个任务存储类,比如RAMJobStore。
33)初始化dataSource(数据源):开发者可以通过org.quartz.dataSource配置指定数据源详情,比如哪个数据库、账号、密码等。
44)初始化其他配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;
55)初始化threadExecutor(线程执行器):默认为DefaultThreadExecutor;
66)创建工作线程:根据配置创建N个工作thread,执行start()启动thread,并将N个thread顺序add进threadPool实例的空闲线程列表availWorkers中;
77)创建调度器线程:创建QuartzSchedulerThread实例,并通过threadExecutor.execute(实例)启动调度器线程;
88)创建调度器:创建StdScheduler实例,将上面所有配置和引用组合进实例中,并将实例存入调度器池中
SchedulerFactoryBean.prepareScheduler(SchedulerFactory schedulerFactory)
SchedulerFactoryBean.createScheduler(schedulerFactory, this.schedulerName);
SchedulerFactoryBean.createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
Scheduler newScheduler = schedulerFactory.getScheduler();
StdSchedulerFactory.getScheduler();
sched = instantiate(); 包括一系列核心操作;
源码477行: 调用父类 SchedulerAccessor.registerJobsAndTriggers() 注册任务和触发器
1for (Trigger trigger : this.triggers) {
2 addTriggerToScheduler(trigger);
3}
SchedulerAccessor.addTriggerToScheduler() & SchedulerAccessor 是SchedulerFactoryBean的父类
1private boolean addTriggerToScheduler(Trigger trigger) throws SchedulerException {
2 boolean triggerExists = (getScheduler().getTrigger(trigger.getKey()) != null);
3 if (triggerExists && !this.overwriteExistingJobs) {
4 return false;
5 }
6 // Check if the Trigger is aware of an associated JobDetail.
7 JobDetail jobDetail = (JobDetail) trigger.getJobDataMap().remove("jobDetail");
8 if (triggerExists) {
9 if (jobDetail != null && !this.jobDetails.contains(jobDetail) && addJobToScheduler(jobDetail)) {
10 this.jobDetails.add(jobDetail);
11 }
12 try {
13 getScheduler().rescheduleJob(trigger.getKey(), trigger);
14 }
15 catch (ObjectAlreadyExistsException ex) {
16 if (logger.isDebugEnabled()) {
17 logger.debug("Unexpectedly encountered existing trigger on rescheduling, assumably due to " +
18 "cluster race condition: " + ex.getMessage() + " - can safely be ignored");
19 }
20 }
21 }
22 else {
23 try {
24 if (jobDetail != null && !this.jobDetails.contains(jobDetail) &&
25 (this.overwriteExistingJobs || getScheduler().getJobDetail(jobDetail.getKey()) == null)) {
26 getScheduler().scheduleJob(jobDetail, trigger);
27 this.jobDetails.add(jobDetail);
28 }
29 else {
30 getScheduler().scheduleJob(trigger);
31 }
32 }
33 catch (ObjectAlreadyExistsException ex) {
34 if (logger.isDebugEnabled()) {
35 logger.debug("Unexpectedly encountered existing trigger on job scheduling, assumably due to " +
36 "cluster race condition: " + ex.getMessage() + " - can safely be ignored");
37 }
38 if (this.overwriteExistingJobs) {
39 getScheduler().rescheduleJob(trigger.getKey(), trigger);
40 }
41 }
42 }
43 return true;
44}
源码299行: addJobToScheduler(jobDetail) 一直会调用到 RAMJobStore 进行存放任务信息到 HashMap
1public void storeJob(JobDetail newJob,
2 boolean replaceExisting) throws ObjectAlreadyExistsException {
3 JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
4 boolean repl = false;
5 synchronized (lock) {
6 if (jobsByKey.get(jw.key) != null) {
7 if (!replaceExisting) {
8 throw new ObjectAlreadyExistsException(newJob);
9 }
10 repl = true;
11 }
12 if (!repl) {
13 // get job group
14 HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
15 if (grpMap == null) {
16 grpMap = new HashMap<JobKey, JobWrapper>(100);
17 jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
18 }
19 // add to jobs by group
20 grpMap.put(newJob.getKey(), jw);
21 // add to jobs by FQN map
22 jobsByKey.put(jw.key, jw);
23 } else {
24 // update job detail
25 JobWrapper orig = jobsByKey.get(jw.key);
26 orig.jobDetail = jw.jobDetail; // already cloned
27 }
28 }
29}
初始化线程组;
SimpleThreadPool.initialize() & 这里的count是默认配置中的数量,可以更改
1 // create the worker threads and start them
2 Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
3 while(workerThreads.hasNext()) {
4 WorkerThread wt = workerThreads.next();
5 wt.start();
6 availWorkers.add(wt);
7 }
prepareScheduler
createScheduler
schedulerFactory
StdSchedulerFactory.getScheduler()
getScheduler()->instantiate()
源码1323行: tp.initialize();
案例中使用硬编码方式调用 schedulerFactoryBean.start() 启动线程服务。线程的协作通过Object sigLock来实现,关于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock唤醒的是只有线程QuartzSchedulerThread。核心流程如下;
这个启动过程中,核心的代码类,如下;
StdScheduler
QuartzScheduler
QuartzSchedulerThread
ThreadPool
RAMJobStore
CronTriggerImpl
JobRunShellFactory
QuartzScheduler.start() & 启动
1public void start() throws SchedulerException {
2
3 if (shuttingDown|| closed) {
4 throw new SchedulerException(
5 "The Scheduler cannot be restarted after shutdown() has been called.");
6 }
7
8 // QTZ-212 : calling new schedulerStarting() method on the listeners
9 // right after entering start()
10 notifySchedulerListenersStarting();
11
12 if (initialStart == null) {
13 initialStart = new Date();
14 this.resources.getJobStore().schedulerStarted();
15 startPlugins();
16 } else {
17 resources.getJobStore().schedulerResumed();
18 }
19
20 // 唤醒线程
21 schedThread.togglePause(false);
22
23 getLog().info(
24 "Scheduler " + resources.getUniqueIdentifier() + " started.");
25
26 notifySchedulerListenersStarted();
27}
QuartzSchedulerThread.run() & 执行过程
1@Override
2public void run() {
3 int acquiresFailed = 0;
4
5 // 只有调用了halt()方法,才会退出这个死循环
6 while (!halted.get()) {
7 try {
8
9 // 一、如果是暂停状态,则循环超时等待1000毫秒
10
11 // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..
12
13 // 阻塞直到有空闲的线程可用并返回可用的数量
14 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
15 if(availThreadCount > 0) {
16
17 List<OperableTrigger> triggers;
18 long now = System.currentTimeMillis();
19 clearSignaledSchedulingChange();
20
21 try {
22 // 二、获取acquire状态的Trigger列表,也就是即将执行的任务
23 triggers = qsRsrcs.getJobStore().acquireNextTriggers(
24 now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat
25 acquiresFailed = 0;
26 if (log.isDebugEnabled())
27 log.debug("batch acquisition of " + (triggers == null ? 0 : triggers
28 } catch(){//...}
29
30 if (triggers != null && !triggers.isEmpty()) {
31
32 // 三:获取List第一个Trigger的下次触发时刻
33 long triggerTime = triggers.get(0).getNextFireTime().getTime();
34
35 // 四:获取任务触发集合
36 List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
37
38 // 五:设置Triggers为'executing'状态
39 qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
40
41 // 六:创建JobRunShell
42 qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
43
44 // 七:执行Job
45 qsRsrcs.getThreadPool().runInThread(shell)
46
47 continue; // while (!halted)
48 }
49 } else { // if(availThreadCount > 0)
50 // should never happen, if threadPool.blockForAvailableThreads() follows con
51 continue; // while (!halted)
52 }
53
54
55 } catch(RuntimeException re) {
56 getLog().error("Runtime error occurred in main trigger firing loop.", re);
57 }
58 }
59
60 qs = null;
61 qsRsrcs = null;
62}
源码391行: 创建JobRunShell,JobRunShell实例在initialize()方法就会把包含业务逻辑类的JobDetailImpl设置为它的成员属性,为后面执行业务逻辑代码做准备。执行业务逻辑代码在runInThread(shell)方法里面。
QuartzSchedulerThread.run() & 部分代码
1JobRunShell shell = null;
2try {
3 shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
4 shell.initialize(qs);
5} catch (SchedulerException se) {
6 qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
7 continue;
8}
源码398行: qsRsrcs.getThreadPool().runInThread(shell)
SimpleThreadPool.runInThread
1// 保存所有WorkerThread的集合
2private List<WorkerThread> workers;
3// 空闲的WorkerThread集合
4private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
5// 任务的WorkerThread集合
6private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
7
8/**
9 * 维护workers、availWorkers和busyWorkers三个列表数据
10 * 有任务需要一个线程出来执行:availWorkers.removeFirst();busyWorkers.add()
11 * 然后调用WorkThread.run(runnable)方法
12 */
13public boolean runInThread(Runnable runnable) {
14 if (runnable == null) {
15 return false;
16 }synchronized (nextRunnableLock) {
17
18 handoffPending = true;
19
20 // Wait until a worker thread is available
21 while ((availWorkers.size() < 1) && !isShutdown) {
22 try {
23 nextRunnableLock.wait(500);
24 } catch (InterruptedException ignore) {
25 }
26 }
27
28 if (!isShutdown) {
29 WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
30 busyWorkers.add(wt);
31 wt.run(runnable);
32 } else {
33 // If the thread pool is going down, execute the Runnable
34 // within a new additional worker thread (no thread from the pool).
35
36 WorkerThread wt = new WorkerThread(this, threadGroup,
37 "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
38 busyWorkers.add(wt);
39 workers.add(wt);
40 wt.start();
41 }
42 nextRunnableLock.notifyAll();
43 handoffPending = false;
44}
45
46return true;
47
}
源码428行: WorkerThread ,是一个内部类,主要是赋值并唤醒lock对象的等待线程队列
WorkerThread.run(Runnable newRunnable)
1public void run(Runnable newRunnable) {
2 synchronized(lock) {
3 if(runnable != null) {
4 throw new IllegalStateException("Already running a Runnable!");
5 }
6 runnable = newRunnable;
7 lock.notifyAll();
8 }
9}
源码561行: WorkerThread 的run方法,方法执行lock.notifyAll()后,对应的WorkerThread就会来到run()方法。到这!接近曙光了!终于来到了执行业务的execute()方法的倒数第二步,runnable对象是一个JobRunShell对象,下面在看JobRunShell.run()方法。
WorkerThread.run()
1@Override
2public void run() {
3 boolean ran = false;while (run.get()) {
4 try {
5 synchronized(lock) {
6 while (runnable == null && run.get()) {
7 lock.wait(500);
8 }
9 if (runnable != null) {
10 ran = true;
11 // 启动真正执行的内容,runnable就是JobRunShell
12 runnable.run();
13 }
14 }
15 } cache(){//...}
16}
17//if (log.isDebugEnabled())
18try {
19 getLog().debug("WorkerThread is shut down.");
20} catch(Exception e) {
21 // ignore to help with a tomcat glitch
22}
23
}
JobRunShell.run() & 从上面WorkerThread.run(),调用到这里执行
1public void run() {
2 qs.addInternalSchedulerListener(this);
3
4 try {
5 OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
6 JobDetail jobDetail = jec.getJobDetail();
7
8 do {
9 // ...
10
11 long startTime = System.currentTimeMillis();
12 long endTime = startTime;
13
14 // execute the job
15 try {
16 log.debug("Calling execute on job " + jobDetail.getKey());
17
18 // 执行业务代码,也就是我们的task
19 job.execute(jec);
20
21 endTime = System.currentTimeMillis();
22 } catch (JobExecutionException jee) {
23 endTime = System.currentTimeMillis();
24 jobExEx = jee;
25 getLog().info("Job " + jobDetail.getKey() +
26 " threw a JobExecutionException: ", jobExEx);
27 } catch (Throwable e) {
28 endTime = System.currentTimeMillis();
29 getLog().error("Job " + jobDetail.getKey() +
30 " threw an unhandled Exception: ", e);
31 SchedulerException se = new SchedulerException(
32 "Job threw an unhandled exception.", e);
33 qs.notifySchedulerListenersError("Job ("
34 + jec.getJobDetail().getKey()
35 + " threw an exception.", se);
36 jobExEx = new JobExecutionException(se, false);
37 }
38
39 jec.setJobRunTime(endTime - startTime);
40
41 // 其他代码
42 } while (true);
43
44 } finally {
45 qs.removeInternalSchedulerListener(this);
46 }
47}
QuartzJobBean.execte() & 继续往下走
1public final void execute(JobExecutionContext context) throws JobExecutionException {
2 try {
3 BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
4 MutablePropertyValues pvs = new MutablePropertyValues();
5 pvs.addPropertyValues(context.getScheduler().getContext());
6 pvs.addPropertyValues(context.getMergedJobDataMap());
7 bw.setPropertyValues(pvs, true);
8 }
9 catch (SchedulerException ex) {
10 throw new JobExecutionException(ex);
11 }
12 executeInternal(context);
13}
MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)
1protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
2 try {
3 // 反射执行业务代码
4 context.setResult(this.methodInvoker.invoke());
5 }
6 catch (InvocationTargetException ex) {
7 if (ex.getTargetException() instanceof JobExecutionException) {
8 // -> JobExecutionException, to be logged at info level by Quartz
9 throw (JobExecutionException) ex.getTargetException();
10 }
11 else {
12 // -> "unhandled exception", to be logged at error level by Quartz
13 throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());
14 }
15 }
16 catch (Exception ex) {
17 // -> "unhandled exception", to be logged at error level by Quartz
18 throw new JobMethodInvocationFailedException(this.methodInvoker, ex);
19 }
20}
到此,关于“java Spring定时任务Quartz执行过程是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。