当前位置: 首页 > news >正文

【实战-08】 flink自定义Map中的变量的行为

场景

自定义Map或者别的算子的时候,有时候需要定义一些类变量,在flink内部高并发的情况下需要正确理解这些变量的行为

代码

package com.pg.function;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class FlinkFunction {//对于自定义函数中的变量,只有内置的状态是完全按照flink内置的 keyBy行为来的//如果是自定义的缓存比如ArrayList 则可能不会按照预期的行为public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStream<String> dataStream = env.fromElements( "b","b","b","c","c","c","d","d","d");dataStream.keyBy(x->{return x;}).map(new MyMap()).print();env.execute();}}class MyMap extends RichMapFunction<String, String> {public ArrayList<String> list= new ArrayList<>();
//     public ValueState<Integer> counter;//存储数据条数
//     public ValueState<String> element;//存储临时数据
//     @Override
//     public void open(Configuration parameters) throws Exception {
//         counter = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("counter", Types.INT));
//         element = getRuntimeContext().getState(new ValueStateDescriptor<>("element", Types.STRING));
//     }@Overridepublic String map(String s) throws Exception {list.add(s);if(list.size()==2){String re = list.toString();list.clear();return re;}else {return "null";}
//        if (counter.value() == null) {
//            counter.update(1);//遇见第一条数据的时候,计数器为1
//        } else {
//            counter.update(counter.value() + 1);
//        }
//        if (element.value() == null) {
//            element.update(s);//element只存储上一次到来的数据
//        }else {
//            element.update(element.value()+s);
//        }
//        if (counter.value() == 2) {
//            String re = element.value();
//            //发出结果之后清楚状态
//            counter.clear();
//            element.clear();
//            return re;
//        }else {
//            return "null";
//        }}
}

分析

keyBy之后,理论上相同key的会在map中用同样的处理逻辑,我们的预期行为是输出:bb,cc,dd
但是用ArrayList实现的逻辑最终输出却是:bb,bc,cc,dd
用ValueState的输出是:bb,cc,dd
这说明了,keBy后的逻辑,ArrayList不会按照预期的行为执行。这是因为在flink中,当多个并发的时候,多个key如果落入同一个线程
则当前线程的valueState是和某一个key绑定的,符合flink预期行为,但是ArrayList以及其它你定义的变量则不做保证, 它是线程级别的局部变量, 这点要注意。

相关文章:

【实战-08】 flink自定义Map中的变量的行为

场景 自定义Map或者别的算子的时候&#xff0c;有时候需要定义一些类变量&#xff0c;在flink内部高并发的情况下需要正确理解这些变量的行为 代码 package com.pg.function;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common…...

Docker Volume

"Ice in my vein" Docker Volume(存储卷) 什么是存储卷? 存储卷就是: “将宿主机的本地文件系统中存在的某个目录&#xff0c;与容器内部的文件系统上的某一目录建立绑定关系”。 存储卷与容器本身的联合文件系统&#xff1f; 在宿主机上的这个与容器形成绑定关系…...

开源计算机视觉库OpenCV常用的API介绍

阅读本文之前请参阅-----开源计算机视觉库OpenCV详细介绍 OpenCV&#xff08;开源计算机视觉库&#xff09;是一个跨平台的计算机视觉和机器学习软件库&#xff0c;它提供了大量的API&#xff08;应用程序编程接口&#xff09;&#xff0c;用于处理图像和视频分析、对象检测、面…...

pytorch -- torch.nn下的常用损失函数

1.基础 loss function损失函数&#xff1a;预测输出与实际输出 差距 越小越好 - 计算实际输出和目标之间的差距 - 为我们更新输出提供依据&#xff08;反向传播&#xff09; 1. L1 torch.nn.L1Loss(size_averageNone, reduceNone, reduction‘mean’) 2. 平方差&#xff08;…...

daydayEXP: 支持自定义Poc文件的图形化漏洞利用工具

daydayEXP: 支持自定义Poc文件的图形化漏洞利用工具 基于java fx写的一款支持加载自定义poc文件的、可扩展的的图形化渗透测试框架。支持批量漏洞扫描、漏洞利用、结果导出等功能。 使用 经过测试,项目可在jdk8环境下正常使用。jdk11因为缺少一些必要的组件,所以jdk11版本工…...

