当前位置: 首页 > news >正文

flink学习之广播流与合流操作demo

广播流是什么?

将一条数据广播到所有的节点。使用 dataStream.broadCast()

广播流使用场景?

一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复

vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。

广播流怎么用?

一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流

不多说直接上demo,开箱即用

package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Pattern> patternDataStream = env.addSource(new ChangeSource());DataStream<User> userDataStream = env.addSource(new CustomerSource());userDataStream.print("user");patternDataStream.print("pattern");//test1  直接合流 不广播。只会在一个节点更新。 用于特殊需求?
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream)
//                .process(new CustomerSimpleProcess())
//                .print();//test2// 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream.broadcast())
//                .process(new CustomerSimpleProcess())
//                        .print();//test3MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStream<Pattern> broadcast = patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user -> user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction<Integer, User, Pattern, String> {@Overridepublic void processElement(User user, KeyedBroadcastProcessFunction<Integer, User, Pattern, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {Integer userVip = user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!=null){Integer patternVip = pattern.vip;String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>= patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}else {System.out.println("pattern is null ");}}@Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<Integer,User, Pattern, String>.Context context, Collector<String> collector) throws Exception {BroadcastState<Void, Pattern> bcState = context.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunction<User, Pattern, String> {ValueState<Integer> vip; //这个是保留主流的state的。 不是保留广播流的stateHashMap<String,Integer> vipMap;@Overridepublic void open(Configuration parameters) throws Exception {vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));vipMap=new HashMap<String,Integer>();super.open(parameters);}@Overridepublic void processElement1(User user, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {Integer userVip = user.getVip();Integer patternVip = vipMap.getOrDefault("vip", 0);String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>=patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}@Overridepublic void processElement2(Pattern pattern, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {vipMap.put("vip",pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId = userId;this.vip = vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Action{" +"userId=" + userId +", vip='" + vip + '\'' +'}';}}// 定义行为模式POJO类,包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Pattern{" +"vip='" + vip + '\'' +'}';}}private static class CustomerSource implements SourceFunction<User> {boolean run = true;@Overridepublic void run(SourceContext<User> sourceContext) throws Exception {while (true) {Integer userId = new Random().nextInt(1000);Integer vip = new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}@Overridepublic void cancel() {run = false;}}private static class ChangeSource implements SourceFunction<Pattern> {boolean run = true;@Overridepublic void run(SourceContext<Pattern> sourceContext) throws Exception {int i = 1;while (true) {sourceContext.collect(new Pattern(i++));Thread.sleep(5000);}}@Overridepublic void cancel() {run = false;}}}

demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。

test1 不广播

注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2

但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1

test2 map保存变化数据

test3通过描述器获取数据

和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。

相关文章:

flink学习之广播流与合流操作demo

广播流是什么&#xff1f; 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景&#xff1f; 一般用于动态加载配置项。比如lol&#xff0c;每天不断有人再投诉举报&#xff0c;客服根本忙不过来&#xff0c;腾讯内部做了一个判断&#xff0c;只有vip3…...

PPT架构师架构技能图

PPT架构师架构技能图 目录概述需求&#xff1a; 设计思路实现思路分析1.软素质2.核心输出&#xff08;office输出&#xff09; 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,ma…...

STM32微控制器的低功耗模式

STM32微控制器的低功耗模式(Low-power modes):Sleep mode、Stop mode 和 Standby mode。 1.1 Sleep Mode(睡眠模式): 把STM32微控制器当作一位劳累的工人,他在工作过程中需要短暂的休息。在Sleep模式下,微控制器会关闭一部分电路,减小功耗,但仍然保持对中央处理单…...

tensorflow QAT

tensorflow qat https://www.wpgdadatong.com/tw/blog/detail/70672 在边缘运算的重点技术之中&#xff0c;除了简化复杂的模块构架&#xff0c;来简化参数量以提高运算速度的这项模块轻量化网络构架技术之外。另一项技术就是各家神经网络框架&#xff08;TensorFlow、Pytorc…...

[杂谈]-快速了解LoRaWAN网络以及工作原理

快速了解LoRaWAN网络以及工作原理 文章目录 快速了解LoRaWAN网络以及工作原理1、LoRaWAN网络元素1.1 终端设备&#xff08;End Devices&#xff09;1.2 网关&#xff08;Gateways&#xff09;1.3 网络服务器&#xff08;Net Server&#xff09;1.4 应用服务器&#xff08;Appli…...

MySQL--MySQL表的增删改查(基础)

排序&#xff1a;ORDER BY 语法&#xff1a; – ASC 为升序&#xff08;从小到大&#xff09; – DESC 为降序&#xff08;从大到小&#xff09; – 默认为 ASC SELECT … FROM table_name [WHERE …] ORDER BY column [ASC|DESC], […]; *** update...

Vue中启动提示polyfill缺少-webpack v5版本导致

安装 npm i node-polyfill-webpack-plugin 因为我们的项目使用webpack v5&#xff0c;其中polyfill Node核心模块被删除。所以&#xff0c;我们安装它是为了在项目中访问这些模块 vue.config.js文件 const { defineConfig } require("vue/cli-service"); const No…...

Hugging Face实战-系列教程3:AutoModelForSequenceClassification文本2分类

&#x1f6a9;&#x1f6a9;&#x1f6a9;Hugging Face 实战系列 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在notebook中进行 本篇文章配套的代码资源已经上传 下篇内容&#xff1a; Hugging Face实战-系列教程4&#xff1a;padding与attention_mask ​输出我…...

