当前位置: 首页 > news >正文

flink学习之广播流与合流操作demo

广播流是什么?

将一条数据广播到所有的节点。使用 dataStream.broadCast()

广播流使用场景?

一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复

vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。

广播流怎么用?

一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流

不多说直接上demo,开箱即用

package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Pattern> patternDataStream = env.addSource(new ChangeSource());DataStream<User> userDataStream = env.addSource(new CustomerSource());userDataStream.print("user");patternDataStream.print("pattern");//test1  直接合流 不广播。只会在一个节点更新。 用于特殊需求?
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream)
//                .process(new CustomerSimpleProcess())
//                .print();//test2// 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream.broadcast())
//                .process(new CustomerSimpleProcess())
//                        .print();//test3MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStream<Pattern> broadcast = patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user -> user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction<Integer, User, Pattern, String> {@Overridepublic void processElement(User user, KeyedBroadcastProcessFunction<Integer, User, Pattern, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {Integer userVip = user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!=null){Integer patternVip = pattern.vip;String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>= patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}else {System.out.println("pattern is null ");}}@Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<Integer,User, Pattern, String>.Context context, Collector<String> collector) throws Exception {BroadcastState<Void, Pattern> bcState = context.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunction<User, Pattern, String> {ValueState<Integer> vip; //这个是保留主流的state的。 不是保留广播流的stateHashMap<String,Integer> vipMap;@Overridepublic void open(Configuration parameters) throws Exception {vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));vipMap=new HashMap<String,Integer>();super.open(parameters);}@Overridepublic void processElement1(User user, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {Integer userVip = user.getVip();Integer patternVip = vipMap.getOrDefault("vip", 0);String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>=patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}@Overridepublic void processElement2(Pattern pattern, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {vipMap.put("vip",pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId = userId;this.vip = vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Action{" +"userId=" + userId +", vip='" + vip + '\'' +'}';}}// 定义行为模式POJO类,包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Pattern{" +"vip='" + vip + '\'' +'}';}}private static class CustomerSource implements SourceFunction<User> {boolean run = true;@Overridepublic void run(SourceContext<User> sourceContext) throws Exception {while (true) {Integer userId = new Random().nextInt(1000);Integer vip = new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}@Overridepublic void cancel() {run = false;}}private static class ChangeSource implements SourceFunction<Pattern> {boolean run = true;@Overridepublic void run(SourceContext<Pattern> sourceContext) throws Exception {int i = 1;while (true) {sourceContext.collect(new Pattern(i++));Thread.sleep(5000);}}@Overridepublic void cancel() {run = false;}}}

demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。

test1 不广播

注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2

但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1

test2 map保存变化数据

test3通过描述器获取数据

和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。

相关文章:

flink学习之广播流与合流操作demo

广播流是什么&#xff1f; 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景&#xff1f; 一般用于动态加载配置项。比如lol&#xff0c;每天不断有人再投诉举报&#xff0c;客服根本忙不过来&#xff0c;腾讯内部做了一个判断&#xff0c;只有vip3…...

PPT架构师架构技能图

PPT架构师架构技能图 目录概述需求&#xff1a; 设计思路实现思路分析1.软素质2.核心输出&#xff08;office输出&#xff09; 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,ma…...

STM32微控制器的低功耗模式

STM32微控制器的低功耗模式(Low-power modes):Sleep mode、Stop mode 和 Standby mode。 1.1 Sleep Mode(睡眠模式): 把STM32微控制器当作一位劳累的工人,他在工作过程中需要短暂的休息。在Sleep模式下,微控制器会关闭一部分电路,减小功耗,但仍然保持对中央处理单…...

tensorflow QAT

tensorflow qat https://www.wpgdadatong.com/tw/blog/detail/70672 在边缘运算的重点技术之中&#xff0c;除了简化复杂的模块构架&#xff0c;来简化参数量以提高运算速度的这项模块轻量化网络构架技术之外。另一项技术就是各家神经网络框架&#xff08;TensorFlow、Pytorc…...

[杂谈]-快速了解LoRaWAN网络以及工作原理

