厦门酒店网站建设/和生活爱辽宁免费下载安装
2.3. Pulsar Adaptors适配器
2.3.1.kafka适配器
2.3.2.Spark适配器
2.3. Pulsar Adaptors适配器
2.3.1.kafka适配器
Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的平台中, 那么Pulsar adaptor on kafka就变的非常的有用了, 它可以帮助我们在不改变原有kafka的代码基础上, 即可接入pulsar, 但是需要注意, 相关配置信息需要进行一些调整, 例如: 地址与topic
- 1- 需要导入Pulsar集成kafka的依赖包, 删除掉原有Kafka-client包
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-kafka</artifactId> <version>2.8.0</version>
</dependency>
注: 目前Pulsar并在Maven中央仓库中并没有提供Pulsar-client-kafka 2.8.1的包, 故此处导入2.8.0
- 2-编写生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaAdaptorProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//1. 创建kafka生产者的核心类对象: KafkaProducer// 1.1: 创建生产者配置对象: 设置相关配置Properties props = new Properties();props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");// 消息的确认方案props.put("acks", "all");// key序列化类型props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value 序列化类型props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props); //2. 发送数据 for (int i = 0; i < 10; i++) { //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); producer.send(producerRecord).get(); }//3. 释放资源 producer.close();}}
- 3-编写消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaAdaptorConsumer {public static void main(String[] args) {//1. 创建kafka的消费者的核心对象: KafkaConsumer//1.1: 创建消费者配置对象, 并设置相关的参数:Properties props = new Properties();props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");//消费者组的 idprops.setProperty("group.id", "test");//是否启动消费者自动提交消费偏移量props.setProperty("enable.auto.commit", "true");//每间隔多长时间提交一次偏移量:单位 毫秒props.setProperty("auto.commit.interval.ms","1000");//key 反序列化props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//val 发序列化props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//2. 给消费者设置订阅topic:consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));//3. 循环获取相关的消息数据while (true) {//3.1: 从kafka中获取消息数据: 参数表示等待超时时间//注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息for (ConsumerRecord<String, String> record : records) {String massage = record.value();System.out.println("消息数据为:"+massage);}} }
}
- 4- 先运行消费者, 进行监听, 然后运行生产者, 观察消费者是否可以正常消费到数据
2.3.2.Spark适配器
Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从 Pulsar 接
收原始数据。
应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可
以通过多种方式对其进行处理。
- 1-导入相关的依赖包
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>2.8.0</version>
</dependency>
- 2-编写spark的流式代码
String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();
Set<String> set = new HashSet<>();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);
相关文章:

11_Pulsar Adaptors适配器、kafka适配器、Spark适配器
2.3. Pulsar Adaptors适配器 2.3.1.kafka适配器 2.3.2.Spark适配器 2.3. Pulsar Adaptors适配器 2.3.1.kafka适配器 Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。 在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的…...

jupyter文档转换成markdown
背景 上一篇文章**《如何优雅地用python生成模拟数据》**我就使用jupyter写的,这个真的是万能的,可以插入markdown格式的内容,也可写代码,关键是像ipython一样,可以分步执行。 我可以这样自由的写我的博客内容&#x…...

日志框架及其使用方法
log4j和logBack,同一个人写的,logBack为log4j的升级版,SpringBoot中默认集成logBack 作用:记录软件发布后的一些bug,以及数据是怎样被操作的 传统开发弊端: 1.日志直接输出在控制台,关闭控制台后,日志消…...

ZIG:理解未来编程语言的视角
文章目录 摘要:引言:性能简洁性和模块化避免常见错误和陷阱总结:参考资料📑: 摘要: 本文介绍了新兴编程语言ZIG的目标和特点,包括高性能、简洁性和模块化,并分析了这些特点是如何通过语言设计来…...

让三驾马车奔腾:华为如何推动空间智能化发展?
上个月,国务院常务会议审议通过了《关于促进家居消费的若干措施》,其中明确提出了“推动单品智能向全屋智能发展创新培育智能消费”“开展数字家庭建设试点”等推动全屋智能拼配发展的建议与方案。 可以说,以整屋为单位的空间智能品类&#x…...

2022年03月 Python(一级)真题解析#中国电子学会#全国青少年软件编程等级考试
一、单选题(共25题,每题2分,共50分) 第1题 已知a“161”,b“16”,c“8”,执行语句da>b and a>c,变量d的值为是? A:0 B:1 C:True D&am…...

