flink学习之广播流与合流操作demo
广播流是什么?
将一条数据广播到所有的节点。使用 dataStream.broadCast()
广播流使用场景?
一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复
vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。
广播流怎么用?
一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流
不多说直接上demo,开箱即用
package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Pattern> patternDataStream = env.addSource(new ChangeSource());DataStream<User> userDataStream = env.addSource(new CustomerSource());userDataStream.print("user");patternDataStream.print("pattern");//test1 直接合流 不广播。只会在一个节点更新。 用于特殊需求?
// userDataStream
// .keyBy(user -> user.userId)
// .connect(patternDataStream)
// .process(new CustomerSimpleProcess())
// .print();//test2// 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
// userDataStream
// .keyBy(user -> user.userId)
// .connect(patternDataStream.broadcast())
// .process(new CustomerSimpleProcess())
// .print();//test3MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStream<Pattern> broadcast = patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user -> user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction<Integer, User, Pattern, String> {@Overridepublic void processElement(User user, KeyedBroadcastProcessFunction<Integer, User, Pattern, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {Integer userVip = user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!=null){Integer patternVip = pattern.vip;String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>= patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}else {System.out.println("pattern is null ");}}@Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<Integer,User, Pattern, String>.Context context, Collector<String> collector) throws Exception {BroadcastState<Void, Pattern> bcState = context.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunction<User, Pattern, String> {ValueState<Integer> vip; //这个是保留主流的state的。 不是保留广播流的stateHashMap<String,Integer> vipMap;@Overridepublic void open(Configuration parameters) throws Exception {vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));vipMap=new HashMap<String,Integer>();super.open(parameters);}@Overridepublic void processElement1(User user, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {Integer userVip = user.getVip();Integer patternVip = vipMap.getOrDefault("vip", 0);String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>=patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}@Overridepublic void processElement2(Pattern pattern, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {vipMap.put("vip",pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId = userId;this.vip = vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Action{" +"userId=" + userId +", vip='" + vip + '\'' +'}';}}// 定义行为模式POJO类,包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Pattern{" +"vip='" + vip + '\'' +'}';}}private static class CustomerSource implements SourceFunction<User> {boolean run = true;@Overridepublic void run(SourceContext<User> sourceContext) throws Exception {while (true) {Integer userId = new Random().nextInt(1000);Integer vip = new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}@Overridepublic void cancel() {run = false;}}private static class ChangeSource implements SourceFunction<Pattern> {boolean run = true;@Overridepublic void run(SourceContext<Pattern> sourceContext) throws Exception {int i = 1;while (true) {sourceContext.collect(new Pattern(i++));Thread.sleep(5000);}}@Overridepublic void cancel() {run = false;}}}
demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。
test1 不广播

注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2
但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1
test2 map保存变化数据