快速了解LoRaWAN网络以及工作原理 文章目录 快速了解LoRaWAN网络以及工作原理1、LoRaWAN网络元素1.1 终端设备&#xff08;End Devices&#xff09;1.2 网关&#xff08;Gateways&#xff09;1.3 网络服务器&#xff08;Net Server&#xff09;1.4 应用服务器&#xff08;Appli…...

MySQL--MySQL表的增删改查(基础)

排序&#xff1a;ORDER BY 语法&#xff1a; – ASC 为升序&#xff08;从小到大&#xff09; – DESC 为降序&#xff08;从大到小&#xff09; – 默认为 ASC SELECT … FROM table_name [WHERE …] ORDER BY column [ASC|DESC], […]; *** update...

Vue中启动提示polyfill缺少-webpack v5版本导致

安装 npm i node-polyfill-webpack-plugin 因为我们的项目使用webpack v5&#xff0c;其中polyfill Node核心模块被删除。所以&#xff0c;我们安装它是为了在项目中访问这些模块 vue.config.js文件 const { defineConfig } require("vue/cli-service"); const No…...

Hugging Face实战-系列教程3:AutoModelForSequenceClassification文本2分类

&#x1f6a9;&#x1f6a9;&#x1f6a9;Hugging Face 实战系列 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在notebook中进行 本篇文章配套的代码资源已经上传 下篇内容&#xff1a; Hugging Face实战-系列教程4&#xff1a;padding与attention_mask ​输出我…...

《TCP/IP网络编程》阅读笔记--Socket类型及协议设置

目录 1--协议的定义 2--Socket的创建 2-1--协议族&#xff08;Protocol Family&#xff09; 2-2--Socket类型&#xff08;Type&#xff09; 3--Linux下实现TCP Socket 3-1--服务器端 3-2--客户端 3-3--编译运行 4--Windows下实现 TCP Socket 4-1--TCP服务端 4-2--TC…...

GitHub使用教程

GitHub使用教程 视频教程一&#xff1a;Github 新手够用指南 | 全程演示&个人找项目技巧放送_哔哩哔哩_bilibili 笔记&#xff1a; README.md编写教程&#xff1a;Typora官方免费版与入门教程__阿伟_的博客-CSDN博客 找开源项目的一些途径 • https://github.com/trendin…...

sql server 分区表

分区表 分区表是在SQL Server 2005之后的版本引入的特性&#xff0c;这个特性允许把逻辑上的一个表在物理上分为很多部分。换句话说&#xff0c;分区表从物理上看是将一个大表分成几个小表&#xff0c;但是从逻辑上看&#xff0c;还是一个大表。 步骤 创建分表区的步骤分为…...

开源许可证概述:GNU, BSD, Apache, MPL, 和 MIT

前言 开源许可证是开源软件分发的基础。它们定义了使用者如何使用&#xff0c;修改&#xff0c;分发开源软件。在这篇文章中&#xff0c;我们将探讨五种常见的开源许可证&#xff1a;GNU通用公共许可证 (GNU GPL)&#xff0c;BSD许可证&#xff0c;Apache许可证&#xff0c;Mo…...

java中log使用总结

目录 一、概述1.1. 核心日志框架1.2 门面日志框架 二、最佳实践2.1 核心日志框架API包2.2 门面日志框架依赖2.3 集成使用2.3.1 集成jcl2.3.2 集成slf4j2.3.2.1 slf4j集成单一框架2.3.2.2 slf4j整合混合框架 三、总结3.1 所有相关包3.1.1 核心日志框架包3.1.2 门面日志框架3.1.3…...

【Java】传输层协议TCP

传输层协议TCP TCP报文格式首部长度保留位32位序列号和32位确认应答号标记ACKSYNFINRSTURGPSH 16位窗口大小16位校验和16位紧急指针选项 TCP特点可靠传输实现机制-确认应答超时重传连接管理机制三次握手四次挥手特殊情况 滑动窗口流量控制拥塞控制延迟应答捎带应答面向字节流粘…...

计算机网络基础知识(非常详细)

