海豚调度清理:使用 API 轻松清理历史工作流实例以及日志文件
💡 本系列文章是 DolphinScheduler 由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。
祝开卷有益。
大数据学习指南
大家好,我是小陶,DolphinScheduler 运行一段时间之后,会积累大量的历史运行记录,这些记录主要包括:工作流实例记录(MySQL)、任务实例记录(MySQL)、任务日志(本地磁盘),其中 MySQL 的记录越来越多,会影响页面分页查询的速度,进而影响用户使用体验和 MySQL 服务。
所以,需要清理以上历史记录,保证页面影响速度和 MySQL 服务。
本文的内容也比较简单,先是说明 API 的逻辑、存在的bug和修复方法,最后再介绍如何使用一个 Python 脚本来调用 API 删除历史实例。
1.API 逻辑介绍
DolphinScheduler 本身提供了批量删除工作流实例的接口,**process-instances/batch-delete,**接口逻辑这里简单描述一下就是,找到工作流下面的任务实例,依次删除任务日志和 Mysql 记录。
2.API bug说明和修复
但是这里需要注意的是,海豚调度 3.2.0(不包含)以前的版本,这里有一个 bug,在查询工作流实例下面的任务实例的时候,只查询了 flag =1 的任务实例,所以就导致了在清理日志和记录的时候,漏掉了一部分。
ProcessServiceImpl.java 中的 removeTaskLogFile 方法,在查询任务实例集合的时候,引用了 findValidTaskListByProcessId(processInstanceId); 而 findValidTaskListByProcessId 中仅查询了 Flag.YES 也就是 flag = 1 的记录。如下图所示:
这里解释一下 flag = 1 是标识该任务的最新的运行记录,表示任务多次重试之后,最新的运行记录。如果任务第一次失败了,第二次重试之后成功了,那么这个任务就会有两条运行记录,flag = 0 和 falg = 1,flag =1 的则标识最新的运行记录。
所以,如果你在使用海豚调度 3.2.0(不包含)以前的版本的时候,需要自行修复一下,或者升级到 3.2.0 。
修复的方式,也比较简单,新增 findAllTaskListByProcessId 方法,把工作流实例所有的运行实例都拿出来,不要加 flag 这个过滤条件。
3.使用 Python 脚本调用API
Python脚本的逻辑比较简单,使用了三个API,按照顺序是:
1.获取项目列表
2.获取工作流列表
3.批量删除工作流实例
入参是:日期
具体的代码如下:
#!/usr/bin/python
# -*- coding: utf8 -*-
## 定时清理调度工作流记录,入参是日期import io
import subprocess
import requests
import json
import time
import datetime
from optparse import OptionParser
from optparse import OptionGrouplogging.basicConfig(format='%(asctime)s : %(levelname)s : %(module)s : %(message)s', level=logging.INFO,stream=sys.stdout)
logger = logging.getLogger(__name__)# 配置信息: ip 端口 token自行修改
base_url = 'http://IP:端口'
token = 'xxxxxxxxxxxxx'# get args
def get_option_parser(params):usage = "usage: %prog [options] json-url"parser = OptionParser(usage=usage)prodEnvOptionGroup = OptionGroup(parser, "Product Env Options","Normal user use these options to set jvm parameters, job runtime mode etc. ""Make sure these options can be used in Product Env.")for k in params:prodEnvOptionGroup.add_option("--" + k, metavar="<" + k + ">", dest=k, action="store", default="",help="" + params[k])parser.add_option_group(prodEnvOptionGroup)return parser# 获取项目列表
def get_project_list():url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=&_t=0.3741042528841678".format(base_url=base_url)payload={}headers = {'Connection': 'keep-alive','Accept': 'application/json, text/plain, */*','language': 'zh_CN','sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c','User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36','Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7','token':token}response = requests.request("GET", url, headers=headers, data=payload)response_data = json.loads(response.text)totalList = response_data['data']['totalList']return totalListdef get_page_detail(code,dt):url = "{base_url}/dolphinscheduler/projects/{code}/process-instances?searchVal=&pageSize=50&pageNo=1&host=&stateType=&startDate=2000-01-01 00:00:00&endDate={dt} 23:59:59&executorName=".format(code=code,dt=dt,base_url=base_url)payload={}headers = {'Connection': 'keep-alive','Accept': 'application/json, text/plain, */*','language': 'zh_CN','sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c','User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36','Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7','token':token}response = requests.request("GET", url, headers=headers, data=payload)response_data = json.loads(response.text)page = response_data['data']['totalList']page_del = 'processInstanceIds='if len(page) == 0:print('列表为空,退出程序')return '0'for p in page:page_del = page_del + str(p['id']) + ','# print(page_del)return page_deldef delete(project,ids):print('即将删除如下工作流实例:')print(project)print(ids)url = "{base_url}/dolphinscheduler/projects/{project}/process-instances/batch-delete".format(base_url=base_url,project = project)# 'processInstanceIds=89767'payload= idsheaders = {'Connection': 'keep-alive','Accept': 'application/json, text/plain, */*','language': 'zh_CN','sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c','User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36','Content-Type': 'application/x-www-form-urlencoded','Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7','token':token}response = requests.request("POST", url, headers=headers, data=payload)print('执行结果如下:')print(response.text)if __name__ == '__main__':#获取请求参数()params = {"dt": "dt"};parser = get_option_parser(params)options, args = parser.parse_args(sys.argv[1:])logger.info('开始执行删除任务实例...' + " ".join(sys.argv))# 清理的日期dt = options.dtif dt == '' or len(dt) == 0:logger.error('调度系统-运维任务:日期为空,请输入日期')sys.exit(1)today_91 = (datetime.datetime.now()+datetime.timedelta(days=-61)).strftime("%Y-%m-%d")short_dt = dt.replace('-','')short_today_91 = today_91.replace('-','')if int(short_dt) > int(short_today_91):logger.error('调度系统-运维任务:不能删除最近90天之内的任务实例')sys.exit(1)# # 需要处理的项目projects = get_project_list()# 依次处理项目for project in projects:code = project['code']print('正在处理:'+ str(code))while True:page_del = get_page_detail(code,dt)if page_del == '0':breakdelete(code,page_del)time.sleep(1)
使用示例:dolphin_clean_process.py 是上面的脚本。
python dolphin_clean_process.py 2024-01-01
**脚本在 GitHub 也维护了一份,欢迎 star **
https://github.com/aikuyun/dolphin_practices/blob/main/dolphin_clean_process.py
4.注意事项
1.token 获取的方式
2.可以删除的工作流的状态是一定要是完成状态的。否则,接口就会报错,非完成状态的工作流是不可以删除的。可以通过下面的SQL查看某个日期之前是否存在非完成状态的工作流实例。
SELECT *
FROM t_ds_process_instance
where state not in (7 ,13 ,6 ,8 ,5 ,9 ,3)
and start_time < '2024-01-01'
以上就使用 API 轻松清理历史工作流实例以及日志文件的全部内容,如果有任何疑问,都可以与我交流,希望可以帮到你,下次见。
大数据学习指南 专注于大数据技术分享与交流。
相关文章:

