当前位置: 首页 > news >正文

flink cep数据源keyby union后 keybe失效

问题背景:cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警

策略条件:

选择了两个kafka的的topic的数据作为数据源,

对A 数据源 test-topic1, 进行条件过滤, 过滤条件为:login_type  = 1

对B 数据源 test-topic2,进行条件过滤,过滤条件为:login_type =  2

分组条件 为   src_ip,hostname两个字段进行分组

进行followby 关联。时间关联的最大时间间隔为  60秒

运行并行度设置为3

通过SourceStream打印的原始数据:

2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
1> {"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type ":"2"}

经过cep处理后,产了告警

产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"2"}]}

经过src_ip,和hostname分组后, 理论上应该只分组后的相同的 scr_ip,hostname进行事件关联告警

结果其他的分组数据也参和进来关联告警了。 

期望的是  login_type = 1 出现至少两次, 接着login_type=2的至少出现1次,且相同的src_ip和hostname

然后结果是下面数据也产生了告警。

{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":2}

怀疑是分组没生效。

然后debug数据源那块的方法kafkaStreamSource() 里面有进行分组,debug后发现确实也进行了keyby

后来找不到其他问题,纠结了下, 怀疑是不是 KeyedSteam.union(KeyedStream)后得到的就不是一个KeyedSteam了。 所以

出现问题的原始代码数据源代码:

//程序具体执行流程DataStream<JSONObject> sourceStream = SourceProcess.getKafkaStream(env, rule);DataStream<JSONObject> resultStream = TransformProcess.process(sourceStream, rule);SinkProcess.sink(resultStream, rule);public static DataStream<JSONObject> getKafkaStream(StreamExecutionEnvironment env, Rule rule) {DataStream<JSONObject> inputStream = null;List<Event> events = rule.getEvents();if (events.size() > SharingConstant.NUMBER_ZERO) {for (Event event : events) {FlinkKafkaConsumer<JSONObject> kafkaConsumer =new KafkaSourceFunction(rule, event).init();if (inputStream != null) {// 多条 stream 合成一条 streaminputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));} else {// 只有一条 streaminputStream = kafkaStreamSource(env, event, rule, kafkaConsumer);}}}return inputStream;}private static DataStream<JSONObject> kafkaStreamSource(StreamExecutionEnvironment env,Event event,Rule rule,FlinkKafkaConsumer<JSONObject> kafkaConsumer) {DataStream<JSONObject> inputStream = env.addSource(kafkaConsumer);// 对多个黑白名单查询进行循环String conditions = event.getConditions();while (conditions.contains(SharingConstant.ARGS_NAME)) {// 使用新的redis 数据结构,进行 s.include 过滤inputStream = AsyncDataStream.orderedWait(inputStream,new RedisNameListFilterSourceFunction(s,rule.getSettings().getRedis()),30,TimeUnit.SECONDS,2000);conditions = conditions.replace(s, "");}// 一般过滤处理inputStream = AsyncDataStream.orderedWait(inputStream,new Redis3SourceFunction(event, rule.getSettings().getRedis()), 30, TimeUnit.SECONDS, 2000);// kafka source 进行 keyBy 处理return KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}public static DataStream<JSONObject> keyedBy(DataStream<JSONObject> input, Map<String, String> groupBy) {if (null == groupBy || groupBy.isEmpty() ||"".equals(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO])){return input;}return input.keyBy(new TwoEventKeySelector(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO].toString()));}public class TwoEventKeySelector implements KeySelector<JSONObject, String> {private static final long serialVersionUID = 8534968406068735616L;private final String groupBy;public TwoEventKeySelector(String groupBy) {this.groupBy = groupBy;}@Overridepublic String getKey(JSONObject event) {StringBuilder keys = new StringBuilder();for (String key : groupBy.split(SharingConstant.DELIMITER_COMMA)) {keys.append(event.getString(key));}return keys.toString();}
}

问题出现在这里:

// 多条 stream 合成一条 stream
                    inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));

kafkaStreamSource()这个方法返回的是 KeyedStream ,

两个KeyedStream unio合并后,  本来以为返回时KeyedStream,结果确是DataStream类型,

结果导致cep分组不生效,一个告警中出现了其他分组的数据。