1. 网络模型 1.1 OSI 七层参考模型 七层模型&#xff0c;亦称 OSI&#xff08;Open System Interconnection&#xff09;参考模型&#xff0c;即开放式系统互联&#xff0c;是网络通信的标准模型。一般称为 OSI 参考模型或七层模型。 它是一个七层的、抽象的模型体&#xff…...

如何进行SEO优化数据分析?(掌握正确的数据分析方法,让您的网站更上一层楼!)

在互联网时代&#xff0c;SEO优化已经成为了每一个网站运营者必备的技能。而在SEO优化中&#xff0c;数据分析更是至关重要的一环。在本文中&#xff0c;我们将会详细介绍如何正确的进行SEO优化数据分析&#xff0c;让您的网站更上一层楼&#xff01; 数据分析的重要性 数据分…...

Golang不同平台编译的思考

GOOS和GOARCH $GOOS可选值如下&#xff1a; darwin dragonfly freebsd linux netbsd openbsd plan9 solaris windows $GOARCH可选值如下 386 amd64 arm 在编译的时候我们可以根据实际需要对这两个参数进行组合。更详细的说明可以进官网看看 ## http://golang.org/cmd/go http…...

SpringSecurity学习

1.认证 密码校验用户 密码加密存储 Configuration public class SecurityConfig extends WebSecurityConfigurerAdapter {Beanpublic PasswordEncoder passwordEncoder(){return new BCryptPasswordEncoder();}} 我们没有这个配置&#xff0c;默认明文存储, {id}password;实现…...

时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测

时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测 目录 时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 ICEEMDAN-iMPA-BiLSTM功率/风速预测 基于改进的自适应经验模态分解改进海洋捕食者算法双向长短期记忆…...

二叉树(上)

“路虽远&#xff0c;行则将至” ❤️主页&#xff1a;小赛毛 目录 1.树概念及结构 1.1树的概念 1.2 树的相关概念 1.3 树的表示&#xff08;树的存储&#xff09; 2.二叉树概念及结构 2.1概念 2.2现实中的二叉树 2.3 特殊的二叉树&#xff1a; 2.4 二叉树的性质 3.二叉树的顺…...

Excel怎么批量生成文件夹

Excel怎么批量生成文件夹的链接: https://jingyan.baidu.com/article/ea24bc398d9dcb9b63b3312f.html...

c++ 学习之 静态成员变量和静态成员函数

文章目录 前言正文静态成员变量初始化操作如何理解共享一份数据访问权限 静态成员函数访问方式静态成员函数只能访问静态成员变量访问权限 前言 静态成员分为 1&#xff09;静态成员变量 所有对象共享一份数据在编译阶段分配空间类内声明&#xff0c;类外初始化 2&#xff09…...

C程序需要按下回车键才能读取字符

当编写涉及从终端输入字符的C程序时&#xff0c;有时会遇到需要按下回车键才能读取字符的问题。这是因为默认情况下&#xff0c;终端通常处于行缓冲模式&#xff0c;需要等待用户按下回车键才会将输入的字符发送给正在运行的程序。这可能会导致一些不便&#xff0c;尤其是当程序…...

x86体系结构(WinDbg学习笔记)

寄存器 eaxAccumulator累加器ebxBase register基寄存器ecxCounter register计数器寄存器edxData register - can be used for I/O port access and arithmetic functions数据寄存器-可用于I/O端口访问和算术函数esiSource index register源索引寄存器ediDestination index reg…...

Hadoop的第二个核心组件:MapReduce框架第四节

Hadoop的第二个核心组件&#xff1a;MapReduce框架 十、MapReduce的特殊应用场景1、使用MapReduce进行join操作2、使用MapReduce的计数器3、MapReduce做数据清洗 十一、MapReduce的工作流程&#xff1a;详细的工作流程第一步&#xff1a;提交MR作业资源第二步&#xff1a;运行M…...

算法通关村第十九关——最少硬币数

LeetCode322.给你一个整数数组 coins,表示不同面额的硬币&#xff0c;以及一个整数 amount&#xff0c;表示总金额。计算并返回可以凑成总金额所需的最少的硬币个数。如果没有任何一种硬币组合能组成总金额&#xff0c;返回-1。你可以认为每种硬币的数量是无限的。 示例1&…...

