python apscheduler cron定时任务触发接口自动化巡检如何实现

发布时间:2023-05-04 10:04:27 作者:iii
来源:亿速云 阅读:143

这篇文章主要介绍“python apscheduler cron定时任务触发接口自动化巡检如何实现”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“python apscheduler cron定时任务触发接口自动化巡检如何实现”文章能帮助大家解决问题。

python cron定时任务触发接口自动化巡检

定时任务触发方式有几种类型,日常的工作中,研发同学运用比较多的就是cron方式

查了一下APScheduler框架内支持多种定时任务方式

首先先安装apscheduler模块

$ pip install apscheduler

代码如下:(在方法内注释了各种时间参数的定义与范围)

from apscheduler.schedulers.blocking import BlockingScheduler


class Timing:
    def __init__(self, start_date, end_date, hour=None):
        self.start_date = start_date
        self.end_date = end_date
        self.hour = hour

    def cron(self, job, *value_list):
        """cron格式 在特定时间周期性地触发"""
        # year (int 或 str) – 年,4位数字
        # month (int 或 str) – 月 (范围1-12)
        # day (int 或 str) – 日 (范围1-31)
        # week (int 或 str) – 周 (范围1-53)
        # day_of_week (int 或 str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
        # hour (int 或 str) – 时 (范围0-23)
        # minute (int 或 str) – 分 (范围0-59)
        # second (int 或 str) – 秒 (范围0-59)
        # start_date (datetime 或 str) – 最早开始日期(包含)
        # end_date (datetime 或 str) – 分 最晚结束时间(包含)
        # timezone (datetime.tzinfo 或str) – 指定时区
        scheduler = BlockingScheduler()
        scheduler.add_job(job, 'cron', start_date=self.start_date, end_date=self.end_date, hour=self.hour,
                          args=[*value_list])
        scheduler.start()

    def interval(self, job, *value_list):
        """interval格式 周期触发任务"""
        # weeks (int) - 间隔几周
        # days (int)  - 间隔几天
        # hours (int) - 间隔几小时
        # minutes (int) - 间隔几分钟
        # seconds (int) - 间隔多少秒
        # start_date (datetime 或 str) - 开始日期
        # end_date (datetime 或 str) - 结束日期
        # timezone (datetime.tzinfo 或str) - 时区
        scheduler = BlockingScheduler()
        # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
        scheduler.add_job(job, 'interval', minutes=1, seconds=30, start_date=self.start_date,
                          end_date=self.end_date, args=[*value_list])
        scheduler.start()

    @staticmethod
    def date(job, *value_list):
        """date格式 特定时间点触发"""
        # run_date (datetime 或 str) - 作业的运行日期或时间
        # timezone (datetime.tzinfo 或 str)  - 指定时区
        scheduler = BlockingScheduler()
        # 在 2019-8-30 01:00:01 运行一次 job 方法
        scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=[*value_list])
        scheduler.start()

封装的方法不是很通用,后面会优化一下代码,但最起码现在是能用的,哈哈哈哈哈哈

思考了一下思路,巡检触发任务,然后触发钉钉,所以定时任务应该是在最上层

之前分享的钉钉封装的代码内底部继续完善一下

if __name__ == '__main__':
    file_list = ["test_shiyan.py", "MeetSpringFestival.py"]
    # run_py(file_list)
    case_list = ["test_case_01", "test_case_02"]
    # run_case(test_sample, case_list)
    dingDing_list = [2, case_list, test_sample]
    # run_dingDing(*dingDing_list)
    Timing('2022-02-15 00:00:00', '2022-02-16 00:00:00', '0-23').cron(run_dingDing, *dingDing_list)

把run_dingDing()的函数我们放在已经封装好的Timing().cron(run_dingDing,*dingDing_list)内,那么run_dingDing()内的参数我们通过元组的方式传入

就是我们上面写的这里能看到

def cron(self, job, *value_list):
        """cron格式 在特定时间周期性地触发"""
        scheduler.add_job(job, 'cron', start_date=self.start_date, end_date=self.end_date, hour=self.hour,
                                  args=[*value_list])

时间范围的填写我放在了Timing()初始化内,看着舒服一点

在运行Timing().cron()后就可以触发定时了,但是必须要开着电脑才可以,等后面开始研究平台,存储在服务器内就美吱吱了~

apscheduler报错:Run time of job …… next run at: ……)” was missed by

apscheduler 运行过程中出现类似如下报错:

