flink cep数据源keyby union后 keybe失效
问题背景:cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警
策略条件:
选择了两个kafka的的topic的数据作为数据源,
对A 数据源 test-topic1, 进行条件过滤, 过滤条件为:login_type = 1
对B 数据源 test-topic2,进行条件过滤,过滤条件为:login_type = 2
分组条件 为 src_ip,hostname两个字段进行分组
进行followby 关联。时间关联的最大时间间隔为 60秒
运行并行度设置为3
通过SourceStream打印的原始数据:
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
1> {"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type ":"2"}
经过cep处理后,产了告警
产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"2"}]}
经过src_ip,和hostname分组后, 理论上应该只分组后的相同的 scr_ip,hostname进行事件关联告警
结果其他的分组数据也参和进来关联告警了。
期望的是 login_type = 1 出现至少两次, 接着login_type=2的至少出现1次,且相同的src_ip和hostname
然后结果是下面数据也产生了告警。
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":2}
怀疑是分组没生效。
然后debug数据源那块的方法kafkaStreamSource() 里面有进行分组,debug后发现确实也进行了keyby
后来找不到其他问题,纠结了下, 怀疑是不是 KeyedSteam.union(KeyedStream)后得到的就不是一个KeyedSteam了。 所以
出现问题的原始代码数据源代码:
//程序具体执行流程DataStream<JSONObject> sourceStream = SourceProcess.getKafkaStream(env, rule);DataStream<JSONObject> resultStream = TransformProcess.process(sourceStream, rule);SinkProcess.sink(resultStream, rule);public static DataStream<JSONObject> getKafkaStream(StreamExecutionEnvironment env, Rule rule) {DataStream<JSONObject> inputStream = null;List<Event> events = rule.getEvents();if (events.size() > SharingConstant.NUMBER_ZERO) {for (Event event : events) {FlinkKafkaConsumer<JSONObject> kafkaConsumer =new KafkaSourceFunction(rule, event).init();if (inputStream != null) {// 多条 stream 合成一条 streaminputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));} else {// 只有一条 streaminputStream = kafkaStreamSource(env, event, rule, kafkaConsumer);}}}return inputStream;}private static DataStream<JSONObject> kafkaStreamSource(StreamExecutionEnvironment env,Event event,Rule rule,FlinkKafkaConsumer<JSONObject> kafkaConsumer) {DataStream<JSONObject> inputStream = env.addSource(kafkaConsumer);// 对多个黑白名单查询进行循环String conditions = event.getConditions();while (conditions.contains(SharingConstant.ARGS_NAME)) {// 使用新的redis 数据结构,进行 s.include 过滤inputStream = AsyncDataStream.orderedWait(inputStream,new RedisNameListFilterSourceFunction(s,rule.getSettings().getRedis()),30,TimeUnit.SECONDS,2000);conditions = conditions.replace(s, "");}// 一般过滤处理inputStream = AsyncDataStream.orderedWait(inputStream,new Redis3SourceFunction(event, rule.getSettings().getRedis()), 30, TimeUnit.SECONDS, 2000);// kafka source 进行 keyBy 处理return KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}public static DataStream<JSONObject> keyedBy(DataStream<JSONObject> input, Map<String, String> groupBy) {if (null == groupBy || groupBy.isEmpty() ||"".equals(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO])){return input;}return input.keyBy(new TwoEventKeySelector(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO].toString()));}public class TwoEventKeySelector implements KeySelector<JSONObject, String> {private static final long serialVersionUID = 8534968406068735616L;private final String groupBy;public TwoEventKeySelector(String groupBy) {this.groupBy = groupBy;}@Overridepublic String getKey(JSONObject event) {StringBuilder keys = new StringBuilder();for (String key : groupBy.split(SharingConstant.DELIMITER_COMMA)) {keys.append(event.getString(key));}return keys.toString();}
}
问题出现在这里:
// 多条 stream 合成一条 stream
inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));
kafkaStreamSource()这个方法返回的是 KeyedStream ,
两个KeyedStream unio合并后, 本来以为返回时KeyedStream,结果确是DataStream类型,
结果导致cep分组不生效,一个告警中出现了其他分组的数据。
解决方法, 就是在cep pattern前 根据是否有分组条件再KeyedBy一次
private static DataStream<JSONObject> patternProcess(DataStream<JSONObject> inputStream, Rule rule) {PatternGen patternGenerator = new PatternGen(rule.getPatterns(), rule.getWindow().getSize());Pattern<JSONObject, JSONObject> pattern = patternGenerator.getPattern();if (!rule.getGroupBy().isEmpty()){inputStream = KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}PatternStream<JSONObject> patternStream = CEP.pattern(inputStream, pattern);return patternStream.inProcessingTime().select(new RuleSelectFunction(rule.getAlarmInfo(), rule.getSelects()));
输入数据:
{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860300012,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860299272,"create_time_desc":"2022-10-27 16:44:59","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666860300196,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"2"}
不产生告警,符合预期
再次输入同分组的数据:
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}
产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}]}
告警输出:{"org_log_id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","event_category_id":1,"event_technique_type":"无","event_description":"1","alarm_first_time":1666860368471,"src_ip":"172.11.11.1","hostname":"hostname1","intelligence_id":"","strategy_category_id":"422596451785379862","intelligence_type":"","id":"cc1cd8cd-a626-4916-bdd3-539ea57e898f","event_nums":3,"event_category_label":"资源开发","severity":"info","create_time":1666860369647,"strategy_category_name":"网络威胁分析","rule_name":"ceptest","risk_score":1,"data_center":"guo-sen","baseline":[],"sop_id":"","event_device_type":"无","rule_id":214,"policy_type":"pattern","strategy_category":"/NetThreatAnalysis","internal_event":"1","event_name":"ceptest","event_model_source":"/RuleEngine/OnLine","alarm_last_time":1666860369478}
产生告警符合预期
相关文章:
flink cep数据源keyby union后 keybe失效
问题背景:cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警 策略条件: 选择了两个kafka的的topic的数据作为数据源, 对A 数据…...
python中的继承与多态,dir()函数
Python继承 在继承关系中,已有的、设计好的类称为父类或基类,新设计的类称为子类或派生类。派生类可以继承父类的公有成员,但是不能继承其私有成员。如果需要在派生类中调用基类的方法,可以使用内置函数super()或者通过“基类名.…...
C++练级之初级:第五篇
C练级之初级:第五篇 第五篇 C练级之初级:第五篇1.auto关键字2.for循环改进3.指针空值nullptr4.内联函数4.1内联函数的概念4.2内联函数的注意点 总结 1.auto关键字 🤔什么是auto(automatic的缩写,自动的意思)关键字? au…...
JMeter的使用(二)
九、直连数据库 通过直连数据库让程序代替接口访问数据库,如果二者预期结果不一致,就找到了程序缺陷。 获取某条学院的名字,放在百度搜索: JMeter 不具备直连数据库功能,必须整合第三方(jar包)实现配置数据库的连接通过JDBC Re…...
C/C++文件操作/IO流
学习任务: ⭐认识文件。⭐学习C语言中文件如何打开和关闭。⭐学习C语言中文件的读写方法(包括顺序读写和随机读写)。⭐学习C语言文件操作中如何判断文件读取结束。⭐简单了解FILE缓冲区。⭐认识流。⭐学习C的IO流,包括标准IO流和文…...
推荐 7 个超牛的 Spring Cloud 实战项目
个 把一个大型的单个应用程序和服务拆分为数个甚至数十个的支持微服务,这就是微服务架构的架构概念,通过将功能分解到各个离散的服务中以实现对解决方案的解耦。 关于微服务相关的学习资料不多,而 GitHub 上的开源项目可以作为你微服务之旅…...
Linux信号:信号 信号集 信号集函数
1. 信号的概念 Linux进程间通信的方式之一。信号也称为“软件中断”。 信号特点: 简单;携带信息有限;满足特定条件才发送信号;可进行用户空间和内核空间进程的交互; 信号4要素: (1…...
详解八大排序算法-附动图和源码(插入,希尔,选择,堆排序,冒泡,快速,归并,计数)
目录 🍏一.排序的概念及应用🍏 1.排序的概念 2.排序的应用 3.常用的排序算法 🍎二.排序算法的实现🍎 1.插入排序 1.1直接插入排序 1.2希尔排序(缩小增量排序) 2.选择排序 2.1直接选择排序 2.2堆排序…...
网络编程--协议、协议族、地址族
写在前面 这里先介绍下socket函数(Windows版本)的函数声明,后续内容均围绕该声明展开: #include <winsock2.h> //af: 指定该套接字的协议族 //type: 指定该套接字的数据传输方式 //protocol: 指定该套接字的最终协议 //返…...
Linux入门操作
pwd 查看当前目录 与 自动补全 文件详情 drwxrwxr-x d代表文件夹 -代表文件 其中rwx rwx r-x r是可读 w是可写 x 执行 第一组(前三个)指文件拥有者的权限 第二组(中三个)代表文件拥有的组的权限 第三组(后三个&am…...
1。C语言基础知识回顾
学习嵌入式的C基础知识,主要包括几个核心知识点:三大语法结构、常用的数据类型、函数、结构体、指针、文件操作。 一、顺序结构 程序自上而下依次执行、没有分支、代码简单。 常见顺序结构有:四则运算:,-࿰…...
学习如何通过构建一个简单的JavaScript颜色游戏来操作DOM
学习如何通过构建一个简单的JavaScript颜色游戏来操作DOM 题目要求 我们将构建一个简单的颜色猜谜游戏。每次游戏启动时,都会选择一个随机的RGB颜色代码。根据游戏模式,我们将在屏幕上提供三个(简单)或六个(困难&…...
【算法学习】—n皇后问题(回溯法)
【算法学习】—n皇后问题(回溯法) 1. 什么是回溯法? 相信"迷宫"是许多人儿时的回忆,大家小时候一定都玩过迷宫游戏。我们从不用别人教,都知道走迷宫的策略是: 当遇到一个岔路口,会有以下两种情况…...
万亿OTA市场进入新爆发期,2025或迎中国汽车软件付费元年
伴随智能汽车市场规模发展,越来越多的汽车产品具备OTA能力,功能的优化、以及服务的差异化,成为了车企竞争的新战场。 例如,今年初,问界M5 EV迎来了首次OTA升级,升级内容覆盖用户在实际用车中的多个场景&am…...
Android硬件通信之 蓝牙Mesh通信
一,简介 蓝牙4.0以下称为传统蓝牙,4.0以上是低功耗蓝牙,5.0开始主打物联网 5.0协议蓝牙最重要的技术就是Mesh组网,实现1对多,多对多的无线通信。即从点对点传输发展为网络拓扑结构,主要领域如灯光控制等&…...
PG数据库实现bool自动转smallint的方式
删除函数: 语法: DROP FUNCTION IF EXISTS your_schema_name.function_name(arg_type1, arg_type2) CASCADE RESTRICT; 实例: DROP FUNCTION IF EXISTS platformyw.boolean_to_smallint(bool) CASCADE RESTRICT; 查询是否存在函数 语法: SELE…...
易观千帆 | 2023年3月证券APP月活跃用户规模盘点
易观:2023年3月证券服务应用活跃人数14131.58万人,相较上月,环比增长0.61%,同比增长0.60%;2023年3月自营类证券服务应用Top10 活跃人数6221.44万人,环比增长0.08%;2023年3月第三方证券服务应用T…...
2023年江苏专转本成绩查询步骤
2023年江苏专转本成绩查询时间 2023年江苏专转本成绩查询时间预计在5月初,参加考试的考生,可以关注考试院发布的消息。江苏专转本考生可在规定时间内在省教育考试院网,在查询中心页面中输入准考证号和身份证号进行查询,或者拨…...
JavaScript中sort()函数
sort()函数是javascript中自带函数,这个函数的功能是排序。 使用sort()函数时,函数参数如果不设置的话,以默认方式进行排序,就是以字母顺序进行排序,准确的讲就是按照字符编码的顺序进行排序。 var arr [3,2,3,34,1…...
泰克Tektronix DPO5204B混合信号示波器
特征 带宽:2 GHz输入通道:4采样率:1 或 2 个通道上为 5 GS/s、10 GS/s记录长度:所有 4 个通道 25M,50M:1 或 2 个通道上升时间:175 皮秒MultiView zoom™ 记录长度高达 250 兆点>250,000 wf…...
零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?
一、核心优势:专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发,是一款收费低廉但功能全面的Windows NAS工具,主打“无学习成本部署” 。与其他NAS软件相比,其优势在于: 无需硬件改造:将任意W…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...
Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...
淘宝扭蛋机小程序系统开发:打造互动性强的购物平台
淘宝扭蛋机小程序系统的开发,旨在打造一个互动性强的购物平台,让用户在购物的同时,能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机,实现旋转、抽拉等动作,增…...
JS红宝书笔记 - 3.3 变量
要定义变量,可以使用var操作符,后跟变量名 ES实现变量初始化,因此可以同时定义变量并设置它的值 使用var操作符定义的变量会成为包含它的函数的局部变量。 在函数内定义变量时省略var操作符,可以创建一个全局变量 如果需要定义…...
