flink处理函数--副输出功能
背景
在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出
副输出
本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:
package wikiedits.processfunc.job;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;public class SideOutPutJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> readings = see.addSource(new SensorSource());SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());// 打印附输出monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();// 打印主输出monitoredReadings.print();see.execute();}
}package wikiedits.processfunc.process;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if (value.temperature < 32.0) {ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);}out.collect(value);}}
package wikiedits.processfunc.source;/** Copyright 2015 Fabian Hueske / Vasia Kalavri** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;import java.util.Calendar;
import java.util.Random;/*** Flink SourceFunction to generate SensorReadings with random temperature values.** Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.** Note: This is a simple data-generating source function that does not checkpoint its state.* In case of a failure, the source does not replay any data.*/
public class SensorSource extends RichParallelSourceFunction<SensorReading> {// flag indicating whether source is still runningprivate boolean running = true;/** run() continuously emits SensorReadings by emitting them through the SourceContext. */@Overridepublic void run(SourceContext<SensorReading> srcCtx) throws Exception {// initialize random number generatorRandom rand = new Random();// look up index of this parallel taskint taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();// initialize sensor ids and temperaturesString[] sensorIds = new String[10];double[] curFTemp = new double[10];for (int i = 0; i < 10; i++) {sensorIds[i] = "sensor_" + (taskIdx * 10 + i);curFTemp[i] = 65 + (rand.nextGaussian() * 20);}while (running) {// get current timelong curTime = Calendar.getInstance().getTimeInMillis();// emit SensorReadingsfor (int i = 0; i < 10; i++) {// update current temperaturecurFTemp[i] += rand.nextGaussian() * 0.5;// emit readingsrcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));}// wait for 100 msThread.sleep(3000);}}/** Cancels this SourceFunction. */@Overridepublic void cancel() {this.running = false;}
}
程序运行结果:

