您好,登录后才能下订单哦!
本篇内容介绍了“Spring Boot集成quartz如何实现定时任务并支持切换任务数据源”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
在工作中经常会需要使用到定时任务处理各种周期性的任务,org.quartz是处理此类定时任务的一个优秀框架。随着项目一点点推进,此时我们并不满足于任务仅仅是定时执行,我们还想要对任务进行更多的控制,随时能对任务进行人为干预,就需要对quartz有更深入的了解。而随着微服务的流行,项目中多数据源的情况也越来越常见,在定时任务中集成多数据源切换的功能也需要集成进来。
集成quartz实现定时任务
通过实现Job
类,在实现方法中写我们具体想要定时任务完成的工作,然后交给quartz
管理。
Job
只负责实现具体任务,所以还需要借助JobDetail
来存储一些描述Job
的基本信息。
为构造JobDetail
实体提供的builder-style API
。你可以这样使用它来构建一个JobDetail
:
@Bean public JobDetail jobDetail() { return JobBuilder.newJob().ofType(SampleJob.class) .storeDurably() .withIdentity("Qrtz_Job_Detail") .withDescription("Invoke Sample Job service...") .build(); }
在Spring
中配置JobDetail
的方式:
@Bean public JobDetailFactoryBean jobDetail() { JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean(); jobDetailFactory.setJobClass(SampleJob.class); jobDetailFactory.setDescription("Invoke Sample Job service..."); jobDetailFactory.setDurability(true); return jobDetailFactory; }
触发器,代表一个调度参数的配置,什么时候去调度:
@Bean public Trigger trigger(JobDetail job) { return TriggerBuilder.newTrigger().forJob(job) .withIdentity("Qrtz_Trigger") .withDescription("Sample trigger") .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1)) .build(); }
调度器,通过Job
和Trigger
来注册一个调度器:
@Bean public Scheduler scheduler(Trigger trigger, JobDetail job) { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(new ClassPathResource("quartz.properties").getInputStream()); Scheduler scheduler = factory.getScheduler(); scheduler.setJobFactory(springBeanJobFactory()); scheduler.scheduleJob(job, trigger); scheduler.start(); return scheduler; }
在quartz
中Job
就是我们需要去执行的任务,由Scheduler
调度器负责调度任务们依靠制定好的Trigger
来定时执行任务。
因此首先我们需要结合以上基础给系统添加一个Job。
public void addJob(BaseJob job) throws SchedulerException { /** 创建JobDetail实例,绑定Job实现类 * JobDetail 表示一个具体的可执行的调度程序,job是这个可执行调度程序所要执行的内容 * 另外JobDetail还包含了这个任务调度的方案和策略**/ // 指明job的名称,所在组的名称,以及绑定job类 JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass()) .withIdentity(job.getJobKey()) .withDescription(job.getDescription()) .usingJobData(job.getDataMap()) .build(); /** * Trigger代表一个调度参数的配置,什么时候去调度 */ //定义调度触发规则, 使用cronTrigger规则 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(job.getJobName(),job.getJobGroup()) .withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression())) .startNow() .build(); //将任务和触发器注册到任务调度中去 scheduler.scheduleJob(jobDetail,trigger); //判断调度器是否启动 if(!scheduler.isStarted()){ scheduler.start(); } log.info(String.format("定时任务:%s.%s-已添加到调度器!", job.getJobGroup(),job.getJobName())); }
首先需要定义好我们的Job,之后通过Job初始化JobDetail
和Trigger
,最后将JobDetail
和Trigger
注册到调度器中。
Job
的结构如下:
public abstract class BaseJob implements Job,Serializable { private static final long serialVersionUID = 1L; private static final String JOB_MAP_KEY = "self"; /** * 任务名称 */ private String jobName; /** * 任务分组 */ private String jobGroup; /** * 任务状态 是否启动任务 */ private String jobStatus; /** * cron表达式 */ private String cronExpression; /** * 描述 */ private String description; /** * 任务执行时调用哪个类的方法 包名+类名 */ private Class beanClass = this.getClass(); /** * 任务是否有状态 */ private String isConcurrent; /** * Spring bean */ private String springBean; /** * 任务调用的方法名 */ private String methodName; /** * 该任务所使用的数据源 */ private String dataSource = DataSourceEnum.DB1.getName(); /** * 为了将执行后的任务持久化到数据库中 */ @JsonIgnore private JobDataMap dataMap = new JobDataMap(); public JobKey getJobKey(){ return JobKey.jobKey(jobName, jobGroup);// 任务名称和组构成任务key } ... }
可以看到Job
中定义了任务的一些基本信息,重点关注其中的dataSource
和dataMap
属性。其中dataSource
是任务所使用的数据源,并给了一个默认值;由于任务在添加后会持久化到数据库中,之后解析任务就会用到dataMap
。
在添加Job
的时候,JobDetail
和Trigger
都是通过关键字new
生成的,而调度器Scheduler
则需要放在容器中维护。
@Configuration @Order public class SchedulerConfig { @Autowired private MyJobFactory myJobFactory; @Value("${spring.profiles.active}") private String profile; /* * 通过SchedulerFactoryBean获取Scheduler的实例 */ @Bean(name = "scheduler") public Scheduler scheduler() throws Exception { return schedulerFactoryBean().getScheduler(); } @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setOverwriteExistingJobs(true); // 延时启动 factory.setStartupDelay(20); // 加载quartz数据源配置 factory.setQuartzProperties(quartzProperties()); // 自定义Job Factory,用于Spring注入 factory.setJobFactory(myJobFactory); /*********全局监听器配置************/ JobListener myJobListener = new SchedulerListener(); factory.setGlobalJobListeners(myJobListener);//直接添加为全局监听器 return factory; } @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); if (Util.PRODUCT.equals(profile)) {//正式环境 System.out.println("正式环境quartz配置"); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz-prod.properties")); } else { System.out.println("测试环境quartz配置"); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); } //在quartz.properties中的属性被读取并注入后再初始化对象 propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } /* * quartz初始化监听器 */ @Bean public QuartzInitializerListener executorListener() { return new QuartzInitializerListener(); } }
上述代码中,将scheduler
加入到Spring
容器中。scheduler
是由SchedulerFactoryBean
进行维护的,在SchedulerFactoryBean
中对调度器工厂做了一些基本设置并从配置文件中加载了quartz数据源配置(配置文件的读取会根据运行环境profile
来进行自动切换),配置了一个全局监听器用以监听任务的执行过程。
使用Spring提供的JobFactory
。
@Component public class MyJobFactory extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { // 调用父类的方法 Object jobInstance = super.createJobInstance(bundle); // 进行注入 capableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
quartz.properties
中是quartz连接数据库的一些配置信息。
# \u56FA\u5B9A\u524D\u7F00org.quartz # \u4E3B\u8981\u5206\u4E3Ascheduler\u3001threadPool\u3001jobStore\u3001plugin\u7B49\u90E8\u5206 # # org.quartz.scheduler.instanceName = DefaultQuartzScheduler org.quartz.scheduler.rmi.export = false org.quartz.scheduler.rmi.proxy = false org.quartz.scheduler.wrapJobExecutionInUserTransaction = false # \u5B9E\u4F8B\u5316ThreadPool\u65F6\uFF0C\u4F7F\u7528\u7684\u7EBF\u7A0B\u7C7B\u4E3ASimpleThreadPool org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool # threadCount\u548CthreadPriority\u5C06\u4EE5setter\u7684\u5F62\u5F0F\u6CE8\u5165ThreadPool\u5B9E\u4F8B # \u5E76\u53D1\u4E2A\u6570 org.quartz.threadPool.threadCount = 5 # \u4F18\u5148\u7EA7 org.quartz.threadPool.threadPriority = 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true org.quartz.jobStore.misfireThreshold = 5000 # \u9ED8\u8BA4\u5B58\u50A8\u5728\u5185\u5B58\u4E2D #org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore #\u6301\u4E45\u5316 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX #org.quartz.jobStore.useProperties=false org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.dataSource = qzDS org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=UTF-8&useSSL=false&testOnBorrow=true&testWhileIdle=true org.quartz.dataSource.qzDS.user=quartz org.quartz.dataSource.qzDS.password=123456 org.quartz.dataSource.qzDS.maxConnections = 30 org.quartz.dataSource.qzDS.validationQuery = SELECT 1 FROM DUAL org.quartz.dataSource.qzDS.validateOnCheckout = true org.quartz.dataSource.qzDS.idleConnectionValidationSeconds = 40 #org.quartz.dataSource.qzDS.discardIdleConnectionsSeconds = 60
quartz
会根据这个配置文件将Job
持久化到数据库中,也因此quartz
会需要初始化一些数据库表,表结构文件在文末。
调度器监听器用以监听任务的执行状态。
public class SchedulerListener implements JobListener { private final Logger LOG = LoggerFactory.getLogger(SchedulerListener.class); public static final String LISTENER_NAME = "QuartSchedulerListener"; @Override public String getName() { return LISTENER_NAME; //must return a name } //任务被调度前 @Override public void jobToBeExecuted(JobExecutionContext context) { String dataSource = context.getJobDetail().getJobDataMap().getString("dataSource"); // 切换任务的数据源 DataSourceContextHolder.setDB(dataSource); String jobName = context.getJobDetail().getKey().toString(); LOG.info("Job {} is going to start,switch dataSource to {},Thread name {}", jobName, dataSource, Thread.currentThread().getName()); } //任务调度被拒了 @Override public void jobExecutionVetoed(JobExecutionContext context) { String jobName = context.getJobDetail().getKey().toString(); LOG.error("job {} is jobExecutionVetoed", jobName); //可以做一些日志记录原因 } //任务被调度后 @Override public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { // 清空存储的数据源 String jobName = context.getJobDetail().getKey().toString(); DataSourceContextHolder.clearDB(); LOG.info("Job : {} is finished", jobName); if (jobException != null && !jobException.getMessage().equals("")) { LOG.error("Exception thrown by: " + jobName + " Exception: " + jobException.getMessage()); } } }
SchedulerListener
监听任务被调度前、调度后和调度被拒绝时的状态,在任务被调度之前和之后对任务所使用的数据源进行了处理。如果项目中不需要数据源切换的话,这个监听器是不需要的,到此已经完成了quartz
的集成。
多数据源切换
通过自定义DynamicDataSource
来覆盖Spring Boot中原有的数据源。
通过读取配置文件中不同的数据源,初始化项目中可能用到的数据源用以切换。
/** * 多数据源配置类 */ @Configuration public class DataSourceConfig { //数据源1 @Bean(name = "datasource1") @ConfigurationProperties(prefix = "spring.datasource.db1") // application.properteis中对应属性的前缀 public DataSource dataSource1() { return DataSourceBuilder.create().build(); } //数据源2 @Bean(name = "datasource2") @ConfigurationProperties(prefix = "spring.datasource.db2") // application.properteis中对应属性的前缀 public DataSource dataSource2() { return DataSourceBuilder.create().build(); } /** * 动态数据源: 通过AOP在不同数据源之间动态切换 * * @return */ @Primary @Bean(name = "dynamicDataSource") public DataSource dynamicDataSource() { DynamicDataSource dynamicDataSource = new DynamicDataSource(); // 默认数据源 dynamicDataSource.setDefaultTargetDataSource(dataSource1()); // 配置多数据源 Map<Object, Object> dsMap = new HashMap(); dsMap.put(DataSourceEnum.DB1.getName(), dataSource1()); dsMap.put(DataSourceEnum.DB2.getName(), dataSource2()); dynamicDataSource.setTargetDataSources(dsMap); return dynamicDataSource; } @Bean public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); //设置数据源 sqlSessionFactoryBean.setDataSource(dataSource); return sqlSessionFactoryBean.getObject(); } /** * 配置@Transactional注解事物 * * @return */ @Bean public PlatformTransactionManager transactionManager() { return new DataSourceTransactionManager(dynamicDataSource()); } }
spring: datasource: db1: driver-class-name: com.mysql.cj.jdbc.Driver username: doctor password: 123456 type: com.zaxxer.hikari.HikariDataSource jdbc-url: jdbc:mysql://127.0.0.1:3306/doctor?useSSL=false&testOnBorrow=true&testWhileIdle=true db2: driver-class-name: com.mysql.cj.jdbc.Driver username: quartz password: 123456 type: com.zaxxer.hikari.HikariDataSource jdbc-url: jdbc:mysql://127.0.0.1:3307/quartz?useSSL=false&testOnBorrow=true&testWhileIdle=true
由于quartz
在执行过程中是通过不同的线程来执行Job
的,因此此处通过ThreadLocal
来保存线程所使用的数据源情况。
/** * 保存本地数据源 */ public class DataSourceContextHolder { private static final Logger LOG = LoggerFactory.getLogger(DataSourceContextHolder.class); /** * 默认数据源 */ public static final String DEFAULT_DS = DataSourceEnum.DB1.getName(); /** * ThreadLocal之后会进行讲解 */ private static final ThreadLocal<String> contextHolder = new ThreadLocal<>(); // 设置数据源名 public static void setDB(String dbType) { LOG.info("切换到{}数据源", dbType); contextHolder.set(dbType); } // 获取数据源名 public static String getDB() { return (contextHolder.get()); } // 清除数据源名 public static void clearDB() { contextHolder.remove(); } }
获取执行中所使用的数据源。由于数据源被保存在了DataSourceContextHolder
中的ThreadLocal
中,所以直接获取就行了。
/** * 获取本地数据源 */ public class DynamicDataSource extends AbstractRoutingDataSource { private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSource.class); @Override protected Object determineCurrentLookupKey() { LOG.info("数据源为{}", DataSourceContextHolder.getDB()); return DataSourceContextHolder.getDB(); } }
至此就完成了集成quartz
及数据源切换的功能。然后就是具体的任务了。
具体的任务需要继承BaseJob
并在execute
方法中重写具体需要执行的任务。
@Slf4j @Service public class ReadNumJob extends BaseJob { @Autowired private RedisService redisService; @Autowired private JdbcTemplate jdbcTemplate; private final Logger LOG = LoggerFactory.getLogger(ReadNumJob.class); @Override public void execute(JobExecutionContext context) { doSomething(); } }
然后在添加任务时指定任务所使用的数据源
ReadNumJob job = new ReadNumJob(); job.setJobName("test"); job.setJobGroup("hys"); job.setDescription("test"); // 指定数据源 job.getDataMap().put("dataSource", DataSourceEnum.DB1.getName()); job.setCronExpression( "0 */1 * * * ?" ); try { jobAndTriggerService.addJob(job); } catch (SchedulerException e) { e.printStackTrace(); }
“Spring Boot集成quartz如何实现定时任务并支持切换任务数据源”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。