无法访问云服务器上部署的Docker容器(二)

说明&#xff1a;记录一次使用公网IP 接口地址无法访问阿里云服务接口的问题&#xff1b; 描述 最近&#xff0c;我使用Docker部署了jeecg-boot项目&#xff0c;部署过程都没有问题&#xff0c;也没有错误信息。部署完成后&#xff0c;通过下面的地址访问后端Swagger接口文档…...

在Pycharm中运行Django项目如何指定运行的端口

方法步骤&#xff1a; 打开 PyCharm&#xff0c;选择你的 Django 项目。在菜单栏中&#xff0c;选择 “Run” -> “Edit Configurations...”。在打开的 “Run/Debug Configurations” 对话框中&#xff0c;选择你的 Django server 配置&#xff08;如果没有&#xff0c;你…...

Android将 ViewBinding封装到BaseActivity基类中(Java版)

在Android中使用Java语言将ViewBinding封装到基类中&#xff0c;操作步骤如下&#xff1a; 1、在项目的build.gradle文件中启用了ViewBinding&#xff0c;添加以下代码&#xff1a; android {...buildFeatures {viewBinding true} } 2、创建一个名为“BaseActivity”的基类&…...

JSP实现数据传递与保存(一)

一、Web开发步骤 1.1两类模式 后端——————前端 先有前端&#xff0c;前端用的时候直接调用 后端已实现注册接口&#xff0c;接口名为doRegister.jsp 前端此时&#xff1a; 前端的form表单中的action提交地址就只能填doRegister.jsp&#xff0c;即&#xff1a; <f…...

【论文笔记之 YIN】YIN, a fundamental frequency estimator for speech and music

本文对 Alain de Cheveigne 等人于 2002 年在 The Journal of the Acoustical Society of America 上发表的论文进行简单地翻译。如有表述不当之处欢迎批评指正。欢迎任何形式的转载&#xff0c;但请务必注明出处。 论文链接&#xff1a;http://audition.ens.fr/adc/pdf/2002_…...

水印相机小程序源码

水印相机前端源码&#xff0c;本程序无需后端&#xff0c;前端直接导入即可&#xff0c;没有添加流量主功能&#xff0c;大家开通后自行添加 源码搜索&#xff1a;源码软件库 注意小程序后台的隐私权限设置&#xff0c;前端需要授权才可使用 真实时间地址拍照记录&#xff0c…...

NXP实战笔记(八):S32K3xx基于RTD-SDK在S32DS上配置LCU实现ABZ解码

目录 1、概述 2、SDK配置 2.1、IO配置 2.2、TRGMUX配置 2.3、LCU配置 2.4、Trgmux配置 2.5、Emios配置 2.6、代码实现 1、概述 碰到光电编码器、磁编码器等,有时候传出来的位置信息为ABZ的方式,在S32K3里面通过TRGMUX、LCU、Emios结合的方式可以实现ABZ解码。 官方…...

【深度好文】simhash文本去重流程

对于类似于头条客户端而言,推荐的每一刷的新闻都必须是不同的新闻,这就需要对新闻文本进行排重。传统的去重一般是对文章的url链接进行排重,但是对于抓取的网页来说,各大平台的新闻可能存在重复,对于只通过文章url进行排重是不靠谱的,为了解决这个痛点于是就提出了用simh…...

主流的开发语言和开发环境介绍

个人浅见&#xff0c;不喜勿喷&#xff0c;谢谢 软件开发是一个涉及多个方面的复杂过程&#xff0c;其中包括选择合适的编程语言和开发环境。编程语言是软件开发的核心&#xff0c;它定义了程序员用来编写指令的语法和规则。而开发环境则提供了编写、测试和调试代码的工具和平台…...

List去重有几种方式

