简单粗暴的分布式定时任务解决方案
分布式定时任务
- 1.为什么需要定时任务?
- 2.数据库实现分布式定时任务
- 3.基于redis实现
1.为什么需要定时任务?
因为有时候我们需要定时的执行一些操作,比如业务中产生的一些临时文件,临时文件不能立即删除,因为不清楚用户是否操作完毕,不能立即删除,需要隔一段时间然后定时清楚,还有像是一些电商项目,每月进行数据清算。比如某些业务的排行榜,实时性不是高的也可以使用定时任务去统计,然后在做更新。但是我们现在大多数的应用都是分布式的?相当于你写的一个定时任务会在多个子系统中运行,而且是同时的,我们只需要其中一个任务运行就可以了,如果多次运行不仅会无故消耗系统资源,还会导致任务执行出现意外,那么怎么保证这个任务只执行一次呢?其实解决方案有很多。
分布式任务执行出现的问题,如下图所示:

- 使用数据库唯一约束加锁
- 使用redis的setNX命令
- 使用分布式框架Quartz,TBSchedule,elastic-job,Saturn,xxl-job等
当然技术是为业务服务的,我们怎么选择合适的技术,还得是看业务场景,比如一些任务的执行频率不高,也不是特别要求效率,也不复杂,我们完全用不上为了一个定时任务去引入一些第三方的框架作为定时任务实现,我们来介绍两种方式来实现分布式定时任务。
2.数据库实现分布式定时任务
数据库实现定时任务的核心思路:
- 需要两张表,一张定时任务配置表,还有一张定时任务运行记录表
- 任务配置表有一个唯一约束字段,运行记录表由运行日期+任务名称作为唯一约束,这是实现的核心思路
- 使用注解+aop对定时任务进行代理,统一进行加锁操作,避免多次运行
- 这种适合任务不频繁,一天在某个时间点执行,对性能要求不高的业务场景,实现起来也比较简单
表SQL语句:
-- 任务运行记录表
CREATE TABLE `task_record` (`ID` varchar(20) NOT NULL COMMENT 'ID',`start_time` datetime DEFAULT NULL COMMENT '定时任务开始时间',`ent_time` datetime DEFAULT NULL COMMENT '定时任务结束时间',`is_success` varchar(1) DEFAULT NULL COMMENT '是否执行成功',`error_cause` longtext COMMENT '失败原因',`task_name` varchar(100) NOT NULL COMMENT '任务名称',`run_date` varchar(6) DEFAULT NULL COMMENT '运行日期',PRIMARY KEY (`ID`),UNIQUE KEY `run_date_task_name_idx` (`run_date`,`task_name`) USING BTREE COMMENT '运行日期+任务名称作为唯一约束'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定时任务运行记录表';-- 任务配置表
CREATE TABLE `task_config` (`id` varchar(32) NOT NULL COMMENT '序号',`task_describe` varchar(225) DEFAULT NULL COMMENT '任务描述',`task_name` varchar(100) DEFAULT NULL COMMENT '任务名称',`task_valid` varchar(1) DEFAULT NULL COMMENT '任务有效标志',`create_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`),UNIQUE KEY `task_index` (`task_name`) COMMENT '唯一性约束'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定时任务配置表';
1.定时任务标识注解:
/*** 标注在定时任务上,避免多个微服务的情况下定时任务执行重复* @author compass* @date 2023-03-09* @since 1.0**/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DatabaseLock {//定时任务使用的键,千万不要重复String lockName() default "";//定时任务描述String lockDesc() default "";
}
2.使用aop代理定时任务方法进行拦截
/*** 代理具体的定时任务,仅有一个任务会被成功执行** @author compass* @date 2023-03-09* @since 1.0**/
@Aspect
@Slf4j
@Component
public class DatabaseLockAspect {@Resourceprivate TaskService taskService;private static final String TASK_IS_VALID = "1";@Around("@annotation( com.springboot.example.annotation.DatabaseLock)")public Object cacheLockPoint(ProceedingJoinPoint pjp) {Date startTime = new Date();TaskRecord taskRecord = new TaskRecord();String isRunSuccess = "1";String taskConfigId;String errorCause = "";Boolean currentDayRunRecord ;Method cacheMethod = null;for (Method method : pjp.getTarget().getClass().getMethods()) {if (null != method.getAnnotation(DatabaseLock.class)) {cacheMethod = method;break;}}if (cacheMethod != null) {String lockName = cacheMethod.getAnnotation(DatabaseLock.class).lockName();String lockDesc = cacheMethod.getAnnotation(DatabaseLock.class).lockDesc();// 运行主键,避免多次运行核心关键String runDate = DateUtil.format(new Date(), "yyyyMMdd");String taskRecordId = IdUtil.getSnowflakeNextIdStr();taskRecord.setTaskName(lockName);taskRecord.setId(taskRecordId);taskRecord.setRunDate(runDate);if (StringUtils.isBlank(lockName)) {throw new RuntimeException("定时任务锁名称不能为空");}if (StringUtils.isBlank(lockDesc)) {throw new RuntimeException("定时任务锁描述不能为空");}TaskConfig taskConfig = taskService.hasRun(lockName);// 还未运行过,进行初始化处理if (taskConfig == null) {TaskConfig config = new TaskConfig();taskConfigId = IdUtil.getSnowflakeNextIdStr();config.setId(taskConfigId);config.setTaskDescribe(lockDesc);config.setTaskName(lockName);config.setTaskValid("1");config.setCreateTime(new Date());try {// 添加时出现异常,已经运行过该定时任务taskService.addTaskConfig(config);taskConfig = config;} catch (Exception e) {e.printStackTrace();}// 有效标志位0表示无需执行} else if (!TASK_IS_VALID.equals(taskConfig.getTaskValid())) {String message = "该定时任务已经禁用";log.warn("method:{}未获取锁:{}[运行失败原因:{}]", cacheMethod, lockName, message);throw new RuntimeException(String.format("method:%s未获取锁:%s[运行失败原因:%s]", cacheMethod, lockName, message));}// 添加运行记录,以runKey为唯一标识,插入异常,说明执行过try {currentDayRunRecord = taskService.addCurrentDayRunRecord(taskRecord);} catch (Exception e) {log.warn("method:{}未获取锁:{}[运行失败原因:已经有别的服务进行执行]", cacheMethod, lockName);return null;}// 没有执行过,开始执行if (currentDayRunRecord) {try {log.warn("method:{}获取锁:{},运行成功!", cacheMethod, lockName);return pjp.proceed();} catch (Throwable e) {e.printStackTrace();isRunSuccess = "0";errorCause = ExceptionUtils.getExceptionDetail(e);} finally {Date endTime = new Date();taskRecord.setStartTime(startTime);taskRecord.setId(IdUtil.getSnowflakeNextIdStr());taskRecord.setEntTime(endTime);taskRecord.setIsSuccess(isRunSuccess);taskRecord.setErrorCause(errorCause);// 修改运行记录taskService.updateTaskRunRecord(taskRecord);}}}return null;}
}
3.TaskService实现操作数据库接口与实现
public interface TaskService {/*** 判断定时任务是否运行过** @param taskName* @return com.springboot.example.bean.task.TaskConfig* @author compass* @date 2023/3/10 21:22* @since 1.0.0**/TaskConfig hasRun(String taskName);/*** 将首次运行的任务添加到任务配置表** @param taskConfig* @return java.lang.Boolean* @author compass* @date 2023/3/10 21:23* @since 1.0.0**/Boolean addTaskConfig(TaskConfig taskConfig);/*** 更新定时任务运行记录** @param taskRecord* @return java.lang.Boolean* @author compass* @date 2023/3/10 21:23* @since 1.0.0**/Boolean updateTaskRunRecord(TaskRecord taskRecord);/*** 新增一条运行记录,只有新增成功的服务才可以得到运行劝* @param taskRecord* @return java.lang.Boolean* @author compass* @date 2023/3/10 21:23* @since 1.0.0**/Boolean addCurrentDayRunRecord(TaskRecord taskRecord);
}@Slf4j
@Service
public class TaskServiceImpl implements TaskService {@Resourceprivate TaskConfigMapper taskConfigMapper;@Resourceprivate TaskRecordMapper taskRecordMapper;@Overridepublic TaskConfig hasRun(String taskName) {QueryWrapper<TaskConfig> wrapper = new QueryWrapper<>();wrapper.eq("task_name",taskName);return taskConfigMapper.selectOne(wrapper);}@Overridepublic Boolean addTaskConfig(TaskConfig taskConfig) {return taskConfigMapper.insert(taskConfig)>0;}@Overridepublic Boolean updateTaskRunRecord(TaskRecord taskRecord ) {QueryWrapper<TaskRecord> wrapper = new QueryWrapper<>();wrapper.eq("task_name",taskRecord.getTaskName());wrapper.eq("run_date",taskRecord.getRunDate());return taskRecordMapper.update(taskRecord,wrapper)>0;}@Overridepublic Boolean addCurrentDayRunRecord(TaskRecord taskRecord) {return taskRecordMapper.insert(taskRecord)>0;}
}
4.数据库对应的实体类
// 配置类
@Data
@TableName("task_config")
public class TaskConfig {/*** 序号*/@TableId(value = "id", type = IdType.ASSIGN_ID)private String id;/*** 任务描述*/private String taskDescribe;/*** 任务名称*/private String taskName;/*** 任务有效标志*/private String taskValid;/*** 创建时间*/private Date createTime;}
// 运行记录类
@Data
@TableName("task_record")
public class TaskRecord {/*** ID*/@TableId(value = "id", type = IdType.ASSIGN_ID)private String id;/*** 定时任务开始时间*/private Date startTime;/*** 定时任务结束时间*/private Date entTime;/*** 是否执行成功*/private String isSuccess;/*** 失败原因*/private String errorCause;/*** 运行日期[yyyyMMdd]*/private String runDate;/*** 任务名称(任务名称+运行日期为唯一索引)*/private String taskName;}
3.基于redis实现
- 主要是基于setNX来实现的,setNX表示这个key存在,则设置value失败,只有这个key不存在的时候,才会set成功
- 我们可以给这个key指定过期时间,让他一定会释放锁,不然容易出现死锁的情况
1.操作redis锁的工具类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** redis锁工具类,如果redis是集群的话需要考虑数据延时性,这里默认为redis单个节点** @author compass* @date 2023-03-10* @since 1.0**/
@SuppressWarnings(value = {"all"})
@Component
public class RedisLockUtils {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 加锁** @param key* @param value* @param time* @param timeUnit* @return boolean* @author compass* @date 2023/3/10 22:13* @since 1.0.0**/public boolean lock(String key, String value, long time, TimeUnit timeUnit) {return (Boolean)redisTemplate.execute((RedisCallback) connection -> {Boolean setNX = connection.setNX(key.getBytes(), value.getBytes());if (setNX){return connection.expire(key.getBytes(),time);}return false;});}/*** 立即释放锁,如果任务执行的非常快,可能会导致其他应用获得到锁,二次执行** @param key* @return boolean* @author compass* @date 2023/3/10 22:13* @since 1.0.0**/public boolean fastReleaseLock(String key) {return redisTemplate.delete(key);}/*** 缓慢释放锁(隔离小段时间再释放锁,可以完全避免掉别的应用获取到锁)** @param key* @param time* @param timeUnit* @return boolean* @author compass* @date 2023/3/10 22:13* @since 1.0.0**/public boolean turtleReleaseLock(String key, long time, TimeUnit timeUnit) {return redisTemplate.expire(key, time, timeUnit);}}
2.aop切入,统一管理定时任务
import com.springboot.example.annotation.CacheLock;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;/*** 代理具体的定时任务,仅有一个任务会被成功执行** @author compass* @date 2023-03-09* @since 1.0**/
@Aspect
@Slf4j
@Component
public class CacheLockAspect {@Resourceprivate RedisLockUtils redisLockUtils;/*** 加锁值,可以是任意值**/private static final String LOCK_VALUE = "1";@Around("@annotation(com.springboot.example.annotation.CacheLock)")public Object cacheLockPoint(ProceedingJoinPoint pjp) {Method cacheMethod = null;for (Method method : pjp.getTarget().getClass().getMethods()) {if (null != method.getAnnotation(CacheLock.class)) {cacheMethod = method;break;}}if (cacheMethod!=null){CacheLock cacheLock = cacheMethod.getAnnotation(CacheLock.class);String lockName = cacheLock.lockName();long time = cacheLock.timeOut();boolean successLock = redisLockUtils.lock(lockName,LOCK_VALUE, time, TimeUnit.SECONDS);if (successLock){log.info("method:{}获取锁成功:{}", cacheMethod, lockName);try {// 获得锁调用被代理的定时任务return pjp.proceed();} catch (Throwable throwable) {throwable.printStackTrace();}finally {// 延时5秒再去释放锁redisLockUtils.turtleReleaseLock(lockName,5,TimeUnit.SECONDS);}}else {log.warn("method:{}获取锁失败:{}", cacheMethod, lockName);}}return null;}
}
3.自定义注解
/*** 标注在定时任务上,避免多个微服务的情况下定时任务执行重复* @author compass* @date 2023-03-09* @since 1.0**/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {//定时任务使用的键,千万不要重复String lockName() ;// 占用锁的时间,单位是秒,默认10分钟long timeOut() default 60*10;
}
今天就先介绍这两种方式,后续我再为大家续上使用别的框架进行实现,不过在实现的过程中使用 redisTemplate.opsForValue().setIfAbsent() 出现了一点小肯,他返回的是null值,然后出现空指针,然后我不得不采用execute的方式去执行。
相关文章:
简单粗暴的分布式定时任务解决方案
分布式定时任务1.为什么需要定时任务?2.数据库实现分布式定时任务3.基于redis实现1.为什么需要定时任务? 因为有时候我们需要定时的执行一些操作,比如业务中产生的一些临时文件,临时文件不能立即删除,因为不清楚用户是…...
蓝桥杯第五天刷题
第一题:数的分解题目描述本题为填空题,只需要算出结果后,在代码中使用输出语句将所填结果输出即可。把 2019 分解成 3 个各不相同的正整数之和,并且要求每个正整数都不包含数字 2和 4,一共有多少种不同的分解方法&…...
Java数组的定义和使用(万字详解)
目录 编辑 一. 数组的基本概念 1、什么是数组 2、数组的创建及初始化 1、数组的创建 2、数组的初始化 3、数组的使用 (1)数组中元素访问 (3)遍历数组 二、数组是引用类型 1、初始JVM的内存分布 2、基本类型变量与引用类…...
【SpringBoot】自定义Starter
🚩本文已收录至专栏:Spring家族学习之旅 👍希望您能有所收获 一.概述 在使用SpringBoot进行开发的时候,我们发现使用很多技术都是直接导入对应的starter,然后就实现了springboot整合对应技术,再加上一些简…...
【C陷阱与缺陷】----语法陷阱
💯💯💯 要理解一个C程序,必须理解这些程序是如何组成声明,表达式,语句的。虽然现在对C的语法定义很完善,几乎无懈可击,大门有时这些定义与人们的直觉相悖,或容易引起混淆…...
虹科分享| 关于TrueNAS十问十答
上一篇文章我们向您介绍了虹科新品HK-TrueNAS企业存储,很多小伙伴会疑问到底什么是NAS存储,之前常用的磁盘、磁带属于什么存储架构,NAS存储好在哪里,什么时候使用NAS?今天我们整理了关于TrueNAS的十问十答,…...
Https 笔记
HTTP TLS TLS 的前身是 SSL 非对称加密的核心: 两个密钥(公私) https 需要第三方CA(证书授权中心)申请SSL证书以确定其真实性 证书种包含了特定的公钥和私钥 密钥交换 自己将私钥上锁后发给对方对方也上锁 在还回来…...
【Python+requests+unittest+excel】实现接口自动化测试框架
一、框架结构: 工程目录 二、Case文件设计 三、基础包 base 3.1 封装get/post请求(runmethon.py) 1 import requests2 import json3 class RunMethod:4 def post_main(self,url,data,headerNone):5 res None6 if heade…...
MySQL终端的使用及其数据类型的使用
什么是数据库?数据库(Database)是按照数据结构来组织、存储和管理数据的仓库。每个数据库都有一个或多个不同的 API 用于创建,访问,管理,搜索和复制所保存的数据。我们也可以将数据存储在文件中,…...
长视频终局:一场考验资金储备的消耗战
赢者通吃,似乎已成为各行各业的常识,但事实真的是这样吗?20世纪70年代,石油价格高涨,在墨西哥湾油田拍卖中高价拍得油田的企业,要么亏损,要么收入低于预期,但仍然有无数企业在高价竞…...
javaEE初阶 — CSS 常用的属性
文章目录CSS 常用的属性1 字体属性1.1 设置字体家族 font-family1.2 设置字体大小 font-size1.3 设置字体粗细 font-weight1.4 文字倾斜 font-style2 文本属性2.1 文本颜色2.2 文本对齐2.3 文本装饰2.4 文本缩进2.5 行高3 背景属性3.1 背景颜色3.2 背景图片3.3 背景位置3.4 背景…...
【面试题】如何取消 script 标签发出的请求
大厂面试题分享 面试题库前后端面试题库 (面试必备) 推荐:★★★★★地址:前端面试题库问题之前在业务上有这样一个场景,通过 script 标签动态引入了一个外部资源,具体方式是这样的const script document.…...
蓝桥杯嵌入式(G4系列):RTC时钟
前言: 关于RTC时钟的HAL库配置我也是第一次,之前都是用库函数的写法,这里写下这篇博客来记录一下自己的学习过程。 STM32Cubemx配置: 首先点击左侧的Timers的RTC,勾选以下选项 进入时钟树配置 进入时间设置࿰…...
Linux——进程间通信1
目录 进程间通信目的 进程间通信标准 管道 匿名管道 管道实现进程间通信 管道的特点 进程池 ProcessPool.cc Task.hpp 习题 进程间通信目的 数据传输:一个进程需要将它的数据发送给另一个进程 资源共享:多个进程之间共享同样的资源。 通知事件…...
循环语句——“Python”
各位CSDN的uu们你们好呀,今天小雅兰的内容是Python中的循环语句呀,分为while循环和for循环,下面,让我们进入循环语句的世界吧 循环语句 while循环 for循环 continue和break 循环语句小结 人生重开模拟器 设置初始属性 设置性别…...
Python synonyms查找中文任意词汇的同义词近义词
Python synonyms查找中文任意词汇的同义词近义词 作者:虚坏叔叔 博客:https://xuhss.com 早餐店不会开到晚上,想吃的人早就来了!😄 一、安装 对于非专业的开发人员来说可以简单的使用Python一行代码来找到同义词。这…...
三分钟了解http和https
对应测试人员都会听过http请求和响应.在这里给大家介绍http相关的知识 一.http和https基本概念 HTTP:是互联网上应用最为广泛的一种网络协议,是一个客户端和服务器端请求和应答的标准(TCP),用于从WWW服务器传输超文本…...
docker应用:搭建私有云盘
简介:NextCloud是一个开源的云存储解决方案,可以在自己的服务器上搭建个人云存储系统。它提供了与市面上主流云存储服务(如Dropbox、Google Drive)相似的功能,包括文件存储、共享、同步、协作等。NextCloud的主要优势在…...
【C++进阶】面向对象
程序 编写程序是为了让计算机解决现实生活中的实际问题。pascal之父、结构化程序设计先驱Niklaus Wirth提出程序 算法 数据结构。程序是完成一定功能的一些列有序指令的集合。指令 操作码 指令。将指令按一定的顺序进行整合,就形成了程序。 机器语言与汇编语言…...
从ChatGPT与New Bing看程序员为什么要学习算法?
文章目录为什么要学习数据结构和算法?ChatGPT与NEW Bing 的回答想要通关大厂面试,就不能让数据结构和算法拖了后腿业务开发工程师,你真的愿意做一辈子CRUD boy吗?对编程还有追求?不想被行业淘汰?那就不要只…...
【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
GO协程(Goroutine)问题总结
在使用Go语言来编写代码时,遇到的一些问题总结一下 [参考文档]:https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现: 今天在看到这个教程的时候,在自己的电…...
第7篇:中间件全链路监控与 SQL 性能分析实践
7.1 章节导读 在构建数据库中间件的过程中,可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中,必须做到: 🔍 追踪每一条 SQL 的生命周期(从入口到数据库执行)&#…...
永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...
【Post-process】【VBA】ETABS VBA FrameObj.GetNameList and write to EXCEL
ETABS API实战:导出框架元素数据到Excel 在结构工程师的日常工作中,经常需要从ETABS模型中提取框架元素信息进行后续分析。手动复制粘贴不仅耗时,还容易出错。今天我们来用简单的VBA代码实现自动化导出。 🎯 我们要实现什么? 一键点击,就能将ETABS中所有框架元素的基…...
性能优化中,多面体模型基本原理
1)多面体编译技术是一种基于多面体模型的程序分析和优化技术,它将程序 中的语句实例、访问关系、依赖关系和调度等信息映射到多维空间中的几何对 象,通过对这些几何对象进行几何操作和线性代数计算来进行程序的分析和优 化。 其中࿰…...
解决MybatisPlus使用Druid1.2.11连接池查询PG数据库报Merge sql error的一种办法
目录 前言 一、问题重现 1、环境说明 2、重现步骤 3、错误信息 二、关于LATERAL 1、Lateral作用场景 2、在四至场景中使用 三、问题解决之道 1、源码追踪 2、关闭sql合并 3、改写处理SQL 四、总结 前言 在博客:【写在创作纪念日】基于SpringBoot和PostG…...
开源项目实战学习之YOLO11:12.6 ultralytics-models-tiny_encoder.py
👉 欢迎关注,了解更多精彩内容 👉 欢迎关注,了解更多精彩内容 👉 欢迎关注,了解更多精彩内容 ultralytics-models-sam 1.sam-modules-tiny_encoder.py2.数据处理流程3.代码架构图(类层次与依赖)blocks.py: 定义模型中的各种模块结构 ,如卷积块、残差块等基础构建…...