解决方法, 就是在cep pattern前 根据是否有分组条件再KeyedBy一次

  private static DataStream<JSONObject> patternProcess(DataStream<JSONObject> inputStream, Rule rule) {PatternGen patternGenerator = new PatternGen(rule.getPatterns(), rule.getWindow().getSize());Pattern<JSONObject, JSONObject> pattern = patternGenerator.getPattern();if (!rule.getGroupBy().isEmpty()){inputStream = KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}PatternStream<JSONObject> patternStream = CEP.pattern(inputStream, pattern);return patternStream.inProcessingTime().select(new RuleSelectFunction(rule.getAlarmInfo(), rule.getSelects()));

输入数据:

 {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860300012,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860299272,"create_time_desc":"2022-10-27 16:44:59","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666860300196,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"2"}

不产生告警,符合预期

再次输入同分组的数据:

2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}
产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}]}
告警输出:{"org_log_id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","event_category_id":1,"event_technique_type":"无","event_description":"1","alarm_first_time":1666860368471,"src_ip":"172.11.11.1","hostname":"hostname1","intelligence_id":"","strategy_category_id":"422596451785379862","intelligence_type":"","id":"cc1cd8cd-a626-4916-bdd3-539ea57e898f","event_nums":3,"event_category_label":"资源开发","severity":"info","create_time":1666860369647,"strategy_category_name":"网络威胁分析","rule_name":"ceptest","risk_score":1,"data_center":"guo-sen","baseline":[],"sop_id":"","event_device_type":"无","rule_id":214,"policy_type":"pattern","strategy_category":"/NetThreatAnalysis","internal_event":"1","event_name":"ceptest","event_model_source":"/RuleEngine/OnLine","alarm_last_time":1666860369478}

产生告警符合预期

相关文章:

flink cep数据源keyby union后 keybe失效

问题背景&#xff1a;cep模板 对数据源设置分组条件后&#xff0c;告警的数据&#xff0c;和分组条件对不上&#xff0c; 掺杂了&#xff0c;其他的不同组的数据&#xff0c;产生了告警 策略条件&#xff1a; 选择了两个kafka的的topic的数据作为数据源&#xff0c; 对A 数据…...

python中的继承与多态,dir()函数

Python继承 在继承关系中&#xff0c;已有的、设计好的类称为父类或基类&#xff0c;新设计的类称为子类或派生类。派生类可以继承父类的公有成员&#xff0c;但是不能继承其私有成员。如果需要在派生类中调用基类的方法&#xff0c;可以使用内置函数super()或者通过“基类名.…...

C++练级之初级:第五篇

C练级之初级&#xff1a;第五篇 第五篇 C练级之初级&#xff1a;第五篇1.auto关键字2.for循环改进3.指针空值nullptr4.内联函数4.1内联函数的概念4.2内联函数的注意点 总结 1.auto关键字 &#x1f914;什么是auto(automatic的缩写&#xff0c;自动的意思)关键字&#xff1f; au…...

JMeter的使用(二)

九、直连数据库 通过直连数据库让程序代替接口访问数据库&#xff0c;如果二者预期结果不一致&#xff0c;就找到了程序缺陷。 获取某条学院的名字&#xff0c;放在百度搜索: JMeter 不具备直连数据库功能&#xff0c;必须整合第三方(jar包)实现配置数据库的连接通过JDBC Re…...

C/C++文件操作/IO流

学习任务&#xff1a; ⭐认识文件。⭐学习C语言中文件如何打开和关闭。⭐学习C语言中文件的读写方法&#xff08;包括顺序读写和随机读写&#xff09;。⭐学习C语言文件操作中如何判断文件读取结束。⭐简单了解FILE缓冲区。⭐认识流。⭐学习C的IO流&#xff0c;包括标准IO流和文…...

推荐 7 个超牛的 Spring Cloud 实战项目

个 把一个大型的单个应用程序和服务拆分为数个甚至数十个的支持微服务&#xff0c;这就是微服务架构的架构概念&#xff0c;通过将功能分解到各个离散的服务中以实现对解决方案的解耦。 关于微服务相关的学习资料不多&#xff0c;而 GitHub 上的开源项目可以作为你微服务之旅…...

Linux信号:信号 信号集 信号集函数

1. 信号的概念 Linux进程间通信的方式之一。信号也称为“软件中断”。 信号特点&#xff1a; 简单&#xff1b;携带信息有限&#xff1b;满足特定条件才发送信号&#xff1b;可进行用户空间和内核空间进程的交互&#xff1b; 信号4要素&#xff1a; &#xff08;1&#xf…...

详解八大排序算法-附动图和源码(插入,希尔,选择,堆排序,冒泡,快速,归并,计数)

目录 &#x1f34f;一.排序的概念及应用&#x1f34f; 1.排序的概念 2.排序的应用 3.常用的排序算法 &#x1f34e;二.排序算法的实现&#x1f34e; 1.插入排序 1.1直接插入排序 1.2希尔排序&#xff08;缩小增量排序&#xff09; 2.选择排序 2.1直接选择排序 2.2堆排序…...