WIN大恒工业相机SDK开发
大恒工业相机SDK开发概览 一、开发环境搭建1、C# 环境配置(VS2019)2、C 环境配置(VS2019)3、python 环境配置(Pycharm) 二、相机二次开发流程三、相机相机属性参数配置四、图像采集单帧采集回调采集 注意事…...

qt qml中各种Layout之间是如何对齐的?
问题描述: qt qml中下一个RowLayout如何对齐顶部到上方的ColumnLayout的底部略低一些间隔的位置? 我们怎么使用achors去锚定位置? 这些都是可以用anchors锚定属性,以及margin来设置的。 解决办法: 要实现将下一个R…...

Immutable.js 进行js的复制
介绍 在提供不可变(Immutable)数据结构的支持。不可变数据是指一旦创建后就不能被修改的数据,每次对数据进行更新都会返回一个新的数据对象,而原始数据保持不变。 使用 日常中我们使用的拷贝 (1) var arr { } ; arr2 arr ; …...

java动态生成excel并且需要合并单元格
java动态生成excel并且需要合并单元格 先上图看一下预期效果 集成poi <dependency><groupId>cn.afterturn</groupId><artifactId>easypoi-base</artifactId><version>4.0.0</version> </dependency> <dependency><…...

JMeter启动时常见的错误
很多小伙伴在学工具这一块时,安装也是很吃力的一个问题,之前记得有说过怎么安装jmeter这个工具。那么你要启动jmeter的时候,一些粉丝就会碰到如下几个问题。 1.解压下载好的jmeter安装,Windows 平台,双击 jmeter/bin …...

