Flink 侧输出流(SideOutput)
🌸在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。
🌸在处理不同的流中,除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。ProcessFunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。
当使用旁路输出时,首先需要定义一个OutputTag来标识一个旁路输出流
val OutPut=OutputTag[String]("side-output")
注意:OutputTag是如何根据旁路输出流包含的元素类型typed的
✨可以通过以下几种函数发射数据到旁路输出
ProcessFunction
CoProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
//将含有特殊字符串的流区分开,数据由两个定义好的工具类向Kafka灌入不同内容的数据,
//然后通过侧输出流(SideOutput)将不同的流进行分离,得到不同的输出import com.alibaba.fastjson.JSON
import com.tech.bean.Person_t
import com.tech.util.KafkaSourceUtil
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject sideOutputPerson_t {def main(args: Array[String]): Unit = {// UI地址访问:http://localhost:8081/#/job/runningval env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())val ksu = new KafkaSourceUtil("person_t", "test-consumer-group")val dstream = env.addSource(ksu.getSouceInfo())// 首先需要定义一个OutputTag来标识一个旁路输出流val outputTag = new OutputTag[String]("person_t_side-output")val mainDataStream = dstream.map(line => {JSON.parseObject(line, classOf[Person_t])})val sideOutput = mainDataStream.process(new ProcessFunction[Person_t, String] {override def processElement(value: Person_t,ctx: ProcessFunction[Person_t, String]#Context,out: Collector[String]): Unit = {if (!value.getName.contains("_side")) {out.collect(value.toString)} else {// 测输出流输出的部分ctx.output(outputTag, "sideOutput-> 带有_side标识的数据名称" + value.getName)}}})val sideOutputStream: DataStream[String] = sideOutput.getSideOutput(outputTag)// 测输出流处理sideOutputStream.print("测输出流")// 常规数据处理sideOutput.print("常规数据")env.execute("outSideput")}
}
相关文章:
Flink 侧输出流(SideOutput)
🌸在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。 🌸在处理不同的流中,除了 split 算子,可以将一条流分成多条流,这些流的数据类型也…...
C语言中关于#include的一些小知识
写代码的过程中,因为手误,重复包含了头文件 可以看到没有报错 如果是你自己编写的头文件,那么如果没加唯一包含标识的话,那么编译器会编译报错的。如果是系统自带的头文件,由于其每个头文件都加了特殊标识,…...
DSP芯片 机器码下载方法 【主要 “扯” 用Uniflash下载的方法】
还是以德州仪器的TMS320F28335芯片为 “栗子”,说说这事儿。 编制好的程序经过开发环境可以编译成扩展名为 .out 文件,这个文件就是DSP可以运行机器码,我们把这个文件下载到 DSP芯片中的程序区, 下载好了,这个芯片原…...
速盾网络:CDN用几天关了可以吗?安全吗?
在使用CDN(内容分发网络)时,有时候会考虑暂时关闭或暂停使用CDN服务的情况。但是,对于关闭CDN服务的时间长短以及安全性问题,很多人可能存在疑问。在本文中,我们将讨论CDN使用中关闭几天是否安全以及相关注…...
MR混合现实情景实训教学系统在高空作业课堂中的应用
高空作业是一项高风险的工作,对于从业者来说,不仅需要具备专业的技能,还需要有丰富的实践经验。然而,传统的课堂教学往往只能通过理论讲解和模拟训练来传授知识,无法提供真实的实践环境。而MR混合现实情景实训教学系统…...
Windows系统中定时执行python脚本
背景:本地Windows系统指定目录下会有文件的修改新增,这些变化的文件需要定时的被上传到git仓库中,这样不需要每次变更手动上传了。 首先编写一个检测文件夹下文件变化并且上传git仓库的python脚本(确保你已经在E:\edc_workspace\data_edc_et…...
HashMap 源码学习-jdk1.8
1、一些常量的定义 这里针对MIN_TREEIFY_CAPACITY 这个值进行解释一下。 java8里面,HashMap 的数据结构是数组 (链表或者红黑树),每个数组节点下可能会存在链表和红黑树之间的转换,当同一个索引下面的节点超过8个时…...
WebStorm 2023:让您更接近理想的开发环境 mac/win版
JetBrains WebStorm 2023激活版下载是一款强大而智能的Web开发工具,专为提高开发人员的生产力而设计。这款编辑器提供了许多先进的代码编辑功能,以及一系列实用的工具和插件,可帮助您更快地编写、调试和测试代码。 WebStorm 2023软件获取 We…...
java面试题:数字与字母的映射表
前言 好记性不如烂笔头。 问题: 现在有一个数字与字母的映射表,且有以下规则: 映射表: 数字 字母 3 A 7 B 9 C 规则: 1.碰到当前数字时,使用字母替换,例如,3-> A 2.碰到当前数…...
Jmeter教程-JMeter 环境安装及配置
Jmeter教程 JMeter 环境安装及配置 在使用 JMeter 之前,需要配置相应的环境,包括安装 JDK 和获取 JMeter ZIP 包。 安装JDK 1.JDK下载 示例环境为Windows11环境,读者应根据实际环境下载JDK的安装包。 JDK下载地址: Java21 下载 …...
十大基础排序算法
排序算法分类 排序:将一组对象按照某种逻辑顺序重新排列的过程。 按照待排序数据的规模分为: 内部排序:数据量不大,全部存在内存中;外部排序:数据量很大,无法一次性全部存在内存中,…...
IP协议及相关技术协议
一、IP基本认识 1. IP的作用 IP在TCP/IP模型中处于网络层,网络层的主要作用是实现主机与主机之间的通信,而IP的作用是在复杂的网络环境中将数据包发送给最终目的主机。 2. IP与MAC的关系 简单而言,MAC的作用是实现“直连”的两个设备之通信…...
小红书x-s算法及补环境 单旋转验证码
前言 大家好呀!新的一年,先祝大家新年快乐咯.祝大家逆向,风控都一把过咯. 新年第一篇文章,后续会持续更新哦! 春晚见证了中国经济的新风口,今年春晚互联网企业赞助商就两家,小红书和京东.小红书类似国外的ins,有预感未来小红书会大火,所以写了这篇文章,有需要的加我,联系方式…...
代码检测规范和git提交规范
摘要:之前开发的项目,代码检测和提交规范都是已经配置好的,最近自己新建的项目就记录下相关配置过程。 1. ESlint配置 2013年6月创建开源项目,提供一个插件化的JavaScript代码检测工具,创建项目是生成的eslintrc.js文…...
Elasticsearch:什么是搜索引擎?
搜索引擎定义 搜索引擎是一种软件程序或系统,旨在帮助用户查找存储在互联网或特定数据库中的信息。 搜索引擎的工作原理是对各种来源的内容进行索引和编目,然后根据用户的搜索查询向用户提供相关结果列表。 搜索引擎对于希望快速有效地查找特定信息的用…...
人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora
近30年,人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora 深蓝: 1997年,深蓝击败卡斯帕罗夫的比赛是通过一系列复杂的算法和策略实现的。深蓝的开发团队使用了一种名为“暴力搜索”的技术&…...
WordPres Bricks Builder 前台RCE漏洞复现(CVE-2024-25600)
0x01 产品简介 Bricks Builder是一款用于WordPress的开发主题,提供直观的拖放界面,用于设计和构建WordPress网站。它使用户能够轻松创建自定义的网页布局和设计,无需编写或了解复杂的代码。Bricks Builder具有用户友好的界面和强大的功能,使用户可以通过简单的拖放操作添加…...
代码随想录算法训练营总结 | 慢慢总结,想起啥就先写上
二叉树总结 二叉树的结构 stauct TreeNode {int val;TreeNode* left;TreeNode* right; }二叉树的递归函数分析 二叉树的递归函数当做只有一个根节点,一个左子树,一个右节点的数去看,这看着是个废话, 其实很重要 回溯…...
基于开源模型对文本和音频进行情感分析
应用场景 从商品详情页爬取商品评论,对其做舆情分析;电话客服,对音频进行分析,做舆情分析; 通过开发相应的服务接口,进一步工程化; 模型选用 文本,选用了通义实验室fine-tune的st…...
SQL中为什么不要使用1=1
最近看几个老项目的SQL条件中使用了11,想想自己也曾经这样写过,略有感触,特别拿出来说道说道。 编写SQL语句就像炒菜,每一种调料的使用都可能会影响菜品的最终味道,每一个SQL条件的加入也可能会影响查询的执行效率。那…...
UE5 学习系列(二)用户操作界面及介绍
这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...
Java求职者面试指南:计算机基础与源码原理深度解析
Java求职者面试指南:计算机基础与源码原理深度解析 第一轮提问:基础概念问题 1. 请解释什么是进程和线程的区别? 面试官:进程是程序的一次执行过程,是系统进行资源分配和调度的基本单位;而线程是进程中的…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...