网络编程--协议、协议族、地址族

写在前面 这里先介绍下socket函数&#xff08;Windows版本&#xff09;的函数声明&#xff0c;后续内容均围绕该声明展开&#xff1a; #include <winsock2.h> //af: 指定该套接字的协议族 //type: 指定该套接字的数据传输方式 //protocol: 指定该套接字的最终协议 //返…...

Linux入门操作

pwd 查看当前目录 与 自动补全 文件详情 drwxrwxr-x d代表文件夹 -代表文件 其中rwx rwx r-x r是可读 w是可写 x 执行 第一组&#xff08;前三个&#xff09;指文件拥有者的权限 第二组&#xff08;中三个&#xff09;代表文件拥有的组的权限 第三组&#xff08;后三个&am…...

1。C语言基础知识回顾

学习嵌入式的C基础知识&#xff0c;主要包括几个核心知识点&#xff1a;三大语法结构、常用的数据类型、函数、结构体、指针、文件操作。 一、顺序结构 程序自上而下依次执行、没有分支、代码简单。 常见顺序结构有&#xff1a;四则运算&#xff1a;&#xff0c;-&#xff0…...

学习如何通过构建一个简单的JavaScript颜色游戏来操作DOM

学习如何通过构建一个简单的JavaScript颜色游戏来操作DOM 题目要求 我们将构建一个简单的颜色猜谜游戏。每次游戏启动时&#xff0c;都会选择一个随机的RGB颜色代码。根据游戏模式&#xff0c;我们将在屏幕上提供三个&#xff08;简单&#xff09;或六个&#xff08;困难&…...

【算法学习】—n皇后问题(回溯法)

【算法学习】—n皇后问题(回溯法) 1. 什么是回溯法&#xff1f; 相信"迷宫"是许多人儿时的回忆&#xff0c;大家小时候一定都玩过迷宫游戏。我们从不用别人教&#xff0c;都知道走迷宫的策略是&#xff1a; 当遇到一个岔路口&#xff0c;会有以下两种情况&#xf…...

万亿OTA市场进入新爆发期,2025或迎中国汽车软件付费元年

伴随智能汽车市场规模发展&#xff0c;越来越多的汽车产品具备OTA能力&#xff0c;功能的优化、以及服务的差异化&#xff0c;成为了车企竞争的新战场。 例如&#xff0c;今年初&#xff0c;问界M5 EV迎来了首次OTA升级&#xff0c;升级内容覆盖用户在实际用车中的多个场景&am…...

Android硬件通信之 蓝牙Mesh通信

一&#xff0c;简介 蓝牙4.0以下称为传统蓝牙&#xff0c;4.0以上是低功耗蓝牙&#xff0c;5.0开始主打物联网 5.0协议蓝牙最重要的技术就是Mesh组网&#xff0c;实现1对多&#xff0c;多对多的无线通信。即从点对点传输发展为网络拓扑结构&#xff0c;主要领域如灯光控制等&…...

PG数据库实现bool自动转smallint的方式

删除函数&#xff1a; 语法&#xff1a; DROP FUNCTION IF EXISTS your_schema_name.function_name(arg_type1, arg_type2) CASCADE RESTRICT; 实例&#xff1a; DROP FUNCTION IF EXISTS platformyw.boolean_to_smallint(bool) CASCADE RESTRICT; 查询是否存在函数 语法: SELE…...

易观千帆 | 2023年3月证券APP月活跃用户规模盘点

易观&#xff1a;2023年3月证券服务应用活跃人数14131.58万人&#xff0c;相较上月&#xff0c;环比增长0.61%&#xff0c;同比增长0.60%&#xff1b;2023年3月自营类证券服务应用Top10 活跃人数6221.44万人&#xff0c;环比增长0.08%&#xff1b;2023年3月第三方证券服务应用T…...

2023年江苏专转本成绩查询步骤

2023年江苏专转本成绩查询时间 2023年江苏专转本成绩查询时间预计在5月初&#xff0c;参加考试的考生&#xff0c;可以关注考试院发布的消息。江苏专转本考生可在规定时间内在省教育考试院网&#xff0c;在查询中心页面中输入准考证号和身份证号进行查询&#xff0c;或者拨…...

JavaScript中sort()函数

sort()函数是javascript中自带函数&#xff0c;这个函数的功能是排序。 使用sort()函数时&#xff0c;函数参数如果不设置的话&#xff0c;以默认方式进行排序&#xff0c;就是以字母顺序进行排序&#xff0c;准确的讲就是按照字符编码的顺序进行排序。 var arr [3,2,3,34,1…...

泰克Tektronix DPO5204B混合信号示波器