test3通过描述器获取数据
和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。
相关文章:
flink学习之广播流与合流操作demo
广播流是什么? 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景? 一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3…...
PPT架构师架构技能图
PPT架构师架构技能图 目录概述需求: 设计思路实现思路分析1.软素质2.核心输出(office输出) 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,ma…...
STM32微控制器的低功耗模式
STM32微控制器的低功耗模式(Low-power modes):Sleep mode、Stop mode 和 Standby mode。 1.1 Sleep Mode(睡眠模式): 把STM32微控制器当作一位劳累的工人,他在工作过程中需要短暂的休息。在Sleep模式下,微控制器会关闭一部分电路,减小功耗,但仍然保持对中央处理单…...
tensorflow QAT
tensorflow qat https://www.wpgdadatong.com/tw/blog/detail/70672 在边缘运算的重点技术之中,除了简化复杂的模块构架,来简化参数量以提高运算速度的这项模块轻量化网络构架技术之外。另一项技术就是各家神经网络框架(TensorFlow、Pytorc…...
[杂谈]-快速了解LoRaWAN网络以及工作原理
快速了解LoRaWAN网络以及工作原理 文章目录 快速了解LoRaWAN网络以及工作原理1、LoRaWAN网络元素1.1 终端设备(End Devices)1.2 网关(Gateways)1.3 网络服务器(Net Server)1.4 应用服务器(Appli…...
MySQL--MySQL表的增删改查(基础)
排序:ORDER BY 语法: – ASC 为升序(从小到大) – DESC 为降序(从大到小) – 默认为 ASC SELECT … FROM table_name [WHERE …] ORDER BY column [ASC|DESC], […]; *** update...
Vue中启动提示polyfill缺少-webpack v5版本导致
安装 npm i node-polyfill-webpack-plugin 因为我们的项目使用webpack v5,其中polyfill Node核心模块被删除。所以,我们安装它是为了在项目中访问这些模块 vue.config.js文件 const { defineConfig } require("vue/cli-service"); const No…...
Hugging Face实战-系列教程3:AutoModelForSequenceClassification文本2分类
🚩🚩🚩Hugging Face 实战系列 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在notebook中进行 本篇文章配套的代码资源已经上传 下篇内容: Hugging Face实战-系列教程4:padding与attention_mask 输出我…...
《TCP/IP网络编程》阅读笔记--Socket类型及协议设置
目录 1--协议的定义 2--Socket的创建 2-1--协议族(Protocol Family) 2-2--Socket类型(Type) 3--Linux下实现TCP Socket 3-1--服务器端 3-2--客户端 3-3--编译运行 4--Windows下实现 TCP Socket 4-1--TCP服务端 4-2--TC…...
GitHub使用教程
GitHub使用教程 视频教程一:Github 新手够用指南 | 全程演示&个人找项目技巧放送_哔哩哔哩_bilibili 笔记: README.md编写教程:Typora官方免费版与入门教程__阿伟_的博客-CSDN博客 找开源项目的一些途径 • https://github.com/trendin…...
sql server 分区表
分区表 分区表是在SQL Server 2005之后的版本引入的特性,这个特性允许把逻辑上的一个表在物理上分为很多部分。换句话说,分区表从物理上看是将一个大表分成几个小表,但是从逻辑上看,还是一个大表。 步骤 创建分表区的步骤分为…...
开源许可证概述:GNU, BSD, Apache, MPL, 和 MIT
前言 开源许可证是开源软件分发的基础。它们定义了使用者如何使用,修改,分发开源软件。在这篇文章中,我们将探讨五种常见的开源许可证:GNU通用公共许可证 (GNU GPL),BSD许可证,Apache许可证,Mo…...
java中log使用总结
目录 一、概述1.1. 核心日志框架1.2 门面日志框架 二、最佳实践2.1 核心日志框架API包2.2 门面日志框架依赖2.3 集成使用2.3.1 集成jcl2.3.2 集成slf4j2.3.2.1 slf4j集成单一框架2.3.2.2 slf4j整合混合框架 三、总结3.1 所有相关包3.1.1 核心日志框架包3.1.2 门面日志框架3.1.3…...
【Java】传输层协议TCP
传输层协议TCP TCP报文格式首部长度保留位32位序列号和32位确认应答号标记ACKSYNFINRSTURGPSH 16位窗口大小16位校验和16位紧急指针选项 TCP特点可靠传输实现机制-确认应答超时重传连接管理机制三次握手四次挥手特殊情况 滑动窗口流量控制拥塞控制延迟应答捎带应答面向字节流粘…...
计算机网络基础知识(非常详细)
1. 网络模型 1.1 OSI 七层参考模型 七层模型,亦称 OSI(Open System Interconnection)参考模型,即开放式系统互联,是网络通信的标准模型。一般称为 OSI 参考模型或七层模型。 它是一个七层的、抽象的模型体ÿ…...
如何进行SEO优化数据分析?(掌握正确的数据分析方法,让您的网站更上一层楼!)
在互联网时代,SEO优化已经成为了每一个网站运营者必备的技能。而在SEO优化中,数据分析更是至关重要的一环。在本文中,我们将会详细介绍如何正确的进行SEO优化数据分析,让您的网站更上一层楼! 数据分析的重要性 数据分…...
Golang不同平台编译的思考
GOOS和GOARCH $GOOS可选值如下: darwin dragonfly freebsd linux netbsd openbsd plan9 solaris windows $GOARCH可选值如下 386 amd64 arm 在编译的时候我们可以根据实际需要对这两个参数进行组合。更详细的说明可以进官网看看 ## http://golang.org/cmd/go http…...
SpringSecurity学习
1.认证 密码校验用户 密码加密存储 Configuration public class SecurityConfig extends WebSecurityConfigurerAdapter {Beanpublic PasswordEncoder passwordEncoder(){return new BCryptPasswordEncoder();}} 我们没有这个配置,默认明文存储, {id}password;实现…...
时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测
时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测 目录 时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 ICEEMDAN-iMPA-BiLSTM功率/风速预测 基于改进的自适应经验模态分解改进海洋捕食者算法双向长短期记忆…...
二叉树(上)
“路虽远,行则将至” ❤️主页:小赛毛 目录 1.树概念及结构 1.1树的概念 1.2 树的相关概念 1.3 树的表示(树的存储) 2.二叉树概念及结构 2.1概念 2.2现实中的二叉树 2.3 特殊的二叉树: 2.4 二叉树的性质 3.二叉树的顺…...
独立开发者如何利用 Taotoken 按需调用不同模型优化个人项目
独立开发者如何利用 Taotoken 按需调用不同模型优化个人项目 对于独立开发者或自由职业者而言,技术项目的成本控制和灵活性至关重要。在预算有限且需求多变的日常开发中,大模型 API 的调用费用常常是一笔不小的开销,而不同任务对模型能力的需…...
计算机视觉怎么选:2026年技术选型生存指南——在学术界与工业界的撕裂地带,找到你的生态位
一、开篇:一个被低估的结构性事实 如果你站在2026年的时间节点上问"计算机视觉怎么选",你真正在问的是:在一场每年膨胀近200亿美元、但人才供给严重错配的技术革命中,我应该把有限的时间押注在哪里? 这不是…...
基于AI与自由标签的智能错题管理系统设计与实践
1. 项目概述:一个为备考者量身定制的智能错题管家 如果你正在准备GRE、雅思、考研,或者任何需要大量刷题、反复总结的考试,那你一定对“错题本”这个概念不陌生。从学生时代起,老师就告诉我们整理错题的重要性,但真正…...
ROS2 Foxy下EAI_X3激光雷达驱动避坑全记录:从串口映射到gmapping建图乱飞
ROS2 Foxy下EAI_X3激光雷达驱动避坑全记录:从串口映射到gmapping建图乱飞 当你在ROS2 Foxy环境中部署EAI_X3或YDLIDAR激光雷达时,可能会遇到各种令人头疼的问题。本文将以实战经验为基础,深入分析从驱动编译到gmapping建图过程中常见的"…...
基于NapCat的QQ机器人框架openclaw-NapCatQQ部署与开发指南
1. 项目概述:一个为QQ协议打造的现代化机器人框架最近在折腾机器人项目,发现一个挺有意思的开源项目叫openclaw-NapCatQQ。乍一看这个名字,可能有点摸不着头脑,但如果你对QQ机器人生态有所了解,就会知道这背后代表着一…...
专业级B站视频下载工具:BBDown 5大核心优势深度解析
专业级B站视频下载工具:BBDown 5大核心优势深度解析 【免费下载链接】BBDown Bilibili Downloader. 一个命令行式哔哩哔哩下载器. 项目地址: https://gitcode.com/gh_mirrors/bb/BBDown BBDown是一款开源命令行式Bilibili视频下载器,专为技术爱好…...
从VGG到MobileNet:深度可分离卷积如何让你的模型在手机上‘飞’起来?参数对比与实战调优指南
从VGG到MobileNet:深度可分离卷积如何让你的模型在手机上‘飞’起来?参数对比与实战调优指南 当你在服务器上训练了一个表现优异的VGG模型,准备将其部署到移动设备时,突然发现这个"庞然大物"根本无法流畅运行——这就是…...
overlay-web:现代化Web覆盖层状态管理与交互解决方案
1. 项目概述:一个为开发者打造的现代化Web覆盖层工具最近在折腾一个前端项目,需要实现一个全局的、可高度定制的通知或模态框系统,找了一圈现有的UI库,要么太重,要么定制性不够灵活。直到我发现了DevelopedByDev/overl…...
智能前端IDCB-24A:工业智能管控核心终端
在工业自动化与智能化升级的浪潮中,智能前端作为设备管控、数据传输的关键载体,直接决定了工业系统的稳定性与智能化水平。IDCB-24A智能前端凭借集成化设计、高精度管控、灵活适配等核心优势,成为工业场景中不可或缺的智能终端,广…...
Arm Neoverse CMN S3(AE)错误处理架构与寄存器解析
1. Arm Neoverse CMN S3(AE)错误处理架构概述在现代多核SoC设计中,错误处理机制是确保系统可靠性的基石。Arm Neoverse CMN S3(AE)作为新一代互连架构,其错误处理子系统通过硬件级寄存器实现了从错误检测到恢复的全流程管理。这套机制的核心价值在于&…...