海豚调度清理:使用 API 轻松清理历史工作流实例以及日志文件
💡 本系列文章是 DolphinScheduler 由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。 祝开卷有益。 大数据学习指南 大家好,我是小陶,DolphinS…...

python怎么显示行号
我们如果想让Python IDLE显示行号,我们可以通过扩展IDLE功能来做到。 1.我们需要下载一个LineNumber.py扩展。 2.我们打开Python安装目录,找到安装目录下的Lib\idlelib目录,复制LineNumber到这个目录。 3.然后启动扩展。 4.配置扩展的方式…...

pytorch中,load_state_dict和torch.load的区别?
在 PyTorch 中,load_state_dict 和 torch.load 是两个不同的函数,用于不同的目的。 torch.load: 用途: 从磁盘加载一个保存的对象。这个对象可以是一个模型的整个状态字典(包含模型参数)、优化器状态字典、甚至是任意其他 Python …...

ObjectARX打印当前图纸为PDF,无延迟(亲测有效)
CAD二次开发定制ObjectARX安装配置AutoCAD插件ZWCAD插件C++ //----------------------------------------------------------------------------- //----- acrxEntryPoint.cpp //----------------------------------------------------------------------------- #include &quo…...

torch.squeeze() dim=1 dim=-1 dim=2
对数据的维度进行压缩 使用方式:torch.squeeze(input, dimNone, outNone) 将输入张量形状中的1 去除并返回。 如果输入是形如(A1B1C1D),那么输出形状就为: (ABCD) import torch x torch.rand(2, 1, 1, 3, 1, 4) print(x) print(x.shape) …...

