当前位置: 首页 > news >正文

聊聊PowerJob的Alarmable

本文主要研究一下PowerJob的Alarmable

Alarmable

tech/powerjob/server/extension/Alarmable.java

public interface Alarmable {void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
}

Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList

Alarm

public interface Alarm extends PowerSerializable {String fetchTitle();default String fetchContent() {StringBuilder sb = new StringBuilder();JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));content.forEach((key, originWord) -> {sb.append(key).append(": ");String word = String.valueOf(originWord);if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {try {if (originWord instanceof Long) {word = CommonUtils.formatTime((Long) originWord);}}catch (Exception ignore) {}}sb.append(word).append(OmsConstant.LINE_SEPARATOR);});return sb.toString();}
}

Alarm定义了fetchTitle方法,提供了fetchContent默认方法,它有两个实现类分别是JobInstanceAlarm、WorkflowInstanceAlarm

DingTalkAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java

@Slf4j
@Service
@RequiredArgsConstructor
public class DingTalkAlarmService implements Alarmable {private final Environment environment;private Long agentId;private DingTalkUtils dingTalkUtils;private Cache<String, String> mobile2UserIdCache;private static final int CACHE_SIZE = 8192;/*** 防止缓存击穿*/private static final String EMPTY_TAG = "EMPTY";@Overridepublic void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {if (dingTalkUtils == null) {return;}Set<String> userIds = Sets.newHashSet();targetUserList.forEach(user -> {String phone = user.getPhone();if (StringUtils.isEmpty(phone)) {return;}try {String userId = mobile2UserIdCache.get(phone, () -> {try {return dingTalkUtils.fetchUserIdByMobile(phone);} catch (PowerJobException ignore) {return EMPTY_TAG;} catch (Exception ignore) {return null;}});if (!EMPTY_TAG.equals(userId)) {userIds .add(userId);}}catch (Exception ignore) {}});userIds.remove(null);if (!userIds.isEmpty()) {String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));try {dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);}catch (Exception e) {log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());}}}@PostConstructpublic void init() {String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");return;}if (!StringUtils.isNumeric(agentId)) {log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);return;}this.agentId = Long.valueOf(agentId);dingTalkUtils = new DingTalkUtils(appKey, appSecret);mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build();log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");}}

DingTalkAlarmService实现了Alarmable接口,其onFailed遍历targetUserList获取userId,最后通过dingTalkUtils.sendMarkdownAsync发送

MailAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java

@Slf4j
@Service
public class MailAlarmService implements Alarmable {@Resourceprivate Environment environment;private JavaMailSender javaMailSender;@Value("${spring.mail.username:''}")private String from;@Overridepublic void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {return;}SimpleMailMessage sm = new SimpleMailMessage();try {sm.setFrom(from);sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));sm.setSubject(alarm.fetchTitle());sm.setText(alarm.fetchContent());javaMailSender.send(sm);}catch (Exception e) {log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());}}@Autowired(required = false)public void setJavaMailSender(JavaMailSender javaMailSender) {this.javaMailSender = javaMailSender;}}

MailAlarmService实现了Alarmable接口,其onFailed方法构建SimpleMailMessage,然后通过spring的javaMailSender.send发送

WebHookAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/WebHookAlarmService.java