《TCP/IP网络编程》阅读笔记--Socket类型及协议设置

目录 1--协议的定义 2--Socket的创建 2-1--协议族&#xff08;Protocol Family&#xff09; 2-2--Socket类型&#xff08;Type&#xff09; 3--Linux下实现TCP Socket 3-1--服务器端 3-2--客户端 3-3--编译运行 4--Windows下实现 TCP Socket 4-1--TCP服务端 4-2--TC…...

GitHub使用教程

GitHub使用教程 视频教程一&#xff1a;Github 新手够用指南 | 全程演示&个人找项目技巧放送_哔哩哔哩_bilibili 笔记&#xff1a; README.md编写教程&#xff1a;Typora官方免费版与入门教程__阿伟_的博客-CSDN博客 找开源项目的一些途径 • https://github.com/trendin…...

sql server 分区表

分区表 分区表是在SQL Server 2005之后的版本引入的特性&#xff0c;这个特性允许把逻辑上的一个表在物理上分为很多部分。换句话说&#xff0c;分区表从物理上看是将一个大表分成几个小表&#xff0c;但是从逻辑上看&#xff0c;还是一个大表。 步骤 创建分表区的步骤分为…...

开源许可证概述:GNU, BSD, Apache, MPL, 和 MIT

前言 开源许可证是开源软件分发的基础。它们定义了使用者如何使用&#xff0c;修改&#xff0c;分发开源软件。在这篇文章中&#xff0c;我们将探讨五种常见的开源许可证&#xff1a;GNU通用公共许可证 (GNU GPL)&#xff0c;BSD许可证&#xff0c;Apache许可证&#xff0c;Mo…...

java中log使用总结

目录 一、概述1.1. 核心日志框架1.2 门面日志框架 二、最佳实践2.1 核心日志框架API包2.2 门面日志框架依赖2.3 集成使用2.3.1 集成jcl2.3.2 集成slf4j2.3.2.1 slf4j集成单一框架2.3.2.2 slf4j整合混合框架 三、总结3.1 所有相关包3.1.1 核心日志框架包3.1.2 门面日志框架3.1.3…...

【Java】传输层协议TCP

传输层协议TCP TCP报文格式首部长度保留位32位序列号和32位确认应答号标记ACKSYNFINRSTURGPSH 16位窗口大小16位校验和16位紧急指针选项 TCP特点可靠传输实现机制-确认应答超时重传连接管理机制三次握手四次挥手特殊情况 滑动窗口流量控制拥塞控制延迟应答捎带应答面向字节流粘…...

计算机网络基础知识(非常详细)

1. 网络模型 1.1 OSI 七层参考模型 七层模型&#xff0c;亦称 OSI&#xff08;Open System Interconnection&#xff09;参考模型&#xff0c;即开放式系统互联&#xff0c;是网络通信的标准模型。一般称为 OSI 参考模型或七层模型。 它是一个七层的、抽象的模型体&#xff…...

如何进行SEO优化数据分析?(掌握正确的数据分析方法,让您的网站更上一层楼!)

在互联网时代&#xff0c;SEO优化已经成为了每一个网站运营者必备的技能。而在SEO优化中&#xff0c;数据分析更是至关重要的一环。在本文中&#xff0c;我们将会详细介绍如何正确的进行SEO优化数据分析&#xff0c;让您的网站更上一层楼&#xff01; 数据分析的重要性 数据分…...

Golang不同平台编译的思考

GOOS和GOARCH $GOOS可选值如下&#xff1a; darwin dragonfly freebsd linux netbsd openbsd plan9 solaris windows $GOARCH可选值如下 386 amd64 arm 在编译的时候我们可以根据实际需要对这两个参数进行组合。更详细的说明可以进官网看看 ## http://golang.org/cmd/go http…...

SpringSecurity学习

1.认证 密码校验用户 密码加密存储 Configuration public class SecurityConfig extends WebSecurityConfigurerAdapter {Beanpublic PasswordEncoder passwordEncoder(){return new BCryptPasswordEncoder();}} 我们没有这个配置&#xff0c;默认明文存储, {id}password;实现…...

时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测

时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测 目录 时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 ICEEMDAN-iMPA-BiLSTM功率/风速预测 基于改进的自适应经验模态分解改进海洋捕食者算法双向长短期记忆…...

二叉树(上)

“路虽远&#xff0c;行则将至” ❤️主页&#xff1a;小赛毛 目录 1.树概念及结构 1.1树的概念 1.2 树的相关概念 1.3 树的表示&#xff08;树的存储&#xff09; 2.二叉树概念及结构 2.1概念 2.2现实中的二叉树 2.3 特殊的二叉树&#xff1a; 2.4 二叉树的性质 3.二叉树的顺…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台

🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

HashMap中的put方法执行流程(流程图)

1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中&#xff0c;其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下&#xff1a; 初始判断与哈希计算&#xff1a; 首先&#xff0c;putVal 方法会检查当前的 table&#xff08;也就…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...

协议转换利器,profinet转ethercat网关的两大派系,各有千秋

随着工业以太网的发展&#xff0c;其高效、便捷、协议开放、易于冗余等诸多优点&#xff0c;被越来越多的工业现场所采用。西门子SIMATIC S7-1200/1500系列PLC集成有Profinet接口&#xff0c;具有实时性、开放性&#xff0c;使用TCP/IP和IT标准&#xff0c;符合基于工业以太网的…...

Sklearn 机器学习 缺失值处理 获取填充失值的统计值

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...