Java版Flink使用指南——定制RabbitMQ数据源的序列化器
大纲
- 新建工程
- 新增依赖
- 数据对象
- 序列化器
- 接入数据源
- 测试
- 修改Slot个数
- 打包、提交、运行
- 工程代码
在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。
新建工程
我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1

新增依赖
在pom.xml中新增RabbitMQ连接器
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>
新增Json库依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>
新增lombok库,主要是为了使用它的一些注解
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>
数据对象
我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java
package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}
这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。
序列化器
我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。
src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java
package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}
接入数据源
我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。
String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);
测试
修改Slot个数
由于我们要运行两个流式计算任务,于是需要两个Slot。
vim conf/config.yaml
将numberOfTaskSlots的值改成2。
打包、提交、运行
我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行

然后在日志中可以看到“已婚”的数据都在输出
tail -f log/*

工程代码
https://github.com/f304646673/FlinkDemo
相关文章:
Java版Flink使用指南——定制RabbitMQ数据源的序列化器
大纲 新建工程新增依赖数据对象序列化器接入数据源 测试修改Slot个数打包、提交、运行 工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象&#x…...
CV每日论文--2024.7.8
1、DisCo-Diff: Enhancing Continuous Diffusion Models with Discrete Latents 中文标题:DisCo-Diff:利用离散潜伏增强连续扩散模型 简介:这篇文章提出了一种新型的离散-连续潜变量扩散模型(DisCo-Diff),旨在改善传统扩散模型(DMs)存在的问…...
【AI大模型】赋能儿童安全:楼层与室内定位实践与未来发展
文章目录 引言第一章:AI与室内定位技术1.1 AI技术概述1.2 室内定位技术概述1.3 楼层定位的挑战与解决方案 第二章:儿童定位与安全监控的需求2.1 儿童安全问题的现状2.2 智能穿戴设备的兴起 第三章:技术实现细节3.1 硬件设计与选择传感器选择与…...
云服务器linux系统安装配置docker
在我们拿到一个纯净的linux系统时,我需要进行一些基础环境的配置 (如果是云服务器可以用XShell远程连接,如果连接不上可能是服务器没开放22端口) 下面是配置环境的步骤 sudo -s进入root权限:退出使用exit sudo -i进入…...
泰勒雷达图2
matplotlib绘制泰勒雷达图 import matplotlib.pyplot as plt import numpy as np from numpy.core.fromnumeric import shape import pandas as pd import dask.dataframe as dd from matplotlib.projections import PolarAxes import mpl_toolkits.axisartist.floating_axes a…...
数据库容灾 | MySQL MGR与阿里云PolarDB-X Paxos的深度对比
开源生态 众所周知,MySQL主备库(两节点)一般通过异步复制、半同步复制(Semi-Sync)来实现数据高可用,但主备架构在机房网络故障、主机hang住等异常场景下,HA切换后大概率就会出现数据不一致的问…...
react根据后端返回数据动态添加路由
以下代码都为部分核心代码 一.根据不同的登录用户,返回不同的权限列表 ,以下是三种不同用户限权列表 const pression { //超级管理员BigAdmin: [{key: "screen",icon: "FileOutlined",label: "数据图表",},{key: "…...
机器学习中的可解释性
「AI秘籍」系列课程: 人工智能应用数学基础 人工智能Python基础 人工智能基础核心知识 人工智能BI核心知识 人工智能CV核心知识 为什么我们需要了解模型如何进行预测 我们是否应该始终信任表现良好的模型?模型可能会拒绝你的抵押贷款申请或诊断你患…...
上海慕尼黑电子展开展,启明智显携物联网前沿方案亮相
随着科技创新的浪潮不断涌来,上海慕尼黑电子展在万众瞩目中盛大开幕。本次展会汇聚了全球顶尖的电子产品与技术解决方案,成为业界瞩目的焦点。启明智显作为物联网彩屏显示领域的佼佼者携产品亮相展会,为参展者带来了RTOS、LINUX全系列方案及A…...
Centos7离线安装ElasticSearch7.4.2
一、官网下载相关的安装包 ElasticSearch7.4.2: elasticsearch-7.4.2-linux-x86_64.tar.gz 下载中文分词器: elasticsearch-analysis-ik-7.4.2.zip 二、上传解压文件到服务器 上传到目录:/home/data/elasticsearch 解压文件࿱…...
深入理解sklearn中的模型参数优化技术
参数优化是机器学习中的关键步骤,它直接影响模型的性能和泛化能力。在sklearn中,参数优化可以通过多种方式实现,包括网格搜索(GridSearchCV)、随机搜索(RandomizedSearchCV)和贝叶斯优化等。本文…...
【Elasticsearch】开源搜索技术的演进与选择:Elasticsearch 与 OpenSearch
开源搜索技术的演进与选择:Elasticsearch 与 OpenSearch 1.历史发展2.OpenSearch 与 Elasticsearch 相同点3.OpenSearch 与 Elasticsearch 不同点3.1 版本大不同3.2 许可证不同3.3 社区不同3.4 功能不同3.5 安全性不同3.6 性能不同3.7 价格不同3.8 两者可相互导入 4…...
欧拉openEuler 22.03 LTS-部署k8sv1.03.1
1.设置ip # vi /etc/sysconfig/network-scripts/ifcfg-ens32 TYPEEthernet PROXY_METHODnone BROWSER_ONLYno BOOTPROTOstatic DEFROUTEyes IPV4_FAILURE_FATALno #IPV6INITyes #IPV6_AUTOCONFyes #IPV6_DEFROUTEyes #IPV6_FAILURE_FATALno #IPV6_ADDR_GEN_MODEeui64 NAMEens1…...
老年生活照护实训室:为养老服务业输送专业人才
本文探讨了老年生活照护实训室在养老服务业专业人才培养中的关键作用。通过详细阐述实训室的功能、教学实践、对学生能力的培养以及面临的挑战和解决方案,强调了其在提升人才素质、满足行业需求方面的重要性,旨在为养老服务业的可持续发展提供有力的人才…...
go语言中使用WaitGroup和channel实现处理多线程问题
WaitGroup 背景 如果将一个任务分为任意个小任务,并且不关心小任务的执行顺序,并且希望等待全部的小任务执行完成后再去操作后面的逻辑,那我推荐你用sync.WaitGRoup 使用方法 比如,有一个任务需要执行 3 个子任务,…...
Open3D 计算点云的平均密度
目录 一、概述 1.1基于领域密度计算原理 1.2应用 二、代码实现 三、实现效果 2.1点云显示 2.2密度计算结果 一、概述 在点云处理中,点的密度通常表示为某个点周围一定区域内的点的数量。高密度区域表示点云较密集,低密度区域表示点云较稀疏。计算…...
C语言之数据在内存中的存储(1),整形与大小端字节序
目录 前言 一、整形数据在内存中的存储 二、大小端字节序 三、大小端字节序的判断 四、字符型数据在内存中的存储 总结 前言 本文主要讲述整型包括字符型是如何在内存中存储的,涉及到大小端字节序这一概念,还有如何判断大小端,希望对大…...
B端全局导航:左侧还是顶部?不是随随便便,有依据在。
一、什么是全局导航 B端系统的全局导航是指在B端系统中的主要导航菜单,它通常位于系统的顶部或左侧,提供了系统中各个模块和功能的入口。全局导航菜单可以帮助用户快速找到和访问系统中的各个功能模块,提高系统的可用性和用户体验。 全局导航…...
什么是海外仓管理自动化?策略及落地实施步骤指南
作为海外仓的管理者,你每天都面临提高海外仓运营效率、降低成本和满足客户需求的问题。海外仓自动化管理技术为这些问题提供了不错的解决思路,不过和任何新技术一样,从策略到落地实施,都有一个对基础逻辑的认识过程。 今天我们整…...
自定义控件三部曲之绘图篇(六)Paint之函数大汇总、ColorMatrix与滤镜效果、setColorFilter
在自定义控件的绘图篇中,Paint 类是核心的组成部分之一,它控制了在 Canvas 上绘制的内容的各种属性,包括颜色、风格、抗锯齿、透明度等等。下面将详细介绍 Paint 的主要功能以及如何使用 ColorMatrix 和 setColorFilter 来实现滤镜效果。 Pa…...
铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】
1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件(System Property Definition File),用于声明和管理 Bluetooth 模块相…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)
参考官方文档:https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java(供 Kotlin 使用) 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
