flink cep数据源keyby union后 keybe失效
问题背景:cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警
策略条件:
选择了两个kafka的的topic的数据作为数据源,
对A 数据源 test-topic1, 进行条件过滤, 过滤条件为:login_type = 1
对B 数据源 test-topic2,进行条件过滤,过滤条件为:login_type = 2
分组条件 为 src_ip,hostname两个字段进行分组
进行followby 关联。时间关联的最大时间间隔为 60秒
运行并行度设置为3
通过SourceStream打印的原始数据:
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
1> {"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type ":"2"}
经过cep处理后,产了告警
产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"2"}]}
经过src_ip,和hostname分组后, 理论上应该只分组后的相同的 scr_ip,hostname进行事件关联告警
结果其他的分组数据也参和进来关联告警了。
期望的是 login_type = 1 出现至少两次, 接着login_type=2的至少出现1次,且相同的src_ip和hostname
然后结果是下面数据也产生了告警。
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":2}
怀疑是分组没生效。
然后debug数据源那块的方法kafkaStreamSource() 里面有进行分组,debug后发现确实也进行了keyby
后来找不到其他问题,纠结了下, 怀疑是不是 KeyedSteam.union(KeyedStream)后得到的就不是一个KeyedSteam了。 所以
出现问题的原始代码数据源代码:
//程序具体执行流程DataStream<JSONObject> sourceStream = SourceProcess.getKafkaStream(env, rule);DataStream<JSONObject> resultStream = TransformProcess.process(sourceStream, rule);SinkProcess.sink(resultStream, rule);public static DataStream<JSONObject> getKafkaStream(StreamExecutionEnvironment env, Rule rule) {DataStream<JSONObject> inputStream = null;List<Event> events = rule.getEvents();if (events.size() > SharingConstant.NUMBER_ZERO) {for (Event event : events) {FlinkKafkaConsumer<JSONObject> kafkaConsumer =new KafkaSourceFunction(rule, event).init();if (inputStream != null) {// 多条 stream 合成一条 streaminputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));} else {// 只有一条 streaminputStream = kafkaStreamSource(env, event, rule, kafkaConsumer);}}}return inputStream;}private static DataStream<JSONObject> kafkaStreamSource(StreamExecutionEnvironment env,Event event,Rule rule,FlinkKafkaConsumer<JSONObject> kafkaConsumer) {DataStream<JSONObject> inputStream = env.addSource(kafkaConsumer);// 对多个黑白名单查询进行循环String conditions = event.getConditions();while (conditions.contains(SharingConstant.ARGS_NAME)) {// 使用新的redis 数据结构,进行 s.include 过滤inputStream = AsyncDataStream.orderedWait(inputStream,new RedisNameListFilterSourceFunction(s,rule.getSettings().getRedis()),30,TimeUnit.SECONDS,2000);conditions = conditions.replace(s, "");}// 一般过滤处理inputStream = AsyncDataStream.orderedWait(inputStream,new Redis3SourceFunction(event, rule.getSettings().getRedis()), 30, TimeUnit.SECONDS, 2000);// kafka source 进行 keyBy 处理return KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}public static DataStream<JSONObject> keyedBy(DataStream<JSONObject> input, Map<String, String> groupBy) {if (null == groupBy || groupBy.isEmpty() ||"".equals(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO])){return input;}return input.keyBy(new TwoEventKeySelector(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO].toString()));}public class TwoEventKeySelector implements KeySelector<JSONObject, String> {private static final long serialVersionUID = 8534968406068735616L;private final String groupBy;public TwoEventKeySelector(String groupBy) {this.groupBy = groupBy;}@Overridepublic String getKey(JSONObject event) {StringBuilder keys = new StringBuilder();for (String key : groupBy.split(SharingConstant.DELIMITER_COMMA)) {keys.append(event.getString(key));}return keys.toString();}
}
问题出现在这里:
// 多条 stream 合成一条 stream
inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));
kafkaStreamSource()这个方法返回的是 KeyedStream ,
两个KeyedStream unio合并后, 本来以为返回时KeyedStream,结果确是DataStream类型,
结果导致cep分组不生效,一个告警中出现了其他分组的数据。
解决方法, 就是在cep pattern前 根据是否有分组条件再KeyedBy一次
private static DataStream<JSONObject> patternProcess(DataStream<JSONObject> inputStream, Rule rule) {PatternGen patternGenerator = new PatternGen(rule.getPatterns(), rule.getWindow().getSize());Pattern<JSONObject, JSONObject> pattern = patternGenerator.getPattern();if (!rule.getGroupBy().isEmpty()){inputStream = KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}PatternStream<JSONObject> patternStream = CEP.pattern(inputStream, pattern);return patternStream.inProcessingTime().select(new RuleSelectFunction(rule.getAlarmInfo(), rule.getSelects()));
输入数据:
{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860300012,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860299272,"create_time_desc":"2022-10-27 16:44:59","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666860300196,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"2"}
不产生告警,符合预期
再次输入同分组的数据:
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}
产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}]}
告警输出:{"org_log_id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","event_category_id":1,"event_technique_type":"无","event_description":"1","alarm_first_time":1666860368471,"src_ip":"172.11.11.1","hostname":"hostname1","intelligence_id":"","strategy_category_id":"422596451785379862","intelligence_type":"","id":"cc1cd8cd-a626-4916-bdd3-539ea57e898f","event_nums":3,"event_category_label":"资源开发","severity":"info","create_time":1666860369647,"strategy_category_name":"网络威胁分析","rule_name":"ceptest","risk_score":1,"data_center":"guo-sen","baseline":[],"sop_id":"","event_device_type":"无","rule_id":214,"policy_type":"pattern","strategy_category":"/NetThreatAnalysis","internal_event":"1","event_name":"ceptest","event_model_source":"/RuleEngine/OnLine","alarm_last_time":1666860369478}
产生告警符合预期
相关文章:
flink cep数据源keyby union后 keybe失效
问题背景:cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警 策略条件: 选择了两个kafka的的topic的数据作为数据源, 对A 数据…...
python中的继承与多态,dir()函数
Python继承 在继承关系中,已有的、设计好的类称为父类或基类,新设计的类称为子类或派生类。派生类可以继承父类的公有成员,但是不能继承其私有成员。如果需要在派生类中调用基类的方法,可以使用内置函数super()或者通过“基类名.…...
C++练级之初级:第五篇
C练级之初级:第五篇 第五篇 C练级之初级:第五篇1.auto关键字2.for循环改进3.指针空值nullptr4.内联函数4.1内联函数的概念4.2内联函数的注意点 总结 1.auto关键字 🤔什么是auto(automatic的缩写,自动的意思)关键字? au…...
JMeter的使用(二)
九、直连数据库 通过直连数据库让程序代替接口访问数据库,如果二者预期结果不一致,就找到了程序缺陷。 获取某条学院的名字,放在百度搜索: JMeter 不具备直连数据库功能,必须整合第三方(jar包)实现配置数据库的连接通过JDBC Re…...
C/C++文件操作/IO流
学习任务: ⭐认识文件。⭐学习C语言中文件如何打开和关闭。⭐学习C语言中文件的读写方法(包括顺序读写和随机读写)。⭐学习C语言文件操作中如何判断文件读取结束。⭐简单了解FILE缓冲区。⭐认识流。⭐学习C的IO流,包括标准IO流和文…...
推荐 7 个超牛的 Spring Cloud 实战项目
个 把一个大型的单个应用程序和服务拆分为数个甚至数十个的支持微服务,这就是微服务架构的架构概念,通过将功能分解到各个离散的服务中以实现对解决方案的解耦。 关于微服务相关的学习资料不多,而 GitHub 上的开源项目可以作为你微服务之旅…...
Linux信号:信号 信号集 信号集函数
1. 信号的概念 Linux进程间通信的方式之一。信号也称为“软件中断”。 信号特点: 简单;携带信息有限;满足特定条件才发送信号;可进行用户空间和内核空间进程的交互; 信号4要素: (1…...
详解八大排序算法-附动图和源码(插入,希尔,选择,堆排序,冒泡,快速,归并,计数)
目录 🍏一.排序的概念及应用🍏 1.排序的概念 2.排序的应用 3.常用的排序算法 🍎二.排序算法的实现🍎 1.插入排序 1.1直接插入排序 1.2希尔排序(缩小增量排序) 2.选择排序 2.1直接选择排序 2.2堆排序…...
网络编程--协议、协议族、地址族
写在前面 这里先介绍下socket函数(Windows版本)的函数声明,后续内容均围绕该声明展开: #include <winsock2.h> //af: 指定该套接字的协议族 //type: 指定该套接字的数据传输方式 //protocol: 指定该套接字的最终协议 //返…...
Linux入门操作
pwd 查看当前目录 与 自动补全 文件详情 drwxrwxr-x d代表文件夹 -代表文件 其中rwx rwx r-x r是可读 w是可写 x 执行 第一组(前三个)指文件拥有者的权限 第二组(中三个)代表文件拥有的组的权限 第三组(后三个&am…...
1。C语言基础知识回顾
学习嵌入式的C基础知识,主要包括几个核心知识点:三大语法结构、常用的数据类型、函数、结构体、指针、文件操作。 一、顺序结构 程序自上而下依次执行、没有分支、代码简单。 常见顺序结构有:四则运算:,-࿰…...
学习如何通过构建一个简单的JavaScript颜色游戏来操作DOM
学习如何通过构建一个简单的JavaScript颜色游戏来操作DOM 题目要求 我们将构建一个简单的颜色猜谜游戏。每次游戏启动时,都会选择一个随机的RGB颜色代码。根据游戏模式,我们将在屏幕上提供三个(简单)或六个(困难&…...
【算法学习】—n皇后问题(回溯法)
【算法学习】—n皇后问题(回溯法) 1. 什么是回溯法? 相信"迷宫"是许多人儿时的回忆,大家小时候一定都玩过迷宫游戏。我们从不用别人教,都知道走迷宫的策略是: 当遇到一个岔路口,会有以下两种情况…...
万亿OTA市场进入新爆发期,2025或迎中国汽车软件付费元年
伴随智能汽车市场规模发展,越来越多的汽车产品具备OTA能力,功能的优化、以及服务的差异化,成为了车企竞争的新战场。 例如,今年初,问界M5 EV迎来了首次OTA升级,升级内容覆盖用户在实际用车中的多个场景&am…...
Android硬件通信之 蓝牙Mesh通信
一,简介 蓝牙4.0以下称为传统蓝牙,4.0以上是低功耗蓝牙,5.0开始主打物联网 5.0协议蓝牙最重要的技术就是Mesh组网,实现1对多,多对多的无线通信。即从点对点传输发展为网络拓扑结构,主要领域如灯光控制等&…...
PG数据库实现bool自动转smallint的方式
删除函数: 语法: DROP FUNCTION IF EXISTS your_schema_name.function_name(arg_type1, arg_type2) CASCADE RESTRICT; 实例: DROP FUNCTION IF EXISTS platformyw.boolean_to_smallint(bool) CASCADE RESTRICT; 查询是否存在函数 语法: SELE…...
易观千帆 | 2023年3月证券APP月活跃用户规模盘点
易观:2023年3月证券服务应用活跃人数14131.58万人,相较上月,环比增长0.61%,同比增长0.60%;2023年3月自营类证券服务应用Top10 活跃人数6221.44万人,环比增长0.08%;2023年3月第三方证券服务应用T…...
2023年江苏专转本成绩查询步骤
2023年江苏专转本成绩查询时间 2023年江苏专转本成绩查询时间预计在5月初,参加考试的考生,可以关注考试院发布的消息。江苏专转本考生可在规定时间内在省教育考试院网,在查询中心页面中输入准考证号和身份证号进行查询,或者拨…...
JavaScript中sort()函数
sort()函数是javascript中自带函数,这个函数的功能是排序。 使用sort()函数时,函数参数如果不设置的话,以默认方式进行排序,就是以字母顺序进行排序,准确的讲就是按照字符编码的顺序进行排序。 var arr [3,2,3,34,1…...
泰克Tektronix DPO5204B混合信号示波器
特征 带宽:2 GHz输入通道:4采样率:1 或 2 个通道上为 5 GS/s、10 GS/s记录长度:所有 4 个通道 25M,50M:1 或 2 个通道上升时间:175 皮秒MultiView zoom™ 记录长度高达 250 兆点>250,000 wf…...
UE5 学习系列(二)用户操作界面及介绍
这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...
业务系统对接大模型的基础方案:架构设计与关键步骤
业务系统对接大模型:架构设计与关键步骤 在当今数字化转型的浪潮中,大语言模型(LLM)已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中,不仅可以优化用户体验,还能为业务决策提供…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...
无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...
从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路
进入2025年以来,尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断,但全球市场热度依然高涨,入局者持续增加。 以国内市场为例,天眼查专业版数据显示,截至5月底,我国现存在业、存续状态的机器人相关企…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
Java编程之桥接模式
定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
(一)单例模式
一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...