智慧环保一体化平台简介
据悉,环保问题日益受到人们的关注,智慧环保一体化平台作为解决环保问题的有力工具,正逐渐走进人们的视野。朗观视觉智慧环保一体化平台通过整合各类环保资源,实现环境数据的实时监测、分析与管理,为环境保护提供智能化…...

idea在空工程中添加新模块并测试的步骤
ServicesTest是空的工程,没有pom文件。现在需要在ServicesTest目录下添加新模块作为新的工程,目的是写一下别的技术功能。 原先目录结构,ServicesTest是空的工程,没有pom文件。下面的几个模块是新的工程,相互独立。 1.…...

HCIE-QOS基本原理
QOS基本原理 QOS概述什么是QOSQoS服务模型区分服务模型QoS常用技术 (DiffServ模型)QoS数据处理流程 (DiffServ模型) QoS流分类和流标记QoS数据处理流程为什么需要流分类和流标记 简单流分类外部优先级 - VLAN报文外部优先级 - MPLS报文外部优先级 - IP报文各外部优先级间的对应…...

pycharm基本使用(常用快捷键)
0.下载 pycharm官网下载 选择合适的版本,本文以2024.1为例 1.简单应用 常用快捷键 ctrlD 复制当前行 ctrlY 删除当前行 ctrlX 剪切当前行(可用作删除,更顺手) shift↑ 选中多行ctrlshiftF10 运行 shiftF9 调试ctrl/ 注释当前…...

机器学习--回归模型和分类模型常用损失函数总结(详细)
文章目录 引言 回归模型常用损失函数均方误差(Mean Squared Error, MSE)均方根误差(Root Mean Squared Error, RMSE)平均绝对误差(Mean Absolute Error, MAE)Huber损失(Huber Loss) …...

企业选择数字工厂管理系统供应商的标准是什么
随着信息技术的飞速发展,数字化转型已成为企业提升竞争力、实现可持续发展的必由之路。在制造业领域,数字工厂管理系统实现生产流程自动化、智能化的关键工具,其选择的重要性不言而喻。本文将从多个维度探讨企业选择数字工厂管理系统供应商的…...

京准电钟|基于纳秒级的GPS北斗卫星授时服务器
京准电钟|基于纳秒级的GPS北斗卫星授时服务器 京准电钟|基于纳秒级的GPS北斗卫星授时服务器 你有没有思考过这样一个问题:火车站内,熙熙攘攘,旅客排队进站、列车停靠发车,一切井然有序。一旦有个别时间出现…...

Flutter知识点
Dart语言基础知识 Dart特性: Dart 是少数同时支持 JIT(Just In Time,即时编译)和 AOT(Ahead of Time,运行前编译)的语言之一。语言在运行之前通常都需要编译,JIT 和 AOT 则是最常见…...

2024-06-12 问AI: 在大语言模型中,什么是Jailbreak漏洞?
文心一言 在大语言模型中,Jailbreak漏洞(也称为越狱攻击或指令注入攻击)是一种特定的安全漏洞,它允许用户或攻击者通过精心设计的输入或指令,绕过模型的安全限制,诱导模型产生原本被设定为禁止或限制的输出…...

Vue22-v-model收集表单数据
一、效果图 二、代码 2-1、HTML代码 2-2、vue代码 1、v-model单选框的收集信息 v-model:默认收集的就是元素中的value值。 单选框添加默认值: 2、v-model多选框的收集信息 ①、多个选择的多选 注意: 此处的hobby要是数组!&…...

【深度学习】深入解码:提升NLP生成文本的策略与参数详解
文章目录 解码策略解码参数公式解释代码例子区别 更详细的束搜索的解释更详细的例子解释第一步第二步第三步 解码策略和解码参数在自然语言处理(NLP)模型的生成过程中起着不同的作用,但它们共同决定了生成文本的质量和特性。 解码策略 解码…...

Petalinux由于网络原因产生的编译错误(2)--Fetcher failure:Unable to find file
1 Fetcher failure:Unable to find file 错误 如果编译工程遇到如下图所示的“Fetcher failure for URL”或相似错误 出现这种错误的原因是 Petalinux 在配置和编译的时候,需要联网下载一些文件,由于网 络原因这些文件不能正常下载,导致编译…...

