Java版Flink使用指南——从RabbitMQ中队列中接入消息流
大纲
- 创建RabbitMQ队列
- 新建工程
- 新增依赖
- 编码
- 设置数据源配置
- 读取、处理数据
- 完整代码
- 打包、上传和运行任务
- 测试
- 工程代码
在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而现实中,数据往往来源于外部。本文我们将尝试Flink从RabbitMQ中读取数据,然后输出到日志中。
关于RabbitMQ的知识可以参阅《RabbitMQ实践》。
创建RabbitMQ队列
我们创建一个Classic队列data.from.rbtmq。注意要选择Durable类型,这是后续用的默认连接器的限制。
具体方法见《RabbitMQ实践——在管理后台测试消息收发功能》。
后续我们将在后台通过默认交换器,给这个队列新增消息。
新建工程
我们在IntelliJ中新建一个工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入与Flink的版本:1.19.1
新增依赖
在pom.xml中新增RabbitMQ连接器
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>
编码
设置数据源配置
String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());
读取、处理数据
下面代码通过addSource添加RabbitMQ数据源。注意,不能使用fromSource方法,是因为RMQSource没有实现SourceFunction方法。
final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);
完整代码
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String queueName = "data.from.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);env.execute("Flink Java API Skeleton");}
}
打包、上传和运行任务
测试
在RabbitMQ后台的默认交换器中,发布一条消息到data.from.rbtmq
然后使用下面指令可以看到Flink读取到消息并执行了print方法
tail log/flink-*-taskexecutor-*.out
==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default
工程代码
https://github.com/f304646673/FlinkDemo
相关文章:
Java版Flink使用指南——从RabbitMQ中队列中接入消息流
大纲 创建RabbitMQ队列新建工程新增依赖编码设置数据源配置读取、处理数据完整代码 打包、上传和运行任务测试 工程代码 在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而…...
Python酷库之旅-第三方库Pandas(013)
目录 一、用法精讲 31、pandas.read_feather函数 31-1、语法 31-2、参数 31-3、功能 31-4、返回值 31-5、说明 31-6、用法 31-6-1、数据准备 31-6-2、代码示例 31-6-3、结果输出 32、pandas.DataFrame.to_feather函数 32-1、语法 32-2、参数 32-3、功能 32-4、…...
Linux 高级 Shell 脚本编程:掌握 Shell 脚本精髓,提升工作效率
【Linux】 高级 Shell 脚本编程:掌握 Shell 脚本精髓,提升工作效率 Shell 脚本编程是 Linux 系统管理员和开发人员的必备技能。通过学习高级 Shell 脚本编程,你可以编写更高效、更灵活和更易于维护的脚本。本文将介绍 Shell 脚本编程中的函数…...
【ARMv8/v9 GIC 系列 1.5 -- Enabling the distribution of interrupts】
请阅读【ARM GICv3/v4 实战学习 】 文章目录 Enabling the distribution of interruptsGIC Distributor 中断组分发控制CPU Interface 中断组分发控制Physical LPIs 的启用Summary Enabling the distribution of interrupts 在ARM GICv3和GICv4体系结构中,中断分发…...
《mysql篇》--索引事务
索引 索引的介绍 索引是帮助MySQL高效获取数据的数据结构,是一种特殊的文件,包含着对数据表里所有记录的引用指针,因为索引本身也比较大,所以索引一般是存储在磁盘上的,索引的种类有很多,不过如果没有特殊…...
科研绘图系列:R语言STAMP图(STAMP Plot)
介绍 STAMP图(STAMP plot)并非一个广泛认知的、具有特定名称的图表类型,而是可能指在STAMP(Statistical Analysis of Metagenomic Profiles:“STAMP: statistical analysis of taxonomic and functional profiles”)软件使用过程中生成的各种统计和可视化图表的总称。ST…...
运维团队如何应对动环监控与IT监控分离的挑战
IT与机房动环监控的一体化是当下及未来的必然趋势,这一模式显著节省了运维过程中的时间与成本。一体化平台不仅消除了频繁切换系统的繁琐,更在一个统一界面上实现了多元化的管理运维功能,极大地提升了工作效率。 在机房升级或新建项目中&…...
深入解析大数据核心概念:数据平台、数据中台、数据湖与数据仓库的异同与应用
大数据领域内的诸多概念常常让人困惑,其中数据平台、数据中台、数据湖和数据仓库是最为关键的几个。 1. 数据平台 定义: 数据平台是一个综合性的技术框架,旨在支持整个数据生命周期的管理和使用。它包含数据采集、存储、处理、分析和可视化…...
开发指南040-业务操作日志
平台所有业务操作都存储在核心库,以便统一分析处理。各业务微服务通过feign调用核心日志服务。底层提供了API: <dependency><groupId>org.qlm</groupId><artifactId>qlm-api</artifactId><version>1.0-SNAPSHOT<…...
如何构建数据驱动的企业?爬虫管理平台是关键桥梁吗?
一、数据驱动时代:为何选择爬虫管理平台? 在信息爆炸的今天,数据驱动已成为企业发展的核心战略之一。爬虫管理平台,作为数据采集的第一站,它的重要性不言而喻。这类平台通过自动化手段,从互联网的各个角落…...
多线程Thread
线程Thread简介 任务、线程、金城、多线程 多任务:短时间切换不同得任务 多线程:通过同一条道路,增加道多条道路,提高使用率,解决堵塞问题 普通方法调多线程只有主线一台执行路径是主线程调run()方法,方…...
计算机网络之WPAN 和 WLAN
上一篇文章内容:无线局域网 1.WPAN(无线个人区域网) WPAN 是以个人为中心来使用的无线个人区域网,它实际上就是一个低功率、小范围、低速率和低价格的电缆替代技术。 (1) 蓝牙系统(Bluetooth) &#…...
TikTok海外运营,云手机多种变现方法
从现阶段来看,TikTok 的用户基数不断增长,已然成为全球创业者和品牌的全新竞争舞台。其用户数量近乎 20 亿,年轻用户占据主导,市场渗透率也逐年提高。不管是大型企业、著名品牌,还是个体创业者,都绝不能小觑…...
kubekey在ubuntu24实现kubernetes快速安装
基于Ubunut24.04安装 设置主机名 hostnamectl set-hostname kkmain hostnamectl set-hostname kknode1 hostnamectl set-hostname kknode2关闭swap sudo swapoff -a sudo sed -i s/.*swap.*/#&/ /etc/fstab安装kubekey export KKZONEcn curl -sfL https://get-kk.kubes…...
根据关键词query获取google_img(api方式)
文章目录 说明代码第一部分:链接保存为Json第二部分:链接转换为img 说明 根据关键词query获取google_img USERNAME “xxx” PASSWORD “xxx” 官网申请。 代码 首先获取图片链接,保存为json之后下载。 第一部分:链接保存为…...
西安明德理工学院师生莅临泰迪智能科技开展参观见习活动
为进一步深化校企合作,落实高校应用型人才培养。7月8日,西安明德理工学院与广东泰迪智能科技股份有限公司联合开展学生企业见习活动。西安明德理工学院金融产业学院副院长刘敏、金融学专业负责人张莉萍、金融学专业教师曹艳飞、赵浚妤、泰迪智能科技董事…...
通用机器人里程碑!MIT提出策略组合框架PoCo,解决数据源异构难题,实现机器人多任务灵活执行
18 位人形机器人充当「迎宾」人员,整齐划一向嘉宾挥手,这是 2024 世界人工智能大会上的一个震撼场景,让人们直观感受到了今年机器人的飞速发展。 图源:甲子光年 1954 年,世界上第一台可编程机器人「尤尼梅特」在通用汽…...
基于Java中的SSM框架实现疫情冷链追溯系统项目【项目源码+论文说明】
基于Java中的SSM框架实现疫情冷链追溯系统演示 摘要 近几年随着城镇化发展和居民消费水平的不断提升,人们对健康生活方式的追求意识逐渐加强,生鲜食品逐渐受到大众青睐,诸如盒马鲜生、7-fresh等品牌生鲜超市,一时间如雨后春笋般迅…...
想在vue中预览doxc,excel,pdf文件? vue-office提供包支持
在浩瀚的Vue生态中,vue-office犹如一颗璀璨的星辰,以其独特的魅力照亮了开发者处理多种文件格式的预览之路。这款精心打造的Vue组件库,不仅拥抱了Vue2的经典,也紧密跟随Vue3的步伐,展现了卓越的技术前瞻性和兼容性。它…...
PostgreSQL16安装Mac(brew)
问题 最近需要从MySQL切换到PostgreSQL。我得在本地准备一个PostgreSQL。 步骤 使用brew安装postgresql16: arch -arm64 brew install postgresql16启动postgresql16: brew services start postgresql16配置postgresql环境变量,打开环境变量文件: …...
【语音识别算法】深度学习语音识别算法与传统语音识别算法的区别、对比及联系
深度学习语音识别算法与传统语音识别算法在理论基础、实现方式、性能表现等方面存在显著区别,同时也有一些联系。下面将从几个方面详细比较这两种方法,并给出应用实例和代码示例。 一、理论基础与实现方式 1.传统语音识别算法: 特征提取&a…...
图片批量重命名bat,一个脚本快速搞定图片批量重命名
BAT 批处理 是一种在 Microsoft Windows 操作系统中使用的脚本语言,用于自动执行一系列预定义的命令或任务。这些命令集合通常存储在一个文本文件中,文件扩展名为 .bat 或 .cmd。批处理脚本可以包含简单的命令,如文件复制、移动、删除&#x…...
基于stm32单片机的智能手环的设计
摘 要 随着科技的飞速发展和人们生活水平的提高,健康与科技日益融合,智能可穿戴设备已成为现代人生活中不可或缺的一部分。智能手环,作为一种便携、实用且功能丰富的可穿戴设备,受到越来越多用户的喜爱。它不仅能够实时监测用户的…...
雷池WAF动态防护功能初体验
一、 介绍 大名鼎鼎的雷池WAF最近新上了个名为 动态防护 的功能 所谓动态防护,是在用户浏览到的网页内容不变的情况下,将网页赋予动态特性,即使是静态页面,也会具有动态的随机性。 说白了就是给你网站的 html 和 js 代码加上加密…...
持安科技CEO何艺荣获中国信通院2023-2024年度标准卓越贡献奖
近日,由中国信息通信研究院、中国通信标准化协会承办的“全球数字经济大会—云和软件安全论坛”暨“2024第二届SecGo云和软件安全大会”胜利召开,零信任办公安全技术创新企业持安科技创始人兼CEO何艺获评为2023-2024年度零信任领域标准卓越贡献者。 由中…...
gitee上传和下载idea项目的流程
环境:idea2022 一、上传项目 1、在gitee中新建一个仓库。 2、打开所要上传的项目的文件夹,点击Git Bash,生成.git文件夹。 3、在idea中打开所要上传的项目,在控制台的Terminal菜单中,输入git add . (注意…...
【Numpy】np.loadtxt 读取单行数据时报错。(零维数组)
np.loadtxt 读取单行数据时遇到了报错 代码: import numpy as nplabelPath"./name.names" names np.loadtxt(labelPath, dtypestr)print(names[0])names中的数据: 报错: IndexError: too many indices for array: array is 0-…...
Unity之OpenXR+XR Interaction Toolkit实现 Gaze眼部追踪
使用 Unity OpenXR 实现Gaze眼部追踪 在虚拟现实(VR)和增强现实(AR)应用中,眼动追踪是一项强大而受欢迎的技术。它可以让开发者更好地理解用户的注意力和行为,并创造出更加沉浸和智能的体验。在本文中,我们将探讨如何使用 Unity OpenXR 实现Gaze眼部追踪功能。 Unity …...
自然语言处理(NLP)与大语言模型(LLM) 主要差异
一、简述 NLP 和 LLM 技术是大规模分析和生成人类语言的核心。随着它们的日益普及,区分 LLM 与 NLP 变得越来越重要。 NLP 包含一套用于理解、操纵和生成人类语言的算法。自 20 世纪 50 年代诞生以来,NLP 已发展到分析文本关系的阶段。它使用词性标注、命…...
智能车载防窒息系统设计
摘要 随着汽车行业的快速发展,车辆安全问题越来越受到人们的关注。其中,车载防窒息系统是一项重要的安全设备。本论文基于STM32单片机,设计了一种智能车载防窒息系统。该系统主要包括氧气浓度检测模块、温湿度检测模块、声音检测模块、光线检…...
个人接做政府网站/网站排名前十
环境为Ubuntu 12.10,机器为3网卡,需要再eth2上配置一个dhcp服务。 因为eth2后接一个交换机,交换机上的机器需要自动分配网址,并且对于某一台或多台机器需要固定网址,例如打印机。 1. 安装DHCP服务 sudo apt-get inst…...
灵雀云 wordpress/seo网页优化平台
Dubbo分布式服务子系统划分需要把控系统的数量 过多: 可能划分过细,破坏业务子系统的独立性 部署维护工作量大,独立进程占用内存多 过少: 没能很好的解耦 开发维护不好分工 升级维护影响面大 服务子系统划分注意事项&#…...
运动服饰网站建设目的/2022年小学生新闻摘抄十条
text-align 属性规定元素中的文本的水平对齐方式。该属性通过指定行框与哪个点对齐,从而设置块级元素内文本的水平对齐方式。通过允许用户代理调整行内容中字母和字之间的间隔,可以支持值 justify;不同用户代理可能会得到不同的结果。 <!…...
网站备案核/磁力搜索器在线
Java 发送会议邀请到 OutlookJava 发送会议邀请到 Outlook1.发件服务器配置2.发送邮件代码3.测试代码Java 发送会议邀请到 Outlook 系统:Win10 IDE:IntelliJ IDEA 2017.3.7 JDK:1.8.0_121 Outlook:Microsoft Office 2016 1.发件服…...
做美篇发网站/外链是什么意思
又来到了总结知识的时间了,今天又学了一些新的知识,是多线程和GDI的一些运用。 理论: 在学习多线程之前,首先要了解一下什么是进程? 进程:(关键字Process)进程是一个具有一定独立功能的程序关于某个数据集合…...
php可以独立做网站吗/域名注册平台有哪些
Vue 中的 key 是用来做什么的?为什么不推荐使用 index 作为 key?常常听说这样的问题,本篇文章带你从原理来一探究竟。示例以这样一个列表为例:1 2那么它的 vnode 也就是虚拟 dom 节点大概是这样的。{ tag: ul, children: [ …...