Python运维之定时任务模块APScheduler
前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除

目录
定时任务模块APScheduler
一、安装及基本概念
1.1、APScheduler的安装
1.2、涉及概念
1.3、APScheduler的工作流程编辑
二、配置调度器
三、启动调度器
四、调度事件监听
定时任务模块APScheduler
APScheduler提供了基于日期、固定时间间隔以及crontab类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业。如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。
一、安装及基本概念
1.1、APScheduler的安装
pip install apscheduler
1.2、涉及概念
- 触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,有按日期、按时间间隔、按cronjob描述式三种触发方式。每个作业都有自己的触发器,除了初始配置之外,触发器是完全无状态的。
- 作业存储器(job stores):指定了作业被存放的位置,默认的作业存储器是内存,也可以将作业保存在各种数据库中。当作业被存放在数据库中时,它会被序列化;当重新被加载时,会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储
- 执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
- 调度器(schedulers):任务调度器,控制器角色,通过它配置作业存储器、执行器和触发器、添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员不需要直接处理作业存储器、执行器或触发器。配置作业存储器和执行器是通过调度器来完成的
1.3、APScheduler的工作流程
一个简单的间隔任务实例:
import osfrom datetime import datetimefrom apscheduler.schedulers.blocking import BlockingScheduler# 打印当前的时间def tick():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()# 添加一个作业rick,触发器为interval,每隔3秒执行一次scheduler.add_job(tick, 'interval', seconds=3)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass
另外的触发器为date,cron。date按特定时间点触发,cron则按固定的时间间隔触发。
上述代码稍作修改可变为cron类的定时任务:
import osfrom datetime import datetimefrom apscheduler.schedulers.blocking import BlockingSchedulerdef tick():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(tick, 'cron', hour=19,minute=23)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass
定时cron任务也非常简单,直接给触发器trigger传入'cron'即可。hour=19,minute23,表示每天的19时23分执行任务
hour=19,minute=23hour='19',minute='23'minute='*/3' # 表示每3分钟执行一次hour='19-21',minute='23' # 表示19:23、20:23、21:23各执行一次任务
二、配置调度器
调度器的主循环其实就是反复检查是否有到期需要执行的任务,具体分两步进行
- 询问自己的每一个作业存储器,有没有到期需要执行的任务。如果有则计算这些作业中每个作业需要 运行的时间点;如果时间点有多个,就做coalesce检查。
-
提交给执行器按时间点运行
各调度器的适用场景:
- BlockingSchduler:适用于调度程序,是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回
- BackgroundScheduler:适用于调度程序,在应用程序的后台运行,调用start后主线程不会阻塞。
- AsyncIOScheduler:适用于使用了asyncio模块的应用程序
- GeventScheduler:适用于使用了gevent模块的应用程序
- TwistedScheduler:适用于构建Twisted的应用程序
- QtSchuduler:适用于构建Qt的应用程序。
作业存储器的选择:一是内存( 默认),而是数据库。
执行器的选择:默认的ThreadPoolExecutor足够OK,如果作业负载涉及CPU密集型操作,那么考虑使用ProcessPoolExecutor,甚至同时使用,将其作为二级执行器。
APScheduler可以使用字典,关键字参数传递配置调度器。首先实例化调度程序添加作业,然后配置调度器,获得最大的灵活性。
如果调度程序在应用程序的后台运行,则选择BackgroundScheduler,并使用默认的jobstore和executor
from apscheduler.schedulers.blocking import BlockingSchedulerscheduler = BlockingScheduler()
如果想配置更多的信息,就可设置两个执行器、两个作业存储器、调整新作业的默认值,并设置不同的时区。配置详情:
- 配置名为mongo的MongoDBjobStore作业存储器
- 配置名为default的SQLAlchemyJobStore(使用SQLite)
- 配置名为default的ThreadPoolExecutor,最大进程数为5
- UTC作为调度器的时区
- coalesce默认情况下关闭
- 作业的默认最大运行实例限制为3
方法一:
from pytz import utcfrom apscheduler.schedulers.background import BlockingSchedulerfrom apscheduler.jobstores.mongodb import MongoDBJobStorefrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutorjobstores = {'mongo':MongoDBJobStore(),'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = {'default':ThreadPoolExecutor(20),'processpool':ProcessPoolExecutor(5)}job_defaults = {'coalesce':False,'max_instances':3}scheduler = BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults,timezone=utc)
方法二:
from apscheduler.schedulers.background import BlockingSchedulerscheduler = BlockingScheduler({'apscheduler.jobstores.mongo':{'type':'mongodb'},'apscheduler.jobstores.default':{'type':'sqlalchemy','url':'sqlite:///jobs.sqlite'},'apscheduler.executors.default':{'class':'apscheduler.executors.pool:ThreadPoolExecutor','max_workers':'5'},'apscheduler.job_defaults.coalesce':'fasle','apscheduler.job_defaults.max_instances':'3','apscheduler.timezone':'UTC',})
方法三:
from pytz import utcfrom apscheduler.schedulers.background import BlockingSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutorjobstores = {'mongo':{'type':'mongodb'},'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = {'default':{'type':'threadpool','max_workers':20},'processpool':ProcessPoolExecutor(max_workers=5)}job_defaults = {'coalesce':False,'max_instances':3}scheduler = BlockingScheduler()scheduler.configure(jobstores=jobstores,executors=executors,job_defaults=job_defaults,timezone=utc)
三、启动调度器
启动调度器前需要先添加作业,有两种方法可以向调度器添加作业:一是通过接口add_job();二是通过使用函数装饰器,其中add_job()返回一个apscheduler.job.Job类的实例,用于后续修改或删除作业。
可以随时在调度器上调度作业。如果在添加作业时,调度器还没有启动,那么任务不会运行,并且它的第一次运行时间在调度器启动时计算。
调用调度器的start()方法启动调度器,下面用不同的作业存储器来举例:
from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimefrom apscheduler.jobstores.memory import MemoryJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutorfdef my_job(id='my_job'):print(id,'-->',datetime.datetime.now())jobstores = {'default':MemoryJobStore()}executors = {'default':ThreadPoolExecutor(20),'processpool':ProcessPoolExecutor(10)}job_defaults = {'coalesce':False,'max_instance':3}scheduler =BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults)scheduler.add_job(my_job,args=['job_interval',],id='job_interval',trigger='interval',seconds=5,replace_existing=True)scheduler.add_job(my_job,args=['job_cron',],id='job_cron',trigger='cron',month='4-8,5-6',hour='7-11',second='*/10',end_date='2024-06-06')scheduler.add_job(my_job,args=['job_once_now',],id='job_once_now')scheduler.add_job(my_job,args=['job_date_once',],id='job_date_once',trigger='date',run_date='2024-01-01 00:00:00')try:scheduler.start()except SystemExit:print('exit')exit()
方法二:使用数据库作为作业存储器(修改第5行和11行)
from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimefrom apscheduler.jobstores.memory import MemoryJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutorfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStoredef my_job(id='my_job'):print(id,'-->',datetime.datetime.now())jobstores = {'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = {'default':ThreadPoolExecutor(20),'processpool':ProcessPoolExecutor(10)}job_defaults = {'coalesce':False,'max_instance':3}scheduler =BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults)scheduler.add_job(my_job,args=['job_interval',],id='job_interval',trigger='interval',seconds=5,replace_existing=True)scheduler.add_job(my_job,args=['job_cron',],id='job_cron',trigger='cron',month='4-8,5-6',hour='7-11',second='*/10',end_date='2024-06-06')scheduler.add_job(my_job,args=['job_once_now',],id='job_once_now')scheduler.add_job(my_job,args=['job_date_once',],id='job_date_once',trigger='date',run_date='2024-01-01 00:00:00')try:scheduler.start()except SystemExit:print('exit')exit()
运行过之后,如果不注释添加作业的代码,则作业会重新添加到数据库中,这样就有了两个作业,为了避免这样的情况:设置(replace_existing=True)
scheduler.add_job(my_job,args=['job_interval',],id='job_interval',trigger='interval',seconds=5,replace_existing=True)
如果想运行错过运行的作业,则使用misfire_grace_time:
scheduler.add_job(my_job,args=['job_cron',],id='job_cron',trigger='cron',month='4-8,5-6',hour='7-11',second='*/10',coalesce=True,misfire_grace_time=30,replace_existing=True,end_date='2024-06-06')
其他操作如下:
scheduler.remove_job(job_id,jobstore=None) # 删除作业scheduler.remove_all_jobs(jobstore=None) # 删除所有作业scheduler.pause_job(job_id,jobstore=None) # 暂停作业scheduler.resume_job(job_id,jobstore=None) # 恢复作业scheduler.modify_job(job_id,jobstore=None,**changes) # 修改单个作业属性配置scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args) # 修改单个作业的触发器并更新下次运行时间scheduler.print_jobs(jobstore=None,out=sys.stdout) # 输出作业信息
四、调度事件监听
日志记录和事件监听:
from apscheduler.schedulers.blocking import BlockingSchedulerfrom apscheduler.events import EVENT_JOB_EXECUTED,EVENT_JOB_ERRORimport datetimeimport logging# 配置日志记录信息 logging.basicConfig(level=logging.INFO,format='%(asctime)s %(filename)s[line:%(lineno)d %(levelname)s %(message)s',datefmt='%Y-%m-%d %H:%M:%S',filename='log1.txt',filemode='a')def aps_test(x):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),x)def date_test(x):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),x)print(1/0)def my_listener(event):if event.exception:print('任务出错了!!!!')else:print('任务照常运行...')scheduler = BlockingScheduler()scheduler.add_job(func=date_test,args=('一次性任务,会出错',),next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15),id='date_task')scheduler.add_job(func=aps_test,args=('循环任务',),trigger='interval',seconds=3,id='interval_task')scheduler.add_listener(my_listener,EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)scheduler._logger = loggingscheduler.start()
致谢
在此,我要对所有为知识共享做出贡献的个人和机构表示最深切的感谢。同时也感谢每一位花时间阅读这篇文章的读者,如果文章中有任何错误,欢迎留言批评指正。
学习永无止境,让我们共同进步
相关文章:
Python运维之定时任务模块APScheduler
前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除 目录 定时任务模块APScheduler 一、安装及基本概念 1.1、APScheduler的安装 1.2、涉及概念 1.3、APScheduler的工作流程编辑 二、配置调度器 …...
Linux技能
文章目录 Linux2024心得优秀博客 Linux2024 心得 会一些基本的命令,解决生产的问题有时候会用的到 优秀博客 02、Linux相关工具及操作03、Linux实用指令 cat xxx | grep “xx xx” 这个应用在从大量的日志文件中找到报错的信息 04、Linux高级部分05、JavaEE定制…...
算法有哪些分类
算法的分类可以根据不同的标准来进行,以下是一些常见的算法分类: 基本算法分类: 搜索算法:包括线性搜索、二分搜索、哈希搜索、深度优先搜索(DFS)、广度优先搜索(BFS)等。 排序算法…...
面试经典150题——找出字符串中第一个匹配项的下标
面试经典150题 day23 题目来源我的题解方法一 库函数方法二 自定义indexOf函数方法三 KMP算法 题目来源 力扣每日一题;题序:28 我的题解 方法一 库函数 直接使用indexOf函数。 时间复杂度:O(n) 空间复杂度:O(1) public int str…...
.Net MAUI 搭建Android 开发环境
一、 安装最新版本 VS 2022 安装时候选择上 .Net MAUI 跨平台开发 二、安装成功后,创建 .Net MAUI 应用 三、使用 VS 自带的 Android SDK 下载 ,Android镜像、编译工具、加速工具 四、使用Vs 自带的 Android Avd 创建虚拟机 五、使用 Android 手机真机调试...
编译适配纯鸿蒙系统的ijkplayer中的ffmpeg库
目前bilibili官方的ijkplayer播放器,是只适配Android和IOS系统的。而华为接下来即将发布纯harmony系统,是否有基于harmony系统的ijkplayer可以使用呢? 鸿蒙版ijkplayer播放器是哪个,如何使用,这个问题,大家…...
离线维护麒麟操作系统
1 本地源设置 a 首先传输一个镜像ISO文件到离线系统。 b 加载镜像文件作为源文件。 #mkdir /mnt/cdrom #mount -o path/镜像.iso /mnt/cdromc 修改源文件 # cd /etc/yum.repo.d/ # vi base.repo 修改baseurl file:///mnt/cdrom d update &install 然后就可以愉快的…...
leetcode尊享面试——二叉树(python)
250.统计同值子树 使用dfs深度搜索,同值子树,要满足三个条件: 对于当前节点node,他的左子树血脉纯净(为同值子树),右子树血脉纯净(为同值子树),node的值等于…...
macbookpro 安装linux mint 无线wifi无法连接 解决方案
见欢迎页面—驱动管理...
抖音小店如此内卷,现在还值得投入吗?还能赚到钱吗?
大家好,我是电商笨笨熊 抖音小店已经经历4-5年的风霜,所以现在很多还未入驻的玩家都会有一个顾虑; 担心现在进入抖店是都还具备发展空间,还能不能赚到钱; 尤其是当一片市场逐渐加入越来越多商家的时候平台一定会内卷…...
Java基础知识(11)
Java基础知识(11) (包括:IO流高级流,网络爬虫基础,Commons-i0工具包和Hutool工具包) 目录 Java基础知识(11) 一.IO流高级流 1.缓冲流 【1】字节缓冲流 ࿰…...
iOS——SDWebImage源码学习
什么是SDWebImage SDWebImage是一个流行的iOS和macOS平台上的开源库,用于异步加载和缓存网络图片。它提供了一套简单易用的API,使得在应用中加载网络图片变得更加方便和高效。 主要特点和功能: 异步加载:SDWebImage通过异步方式…...
信创基础软件之中间件
信创基础软件之中间件 中间件概述 中间件是一种应用于分布式系统的基础软件,位于应用与操作系统、数据库之间,主要用于解决分布式环境下数据传输、数据访问、应用调度、系统构建和系统集成、流程管理等问题,是分布式环境下支撑应用开发、运…...
在Ubuntu linux操作系统上操作MySQL数据库常用的命令
检查是否安装了MySQL,或检查MySQL的状态: sudo systemctl status mysql或 sudo systemctl status mysql.service如果mysql有安装,上面这条命令会返回mysql的状态active或inactive。 卸载mysql数据库 第一步是停了数据库: sud…...
前端科举八股文-JAVASCRIPT篇
前端科举八股文-JAVASCRIPT篇 Js的变量类型,区别是什么平时有用过symbol吗函数闭包的理解?js的原型链? Function Function.constructor 返回值?promise的出现是为了解决什么问题?前端中的事件流事件委托?js的new操作符做了哪些…...
Docker私有仓库与Harbor部署使用
目录 一、本地私有仓库 1. 下载registry镜像 2. 在daemon.json文件中添加私有镜像仓库地址 编辑 3. 运行registry容器 4. Docker容器的重启策略如下 5. 为镜像打标签 6. 上传到私有仓库 7. 列出私有仓库的所有镜像 8. 列出私有仓库的centos镜像有哪些tag 9. 先删…...
Linux的iptables防火墙基础介绍
iptables 防火墙 应用层软件----管理 内核级netfilter 硬件-------内核----网络—netfilter/kvm----- app iptables iptables—控制netfilter 过滤:<smac/sip/dip/sport/dport/状态> TCP/IP 应用层 传输层 sport dport 状态 <三次握手/四次断开> 网…...
deepspeed+transformers模型微调
一、目录 代码讲解 二、实现。 1、代码讲解,trainer 实现。 transformers通过trainer 集成deepspeed功能,所以中需要进行文件配置,即可实现deepspeed的训练。 微调代码: 参数定义—>数据处理---->模型创建/评估方式----&…...
无人机摄影测量数据处理、三维建模及在土方量计算中的应用
专题一、无人机摄影测量技术应用现状及其发展 1、无人机摄影测量技术概述 2、摄影测量系统的发展 3、无人机摄影测量技术应用分析 专题二、基本原理和关键技术讲解 1、摄影测量基础知识 1)航空摄影 2)航摄像片的方位元素 3)共…...
《ESP8266通信指南》15-MQTT连接、订阅MQTT主题并打印消息(基于Lua|适合新手|非常简单)
往期 《ESP8266通信指南》14-连接WIFI(基于Lua)-CSDN博客 《ESP8266通信指南》13-Lua 简单入门(打印数据)-CSDN博客 《ESP8266通信指南》12-Lua 固件烧录-CSDN博客 《ESP8266通信指南》11-Lua开发环境配置-CSDN博客 《ESP826…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...
【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...
Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...
【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
多模态图像修复系统:基于深度学习的图片修复实现
多模态图像修复系统:基于深度学习的图片修复实现 1. 系统概述 本系统使用多模态大模型(Stable Diffusion Inpainting)实现图像修复功能,结合文本描述和图片输入,对指定区域进行内容修复。系统包含完整的数据处理、模型训练、推理部署流程。 import torch import numpy …...
【HarmonyOS 5】鸿蒙中Stage模型与FA模型详解
一、前言 在HarmonyOS 5的应用开发模型中,featureAbility是旧版FA模型(Feature Ability)的用法,Stage模型已采用全新的应用架构,推荐使用组件化的上下文获取方式,而非依赖featureAbility。 FA大概是API7之…...
