flink cep数据源keyby union后 keybe失效
问题背景:cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警
策略条件:
选择了两个kafka的的topic的数据作为数据源,
对A 数据源 test-topic1, 进行条件过滤, 过滤条件为:login_type = 1
对B 数据源 test-topic2,进行条件过滤,过滤条件为:login_type = 2
分组条件 为 src_ip,hostname两个字段进行分组
进行followby 关联。时间关联的最大时间间隔为 60秒
运行并行度设置为3
通过SourceStream打印的原始数据:
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
1> {"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type ":"2"}
经过cep处理后,产了告警
产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"2"}]}
经过src_ip,和hostname分组后, 理论上应该只分组后的相同的 scr_ip,hostname进行事件关联告警
结果其他的分组数据也参和进来关联告警了。
期望的是 login_type = 1 出现至少两次, 接着login_type=2的至少出现1次,且相同的src_ip和hostname
然后结果是下面数据也产生了告警。
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":2}
怀疑是分组没生效。
然后debug数据源那块的方法kafkaStreamSource() 里面有进行分组,debug后发现确实也进行了keyby
后来找不到其他问题,纠结了下, 怀疑是不是 KeyedSteam.union(KeyedStream)后得到的就不是一个KeyedSteam了。 所以
出现问题的原始代码数据源代码:
//程序具体执行流程DataStream<JSONObject> sourceStream = SourceProcess.getKafkaStream(env, rule);DataStream<JSONObject> resultStream = TransformProcess.process(sourceStream, rule);SinkProcess.sink(resultStream, rule);public static DataStream<JSONObject> getKafkaStream(StreamExecutionEnvironment env, Rule rule) {DataStream<JSONObject> inputStream = null;List<Event> events = rule.getEvents();if (events.size() > SharingConstant.NUMBER_ZERO) {for (Event event : events) {FlinkKafkaConsumer<JSONObject> kafkaConsumer =new KafkaSourceFunction(rule, event).init();if (inputStream != null) {// 多条 stream 合成一条 streaminputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));} else {// 只有一条 streaminputStream = kafkaStreamSource(env, event, rule, kafkaConsumer);}}}return inputStream;}private static DataStream<JSONObject> kafkaStreamSource(StreamExecutionEnvironment env,Event event,Rule rule,FlinkKafkaConsumer<JSONObject> kafkaConsumer) {DataStream<JSONObject> inputStream = env.addSource(kafkaConsumer);// 对多个黑白名单查询进行循环String conditions = event.getConditions();while (conditions.contains(SharingConstant.ARGS_NAME)) {// 使用新的redis 数据结构,进行 s.include 过滤inputStream = AsyncDataStream.orderedWait(inputStream,new RedisNameListFilterSourceFunction(s,rule.getSettings().getRedis()),30,TimeUnit.SECONDS,2000);conditions = conditions.replace(s, "");}// 一般过滤处理inputStream = AsyncDataStream.orderedWait(inputStream,new Redis3SourceFunction(event, rule.getSettings().getRedis()), 30, TimeUnit.SECONDS, 2000);// kafka source 进行 keyBy 处理return KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}public static DataStream<JSONObject> keyedBy(DataStream<JSONObject> input, Map<String, String> groupBy) {if (null == groupBy || groupBy.isEmpty() ||"".equals(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO])){return input;}return input.keyBy(new TwoEventKeySelector(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO].toString()));}public class TwoEventKeySelector implements KeySelector<JSONObject, String> {private static final long serialVersionUID = 8534968406068735616L;private final String groupBy;public TwoEventKeySelector(String groupBy) {this.groupBy = groupBy;}@Overridepublic String getKey(JSONObject event) {StringBuilder keys = new StringBuilder();for (String key : groupBy.split(SharingConstant.DELIMITER_COMMA)) {keys.append(event.getString(key));}return keys.toString();}
}
问题出现在这里:
// 多条 stream 合成一条 stream
inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));
kafkaStreamSource()这个方法返回的是 KeyedStream ,
两个KeyedStream unio合并后, 本来以为返回时KeyedStream,结果确是DataStream类型,
结果导致cep分组不生效,一个告警中出现了其他分组的数据。
解决方法, 就是在cep pattern前 根据是否有分组条件再KeyedBy一次
private static DataStream<JSONObject> patternProcess(DataStream<JSONObject> inputStream, Rule rule) {PatternGen patternGenerator = new PatternGen(rule.getPatterns(), rule.getWindow().getSize());Pattern<JSONObject, JSONObject> pattern = patternGenerator.getPattern();if (!rule.getGroupBy().isEmpty()){inputStream = KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}PatternStream<JSONObject> patternStream = CEP.pattern(inputStream, pattern);return patternStream.inProcessingTime().select(new RuleSelectFunction(rule.getAlarmInfo(), rule.getSelects()));
输入数据:
{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860300012,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860299272,"create_time_desc":"2022-10-27 16:44:59","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666860300196,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"2"}
不产生告警,符合预期
再次输入同分组的数据:
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}
产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}]}
告警输出:{"org_log_id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","event_category_id":1,"event_technique_type":"无","event_description":"1","alarm_first_time":1666860368471,"src_ip":"172.11.11.1","hostname":"hostname1","intelligence_id":"","strategy_category_id":"422596451785379862","intelligence_type":"","id":"cc1cd8cd-a626-4916-bdd3-539ea57e898f","event_nums":3,"event_category_label":"资源开发","severity":"info","create_time":1666860369647,"strategy_category_name":"网络威胁分析","rule_name":"ceptest","risk_score":1,"data_center":"guo-sen","baseline":[],"sop_id":"","event_device_type":"无","rule_id":214,"policy_type":"pattern","strategy_category":"/NetThreatAnalysis","internal_event":"1","event_name":"ceptest","event_model_source":"/RuleEngine/OnLine","alarm_last_time":1666860369478}
产生告警符合预期
相关文章:
flink cep数据源keyby union后 keybe失效
问题背景:cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警 策略条件: 选择了两个kafka的的topic的数据作为数据源, 对A 数据…...
python中的继承与多态,dir()函数
Python继承 在继承关系中,已有的、设计好的类称为父类或基类,新设计的类称为子类或派生类。派生类可以继承父类的公有成员,但是不能继承其私有成员。如果需要在派生类中调用基类的方法,可以使用内置函数super()或者通过“基类名.…...
C++练级之初级:第五篇
C练级之初级:第五篇 第五篇 C练级之初级:第五篇1.auto关键字2.for循环改进3.指针空值nullptr4.内联函数4.1内联函数的概念4.2内联函数的注意点 总结 1.auto关键字 🤔什么是auto(automatic的缩写,自动的意思)关键字? au…...
JMeter的使用(二)
九、直连数据库 通过直连数据库让程序代替接口访问数据库,如果二者预期结果不一致,就找到了程序缺陷。 获取某条学院的名字,放在百度搜索: JMeter 不具备直连数据库功能,必须整合第三方(jar包)实现配置数据库的连接通过JDBC Re…...
C/C++文件操作/IO流
学习任务: ⭐认识文件。⭐学习C语言中文件如何打开和关闭。⭐学习C语言中文件的读写方法(包括顺序读写和随机读写)。⭐学习C语言文件操作中如何判断文件读取结束。⭐简单了解FILE缓冲区。⭐认识流。⭐学习C的IO流,包括标准IO流和文…...
推荐 7 个超牛的 Spring Cloud 实战项目
个 把一个大型的单个应用程序和服务拆分为数个甚至数十个的支持微服务,这就是微服务架构的架构概念,通过将功能分解到各个离散的服务中以实现对解决方案的解耦。 关于微服务相关的学习资料不多,而 GitHub 上的开源项目可以作为你微服务之旅…...
Linux信号:信号 信号集 信号集函数
1. 信号的概念 Linux进程间通信的方式之一。信号也称为“软件中断”。 信号特点: 简单;携带信息有限;满足特定条件才发送信号;可进行用户空间和内核空间进程的交互; 信号4要素: (1…...
详解八大排序算法-附动图和源码(插入,希尔,选择,堆排序,冒泡,快速,归并,计数)
目录 🍏一.排序的概念及应用🍏 1.排序的概念 2.排序的应用 3.常用的排序算法 🍎二.排序算法的实现🍎 1.插入排序 1.1直接插入排序 1.2希尔排序(缩小增量排序) 2.选择排序 2.1直接选择排序 2.2堆排序…...
网络编程--协议、协议族、地址族
写在前面 这里先介绍下socket函数(Windows版本)的函数声明,后续内容均围绕该声明展开: #include <winsock2.h> //af: 指定该套接字的协议族 //type: 指定该套接字的数据传输方式 //protocol: 指定该套接字的最终协议 //返…...
Linux入门操作
pwd 查看当前目录 与 自动补全 文件详情 drwxrwxr-x d代表文件夹 -代表文件 其中rwx rwx r-x r是可读 w是可写 x 执行 第一组(前三个)指文件拥有者的权限 第二组(中三个)代表文件拥有的组的权限 第三组(后三个&am…...
1。C语言基础知识回顾
学习嵌入式的C基础知识,主要包括几个核心知识点:三大语法结构、常用的数据类型、函数、结构体、指针、文件操作。 一、顺序结构 程序自上而下依次执行、没有分支、代码简单。 常见顺序结构有:四则运算:,-࿰…...
学习如何通过构建一个简单的JavaScript颜色游戏来操作DOM
学习如何通过构建一个简单的JavaScript颜色游戏来操作DOM 题目要求 我们将构建一个简单的颜色猜谜游戏。每次游戏启动时,都会选择一个随机的RGB颜色代码。根据游戏模式,我们将在屏幕上提供三个(简单)或六个(困难&…...
【算法学习】—n皇后问题(回溯法)
【算法学习】—n皇后问题(回溯法) 1. 什么是回溯法? 相信"迷宫"是许多人儿时的回忆,大家小时候一定都玩过迷宫游戏。我们从不用别人教,都知道走迷宫的策略是: 当遇到一个岔路口,会有以下两种情况…...
万亿OTA市场进入新爆发期,2025或迎中国汽车软件付费元年
伴随智能汽车市场规模发展,越来越多的汽车产品具备OTA能力,功能的优化、以及服务的差异化,成为了车企竞争的新战场。 例如,今年初,问界M5 EV迎来了首次OTA升级,升级内容覆盖用户在实际用车中的多个场景&am…...
Android硬件通信之 蓝牙Mesh通信
一,简介 蓝牙4.0以下称为传统蓝牙,4.0以上是低功耗蓝牙,5.0开始主打物联网 5.0协议蓝牙最重要的技术就是Mesh组网,实现1对多,多对多的无线通信。即从点对点传输发展为网络拓扑结构,主要领域如灯光控制等&…...
PG数据库实现bool自动转smallint的方式
删除函数: 语法: DROP FUNCTION IF EXISTS your_schema_name.function_name(arg_type1, arg_type2) CASCADE RESTRICT; 实例: DROP FUNCTION IF EXISTS platformyw.boolean_to_smallint(bool) CASCADE RESTRICT; 查询是否存在函数 语法: SELE…...
易观千帆 | 2023年3月证券APP月活跃用户规模盘点
易观:2023年3月证券服务应用活跃人数14131.58万人,相较上月,环比增长0.61%,同比增长0.60%;2023年3月自营类证券服务应用Top10 活跃人数6221.44万人,环比增长0.08%;2023年3月第三方证券服务应用T…...
2023年江苏专转本成绩查询步骤
2023年江苏专转本成绩查询时间 2023年江苏专转本成绩查询时间预计在5月初,参加考试的考生,可以关注考试院发布的消息。江苏专转本考生可在规定时间内在省教育考试院网,在查询中心页面中输入准考证号和身份证号进行查询,或者拨…...
JavaScript中sort()函数
sort()函数是javascript中自带函数,这个函数的功能是排序。 使用sort()函数时,函数参数如果不设置的话,以默认方式进行排序,就是以字母顺序进行排序,准确的讲就是按照字符编码的顺序进行排序。 var arr [3,2,3,34,1…...
泰克Tektronix DPO5204B混合信号示波器
特征 带宽:2 GHz输入通道:4采样率:1 或 2 个通道上为 5 GS/s、10 GS/s记录长度:所有 4 个通道 25M,50M:1 或 2 个通道上升时间:175 皮秒MultiView zoom™ 记录长度高达 250 兆点>250,000 wf…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
FFmpeg:Windows系统小白安装及其使用
一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】,注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录(即exe所在文件夹)加入系统变量…...
PostgreSQL——环境搭建
一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在࿰…...
Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成
一个面向 Java 开发者的 Sring-Ai 示例工程项目,该项目是一个 Spring AI 快速入门的样例工程项目,旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计,每个模块都专注于特定的功能领域,便于学习和…...
