Flink nc -l -p 监听端口测试
1、9999端口未占用
netstat -apn|grep 9999
2、消息发送端
nc -l -k -p 9999
{"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1}
{"user":"xiaohu","url":"www.baidu5.com","timestamp":1267L, "score":10}
{"user":"ming","url":"www.baidu7.com","timestamp":4200L, "score":9}
{"user":"xiaohu","url":"www.baidu8.com","timestamp":5500L, "score":90}
{"user":"Biu","url":"www.baidu8.com","timestamp":5500L, "score":1000}{"user":"ming","url":"www.baidu1.com", "timestamp":1717171200000, "score":1}
{"user":"xiaohu","url":"www.baidu5.com","timestamp":1717171202000, "score":10}
{"user":"ming","url":"www.baidu7.com","timestamp":1717171260000, "score":9}
{"user":"xiaohu","url":"www.baidu8.com","timestamp":1717264860000, "score":90}
{"user":"Biu","url":"www.baidu8.com","timestamp":1718780790000, "score":1000}
3、运行
周期性水位线
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Timestamp;
import java.util.ArrayList;/*** Description: * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\* forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线*/
public class FlinkPeriodicWatermarkGeneratorTestJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// ArrayList<Event> list = new ArrayList<>();
// list.add(new Event("ming","www.baidu1.com",1200L));
// list.add(new Event("xiaohu","www.baidu5.com",1267L));
// list.add(new Event("ming","www.baidu7.com",4200L));
// list.add(new Event("xiaohu","www.baidu8.com",5500L));
//
// DataStreamSource<Event> ds = env.fromCollection(list, BasicTypeInfo.of(Event.class));DataStreamSource<String> dss = env.socketTextStream("test002", 9999);SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {Event event = new Event();event.toEvent(value);return event;}});
// ds.print();SingleOutputStreamOperator<Event> watermarks = ds// AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
// .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()// BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Event>() {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L;@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs);// 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 调整周期时间 flink时间窗口(左开,右闭]output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}};}});ds.print();env.setParallelism(1);env.execute();}public static class Event{String user;String url;Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}public String getUser() {return user;}public String getUrl() {return url;}public Long getTimestamp() {return timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}public void toEvent(String val){JSONObject js = JSONObject.parseObject(val);this.user = js.getString("user");this.url = js.getString("url");this.timestamp = js.getLong("timestamp");}}
}
断点式水位线
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Timestamp;
import java.util.ArrayList;/*** Description: * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\* forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线*/
public class FlinkPunctuatedWatermarkGeneratorTestJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dss = env.socketTextStream("test002", 9999);SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {Event event = new Event();event.toEvent(value);return event;}});
// ds.print();SingleOutputStreamOperator<Event> watermarks = ds// AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
// .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()// BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Event>() {@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的 itemId 时,才发出水位线if (event.getUser().equals("Biu")) {output.emitWatermark(new Watermark(event.getTimestamp() - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线}};}});ds.print();env.setParallelism(1);env.execute();}public static class Event{String user;String url;Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}public String getUser() {return user;}public String getUrl() {return url;}public Long getTimestamp() {return timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}public void toEvent(String val){JSONObject js = JSONObject.parseObject(val);this.user = js.getString("user");this.url = js.getString("url");this.timestamp = js.getLong("timestamp");}}
}
4、打印
3> Event{user='ming', url='www.baidu1.com', timestamp=1970-01-01 08:00:01.2}
4> Event{user='xiaohu', url='www.baidu5.com', timestamp=1970-01-01 08:00:01.267}
5> Event{user='ming', url='www.baidu7.com', timestamp=1970-01-01 08:00:04.2}
6> Event{user='xiaohu', url='www.baidu8.com', timestamp=1970-01-01 08:00:05.5}
参考:
【Flink】Flink 中的时间和窗口之水位线(Watermark)-CSDN博客
Flink watermark_nc -lp 9999-CSDN博客
NoteWarehouse/05_BigData/09_Flink(1).md at main · FGL12321/NoteWarehouse · GitHub
相关文章:
Flink nc -l -p 监听端口测试
1、9999端口未占用 netstat -apn|grep 99992、消息发送端 nc -l -k -p 9999 {"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1} {"user":"xiaohu","url":…...
在IntelliJ IDEA中使用Spring Boot:快速配置
使用IntelliJ IDEA开发Spring Boot应用程序可以极大地提高开发效率,因为IDEA提供了许多便捷的功能,比如自动补全、代码分析、热部署等。以下是一篇可能的CSDN博客文章草稿,介绍如何在IntelliJ IDEA中使用Spring Boot: 在IntelliJ …...
django filter 批量修改
django filter 批量修改 在Django中,如果你想要批量修改记录,可以使用update()方法。这个方法允许你在一个查询集上执行批量更新,而不需要为每条记录生成单独的数据库事务。 以下是一个使用update()方法批量修改记录的例子: fro…...
maven:中央仓库验证方式改变:401 Content access is protected by token
前几天向maven中央仓库发布版本,执行上传命令mvn release:perform时报错了: [ERROR] Failed to execute goal org.sonatype.plugins:nexus-staging-maven-plugin:1.6.13:deploy (injected-nexus-deploy) on project xxxxx: Failed to deploy artifacts: …...
【面试】http
一、定义 HTTP(超文本传输协议),是一种用于分布式、协作式、超媒体信息系统的应用层协议,它是万维网数据通信的基础。主要特点是无状态(服务器不会保存之前请求的状态)、无连接(服务器处理完请…...
获取泛型,泛型擦除,TypeReference 原理分析
说明 author blog.jellyfishmix.com / JellyfishMIX - githubLICENSE GPL-2.0 获取泛型,泛型擦除 下图中示例代码是一个工具类用于生成 csv 文件,需要拿到数据的类型,使用反射感知数据类型的字段,来填充表字段名。可以看到泛型…...
springboot 3.x 之 集成rabbitmq实现动态发送消息给不同的队列
背景 实际项目中遇到针对不同类型的消息,发送消息到不同的队列,而且队列可能还不存在,需要动态创建,于是写了如下代码,实践发现没啥问题,这里分享下。 环境 springboot 3.2 JDK 17 rabbitMQ模型介绍 图片…...
C++ 代码实现鼠标右键注册菜单,一级目录和二级目录方法
最近做的一个项目, 在使用windows的时候,我希望在右键菜单中添加一个自定义的选项, 该选项下有我经常使用的多个程序快捷方式, 直接上代码 头文件 #pragma once #include <Windows.h> #include <iostream> #include <string> using namespace std; …...
SQLite 3 优化批量数据存储操作---事务transaction机制
0、事务操作 事务的目的是为了保证数据的一致性和完整性。 事务(Transaction)具有以下四个标准属性,通常根据首字母缩写为 ACID: 原子性(Atomicity):确保工作单位内的所有操作都成功完成&…...
[程序员] 表达的能力
之前看CSDN的问答区,很多时候,感觉问题的描述所要表达的意思非常模糊,或者说描述不清。如果是想回答问题的人想回答问题,首先要搞清楚是什么问题,就需要再问问题主很多细节的东西。三来四去,才能搞清楚具体…...
rknn转换后精度差异很大,失真算子自纠
下面是添加了详细注释的优化代码: import cv2 import numpy as np import onnx import onnxruntime as rt from onnx import helper, shape_inferencedef get_all_node_names(model):"""获取模型中所有节点的名称。参数:model (onnx.ModelProto): O…...
【C语言】解决C语言报错:Stack Overflow
文章目录 简介什么是Stack OverflowStack Overflow的常见原因如何检测和调试Stack Overflow解决Stack Overflow的最佳实践详细实例解析示例1:递归调用过深示例2:分配过大的局部变量示例3:嵌套函数调用过多 进一步阅读和参考资料总结 简介 St…...
【滚动哈希 二分查找】1044. 最长重复子串
本文涉及知识点 滚动哈希 二分查找算法合集 LeetCode 1044. 最长重复子串 给你一个字符串 s ,考虑其所有 重复子串 :即 s 的(连续)子串,在 s 中出现 2 次或更多次。这些出现之间可能存在重叠。 返回 任意一个 可能具…...
webid、sec_poison_id、a1、web_session参数分析与算法实现
文章目录 1. 写在前面2. 参数分析3. 核心算法【🏠作者主页】:吴秋霖 【💼作者介绍】:擅长爬虫与JS加密逆向分析!Python领域优质创作者、CSDN博客专家、阿里云博客专家、华为云享专家。一路走来长期坚守并致力于Python与爬虫领域研究与开发工作! 【🌟作者推荐】:对爬…...
Qt|QWebSocket与Web进行通讯,实时接收语音流
实现功能主要思路:在网页端进行语音输入,PC机可以实时接收并播放语音流。 此时,Qt程序做客户端,Web端做服务器,使用QWebSocket进行通讯,实时播放接收的语音流。 功能实现 想要实现该功能,需要…...
「51媒体」电视台媒体邀约采访报道怎么做?
传媒如春雨,润物细无声,大家好,我是51媒体网胡老师。 电视台作为地方主流媒体,对于新闻报道有着严格的选题标准和报道流程。如果您希望电视台对某个会议或活动进行报道,可以按这样的方法来做: 1.明确活动信…...
Python提取PDF文本和图片,以及提前PDF页面中指定矩形区域的文本
前言 从PDF中提取内容能帮助我们获取文件中的信息,以便进行进一步的分析和处理。此外,在遇到类似项目时,提取出来的文本或图片也能再次利用。要在Python中通过代码提取PDF文件中的文本和图片,可以使用 Spire.PDF for Python 这个…...
C#实现边缘锐化(图像处理)
在 C# 中进行图像的边缘锐化,可以通过卷积滤波器实现。边缘锐化的基本思想是通过卷积核(也称为滤波器或掩模)来增强图像中的边缘。我们可以使用一个简单的锐化核,例如: [ 0, -1, 0][-1, 5, -1][ 0, -1, 0]这个卷积核…...
ffmpeg windows系统详细教程
视频做预览时黑屏,但有声音问题解决方案。 需要将 .mp4编成H.264格式的.mp4 一般上传视频的站点,如YouTube、Vimeo 等,通常会在用户上传视频时自动对视频进行转码,以确保视频能够在各种设备和网络条件下流畅播放。这些网站通常…...
【单片机】MSP430G2553单片机 Could not find MSP-FET430UIF on specified COM port 解决方案
文章目录 MSP430G2553开发板基础知识解决办法如何实施解决办法4步骤一步骤二步骤三 MSP430G2553开发板基础知识 MSP430G2553开发板如下图,上半部分就是UIF程序下载调试区域的硬件。个人觉得MSP430G2553开发板的这个部分没有做好硬件设计,导致很多系统兼…...
每日一题——力扣104. 二叉树的最大深度(举一反三+思想解读+逐步优化)四千字好文
一个认为一切根源都是“自己不够强”的INTJ 个人主页:用哲学编程-CSDN博客专栏:每日一题——举一反三Python编程学习Python内置函数 目录 我的写法 代码功能 代码结构 时间复杂度分析 空间复杂度分析 总结 我要更强 优化方法:迭代&…...
wpf textbox 有焦点 导致后台更新 前台不跟着改变
这个问题可能是由于 WPF 的数据绑定机制导致的。当 TextBox 有焦点时,它会独立于数据绑定进行更新,这可能会导致前台界面不能及时反映后台数据的变化。 1.使用 UpdateSourceTrigger 属性: 在数据绑定时,将 UpdateSourceTrigger 属性设置为 PropertyChanged。这样当 TextBox 的…...
数字化物资管理系统的未来:RFID技术的创新应用
在信息化和智能化不断发展的背景下,物资管理系统的数字化转型已成为各行各业关注的焦点。RFID技术作为一种先进的物联网技术,通过全面数字化实现物资信息的实时追踪和高效管理,为企业的物资管理提供了强有力的支持。 首先,RFID技…...
【docker】常用指令-表格整理
以下列出的指令是Docker中常用的命令,但并不是全部。Docker的指令非常丰富,可以根据具体的需求和场景选择合适的指令。同时,每个指令都有很多选项和参数可以使用,可以通过 docker COMMAND --help 来获取更详细的信息。 一、容器命…...
洛谷——P2824 排序
题目来源:[HEOI2016/TJOI2016] 排序 - 洛谷https://www.luogu.com.cn/problem/P2824 问题思路 本文介绍一种二分答案的做法,时间复杂度为:(nm)*log(n)*log(n).本题存在nlog(n)的做法,然而其做法没有二分答案的做法通俗易懂. 默认读…...
echart在线图表demo下载直接运行
echart 全面的数据可视化图表解决方案 | 折线图、柱状图、饼图、散点图、水球图等各类图表展示 持续更新中 三色带下表题速度仪表盘 地图自定义图标 动态环形图饼状图 动态水波动圆形 多标题指针仪表盘 温度仪表盘带下标题 横向柱状图排名 环形饼状图 双折线趋势变化...
MLX5_SET_TO_ONES宏解析
看代码时,遇到一个非常复杂的宏MLX5_SET_TO_ONES,这个宏的主要作用是对特定的数据结构置位,宏的上下文如下: #define __mlx5_nullp(typ) ((struct mlx5_ifc_##typ##_bits *)0) #define __mlx5_bit_off(typ, fld) (offsetof(struc…...
SQL Server入门-SSMS简单使用(2008R2版)-1
环境: win10,SQL Server 2008 R2 参考: SQL Server 新建数据库 - 菜鸟教程 https://www.cainiaoya.com/sqlserver/sql-server-create-db.html 第 2 课:编写 Transact-SQL | Microsoft Learn https://learn.microsoft.com/zh-cn/…...
高考专业抉择探索计算机专业的未来展望及适合人群
身份:一位正在面临人生重要抉择的高考生,一位计算机行业从业者 正文: 随着2024年高考落幕,我与数百万高三学生一样,又将面临人生中的重要抉择:选择大学专业。对于许多学生来说,计算机科学…...
windows安装spark
在 Windows 上安装 Spark 并进行配置需要一些步骤,包括安装必要的软件和配置环境变量。以下是详细的步骤指南: 步骤一:安装 Java 下载和安装 Java Development Kit (JDK) 到 Oracle JDK 下载页面 或 OpenJDK 下载页面 下载适合你系统的 JDK。…...
网站设计软件培训怎么样/怎么样引流加微信
一、数学规划模型简介 什么是优化问题? 解决有限资源的最佳分配问题。即如何用“最好”的方法,使有限的资源能获取最佳的经济效益。 数学规划模型分类: 线性规划模型(LP)、非线性规划模型(NLP)、整数规划…...
怀化网络推广收费标准/阿亮seo技术
第一步 使用influxd config > influxd.config生成默认配置文件 influxd.config。 第二步 然后修改influxd.config文件中的相关配置,保存。再用influxd -config influxd.config指定默认的配置文件启动服务。...
遵义住房和城乡建设厅网站/seo工具大全
需求在网页中启动本地应用程序并传递参数实现利用URL Protocol实现1、URL Protocol格式,如下:使用记事本输入上述内容,修改其中的协议名称、应用程序路径,启动参数,并保存成reg文件。点击执行保存的reg文件,…...
建行官网网站/北京专门做seo
部分来源于网络整理,部分自己手写,如有错误,请及时指正。以下所有几乎必问,每个公司侧重点不同,面试前先大概了解公司业务或者岗位需求。PHP 是一种创建动态交互性站点的强有力的服务器端脚本语言。1、常见状态码1xx &…...
什么网站可以做市场分析呢/谷歌优化推广
1)实验平台:正点原子水星 STM32F4/F7 开发板2)摘自《STM32F7 开发指南(HAL 库版)》关注官方微信号公众号,获取更多资料:正点原子http://weixin.qq.com/r/hEhUTLbEdesKrfIv9x2W (二维码自动识别)第十六章 电容触摸按键实…...
wdcp v3搭建WordPress/seo诊断优化方案
【摘要】最近两个月都在学习 Linux 驱动,中间碰到了很多问题,进度比较缓慢。尽管不是班科出生的,但是还是觉得算法很有必要学一学。因此将数组元素查找作为自己算法开篇的第一篇博客,好好跟着平凡程序员的博客学习,内容…...