随手记:商品信息过多,展开收起功能
UI原型图: 页面思路: 在商品信息最小item外面有一个包裹所有item的标签,控制这个标签的高度来实现展开收起功能 <!-- 药品信息 --><view class"drugs" v-if"inquiryInfoSubmitBtn"><view class"…...

uniapp上传头像并裁剪图片
第一步写上uniapp自带的选择图片button按钮 点击之后会弹出选择图片的方式 拍照或从相册选择图片后将会跳到图片裁剪 然后我们裁剪完之后点击确定在上传图片 这里是上传图片的接口 拿到本地图片 上传的话自己想以那种方式上传都可以...

9.1.3 简单介绍单阶段模型YOLO、YOLOv2、YOLO9000、YOLOv3的发展过程
9.1.3 简单介绍单阶段模型YOLO、YOLOv2、YOLO9000、YOLOv3的发展过程 前情回顾:9.1.2 简单介绍两阶段模型R-CNN、SPPNet、Fast R-CNN、Faster R-CNN的发展过程 摘要 YOLOYOLOv2YOLO9000YOLOv3基本思想使用一个端到端的卷积神经网络直接预测目标的类别和位置针对YOL…...

英智教育智能体,AI Agent赋能教育培训行业数字化升级
教育是当前需求巨大且没有足够人力来满足的领域,每个学生个体差异较大,有限的教师资源无法针对性实行差异教学,学生学不会,教师教学压力大等问题普遍存在。 面对这些难题,英智在通用大模型能力的基础上,整合…...

什么是电脑监控软件?六款知名又实用的电脑监控软件
电脑监控软件是一种专为监控和记录计算机活动而设计的应用程序,它能够帮助用户(如家长、雇主或系统管理员)了解并管理目标计算机的使用情况。这些软件通常具有多样化的功能,包括但不限于屏幕捕捉、网络行为监控、应用程序使用记录…...

小程序名片怎么生成?AI名片生成器源码系统 为企业店铺创建自己的数字名片
在数字化时代,小程序名片已经成为企业店铺展示自身形象、推广产品和服务的重要工具。分享一个AI名片生成器源码系统春哥AI雷达智能名片小程序系统企业商业运营版,含完整代码包和详细的图文安装部署搭建教程,新手也能轻松使用,源码…...

浅谈PMP:项目管理的专业化认证
引言: 项目管理作为现代企业运营的核心环节,其重要性不言而喻。随着全球化的加速和市场竞争的加剧,企业对项目管理的需求日益增长,项目管理专业人员的需求也水涨船高。在这样的背景下,PMP(Project Managem…...

获取闲鱼商品详情api
要使用闲鱼商品详情API,你需要先申请一个开发者账号,并且在开发者中心创建一个应用,目前很难申请到,还有一个方式是获取第三方应用的AppKey和AppSecret直接使用。 API的请求地址为: https://api.m.taobao.com/h5/mto…...

java1.8运行arthas-boot.jar运行报错解决
报错内容 输入java -jar arthas-boot.jar,后报错。 [INFO] JAVA_HOME: D:\developing\jdk\jre1.8 [INFO] arthas-boot version: 3.7.2 [INFO] Can not find java process. Try to run jps command lists the instrumented Java HotSpot VMs on the target system.…...

每日一练 - IGMP协议与查询器选举机制
01 真题题目 在共享网络中存在多台路由器的情况下,是否是IGMP协议本身负责选举出查询器的角色? A. 正确 B. 错误 02 真题答案 B 03 答案解析 IGMP(Internet Group Management Protocol)互联网组管理协议,主要用于IP多…...

深入浅出:面向对象软件设计原则(OOD)
目录 前言 1.单一责任原则(SRP) 2.开发封闭原则(OCP) 3.里氏替换原则(LSP) 4.依赖倒置原则(DIP) 5.接口分离原则(ISP) 6.共同封闭原则(CCP)…...

缓存与数据一致性问题
1、更新了数据库,再更新缓存 假设数据库更新成功,缓存更新失败,在缓存失效和过期的时候,读取到的都是老数据缓存。 2、更新缓存,更新数据库 缓存更新成功了,数据库更新失败,是不是读取的缓存的都…...

2024年上海高考作文题目(ChatGPT版)
一、2024年6月7日上海高考作文题目 生活中,人们常用认可度判别事物,区分高下。请写一篇文章,谈谈你对“认可度”的认识和思考。 要求:(1)自拟题目;(2)不少于800字。 二、…...