Run time of job "9668_hack (trigger: interval[1:00:00], next run at: 2018-10-29 22:00:00 CST)" was missed by 0:01:47.387821Run time of job "9668_index (trigger: interval[0:30:00], next run at: 2018-10-29 21:30:00 CST)" was missed by 0:01:47.392574Run time of job "9669_deep (trigger: interval[1:00:00], next run at: 2018-10-29 22:00:00 CST)" was missed by 0:01:47.397622Run time of job "9669_hack (trigger: interval[1:00:00], next run at: 2018-10-29 22:00:00 CST)" was missed by 0:01:47.402938Run time of job "9669_index (trigger: interval[0:30:00], next run at: 2018-10-29 21:30:00 CST)" was missed by 0:01:47.407996

针对该问题百度是基本上指不上了,google到了关键配置,但仍然出现该报错,于是继续找资料,刨根问底这到是什么鬼问题导致的。

python apscheduler cron定时任务触发接口自动化巡检如何实现

misfire_grace_time参数

里面说到了一个参数:misfire_grace_time,但是这个参数到底是干嘛用的,在其他地方找到了解释,其中涉及到几个其他参数,但是结合自己的理解综合总结一下

示例:

15分钟一次的的任务,misfire_grace_time 设置100秒,在0:06分的时候提示:

Run time of job "9392_index (trigger: interval[0:15:00], next run at: 2018-10-27 00:15:00 CST)" was missed by 0:06:03.931026

解释:

于是我修改了配置如下:

 class Config(object):
 
    SCHEDULER_JOBSTORES = {
        'default': RedisJobStore(db=3,host='0.0.0.0', port=6378,password='******'),
    }
 
    SCHEDULER_EXECUTORS = {
        'default': {'type': 'processpool', 'max_workers': 50}  #用进程池提升任务处理效率
    }
 
    SCHEDULER_JOB_DEFAULTS = {
        'coalesce': True,   #积攒的任务只跑一次
        'max_instances': 1000, #支持1000个实例并发
       'misfire_grace_time':600 #600秒的任务超时容错
    }
 
    SCHEDULER_API_ENABLED = True

我本以为这样应该就没什么问题了,配置看似完美,但是现实是残忍的,盯着apscheduler日志看了一会,熟悉的“was missed by”又出现了,这时候就需要怀疑这个配置到底有没有生效了,然后发现果然没有生效,从/scheduler/jobs中可以看到任务:

 {
"id": "9586_site_status",
"name": "9586_site_status",
"func": "monitor_scheduler:monitor_site_status",
"args": [
9586,
"http://sl.jxcn.cn/",
1000,
100,
200,
"",
0,
2
],
"kwargs": {},
"trigger": "interval",
"start_date": "2018-09-14T00:00:00+08:00",
"end_date": "2018-12-31T00:00:00+08:00",
"minutes": 15,
"misfire_grace_time": 10,
"max_instances": 3000,
"next_run_time": "2018-10-24T18:00:00+08:00"
}

可以看到任务中默认就有misfire_grace_time配置,没有改为600,折腾一会发现修改配置,重启与修改任务都不会生效,只能修改配置后删除任务重新添加(才能把这个默认配置用上),或者修改任务的时候把这个值改掉

 scheduler.modify_job(func=func, id=id, args=args, trigger=trigger, minutes=minutes,start_date=start_date,end_date=end_date,misfire_grace_time=600)

然后就可以了?图样图森破,missed 依然存在。

其实从后来的报错可以发现这个容错时间是用上的,因为从执行时间加上600秒后才出现的报错。

找到任务超时的根本原因

那么还是回到这个超时根本问题上,即使容错时间足够长,没有这个报错了,但是一个任务执行时间过长仍然是个根本问题,所以终极思路还在于如何优化executor的执行时间上。

当然这里根据不同的任务处理方式是不一样的,在于各自的代码了,比如更改链接方式、代码是否有冗余请求,是否可以改为异步执行,等等。

而我自己的任务解决方式为:由接口请求改为python模块直接传参,redis链接改为内网,极大提升执行效率,所以也就控制了执行超时问题。

关于“python apscheduler cron定时任务触发接口自动化巡检如何实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注亿速云行业资讯频道,小编每天都会为大家更新不同的知识点。

推荐阅读:
  1. 怎么用Python实现QQ消息自动回复
  2. Python+BI怎么爬取车厘子数据

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python apscheduler cron

上一篇:如何用Python展示全国高校的分布情况

下一篇:Python中的Array模块如何使用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》