特征 带宽&#xff1a;2 GHz输入通道&#xff1a;4采样率&#xff1a;1 或 2 个通道上为 5 GS/s、10 GS/s记录长度&#xff1a;所有 4 个通道 25M&#xff0c;50M&#xff1a;1 或 2 个通道上升时间&#xff1a;175 皮秒MultiView zoom™ 记录长度高达 250 兆点>250,000 wf…...

突破传统监测模式:业务状态监控HM的新思路

作者&#xff1a;京东保险 管顺利 一、传统监控系统的盲区&#xff0c;如何打造业务状态监控。 在系统架构设计中非常重要的一环是要做数据监控和数据最终一致性&#xff0c;关于一致性的补偿&#xff0c;已经由算法部的大佬总结过就不在赘述。这里主要讲如何去补偿&#xff…...

0Ω电阻在PCB板中的5大常见作用

在PCB板中&#xff0c;时常见到一些阻值为0Ω的电阻。我们都知道&#xff0c;在电路中&#xff0c;电阻的作用是阻碍电流&#xff0c;而0Ω电阻显然失去了这个作用。那它存在于PCB板中的原因是什么呢&#xff1f;今天我们一探究竟。 1、充当跳线 在电路中&#xff0c;0Ω电阻…...

分布式消息队列Kafka(三)- 服务节点Broker

1.Kafka Broker 工作流程 &#xff08;1&#xff09;zookeeper中存储的kafka信息 ​ 1&#xff09;启动 Zookeeper 客户端。 [zrclasshadoop102 zookeeper-3.5.7]$ bin/zkCli.sh ​ 2&#xff09;通过 ls 命令可以查看 kafka 相关信息。 [zk: localhost:2181(CONNECTED) 2]…...

蠕动泵说明书_RDB

RDB_2T-S蠕 动 泵 概述 蠕动灌装泵是一种高性能、高质量的泵。采用先进的微处理技术及通讯方式做成的控制器和步进电机驱动器&#xff0c;配以诚合最新研制出的泵头&#xff0c;使产品在稳定性、先进性和性价比上达到一个新的高度。适用饮料、保健品、制药、精细化工等诸流量…...

浅谈react如何自定义hooks

react 自定义 hooks 简介 一句话&#xff1a;使用自定义hooks可以将某些组件逻辑提取到可重用的函数中。 自定义hooks是一个从use开始的调用其他hooks的Javascript函数。 下面以一个案例: 新闻发布操作&#xff0c;来简单说一下react 自定义 hooks。 不使用自定义hooks时 …...

如何优雅的写个try catch的方式!

软件开发过程中&#xff0c;不可避免的是需要处理各种异常&#xff0c;就我自己来说&#xff0c;至少有一半以上的时间都是在处理各种异常情况&#xff0c;所以代码中就会出现大量的try {...} catch {...} finally {...} 代码块&#xff0c;不仅有大量的冗余代码&#xff0c;而…...

海尔智家:智慧场景掌握「主动」权,用户体验才有话语权

2023年1月&#xff0c;《福布斯》AI专栏作家Rob Toews发布了年度AI发展预测&#xff0c;指出人工智能的发展将带来涉及各行业、跨学科领域的深远影响。变革将至&#xff0c;全球已掀起生成式AI热&#xff0c;以自然语言处理为代表的人工智能技术在快速进化&#xff0c;积极拥抱…...

基于铜锁,在前端对登录密码进行加密,实现隐私数据保密性

本文将基于 铜锁&#xff08;tongsuo&#xff09;开源基础密码库实现前端对用户登录密码的加密&#xff0c;从而实现前端隐私数据的保密性。 首先&#xff0c;铜锁密码库是一个提供现代密码学算法和安全通信协议的开源基础密码库&#xff0c;在中国商用密码算法&#xff0c;例…...

LVS的小总结

LVS的工作模式及其工作过程&#xff1a; LVS 有三种负载均衡的模式&#xff0c;分别是VS/NAT&#xff08;nat 模式&#xff09;、VS/DR&#xff08;路由模式&#xff09;、VS/TUN&#xff08;隧道模式&#xff09;。 1、NAT模式&#xff08;NAT模式&#xff09; 原理&#x…...

Spring依赖注入(DI配置)

Spring依赖注入 1. 依赖注入方式【重点】1.1 依赖注入的两种方式1.2 setter方式注入问题导入引用类型简单类型 1.3 构造方式注入问题导入引用类型简单类型参数适配【了解】 1.4 依赖注入方式选择 2. 依赖自动装配【理解】问题导入2.1 自动装配概念2.2 自动装配类型依赖自动装配…...