@Slf4j
@Service
public class WebHookAlarmService implements Alarmable {private static final String HTTP_PROTOCOL_PREFIX = "http://";private static final String HTTPS_PROTOCOL_PREFIX = "https://";@Overridepublic void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {if (CollectionUtils.isEmpty(targetUserList)) {return;}targetUserList.forEach(user -> {String webHook = user.getWebHook();if (StringUtils.isEmpty(webHook)) {return;}// 自动添加协议头if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {webHook = HTTP_PROTOCOL_PREFIX + webHook;}MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));try {String response = HttpUtils.post(webHook, requestBody);log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);}catch (Exception e) {log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);}});}
}

WebHookAlarmService实现了Alarmable接口,其onFailed方法遍历targetUserList,挨个执行HttpUtils.post(webHook, requestBody),用的是okhttp3来实现http请求回调

小结

PowerJob的Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList;它有三个实现类,分别是DingTalkAlarmService(用的是DingTalkClient)、MailAlarmService(用的是spring的JavaMailSender)、WebHookAlarmService(用的是okhttp3的OkHttpClient)。

相关文章:

聊聊PowerJob的Alarmable

序 本文主要研究一下PowerJob的Alarmable Alarmable tech/powerjob/server/extension/Alarmable.java public interface Alarmable {void onFailed(Alarm alarm, List<UserInfoDO> targetUserList); }Alarmable接口定义了onFailed方法&#xff0c;其入参为alarm及tar…...

系列三十五、获取Excel中的总记录数

一、获取Excel中的总记录数 1.1、概述 使用EasyExcel开发进行文件上传时&#xff0c;通常会碰到一个问题&#xff0c;那就是Excel中的记录数太多&#xff0c;使用传统的方案进行文件上传&#xff0c;很容易就超时了&#xff0c;这时可以通过对用户上传的Excel中的数量进行限制…...

VMware workstation安装debian-12.1.0虚拟机并配置网络

VMware workstation安装debian-12.1.0虚拟机并配置网络 Debian 是一个完全自由的操作系统&#xff01;Debian 有一个由普罗大众组成的社区&#xff01;该文档适用于在VMware workstation平台安装debian-12.1.0虚拟机。 1.安装准备 1.1安装平台 Windows 11 1.2软件信息 软…...

centos下系统全局检测工具dstat使用

目录 一&#xff1a;没有需要安装 二&#xff1a;dstat命令参数 三、监测界面各参数含义&#xff08;部分&#xff09; 四、dstat的高级用法 一&#xff1a;没有需要安装 yum install dstat 二&#xff1a;dstat命令参数 有默认选项&#xff0c;执行dstat命令不加任何参数…...

无人机群ros通信

单架无人机与地面站通信 在一个局域网内获取无人机的机载电脑ip 通过地面站ssh到机载电脑&#xff0c;实现通信 多架无人机与地面站通信 在ROS基础上&#xff0c;配置主机和从机&#xff0c;实现主机和从机的话题联通 配置hosts 在主机和从机的/etc/hosts文件中&#xff0c…...

LeetCode刷题:142. 环形链表 II

题目&#xff1a; 是否独立解决&#xff1a;否&#xff0c;参考了解题思路解决问题&#xff0c;思考了用快慢指针&#xff0c;栈&#xff0c;统计链表数量定位尾巴节点&#xff08;因为是环形链表所以是死循环&#xff0c;链表数量用while循环统计不出来&#xff09;都没解决 解…...

Laravel 使用rdkafka_laravel详细教程(实操避坑)

一、选择rdkafka 首先要看版本兼容问题&#xff0c;我的是Laravel5.6&#xff0c;PHP是7.3.13&#xff0c;所以需要下载兼容此的rdkafka&#xff0c;去 Packagist 搜索 kafka &#xff0c;我用的是 enqueue/rdkafka选择里面0.10.5版本&#xff0c; 二、安装rdkafka 在 Larav…...

439 - Knight Moves (UVA)

题目链接如下&#xff1a; Online Judge UVA439 骑士的移动 - 锦依卫议事堂 - 洛谷博客 这里有好几个特别厉害的解法...先存着慢慢看。 我的代码如下&#xff1a; #include <iostream> #include <deque> #include <string> // #define debugstruct node{…...

数据结构(c)冒泡排序

本文除了最下面的代码是我写的&#xff0c;其余是网上抄写的。 冒泡排序 什么是冒泡排序&#xff1f; 冒泡排序&#xff08;Bubble Sort&#xff09;是一种简单的排序算法。它重复地走访过要排序的数列&#xff0c;一次比较两个元素&#xff0c;如果他们的顺序错误就把他们交…...

并发编程之并发容器

目录 并发容器 CopyOnWriteArrayList 应用场景 常用方法 读多写少场景使用CopyOnWriteArrayList举例 CopyOnWriteArrayList原理 CopyOnWriteArrayList 的缺陷 扩展迭代器fail-fast与fail-safe机制 ConcurrentHashMap 应用场景 常用方法 并发场景下线程安全举例 Con…...

K8s---存储卷(动态pv和pvc)

当我要发布pvc可以生成pv&#xff0c;还可以共享服务器上直接生成挂载目录。pvc直接绑定pv。 动态pv需要两个组件 1、卷插件&#xff1a;k8s本生支持的动态pv创建不包括nfs&#xff0c;需要声明和安装一个外部插件 Provisioner: 存储分配器。动态创建pv,然后根据pvc的请求自动…...

JS判断对象是否为空对象的几种方法

通过JSON自带的stringify()方法判断 function isEmptyObj(obj) {return JSON.stringify(obj) {} } console.log(对象是否为空&#xff1a;, isEmptyObj({}))for in 循环判断 function isEmptyObj(obj) {for(let item in obj) {return true}return false } console.log(对…...

算法通关村第十五关—用4KB内存寻找重复元素(青铜)

用4KB内存寻找重复元素 用4KB内存寻找重复元素 题目要求&#xff1a;给定一个数组&#xff0c;包含从1到N的整数&#xff0c;N最大为32000&#xff0c;数组可能还有重复值&#xff0c;且N的取值不定&#xff0c;若只有4KB的内存可用&#xff0c;该如何打印数组中所有重复元素。…...

【PHP】判断字符串是否是有效的base64编码

目录 1.检测长度&#xff1a; 2.检查字符集&#xff1a; 3.判断是否能正常解码 在PHP中&#xff0c;判断一个字符串是否是有效的Base64编码&#xff0c;可以通过以下几种方法&#xff1a; 1.检测长度&#xff1a; Base64编码的字符串长度必须是4的倍数&#xff08;对于标准…...

鼎盛合|测量精度SOC芯片开发中的技术问题整理

SOC芯片近几年的发展势头迅猛&#xff0c;许多行业中俱可见其身影。SOC芯片并不是传统意义上的芯片&#xff0c;它是一个由多种功能集成的一个芯片。SOC芯片自身在出厂时便带有部分程序&#xff0c;是为了方便设计开发而针对某些行业设计的特定功能。如芯海的SOC芯片大多数则是…...

sql | 学生参加各科考试次数

学生表: Students------------------------ | Column Name | Type | ------------------------ | student_id | int | | student_name | varchar | ------------------------ 在 SQL 中&#xff0c;主键为 student_id&#xff08;学生ID&#xff09;。 该表内的每…...

uniapp(vue2)+VoerkaI18n多语言

今天我学习了VoerkaI18n国际化插件&#xff0c;它是一个适用于Javascript/Vue/React/Solid/ReactNative的国际化全流程解决方案。VoerkaI18n可以帮助我们轻松地实现应用程序的多语言支持&#xff0c;使得应用程序可以适应不同的语言环境。 比较吸引我的是集成自动翻译,t(“中华…...

C51--测速小车

测速小车&#xff1a; 测速模块&#xff1a; 用途&#xff1a; 广泛用于电机转速检测&#xff0c;脉冲计数&#xff0c;位置限位等。 高低电平&#xff1a; 有遮挡&#xff0c;输出高电平&#xff1b; 无遮挡&#xff0c;输出低电平。 接线&#xff1a; VCC——正极 GND——接…...

ORACLE报错:ORA-04091 表XXX发生了变化,触发器/函数不能读它

ORACLE报错:ORA-04091 表发XXX生了变化&#xff0c;触发器/函数不能读它 问题描述问题分析解决办法拓展&#xff1a;自治事务的特点 问题描述 在开发校验函数FUNCTION的时候&#xff0c;用数据跑批测试的时候报错。经排查这个校验函数FUNCTION的被一个存储过程中的update语句调…...

Arm LDM和STM的寻址方式

A32指令集中包含多数据传输指令LDM和STM&#xff0c;也就是单条指令可以传输多个寄存器的值与内存交互&#xff0c;这对于数据块传输以及寄存器的压入栈很有帮助。LDM和STM指令可分别用于实现堆栈的pop和push操作。对于堆栈操作&#xff0c;基寄存器通常是堆栈指针(SP)。 LDM和…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...

现代密码学 | 椭圆曲线密码学—附py代码

Elliptic Curve Cryptography 椭圆曲线密码学&#xff08;ECC&#xff09;是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础&#xff0c;例如椭圆曲线数字签…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台

🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

Go 语言并发编程基础:无缓冲与有缓冲通道

在上一章节中&#xff0c;我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道&#xff0c;它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好&#xff0…...

招商蛇口 | 执笔CID,启幕低密生活新境

作为中国城市生长的力量&#xff0c;招商蛇口以“美好生活承载者”为使命&#xff0c;深耕全球111座城市&#xff0c;以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子&#xff0c;招商蛇口始终与城市发展同频共振&#xff0c;以建筑诠释对土地与生活的…...

【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论

路径问题的革命性重构&#xff1a;基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中&#xff08;图1&#xff09;&#xff1a; mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error

在前端开发中&#xff0c;JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作&#xff08;如 Promise、async/await 等&#xff09;&#xff0c;开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝&#xff08;r…...