简单粗暴的分布式定时任务解决方案
分布式定时任务
- 1.为什么需要定时任务?
- 2.数据库实现分布式定时任务
- 3.基于redis实现
1.为什么需要定时任务?
因为有时候我们需要定时的执行一些操作,比如业务中产生的一些临时文件,临时文件不能立即删除,因为不清楚用户是否操作完毕,不能立即删除,需要隔一段时间然后定时清楚,还有像是一些电商项目,每月进行数据清算。比如某些业务的排行榜,实时性不是高的也可以使用定时任务去统计,然后在做更新。但是我们现在大多数的应用都是分布式的?相当于你写的一个定时任务会在多个子系统中运行,而且是同时的,我们只需要其中一个任务运行就可以了,如果多次运行不仅会无故消耗系统资源,还会导致任务执行出现意外,那么怎么保证这个任务只执行一次呢?其实解决方案有很多。
分布式任务执行出现的问题,如下图所示:
- 使用数据库唯一约束加锁
- 使用redis的setNX命令
- 使用分布式框架Quartz,TBSchedule,elastic-job,Saturn,xxl-job等
当然技术是为业务服务的,我们怎么选择合适的技术,还得是看业务场景,比如一些任务的执行频率不高,也不是特别要求效率,也不复杂,我们完全用不上为了一个定时任务去引入一些第三方的框架作为定时任务实现,我们来介绍两种方式来实现分布式定时任务。
2.数据库实现分布式定时任务
数据库实现定时任务的核心思路:
- 需要两张表,一张定时任务配置表,还有一张定时任务运行记录表
- 任务配置表有一个唯一约束字段,运行记录表由运行日期+任务名称作为唯一约束,这是实现的核心思路
- 使用注解+aop对定时任务进行代理,统一进行加锁操作,避免多次运行
- 这种适合任务不频繁,一天在某个时间点执行,对性能要求不高的业务场景,实现起来也比较简单
表SQL语句:
-- 任务运行记录表
CREATE TABLE `task_record` (`ID` varchar(20) NOT NULL COMMENT 'ID',`start_time` datetime DEFAULT NULL COMMENT '定时任务开始时间',`ent_time` datetime DEFAULT NULL COMMENT '定时任务结束时间',`is_success` varchar(1) DEFAULT NULL COMMENT '是否执行成功',`error_cause` longtext COMMENT '失败原因',`task_name` varchar(100) NOT NULL COMMENT '任务名称',`run_date` varchar(6) DEFAULT NULL COMMENT '运行日期',PRIMARY KEY (`ID`),UNIQUE KEY `run_date_task_name_idx` (`run_date`,`task_name`) USING BTREE COMMENT '运行日期+任务名称作为唯一约束'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定时任务运行记录表';-- 任务配置表
CREATE TABLE `task_config` (`id` varchar(32) NOT NULL COMMENT '序号',`task_describe` varchar(225) DEFAULT NULL COMMENT '任务描述',`task_name` varchar(100) DEFAULT NULL COMMENT '任务名称',`task_valid` varchar(1) DEFAULT NULL COMMENT '任务有效标志',`create_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`),UNIQUE KEY `task_index` (`task_name`) COMMENT '唯一性约束'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定时任务配置表';
1.定时任务标识注解:
/*** 标注在定时任务上,避免多个微服务的情况下定时任务执行重复* @author compass* @date 2023-03-09* @since 1.0**/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DatabaseLock {//定时任务使用的键,千万不要重复String lockName() default "";//定时任务描述String lockDesc() default "";
}
2.使用aop代理定时任务方法进行拦截
/*** 代理具体的定时任务,仅有一个任务会被成功执行** @author compass* @date 2023-03-09* @since 1.0**/
@Aspect
@Slf4j
@Component
public class DatabaseLockAspect {@Resourceprivate TaskService taskService;private static final String TASK_IS_VALID = "1";@Around("@annotation( com.springboot.example.annotation.DatabaseLock)")public Object cacheLockPoint(ProceedingJoinPoint pjp) {Date startTime = new Date();TaskRecord taskRecord = new TaskRecord();String isRunSuccess = "1";String taskConfigId;String errorCause = "";Boolean currentDayRunRecord ;Method cacheMethod = null;for (Method method : pjp.getTarget().getClass().getMethods()) {if (null != method.getAnnotation(DatabaseLock.class)) {cacheMethod = method;break;}}if (cacheMethod != null) {String lockName = cacheMethod.getAnnotation(DatabaseLock.class).lockName();String lockDesc = cacheMethod.getAnnotation(DatabaseLock.class).lockDesc();// 运行主键,避免多次运行核心关键String runDate = DateUtil.format(new Date(), "yyyyMMdd");String taskRecordId = IdUtil.getSnowflakeNextIdStr();taskRecord.setTaskName(lockName);taskRecord.setId(taskRecordId);taskRecord.setRunDate(runDate);if (StringUtils.isBlank(lockName)) {throw new RuntimeException("定时任务锁名称不能为空");}if (StringUtils.isBlank(lockDesc)) {throw new RuntimeException("定时任务锁描述不能为空");}TaskConfig taskConfig = taskService.hasRun(lockName);// 还未运行过,进行初始化处理if (taskConfig == null) {TaskConfig config = new TaskConfig();taskConfigId = IdUtil.getSnowflakeNextIdStr();config.setId(taskConfigId);config.setTaskDescribe(lockDesc);config.setTaskName(lockName);config.setTaskValid("1");config.setCreateTime(new Date());try {// 添加时出现异常,已经运行过该定时任务taskService.addTaskConfig(config);taskConfig = config;} catch (Exception e) {e.printStackTrace();}// 有效标志位0表示无需执行} else if (!TASK_IS_VALID.equals(taskConfig.getTaskValid())) {String message = "该定时任务已经禁用";log.warn("method:{}未获取锁:{}[运行失败原因:{}]", cacheMethod, lockName, message);throw new RuntimeException(String.format("method:%s未获取锁:%s[运行失败原因:%s]", cacheMethod, lockName, message));}// 添加运行记录,以runKey为唯一标识,插入异常,说明执行过try {currentDayRunRecord = taskService.addCurrentDayRunRecord(taskRecord);} catch (Exception e) {log.warn("method:{}未获取锁:{}[运行失败原因:已经有别的服务进行执行]", cacheMethod, lockName);return null;}// 没有执行过,开始执行if (currentDayRunRecord) {try {log.warn("method:{}获取锁:{},运行成功!", cacheMethod, lockName);return pjp.proceed();} catch (Throwable e) {e.printStackTrace();isRunSuccess = "0";errorCause = ExceptionUtils.getExceptionDetail(e);} finally {Date endTime = new Date();taskRecord.setStartTime(startTime);taskRecord.setId(IdUtil.getSnowflakeNextIdStr());taskRecord.setEntTime(endTime);taskRecord.setIsSuccess(isRunSuccess);taskRecord.setErrorCause(errorCause);// 修改运行记录taskService.updateTaskRunRecord(taskRecord);}}}return null;}
}
3.TaskService实现操作数据库接口与实现
public interface TaskService {/*** 判断定时任务是否运行过** @param taskName* @return com.springboot.example.bean.task.TaskConfig* @author compass* @date 2023/3/10 21:22* @since 1.0.0**/TaskConfig hasRun(String taskName);/*** 将首次运行的任务添加到任务配置表** @param taskConfig* @return java.lang.Boolean* @author compass* @date 2023/3/10 21:23* @since 1.0.0**/Boolean addTaskConfig(TaskConfig taskConfig);/*** 更新定时任务运行记录** @param taskRecord* @return java.lang.Boolean* @author compass* @date 2023/3/10 21:23* @since 1.0.0**/Boolean updateTaskRunRecord(TaskRecord taskRecord);/*** 新增一条运行记录,只有新增成功的服务才可以得到运行劝* @param taskRecord* @return java.lang.Boolean* @author compass* @date 2023/3/10 21:23* @since 1.0.0**/Boolean addCurrentDayRunRecord(TaskRecord taskRecord);
}@Slf4j
@Service
public class TaskServiceImpl implements TaskService {@Resourceprivate TaskConfigMapper taskConfigMapper;@Resourceprivate TaskRecordMapper taskRecordMapper;@Overridepublic TaskConfig hasRun(String taskName) {QueryWrapper<TaskConfig> wrapper = new QueryWrapper<>();wrapper.eq("task_name",taskName);return taskConfigMapper.selectOne(wrapper);}@Overridepublic Boolean addTaskConfig(TaskConfig taskConfig) {return taskConfigMapper.insert(taskConfig)>0;}@Overridepublic Boolean updateTaskRunRecord(TaskRecord taskRecord ) {QueryWrapper<TaskRecord> wrapper = new QueryWrapper<>();wrapper.eq("task_name",taskRecord.getTaskName());wrapper.eq("run_date",taskRecord.getRunDate());return taskRecordMapper.update(taskRecord,wrapper)>0;}@Overridepublic Boolean addCurrentDayRunRecord(TaskRecord taskRecord) {return taskRecordMapper.insert(taskRecord)>0;}
}
4.数据库对应的实体类
// 配置类
@Data
@TableName("task_config")
public class TaskConfig {/*** 序号*/@TableId(value = "id", type = IdType.ASSIGN_ID)private String id;/*** 任务描述*/private String taskDescribe;/*** 任务名称*/private String taskName;/*** 任务有效标志*/private String taskValid;/*** 创建时间*/private Date createTime;}
// 运行记录类
@Data
@TableName("task_record")
public class TaskRecord {/*** ID*/@TableId(value = "id", type = IdType.ASSIGN_ID)private String id;/*** 定时任务开始时间*/private Date startTime;/*** 定时任务结束时间*/private Date entTime;/*** 是否执行成功*/private String isSuccess;/*** 失败原因*/private String errorCause;/*** 运行日期[yyyyMMdd]*/private String runDate;/*** 任务名称(任务名称+运行日期为唯一索引)*/private String taskName;}
3.基于redis实现
- 主要是基于setNX来实现的,setNX表示这个key存在,则设置value失败,只有这个key不存在的时候,才会set成功
- 我们可以给这个key指定过期时间,让他一定会释放锁,不然容易出现死锁的情况
1.操作redis锁的工具类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** redis锁工具类,如果redis是集群的话需要考虑数据延时性,这里默认为redis单个节点** @author compass* @date 2023-03-10* @since 1.0**/
@SuppressWarnings(value = {"all"})
@Component
public class RedisLockUtils {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 加锁** @param key* @param value* @param time* @param timeUnit* @return boolean* @author compass* @date 2023/3/10 22:13* @since 1.0.0**/public boolean lock(String key, String value, long time, TimeUnit timeUnit) {return (Boolean)redisTemplate.execute((RedisCallback) connection -> {Boolean setNX = connection.setNX(key.getBytes(), value.getBytes());if (setNX){return connection.expire(key.getBytes(),time);}return false;});}/*** 立即释放锁,如果任务执行的非常快,可能会导致其他应用获得到锁,二次执行** @param key* @return boolean* @author compass* @date 2023/3/10 22:13* @since 1.0.0**/public boolean fastReleaseLock(String key) {return redisTemplate.delete(key);}/*** 缓慢释放锁(隔离小段时间再释放锁,可以完全避免掉别的应用获取到锁)** @param key* @param time* @param timeUnit* @return boolean* @author compass* @date 2023/3/10 22:13* @since 1.0.0**/public boolean turtleReleaseLock(String key, long time, TimeUnit timeUnit) {return redisTemplate.expire(key, time, timeUnit);}}
2.aop切入,统一管理定时任务
import com.springboot.example.annotation.CacheLock;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;/*** 代理具体的定时任务,仅有一个任务会被成功执行** @author compass* @date 2023-03-09* @since 1.0**/
@Aspect
@Slf4j
@Component
public class CacheLockAspect {@Resourceprivate RedisLockUtils redisLockUtils;/*** 加锁值,可以是任意值**/private static final String LOCK_VALUE = "1";@Around("@annotation(com.springboot.example.annotation.CacheLock)")public Object cacheLockPoint(ProceedingJoinPoint pjp) {Method cacheMethod = null;for (Method method : pjp.getTarget().getClass().getMethods()) {if (null != method.getAnnotation(CacheLock.class)) {cacheMethod = method;break;}}if (cacheMethod!=null){CacheLock cacheLock = cacheMethod.getAnnotation(CacheLock.class);String lockName = cacheLock.lockName();long time = cacheLock.timeOut();boolean successLock = redisLockUtils.lock(lockName,LOCK_VALUE, time, TimeUnit.SECONDS);if (successLock){log.info("method:{}获取锁成功:{}", cacheMethod, lockName);try {// 获得锁调用被代理的定时任务return pjp.proceed();} catch (Throwable throwable) {throwable.printStackTrace();}finally {// 延时5秒再去释放锁redisLockUtils.turtleReleaseLock(lockName,5,TimeUnit.SECONDS);}}else {log.warn("method:{}获取锁失败:{}", cacheMethod, lockName);}}return null;}
}
3.自定义注解
/*** 标注在定时任务上,避免多个微服务的情况下定时任务执行重复* @author compass* @date 2023-03-09* @since 1.0**/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {//定时任务使用的键,千万不要重复String lockName() ;// 占用锁的时间,单位是秒,默认10分钟long timeOut() default 60*10;
}
今天就先介绍这两种方式,后续我再为大家续上使用别的框架进行实现,不过在实现的过程中使用 redisTemplate.opsForValue().setIfAbsent() 出现了一点小肯,他返回的是null值,然后出现空指针,然后我不得不采用execute的方式去执行。
相关文章:

简单粗暴的分布式定时任务解决方案
分布式定时任务1.为什么需要定时任务?2.数据库实现分布式定时任务3.基于redis实现1.为什么需要定时任务? 因为有时候我们需要定时的执行一些操作,比如业务中产生的一些临时文件,临时文件不能立即删除,因为不清楚用户是…...
蓝桥杯第五天刷题
第一题:数的分解题目描述本题为填空题,只需要算出结果后,在代码中使用输出语句将所填结果输出即可。把 2019 分解成 3 个各不相同的正整数之和,并且要求每个正整数都不包含数字 2和 4,一共有多少种不同的分解方法&…...

Java数组的定义和使用(万字详解)
目录 编辑 一. 数组的基本概念 1、什么是数组 2、数组的创建及初始化 1、数组的创建 2、数组的初始化 3、数组的使用 (1)数组中元素访问 (3)遍历数组 二、数组是引用类型 1、初始JVM的内存分布 2、基本类型变量与引用类…...

【SpringBoot】自定义Starter
🚩本文已收录至专栏:Spring家族学习之旅 👍希望您能有所收获 一.概述 在使用SpringBoot进行开发的时候,我们发现使用很多技术都是直接导入对应的starter,然后就实现了springboot整合对应技术,再加上一些简…...

【C陷阱与缺陷】----语法陷阱
💯💯💯 要理解一个C程序,必须理解这些程序是如何组成声明,表达式,语句的。虽然现在对C的语法定义很完善,几乎无懈可击,大门有时这些定义与人们的直觉相悖,或容易引起混淆…...

虹科分享| 关于TrueNAS十问十答
上一篇文章我们向您介绍了虹科新品HK-TrueNAS企业存储,很多小伙伴会疑问到底什么是NAS存储,之前常用的磁盘、磁带属于什么存储架构,NAS存储好在哪里,什么时候使用NAS?今天我们整理了关于TrueNAS的十问十答,…...

Https 笔记
HTTP TLS TLS 的前身是 SSL 非对称加密的核心: 两个密钥(公私) https 需要第三方CA(证书授权中心)申请SSL证书以确定其真实性 证书种包含了特定的公钥和私钥 密钥交换 自己将私钥上锁后发给对方对方也上锁 在还回来…...

【Python+requests+unittest+excel】实现接口自动化测试框架
一、框架结构: 工程目录 二、Case文件设计 三、基础包 base 3.1 封装get/post请求(runmethon.py) 1 import requests2 import json3 class RunMethod:4 def post_main(self,url,data,headerNone):5 res None6 if heade…...

MySQL终端的使用及其数据类型的使用
什么是数据库?数据库(Database)是按照数据结构来组织、存储和管理数据的仓库。每个数据库都有一个或多个不同的 API 用于创建,访问,管理,搜索和复制所保存的数据。我们也可以将数据存储在文件中,…...

长视频终局:一场考验资金储备的消耗战
赢者通吃,似乎已成为各行各业的常识,但事实真的是这样吗?20世纪70年代,石油价格高涨,在墨西哥湾油田拍卖中高价拍得油田的企业,要么亏损,要么收入低于预期,但仍然有无数企业在高价竞…...

javaEE初阶 — CSS 常用的属性
文章目录CSS 常用的属性1 字体属性1.1 设置字体家族 font-family1.2 设置字体大小 font-size1.3 设置字体粗细 font-weight1.4 文字倾斜 font-style2 文本属性2.1 文本颜色2.2 文本对齐2.3 文本装饰2.4 文本缩进2.5 行高3 背景属性3.1 背景颜色3.2 背景图片3.3 背景位置3.4 背景…...

【面试题】如何取消 script 标签发出的请求
大厂面试题分享 面试题库前后端面试题库 (面试必备) 推荐:★★★★★地址:前端面试题库问题之前在业务上有这样一个场景,通过 script 标签动态引入了一个外部资源,具体方式是这样的const script document.…...

蓝桥杯嵌入式(G4系列):RTC时钟
前言: 关于RTC时钟的HAL库配置我也是第一次,之前都是用库函数的写法,这里写下这篇博客来记录一下自己的学习过程。 STM32Cubemx配置: 首先点击左侧的Timers的RTC,勾选以下选项 进入时钟树配置 进入时间设置࿰…...

Linux——进程间通信1
目录 进程间通信目的 进程间通信标准 管道 匿名管道 管道实现进程间通信 管道的特点 进程池 ProcessPool.cc Task.hpp 习题 进程间通信目的 数据传输:一个进程需要将它的数据发送给另一个进程 资源共享:多个进程之间共享同样的资源。 通知事件…...

循环语句——“Python”
各位CSDN的uu们你们好呀,今天小雅兰的内容是Python中的循环语句呀,分为while循环和for循环,下面,让我们进入循环语句的世界吧 循环语句 while循环 for循环 continue和break 循环语句小结 人生重开模拟器 设置初始属性 设置性别…...

Python synonyms查找中文任意词汇的同义词近义词
Python synonyms查找中文任意词汇的同义词近义词 作者:虚坏叔叔 博客:https://xuhss.com 早餐店不会开到晚上,想吃的人早就来了!😄 一、安装 对于非专业的开发人员来说可以简单的使用Python一行代码来找到同义词。这…...
三分钟了解http和https
对应测试人员都会听过http请求和响应.在这里给大家介绍http相关的知识 一.http和https基本概念 HTTP:是互联网上应用最为广泛的一种网络协议,是一个客户端和服务器端请求和应答的标准(TCP),用于从WWW服务器传输超文本…...

docker应用:搭建私有云盘
简介:NextCloud是一个开源的云存储解决方案,可以在自己的服务器上搭建个人云存储系统。它提供了与市面上主流云存储服务(如Dropbox、Google Drive)相似的功能,包括文件存储、共享、同步、协作等。NextCloud的主要优势在…...

【C++进阶】面向对象
程序 编写程序是为了让计算机解决现实生活中的实际问题。pascal之父、结构化程序设计先驱Niklaus Wirth提出程序 算法 数据结构。程序是完成一定功能的一些列有序指令的集合。指令 操作码 指令。将指令按一定的顺序进行整合,就形成了程序。 机器语言与汇编语言…...

从ChatGPT与New Bing看程序员为什么要学习算法?
文章目录为什么要学习数据结构和算法?ChatGPT与NEW Bing 的回答想要通关大厂面试,就不能让数据结构和算法拖了后腿业务开发工程师,你真的愿意做一辈子CRUD boy吗?对编程还有追求?不想被行业淘汰?那就不要只…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...

Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...
工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配
AI3D视觉的工业赋能者 迁移科技成立于2017年,作为行业领先的3D工业相机及视觉系统供应商,累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成,通过稳定、易用、高回报的AI3D视觉系统,为汽车、新能源、金属制造等行…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...