python pandas 排序
Series的排序: Series.sort_values(ascendingTrue, inplaceFalse) 参数说明: ascending:默认为True升序排序,为False降序排序inplace:是否修改原始Series DataFrame的排序: DataFrame.sort_values(by, as…...

前后端分离式项目架构流程复盘之宿舍管理系统
文章目录 🐒个人主页🏅JavaEE系列专栏📖前言:【🎇前端】先创建Vue-cli项目(版本2.6.10,仅包含babel),请选择此项目并创建 【整理简化项目模板】【🎀创建路由】…...

Linux nohup 命令详解
nohup是Linux/Unix系统中非常有用的命令之一。它允许您在后台运行命令或脚本,并且在退出终端会话后仍然保持运行。这对于长时间运行的任务或进程非常有用,特别是当您需要离开终端但希望任务继续运行时。 nohup命令语法 nohup命令的基本语法如下&#x…...

VoxWeekly|The Sandbox 生态周报|20230731
欢迎来到由 The Sandbox 发布的《VoxWeekly》。我们会在每周发布,对上一周 The Sandbox 生态系统所发生的事情进行总结。 如果你喜欢我们内容,欢迎与朋友和家人分享。请订阅我们的 Medium 、关注我们的 Twitter,并加入 Discord 社区…...

编程导航算法村第九关 | 二分查找
编程导航算法村第九关 | 二分查找 LeetCode852.这个题的要求有点啰嗦,核心意思就是在数组中的某位位置i开始,从0到i是递增的,从i1 到数组最后是递减的,让你找到这个最高点。 详细要求是:符合下列属性的数组 arr 称为山…...

linux 下安装部署flask项目
FlaskDemo 命名为test.py # codingutf-8 from flask import Flaskapp Flask(__name__)app.route("/") def index():return "test"if __name__ __main__:app.debug True# 这里host一定要写0.0.0.0 写127.0.0.1的无法访问 ——_——app.run(host"0.…...

在Vue里,将当前窗口截图,并将数据base64转为png格式传给服务器
目录 前言 1、将当前窗口截图,并将数据存储下来 2、定义将base64转png的方法 3、完整代码 总结 前言 记录来源于需求 1、将当前窗口截图,并将数据存储下来 export default { data() {return {image: // 存储数据} }mounted() {setTimeout(() >…...

Echarts图表Java后端生成Base64图片格式,POI写入Base64图片到Word中
Echarts图表Java后端生成请看上篇,此篇为Base64图片插入Word文档中Java后台生成ECharts图片,并以Base64字符串返回_青冘的博客-CSDN博客 try {XWPFParagraph xwpfParagraphimage doc.createParagraph(); // 创建图片段落xwpfParagraphimage.setAlignment(Paragraph…...

【AI】《动手学-深度学习-PyTorch版》笔记(十二):从零开始实现softmax回归
AI学习目录汇总 1、什么是特征? 对于图像算法,每个像素可以视为一个特征,例如图像的分辨率为28x28,则有784个特征。而且常常将二维的图像像素矩阵展开为长度为784的向量。 2、权重和偏置的规模 本例中,将使用Fashion-MNIST数据集,它是一个服装分类数据集,可以将服装…...

汽车用功率电感器
支持车载用被动元件的可靠性认证测试标准“AEC-Q200”的绕线铁氧体功率电感器 LCXH 系列实现商品化,推出了“LCXHF3030QK”等 6 个尺寸的 64 款商品。 这些商品均是用于汽车车身类及信息娱乐等信息类的电源电路用扼流线圈及噪音滤波器的功率电感器。 LCXH 系列与民生…...

上传图片视频
分布式文件系统MinIo MinIO提供多个语言版本SDK的支持,下边找到java版本的文档: 地址:https://docs.min.io/docs/java-client-quickstart-guide.html MinIO测试(上传、删除、下载) public class MinioTest {MinioC…...

【UE5】UE5与Python Socket通信中文数据接收不全
最近在使用UE的Socket模块与Python服务器进行通信时遇到了一些坑,特此记录一下。 先来复现一下问题,这里只截取关键代码。 UE端: bool ASoc::SendMsg(const FString& Msg) {TSharedRef<FInternetAddr> TargetAddr ISocketSubsy…...

一些有难度的c++题目思路讲解--第一期2023/8/8 小Q的修炼与旷野大计算
说明: 本期博客将分为10篇讲解一些有点挑战的题目,第一期是所有人都可以看到,但后面的关注我才能看到哦!有望大家的支持!谢谢! 题目链接(按顺序) [NOI2013] 小Q的修炼 - 洛谷 小Q的修炼[NOI2013] 小Q的修炼 - 洛谷 [NOI2016] 旷野大计算 - 洛谷旷野大计算[NOI2016] 旷野…...

Node.js:path文件路径操作模块
path 用于文件路径操作 官方文档 https://nodejs.org/api/path.html 一个不错的解释 ┌─────────────────────┬────────────┐│ dir │ base │├──────┬ ├──────┬─────┤│ ro…...

基于 CentOS 7 构建 LVS-DR 群集
文章目录 一、LVS-DR集群介绍1.LVS的基本工作原理2. LVS-DR模式工作原理 二、 LVS-DR模式应用特点三、LVS – DR 模式集群构建1.前期环境准备2.配置LVS3.配置RS 一、LVS-DR集群介绍 1.LVS的基本工作原理 当用户向负载均衡调度器(Director Server)发起请…...
机器学习笔记 - 使用 Tensorflow 从头开始构建您自己的对象检测器
一、简述 之前的文章是利用了VGG16的预训练模型,然后构造完全连接的层标头以输出预测的边界框坐标,但是不包含对象标签的分类。 机器学习笔记 - 使用Keras、TensorFlow框架进行自定义数据集目标检测训练_keras 制作 目标检测 数据集_坐望云起的博客-CSDN博客学习如何训练自定…...

IELAB-网络工程师的路由答疑10问(2)
各位小伙伴们,接下来的问题可能有些难度,你们做好准备了吗? 7. 动态路由协议做了啥? 这次咱们先解决第一个比较棘手的问题--路由协议,相信初学的同学对于路由协议的学习总是或多或少有些问题,呐ÿ…...

聚观早报|iPhone 15预计9月22日上市;一加Open渲染图曝光
【聚观365】8月7日消息 iPhone 15预计9月22日上市一加Open渲染图曝光Redmi K60至尊版细节曝光小米14 Pro屏幕细节曝光vivo V3正式发布,执着自研“影像芯片” iPhone 15预计9月22日上市 上周有多位消息人士透露,多家合作的电信运营商已要求员工不要在9月…...

react-use-gesture
介绍 react-use-gesture 是一个基于 React Hooks 的库,用于处理手势事件。它提供了一种简单且灵活的方式来处理用户的手势操作,例如拖动、缩放、旋转等。 使用 安装 react-use-gesture: npm install react-use-gesture 导入所需的模块和钩…...