kafka集群搭建-使用zookeeper
1.环境准备:
使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。
172.2.1.69:8093
172.2.1.70:8093
172.2.1.71:8093
2.下载地址
去官网下载,或者使用如下仓库地址下载,本次使用的时kafka_2.13-3.6.1.tgz ,即3.6.1版本,前面的2.13是scala版本,该版本是较新的版本,可以使用zookeeper,也可以不使用zookeeper搭建集群,本次记录使用了zk,zk集群的部署可以参考上一篇记录。
# 软件包下载地址,可以切到/kafka/路径,选择自己需要的版本
https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
3.软件包下载解压
在上面3台服务器上分别执行wget下载,或者本地下载后上传,本次使用的环境为堡垒机接入,如果使用的是宿主机账密登陆,可以下载配置一台,其余使用SCP命令拷贝过去即可。
cd /usr/local/wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgztar -zxvf kafka_2.13-3.6.1.tgzmv kafka_2.13-3.6.1 kafka
4.修改配置
需要修改logs路径的话,可以在/kafka路径下新建logs路径,并配置到server.properties中,这里使用默认的/tmp/kafka-logs路径。
cd kafka
vim conf/server.properties
修改内容如下,
# 每个节点唯一的id,这里.69、.70、.71服务器分别设置为了1、2、3
broker.id=1# 默认为9092,云服务器开放端口问题,改为了8093
port=8093# 上一篇记录博客搭建的zk集群地址
zookeeper.connect=172.2.1.69:8092,172.2.1.70:8092,172.2.1.71:8092# 配置监听访问、绑定地址,这里都是PLAINTEXT协议,不需要认证(相当于内网访问)
listeners=PLAINTEXT://0.0.0.0:8093
advertised.listeners=PLAINTEXT://172.2.1.71:8093# 日志路径
log.dirs=/tmp/kafka-logs
5.启动kafka集群
分别在每个节点的bin路径下执行启动脚本
# 在3个节点分别执行如下命令,-daemon表示后台启动,不带该参数前台启动
./bin/kafka-server-start.sh -daemon config/server.properties
使用jps命令,或者去kafka启动日志,查看kafka是否启动成功。
6.创建topic`
# 创建topic,在任一节点执行都可以。./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --create --topic topic-demo --partitions=3 --replication-factor=3
# 查看topic是否创建成功,在任一节点执行
./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo --list
7.模拟生产消费消息
需要注意的是网上搜到的一些老的博客kafka命令在高版本中是不再支持的,如:
sh ./bin/kafka-topics.sh --zookeeper=zk集群地址,可能出现命令无法识别:zookeeper is not a recognized option
需要替换为:sh ./bin/kafka-topics.sh --bootstrap-server=kafka集群地址,注意–bootstrap-server后面跟的是kafka集群地址,不是zookeeper地址。
# 在2个节点启动消费者模拟客户端接收消息,在第3个节点启动生产者模拟客户端发送消息
./bin/kafka-console-consumer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
# 在第3个节点启动生产者客户端模拟发送消息:hello
./bin/kafka-console-producer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
生产者客户端发送hello
此时可以看到2个消费者模拟客户端都受到了消息:hello
8.集成springboot
坐标如下:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
kafka配置类:
package com.example.kafka.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
//@RefreshScope
public class KafkaConfig {@Value("${xxxx:172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093}")private String kafkaServers;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);//props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}
}
消费者客户端:
@KafkaListener(topics = {"topic-demo"},groupId = "test1",properties = {"auto-offset-reset:latest", "enable.auto.commit:true"})public void listen(ConsumerRecord<String, String> consumerRecord) {log.info("consumer Received: " + consumerRecord);}
生产者发送消息:
@RestController
@RequiredArgsConstructor
@RequestMapping("producer")
public class ProducerController {private final KafkaTemplate<String, String> kafkaTemplate;@PostMapping(path = "/sendCommonMsg")public String sendCommonMsg(String topic, String msg) {ListenableFuture<SendResult<String, String>> hello_kafka = this.kafkaTemplate.send(topic, "hello kafka");SendResult<String, String> sendResult = hello_kafka.completable().join();System.out.println(sendResult);return "send topic: " + topic + ", msg: " + msg;}
}
发送测试:
消费者可以接收到消息:
consumer Received: ConsumerRecord(topic = topic-demo, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1721720091071, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
9.IDEA客户端工具
可以使用kafkalytic工具本地开发环境可视化操作kafka服务器,如查看topic,创建topic
相关文章:
kafka集群搭建-使用zookeeper
1.环境准备: 使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。 172.2.1.69:8093 172.2.1.70:8093 172.2.1.71:8093 2.下载地址 去官网下载,或者使用如…...
【python】Numpy运行报错分析:IndexError与形状不匹配问题
✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…...
你有多自律就有多自由
当你失去对时间的控制权,生活也就失去了平衡。 真正对自己有要求的人,都是高度自律的人。 追求自己想要的生活,任何时候开始都不会晚,关键在于你能够坚持下去,以高度自律的精神,日复一日、年复一年的坚持下…...
Codeforces Round 959 (Div. 1 + Div. 2 ABCDEFG 题) 文字讲解+视频讲解
Problem A. Diverse Game Statement 给定 n m n\times m nm 的矩形 a a a, a a a 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互不相同。求出 n m n\times m nm 的矩形 b b b, b b b 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互…...
WSL2 Centos7 Docker服务启动失败怎么办?
wsl 安装的CentOS7镜像,安装了Docker之后,发现用systemctl start docker 无法将docker启动起来。 解决办法 1、编辑文件 vim /usr/lib/systemd/system/docker.service将13行注释掉,然后在下面新增14行的内容。然后保存退出。 2、再次验证 可以发现,我们已经可以正常通过s…...
分布式锁-redisson锁重试和WatchDog机制
抢锁过程中,获得当前线程,通过tryAcquire进行抢锁,该抢锁逻辑和之前逻辑相同。 1、先判断当前这把锁是否存在,如果不存在,插入一把锁,返回null 2、判断当前这把锁是否是属于当前线程,如果是&a…...
ESP8266模块(2)
实例1 查看附近的WiFi 步骤1:进入AT指令模式 使用USB转串口适配器将ESP8266模块连接到电脑。打开串口终端软件,并设置正确的串口和波特率(通常为115200)。输入以下命令并按回车确认: AT如果模块响应OK,…...
Docker安装笔记
1. Mac安装Docker 1.1 Docker安装包下载 1.1.1 阿里云 对于10.10.3以下的用户 推荐使用 对于10.10.3以上的用户 推荐使用 1.1.2 官网下载 系统和芯片选择适合自己的安装包 1.2 镜像加速 【推荐】阿里镜像 登陆后,左侧菜单选中镜像加速器就可以看到你的专属地…...
《昇思25天学习打卡营第21天|Pix2Pix实现图像转换》
Pix2Pix 是一种图像转换模型,使用条件生成对抗网络(Conditional Generative Adversarial Networks,cGANs)实现图像到图像的转换。它主要由生成器(Generator)和判别器(Discriminator)…...
Python和MATLAB网络尺度结构和幂律度大型图生成式模型算法
🎯要点 🎯算法随机图模型数学概率 | 🎯图预期度序列数学定义 | 🎯生成具有任意指数的大型幂律网络,数学计算幂律指数和平均度 | 🎯随机图分析中巨型连接分量数学理论和推论 | 🎯生成式多层网络…...
在jsPsych中使用Vue
jspsych 介绍 jsPsych是一个非常好用的心理学实验插件,可以用来构建心理学实验。具体的就不多介绍了,大家可以去看官网:https://www.jspsych.org/latest/ 但是大家在使用时就会发现,这个插件只能使用js绘制界面,或者…...
机器学习·概率论基础
概率论 概率基础 这部分太简单,直接略过 条件概率 独立性 独立事件A和B的交集如下 非独立事件 非独立事件A和B的交集如下 贝叶斯定理 先验 事件 后验 在概率论和统计学中,先验概率和后验概率是贝叶斯统计的核心概念 简单来说后验概率就是结合了先验概…...
c生万物系列(面向对象:封装)
本系列博客主要介绍c语言的一些屠龙技,里面包含了笔者本人的一些奇思妙想。 该系列博客笔者只是用作记录。如果你偶然找到了这篇博客,但是发现不知所云,请不要过多投入时间,可能笔者本人那时候也看不懂了。 笔者决定用c语言模仿…...
当当网数据采集:Scrapy框架的异步处理能力
在互联网数据采集领域,Scrapy框架以其强大的异步处理能力而著称。Scrapy利用了Python的异步网络请求库,如twisted,来实现高效的并发数据采集。本文将深入探讨Scrapy框架的异步处理能力,并展示如何在当当网数据采集项目中应用这一能…...
React——useEffect和自定义useUpdateEffect
useEffect 是React的一个内置Hook,用于在组件渲染后执行副作用(例如数据获取、订阅或手动更改DOM)。它将在第一次渲染后和每次更新后都会执行。 useEffect(() > {// 这里的代码将在组件挂载和更新时执行。 }, [dependencies]); // depend…...
Hadoop大数据处理架构中ODB、DIM、DWD、DWS
在Hadoop的大数据处理架构中,ODS、DIM、DWD和DWS分别代表了数据仓库体系中不同的层次和功能。下面解释这几个概念: ODS (Operational Data Store) 想象你有一家超市,每天营业结束后,你会把当天所有的销售记录、顾客信息、商品库…...
【刷题汇总 -- 爱丽丝的人偶、集合、最长回文子序列】
C日常刷题积累 今日刷题汇总 - day0211、爱丽丝的人偶1.1、题目1.2、思路1.3、程序实现 2、集合2.1、题目2.2、思路2.3、程序实现 -- set 3、最长回文子序列3.1、题目3.2、思路3.3、程序实现 -- dp 4、题目链接 今日刷题汇总 - day021 1、爱丽丝的人偶 1.1、题目 1.2、思路 …...
基于vue3 + vite产生的 TypeError: Failed to fetch dynamically imported module
具体参考这篇衔接: Vue3报错:Failed to fetch dynamically imported module-CSDN博客 反正挺扯淡的,错误来源于基于ry-vue-plus来进行二次开发的时候遇到的问题。 错误起因 我创建了一个广告管理页面。然后发现访问一直在加载中。报的是这样…...
批量自动添加好友,高效拓展人脉圈.
随着微信使用数量的不断增加,手动添加好友成为了一项耗时且繁琐的任务。为了帮助大家解决这个问题,下面分享一款高效的微信管理系统,它能够帮助你实现批量自动添加好友,极大提升了人脉拓展的效率。 这款微信管理系统可以同时管理多…...
Web开发:一个可拖拽的模态框(HTML、CSS、JavaScript)
目录 一、需求描述 二、实现效果 三、完整代码 四、实现过程 1、HTML 页面结构 2、CSS 元素样式 3、JavaScript动态控制 (1)获取元素 (2)显示\隐藏遮罩层与模态框 (3)实现模态框拖动效果 一、需求…...
【深度学习】fooocusapi,docker,inpainting图像
基础镜像制作来源 fooocusapi接口官方写的: docker run -d --gpusall \-e NVIDIA_DRIVER_CAPABILITIEScompute,utility \-e NVIDIA_VISIBLE_DEVICESall \-p 8888:8888 konieshadow/fooocus-api会下载一些模型,下载完后推这个镜像 docker commit 4dfd1…...
算法017:二分查找
二分查找. - 备战技术面试?力扣提供海量技术面试资源,帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/binary-search/ 二分查找,其实是双指针的一种特殊情况,但是时间复杂度极低&#…...
谷粒商城实战笔记-37-前端基础-Vue-基本语法插件安装
文章目录 一,v-model1,双向绑定2,vue的双向绑定2.1 html元素上使用指令v-model2.2 model中声明对应属性2.3,验证view绑定modelmodel绑定view 完整代码 二,v-on1,指令简介2,在button按钮中添加v-…...
mybatis中的缓存(一级缓存、二级缓存)
文章目录 前言一、MyBatis 缓存概述二、一级缓存1_初识一级缓存2_一级缓存命中原则1_StatementId相同2_查询参数相同3_分页参数相同4_sql 语句5_环境 3_一级缓存的生命周期1_缓存的产生2_缓存的销毁3_网传的一些谣言 4_一级缓存核心源码5_总结 三、二级缓存1_开启二级缓存2_二级…...
实现自动化采购:食堂采购系统源码开发详解
本篇文章,笔者将详细介绍食堂采购系统的开发过程,从需求分析、系统设计到实现和测试,为您全面解析如何构建一个高效的自动化采购系统。 一、需求分析 1.采购计划管理 2.供应商管理 3.订单管理 4.库存管理 5.财务管理 6.数据分析与报告 …...
linux、windows、macos清空本地DNS缓存
文章目录 Linux:Windows:macOS: Linux: 对于使用systemd的操作系统(如CentOS 7、Ubuntu 16.04),可以使用以下命令重启systemd-resolved服务来清除缓存: sudo systemctl restart sys…...
领夹麦克风哪个品牌好,电脑麦克风哪个品牌好,热门麦克风推荐
在信息快速传播的时代,直播和视频创作成为了表达与交流的重要方式。对于追求卓越声音品质的创作者而言,一款性能卓越的无线麦克风宛如一把利剑。接下来,我要为大家介绍几款备受好评的无线麦克风,这些都是我在实际使用中体验良好…...
【第5章】Spring Cloud之Nacos服务注册和服务发现
文章目录 前言一、提供者1. 引入依赖2.配置 Nacos Server 地址3. 开启服务注册 二、消费者1. 引入依赖2.配置 Nacos Server 地址3. 开启服务注册 三、服务列表四、服务发现1. 获取服务列表2. 测试2.1 获取所有服务2.2 根据服务名获取服务信息 五、更多配置项总结 前言 本节通过…...
Springboot 启动时Bean的创建与注入(一)-面试热点-springboot源码解读-xunznux
Springboot 启动时Bean的创建与注入,以及对应的源码解读 文章目录 Springboot 启动时Bean的创建与注入,以及对应的源码解读构建Web项目流程图:堆栈信息:堆栈信息简介堆栈信息源码详解1、main:10, DemoApplication (com.xun.demo)2…...
单调栈(随缘复习到了,顺手刷了)
也是不知道为什么突然又复习到单调栈了,所以顺手刷了三道题,总结一下 P6503 [COCI2010-2011#3] DIFERENCIJA 思路:这题是要求每个子区间里面的最大值和最小值的差,我们一开始想的必然是纯暴力呀,但是一看这数据&#…...
深圳定制型网站建设/seo整站优化新站快速排名
1 被动模式 zabbix默认采用被动模式。就是agent等待server采集数据。 在items中,type为zabbix agent就是指被动模式。 流程为:agent周期性收集数据,server打开一个tcp高位端口,去连接agnet的10050监听端口,请求数据&am…...
手机论坛网站模板/app推广方案范例
1.左旋转字符串 题目: 字符串的左旋转操作是把字符串前面的若干个字符转移到字符串的尾部。请定义一个函数实现字符串左旋转操作的功能。比如,输入字符串"abcdefg"和数字2,该函数将返回左旋转两位得到的结果"cdefgab"。…...
阅读转发网站那些做的比较好/天津网站快速排名提升
板子,全是板子 该模板目前行数(Markdown下包括文字):4799 行 更新日志(从2018.11.19开始) 2019.04.05 : 更新了数据结构->左偏树 2019.04.02 : 更新了数据结构->平衡树->WBLT 2019.03.26 : 更新了…...
jsp网站开发学习心得/网站流量来源
本节书摘来自异步社区《Adobe Premiere Pro CS5经典教程》一书中的第1课,第1.3节,作者 【美】Adobe公司 ,译者 许伟民,袁鹏飞,更多章节内容可以访问云栖社区“异步社区”公众号查看。 1.3 提供标准的数字视频工作流 用…...
淄博做网站/网络营销的认识
经过查找,找到原因是应为数据库字段是varchar2,java映射的类型是Long型导致报此错误。...
济南网站制作企业/湛江seo
2019独角兽企业重金招聘Python工程师标准>>> public class Test { public static void main(String[] args) { int i 0; i i ;System.out.println(i);} } 答案是 0 如果是 i i 就会是1 参考 https://blog.csdn.net/lxlmycsdnfree/article/details/80578222 其…...