Linux ifconfig只显示 lo 网卡,没有ens网卡解决方案

项目场景&#xff1a; 虚拟机中linux无网络问题 问题描述 之前在调试linux的时候&#xff0c;由于一些不太清楚的误操作&#xff0c;导致ubuntu linux出现无网络问题&#xff0c;现象如下 ifconfig 只显示了 lo 网卡 lo 网卡&#xff1a;它是本地环回接口。 这意味着您的虚…...

Java复习-26-枚举

枚举&#xff08;替换多例设计&#xff09; 目的&#xff08;使用场景&#xff09; 不用也没啥 定义一个描述性别的类&#xff0c;那么该对象只有两个:男、 女。或者描述颜色基色的类&#xff0c;可以使用: 红色、绿色、蓝色。 功能 用于定义有限个数对象的一种结构&#x…...

NLP(六十八)使用Optimum进行模型量化

本文将会介绍如何使用HuggingFace的Optimum&#xff0c;来对微调后的BERT模型进行量化&#xff08;Quantization&#xff09;。   在文章NLP&#xff08;六十七&#xff09;BERT模型训练后动态量化&#xff08;PTDQ&#xff09;中&#xff0c;我们使用PyTorch自带的PTDQ&…...

Tomcat多实例和负载均衡动静分离

目录 一、Tomcat多实例部署 二、负载均衡动静分离 2.1.动静分离 2.11 nginx负载均衡 192.168.30.203 2.22 Tomcat服务器&#xff1a;192.168.30.200&#xff1a;80 2.23 Tomcat服务器&#xff1a;192.168.30.100&#xff1a;80 2.24 配置nginx 192.168.30.203静态页面 2…...

网站运营专员月薪多少/比较靠谱的推广平台

由于芯片产能过剩、芯片行业进入下行阶段&#xff0c;业界忽然发现成熟工艺产能再度得到重视&#xff0c;而中国则有望在成熟工艺产能方面居于全球第一&#xff0c;而且低成本和芯片堆叠技术有助于增强中国成熟工艺产能的竞争力。据统计数据显示&#xff0c;自2019年以来全球规…...

做场景秀的网站/网络营销八大目标是什么

题目 给你一个整数 n &#xff0c;对于 0 < i < n 中的每个 i &#xff0c;计算其二进制表示中 1 的个数 &#xff0c;返回一个长度为 n 1 的数组 ans 作为答案。 示例 输入&#xff1a;n 2 输出&#xff1a;[0,1,1] 解释&#xff1a; 0 --> 0 1 --> 1 2 --&g…...

可以直接进入网站的正能量网站/企业营销策划合同

2019独角兽企业重金招聘Python工程师标准>>> 两种方式判断访问终端是否是微信浏览器 JS判断 function is_weixin() { var ua window.navigator.userAgent.toLowerCase(); if (ua.match(/MicroMessenger/i) micromessenger) { $("#rs").text("微信…...

wordpress插件安装本地安装教程/企业营销推广方案

引言&#xff1a;简单的表单验证注册案例&#xff0c;其中包括了表单校验&#xff0c;使用layui的ui组件&#xff0c;以及采用ajax跟后台进行数据交换并提交表单。 代码实现&#xff1a; <% page language"java" contentType"text/html; charsetutf-8"…...

web网站开发方法/网站建设及网络推广

简介项目使用MSSql作为数据库&#xff0c;但是因为SQL服务器贵那么一点&#xff0c;并发连接差那么一点&#xff0c;要把数据迁移到MySQL&#xff0c;顺带迁移过程以及问题。环境 Visual Studio 2013 MySQL 5.7 Entity Framework 6.1.3正文迁移过程1. 安装MySQL&#xff0c;顺带…...

网站建设外包还是自己做/域名注册

大数据快速发展&#xff0c;对数据实时性要求不断提高。Flink 作为集流式批量于一体的开源大数据计算引擎&#xff0c;凭着在实时 ETL、实时报表、监控预警、在线系统等方面优势备受企业青睐。阿里、腾讯、字节等较早就已经进行了布局&#xff0c;尤其是字节将它作为流处理唯一…...