目录 1、for循环添加去重 2、for 双循环去重 3、for 双循环重复坐标去重 4、Set去重 5、stream流去重 1、for循环添加去重 List<String> oldList new ArrayList<>();oldList.add("张三");oldList.add("张三");oldList.add("李四&q…...

使用C#+NPOI进行Excel处理,实现多个Excel文件的求和统计

一个简易的控制台程序&#xff0c;使用C#NPOI进行Excel处理&#xff0c;实现多个Excel文件的求和统计。 前提&#xff1a; 待统计的Excel格式相同统计结果表与待统计的表格格式一致 引入如下四个动态库&#xff1a; 1. NPOI.dll 2. NPOI.OOXML.dll 3. NPOI.OpenXml4Net.dll …...

华清远见嵌入式学习——驱动开发——day9

目录 作业要求&#xff1a; 作业答案&#xff1a; 代码效果&#xff1a; ​编辑 Platform总线驱动代码&#xff1a; 应用程序代码&#xff1a; 设备树配置&#xff1a; 作业要求&#xff1a; 通过platform总线驱动框架编写LED灯的驱动&#xff0c;编写应用程序测试&…...

formality:set_constant应用

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 往期文章链接: formality:形式验证流程 scan mode func的功能检查需要把scan mode设置成0。...

sqllabs的order by注入

当我们在打开sqli-labs的46关发现其实是个表格&#xff0c;当测试sort等于123时&#xff0c;会根据列数的不同来进行排序 我们需要利用这个点来判断是否存在注入漏洞&#xff0c;通过加入asc 和desc判断页面有注入点 1、基于使用if语句盲注 如果我们配合if函数&#xff0c;表达…...

《The Art of InnoDB》第二部分|第4章:深入结构-磁盘结构-redo log

4.3 redo log 目录 4.3 redo log 4.3.1 redo log 介绍 4.3.2 redo log 的作用 4.3.3 redo log file 结构 4.3.4 redo log 提交逻辑 4.3.5 redo log 持久化逻辑 4.3.6 redo log 检查点 4.3.7 小结...

大模型安全相关论文

LLM对于安全的优势 “Generating secure hardware using chatgpt resistant to cwes,” Cryptology ePrint Archive, Paper 2023/212, 2023评估了ChatGPT平台上代码生成过程的安全性&#xff0c;特别是在硬件领域。探索了设计者可以采用的策略&#xff0c;使ChatGPT能够提供安…...

回归预测 | Matlab实现PSO-BiLSTM-Attention粒子群算法优化双向长短期记忆神经网络融合注意力机制多变量回归预测

回归预测 | Matlab实现PSO-BiLSTM-Attention粒子群算法优化双向长短期记忆神经网络融合注意力机制多变量回归预测 目录 回归预测 | Matlab实现PSO-BiLSTM-Attention粒子群算法优化双向长短期记忆神经网络融合注意力机制多变量回归预测预测效果基本描述程序设计参考资料 预测效果…...

[算法沉淀记录] 排序算法 —— 堆排序

排序算法 —— 堆排序 算法基础介绍 堆排序&#xff08;Heap Sort&#xff09;是一种基于比较的排序算法&#xff0c;它利用堆这种数据结构来实现排序。堆是一种特殊的完全二叉树&#xff0c;其中每个节点的值都必须大于或等于&#xff08;最大堆&#xff09;或小于或等于&am…...

C++ //练习 9.33 在本节最后一个例子中,如果不将insert的结果赋予begin,将会发生什么?编写程序,去掉此赋值语句,验证你的答案。

C Primer&#xff08;第5版&#xff09; 练习 9.33 练习 9.33 在本节最后一个例子中&#xff0c;如果不将insert的结果赋予begin&#xff0c;将会发生什么&#xff1f;编写程序&#xff0c;去掉此赋值语句&#xff0c;验证你的答案。 环境&#xff1a;Linux Ubuntu&#xff0…...

[corCTF 2022] CoRJail: From Null Byte Overflow To Docker Escape

前言 题目来源&#xff1a;竞赛官网 – 建议这里下载&#xff0c;文件系统/带符号的 vmlinux 给了 参考 [corCTF 2022] CoRJail: From Null Byte Overflow To Docker Escape Exploiting poll_list Objects In The Linux Kernel – 原作者文章&#xff0c;poll_list 利用方式…...

thinkphp6定时任务

这里主要是教没有用过定时任务没有头绪的朋友, 定时任务可以处理一些定时备份数据库等一系列操作, 具体根据自己的业务逻辑进行更改 直接上代码 首先, 是先在 tp 中的 command 方法中声明, 如果没有就自己新建一个, 代码如下 然后就是写你的业务逻辑 执行定时任务 方法写好了…...

支持国密ssl的curl编译和测试验证(上)

目录 1. 编译铜锁ssl库2. 编译nghttp2库3. 编译curl4. 验证4.1 查看版本信息4.2 验证国密ssl握手功能4.3 验证http2协议功能 以下以ubuntu 22.04环境为例进行编译 本次编译采用铜锁sslnghttp2curl&#xff0c;使得编译出来的curl可以支持国密ssl&#xff0c;并且可以支持http2…...

包装类详解

概述 Java提供了两个类型系统&#xff0c;基本类型与引用类型&#xff0c;使用基本类型在于效率&#xff0c;然而很多情况&#xff0c;会创建对象使用&#xff0c;因为对象可以做更多的功能&#xff0c;如果想要我们的基本类型像对象一样操作&#xff0c;就可以使用基本类型对…...

vue3与vue2的区别

Vue 3和Vue 2在以下几个方面有一些区别&#xff1a; 性能提升&#xff1a;Vue 3对渲染性能和内存占用进行了优化&#xff0c;使用了Proxy代理对象&#xff0c;比Vue 2的Object.defineProperty更高效。此外&#xff0c;Vue 3还引入了静态树提升&#xff08;Static Tree Hoisting…...

SSL OV证书和DV、EV证书的区别

在网站搭建的过程中和小程序开发过程中&#xff0c;很难免会有需要用到SSL证书的地方&#xff0c;但是目前数字证书种类繁多&#xff0c;该选择什么类型的证书成为了一个令人纠结的问题。 目前在市场上较为常见的证书分为三种&#xff1a;DV域名验证型证书&#xff1b;OV组织验…...

商务网站的功能和建设/个人外包接单平台

目录 一、前言 二、Git Bash 三、统计信息 1.统计某项目中成员数量 2.统计所有用户的提交总次数 3.统计所有用户指定时间段的提交次数 4.按用户名统计提交次数 完整脚本如下 一、前言 项目中有很多成员&#xff0c;如何查看各个时间段每个组员的代码提交量&#xff0c;下…...

电子商务网站建设开发文档/搜索引擎优化哪些方面

这道题是让求派出机器人的最少数量&#xff0c;乍一看以为是简单的求最小路径覆盖&#xff0c;后来发现错了&#xff0c;因为有的点可以走多次&#xff0c;而二分中每个点只能走一次&#xff0c;所以要先用floyd进行传递闭包&#xff0c;然后用二分 #include<stdio.h> #i…...

做网站都需要哪些知识/整站优化提升排名

根据传入不同的参数&#xff0c;智能的显示蜂窝式布局&#xff0c;效果如下图&#xff1a; 制作步骤&#xff1a; 1、在库中建一个mc 绑定名称为“myMC” 2、建一个文档类&#xff0c;代码如下&#xff1a; package {import flash.display.Sprite;/*** author chb* data 201…...

松北建设局网站/51link友链

最近项目上有一个需求&#xff0c;在api上收到的请求&#xff0c;需要在springmvc转化成实体参数之前把request body读取出来记录日志。 在通常的响应流程上&#xff0c;使用了request.getInputStream()之后&#xff0c;流就会失效&#xff0c;即这个request body的流只能读取一…...

wordpress 图片网站/seo策略主要包括

版权符号&#xff0c;和商标的专利符号 打开word文档 按 ctrlaltr 和ctrlaltC直接输出带圈字符也可在PHOTOSHOP种用图形描绘。 如果是用在网页里面&#xff0c;那么很简单&#xff0c;版权所有符号是 © 注册商标符号是 商标符号是 ™ 打开一个word&#xff0c;然后先打…...

航天桥网站建设/24小时自助下单平台网站便宜

2019独角兽企业重金招聘Python工程师标准>>> 表格存储TableStore是阿里云自研的面向海量结构化和半结构化数据存储的Serverless NoSQL多模型数据库&#xff0c;被广泛用于社交、物联网、人工智能、元数据和大数据等业务场景。表格存储TableStore采用与Google Bigtab…...