相关文章:
flink处理函数--副输出功能
背景 在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出 副输出 本文还是基于streaming-with-flink这本…...
Java数据结构————队列
一 、队列 在Java中,Queue是个接口,底层是通过链表实现的。 只允许在一端进行插入数据操作, 在另一端进行删除数据操作的特殊线性表, 队列具有先进先出FIFO(First In First Out) 。 入队列: 进行插入操作的一端称为…...
办公网络构建
办公网络项目背景 XX州市益智软件科技有限公司是XX市第九职业技术学校校办企业,依托学校人力技术、场地资源,面向市场独立经营、服务社会,主要从事网络设备销售、网络综合布线与网络管理。该公司现租用实训基地二层作为公司的办公经营场地…...
单层神经网络
神经网络 人工神经网络(Artificial Neural Network,ANN),简称神经网络(Neural Network,NN),是一种模仿生物神经网络的结构和功能的数学模型或计算模型。1943年,McCulloc…...
htb-cozyhosting
HTB-CozyHosting https://app.hackthebox.com/machines/CozyHosting ──(kwkl㉿kwkl)-[~] └─$ tail -l /etc/hosts …...
网络安全渗透测试工具之skipfish
网络安全渗透测试工具skipfish介绍 在数字化的时代,Web 应用程序安全成为了首要任务。想象一下,您是一位勇敢的安全冒险家,迎接着那些隐藏在 Web 应用程序中的未知风险。而在这个冒险之旅中,您需要一款强大的工具来帮助您发现漏洞,揭示弱点。而这个工具就是 Skipfish。 …...
【Rust】文件系统
目录 一、读取文件的字符串行 二、避免读取写入同一文件 三、使用内存映射随机访问文件 四、过去 24 小时内修改过的文件名 五、查找给定路径的循环 六、递归查找重名文件 七、使用给定断言递归查找所有文件 八、跳过隐藏文件遍历目录 九、在给定深度的目录࿰…...
mysql双主双从读写分离
架构图: 详细内容参考: 结果展示: 178.119.30.16(从)- master 178.119.30.17(从)- slave 由上述结果可以看出,产生了主备节点同时抢占VIP的问题(即脑裂问题)…...
postgresql-物化视图
postgresql-物化视图 物化视图创建物化视图刷新物化视图修改物化视图删除物化视图 物化视图 创建物化视图 postgresql使用create materialized view 语句创建视图 create materialized view if not exists name as query [with [NO] data];-- 创建一个包含员工统计信息的物化…...
多层神经网络和激活函数
多层神经网络的结构 多层神经网络就是由单层神经网络进行叠加之后得到的,所以就形成了层的概念,常见的多层神经网络有如下结构: 1)输入层(Input layer),众多神经元(Neuronÿ…...
Visual Studio Code键盘快捷键大全
Visual Studio Code键盘快捷键大全 前言导航快捷键编辑快捷键多光标快捷键终端快捷键调试快捷键文件管理快捷键Git快捷键代码格式化快捷键代码折叠快捷键工作区快捷键Markdown快捷键Zen模式快捷键窗口管理快捷键重构快捷键IntelliSense快捷键测试快捷键扩展快捷键 前言 欢迎来…...
新手学习笔记-----⽂件操作
目录 1. 为什么使⽤⽂件? 2. 什么是⽂件? 2.1 程序⽂件 2.2 数据⽂件 2.3 ⽂件名 3. ⼆进制⽂件和⽂本⽂件? 4. ⽂件的打开和关闭 4.1 流和标准流 4.1.1 流 4.1.2 标准流 4.2 ⽂件指针 4.3 ⽂件的打开和关闭 5. ⽂件的顺序读写 …...
LeetCode 251:展开二维向量
题目 Implement an iterator to flatten a 2d vector. Example: [1,2,3,4,5,6] [1,2,3,4,5,6] Follow up: As an added challenge, try to code it using only iterators in C++ or iterators in Java. 题解: 用两个index 分别记录list 的 index 和当前 list的element index. …...
练[BSidesCF 2020]Had a bad day
[BSidesCF 2020]Had a bad day 文章目录 [BSidesCF 2020]Had a bad day掌握知识解题过程关键paylaod 掌握知识 php伪协议进行文件包含,代码审计,strpos()函数会返回字符串在另一字符串中第一次出现的位置,如果没有找到则返回 FALSE&#…...
第十五章 类和对象——友元
生活中你的家有客厅(Public),有你的卧室(Private) 客厅所有来的客人都可以进去,但是你的卧室是私有的,也就是说只有你能进去 但是呢,你也可以允许你的好闺蜜好基友进去。 在程序里,有些私有属性 也想让类外特殊的一些…...
【数仓精品理论分析】能不能学大数据?
【数仓精品理论分析】能不能学大数据? 还能不能学大数据datapulse官网: 自身情况数据行业发展情况 还能不能学大数据 首先看到这个话题的时候,我是这样想的,能不能学大数据需要参考本人的自身情况【学历、年龄、决心、有没有矿或者…...
java复习-多态性
多态性 在Java中对于多态性由两种实现的模式: 方法的多态性 方法的重载:同一个方法名称可以根据传入的参数类型和个数的不同,进行不同的处理。 方法的覆写:同一个方法可能根据使用子类的不同,由不同的实现。 对象的…...
美团外卖优惠券小程序 美团优惠券微信小程序 自带流量主模式 带教程
小程序带举牌小人带菜谱流量主模式,挺多外卖小程序的,但是都没有搭建教程 搭建: 1、下载源码,去微信公众平台注册自己的账号 2、解压到桌面 3、打开微信开发者工具添加小程序-把解压的源码添加进去-appid改成自己小程序的 4、…...
编写IDEA插件,实现根据现有代码生成流程图
实现根据现有代码生成流程图的功能需要考虑以下几个步骤: 分析代码结构,获取代码中的变量声明、分支语句、循环语句等语句结构。 根据代码结构生成流程图的节点和边。 将生成的流程图展示在IDEA界面中。 下面逐一说明以上步骤的实现方法:…...
王杰国庆作业day6
服务器 #include <stdio.h> #include <string.h> #include <stdlib.h> #include <my_head.h> #define PORT 2324 //端口号 #define IP "192.168.10.107" //本机IP int main(int argc, const char *argv[]) {sqlite3* d…...
安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
短视频矩阵系统文案创作功能开发实践,定制化开发
在短视频行业迅猛发展的当下,企业和个人创作者为了扩大影响力、提升传播效果,纷纷采用短视频矩阵运营策略,同时管理多个平台、多个账号的内容发布。然而,频繁的文案创作需求让运营者疲于应对,如何高效产出高质量文案成…...
HubSpot推出与ChatGPT的深度集成引发兴奋与担忧
上周三,HubSpot宣布已构建与ChatGPT的深度集成,这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋,但同时也存在一些关于数据安全的担忧。 许多网络声音声称,这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...
给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...
pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)
目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 (1)输入单引号 (2)万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...
针对药品仓库的效期管理问题,如何利用WMS系统“破局”
案例: 某医药分销企业,主要经营各类药品的批发与零售。由于药品的特殊性,效期管理至关重要,但该企业一直面临效期问题的困扰。在未使用WMS系统之前,其药品入库、存储、出库等环节的效期管理主要依赖人工记录与检查。库…...
react菜单,动态绑定点击事件,菜单分离出去单独的js文件,Ant框架
1、菜单文件treeTop.js // 顶部菜单 import { AppstoreOutlined, SettingOutlined } from ant-design/icons; // 定义菜单项数据 const treeTop [{label: Docker管理,key: 1,icon: <AppstoreOutlined />,url:"/docker/index"},{label: 权限管理,key: 2,icon:…...
