当前位置: 首页 > news >正文

Kafka安装与使用

Kafka是一种高吞吐量的分布式发布订阅消息系统,因为其高吞吐量、分布式可扩展性等等强大功能使得在目前互联网系统中广泛使用。该篇博客入门了解一下Kafka的安装及使用。

Kafka概念

Kafk是分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接收者称为Consumer。此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。其中每个Topic都由若干个partition组成,partition是topic物理上的分组,每个partition是一个有序的队列。Kafka的消费端有位移(offset)的概念,每条消息在某个partition的位移是固定的,相当于在分区当中的唯一编号。无论是kafka集群,还是consumer都依赖于Zookeeper集群保存一些meta信息,来保证系统可用性。

Kafka集群配置

因为本机是Windows系统,测试方便就单机配置Kafka的集群,但是配置都是共通的,在服务器上也基本一样。

配置版本

  • Kafka 2.2.0
  • Zookeeper 3.5.2
  • Windows 7
  • Java 8

配置Zookeeper

1、官网根据版本下载Zookeeper

2、解压Zookeeper的下载包,修改zoo.cfg中的dataDir地址,也可修改端口

3、点击zkServer.cmd,启动Zookeeper

配置Kafka

1、官网根据版本下载Kafka

2、解压Kafka的下载包,并复制三份,用于配置集群

本机的目录

D:\kafka\KafkaCluster\kafka_9020
D:\kafka\KafkaCluster\kafka_9021
D:\kafka\KafkaCluster\kafka_9022

3、配置server.properties

broker.id三份都需要唯一,目前设置为0,1,2

broker.id=0

配置服务器端口,因为是单机所以IP地址一样,需要端口不一样。分别设置9020、9021、9022

listeners=PLAINTEXT://:9020

设置log地址,分别设置/kafka_9020/、/kafka_9021/、/kafka_9022/

log.dirs=D:/kafka/KafkaCluster/kafka_9020/kafka-logs

并添加配置可删除Topic,如果不配置,Kafka只是标记删除

delete.topic.enable=true

4、启动三个Kafka服务器

分别在主目录/kafka_9020/、/kafka_9021/、/kafka_9022/主目录CMD窗口运行

.\bin\windows\kafka-server-start.bat .\config\server.properties

上述正常即可配置成功。

测试Kafka

配置是否成功,我们可以使用命令行操作查看。本机是Windows所以使用的都是bat文件,若到Linux则用sh文件。

创建Topic

kafka-topics.bat --create --zookeeper [Zookeeper地址] --partitions [分区数] --replication-factor [副本集数] --topic [topic名称]

注意,副本集数不能大于不能大于Broker数,这里Broker数为3

测试

D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --partitions 1 
--replication-factor 2 --topic kafka-topic-testCreated topic kafka_topic_test.

查看Topic列表

kafka-topics.bat --list --zookeeper [Zookeeper地址]

测试

D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --list --zookeeper localhost:2181
kafka-topic-test

查看Topic详情

kafka-topics.bat --zookeeper [Zookeeper地址] --describe --topic [topic名称]

测试

D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --zookeeper localh
ost:2181 --describe --topic kafka_topic_test
Topic:kafka_topic_test  PartitionCount:1        ReplicationFactor:2     Configs:
Topic:kafka_topic_test  Partition: 0    Leader: 1       Replicas: 1,2  Isr: 1,2

删除Topic

kafka-topics.bat --delete --zookeeper [Zookeeper地址] --topic [topic名称]

测试

D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --delete --zookeeper localhost:2181 --topic kafka_topic_test
Topic kafka_topic_test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --list --zookeeper localhost:2181
__consumer_offsets
kafka_topic_test - marked for deletion

消息生产和消费

启动生产端

kafka-console-producer.bat --broker-list [broker地址] --topic [topic名称]

启动消费端

kafka-console-consumer.bat --zookeeper [Zookeeper地址] --from-beginning --topic [topic名称]

测试

生产端9020

D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-console-producer.bat --broker-list localhost:9020 --topic kafka_topic_test
>hello
>world
>kafka
>

消费端9021

D:\kafka\KafkaCluster\kafka_9021\bin\windows>kafka-console-consumer.bat --bootstrap-server  localhost:9020 --from-beginning --topic kafka_topic_test
hello
world
kafka

消费端9022

D:\kafka\KafkaCluster\kafka_9022\bin\windows>kafka-console-consumer.bat --bootstrap-server  localhost:9020 --from-beginning --topic kafka_topic_test
hello
world
kafka

Java操作Kafka

引入Jar

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version>
</dependency>

消费端

import java.util.Properties;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class CustomerProducer {public static void main(String[] args) {//http://kafka.apache.org/documentation/#producerconfigs 更多配置可以访问此地址//配置信息Properties props = new Properties();//设置kafka集群的地址 -- localhost:9020,localhost:9021,localhost:9022props.put("bootstrap.servers", "localhost:9020,localhost:9021,localhost:9022");//ack模式,all是最慢但最安全的// 0  不等待成功返回  // 1  等Leader写成功返回  //all 等Leader和所有ISR中的Follower写成功返回,all也可以用-1代替props.put("acks", "all");//失败重试次数props.put("retries", 0);//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端props.put("batch.size", 16384);//请求的最大字节数,该值要比batch.size大//不建议去更改这个值,如果设置不好会导致程序不报错,但消息又没有发送成功//props.put("max.request.size",1048576);//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端//数据在缓冲区中保留的时长,0表示立即发送//为了减少网络耗时,需要设置这个值,太大可能容易导致缓冲区满,阻塞消费者,太小容易频繁请求服务端props.put("linger.ms", 1);//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端//buffer.memory要大于batch.size,否则会报申请内存不足的错误//不要超过物理内存,根据实际情况调整props.put("buffer.memory", 33554432);//序列化器props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者对象KafkaProducer<String,String> producer = new KafkaProducer<>(props);//循环发送消息for(int i=10;i<20;i++){producer.send(new ProducerRecord<String, String>("kafka-topic-test", Integer.toString(i)),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println(metadata.partition()+" - "+metadata.offset());}else{System.out.println("发送失败");}}});}//关闭资源producer.close();}}

生产端

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class CustomerConsumer {public static void main(String[] args) {Properties props = new Properties();//设置kafka集群的地址props.put("bootstrap.servers", "localhost:9020,localhost:9021,localhost:9022");//消费者组IDprops.put("group.id", "test-consumer-group");//设置自动提交offsetprops.put("enable.auto.commit", "true");//自动提交间隔props.put("auto.commit.interval.ms", "1000");//earliest //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费//latest//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据//none//topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常//默认建议用earliest。设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。props.put("auto.offset.reset", "earliest");//Consumer session 过期时间props.put("session.timeout.ms", "30000");//反序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//创建消费者对象@SuppressWarnings("resource")KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);//指定Topic//consumer.subscribe(Arrays.asList("first","second","third"));consumer.subscribe(Collections.singletonList("kafka-topic-test"));while (true) {//获取数据ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {System.out.println(consumerRecord.topic()+":"+consumerRecord.partition()+":"+consumerRecord.value());}}}
}

相关文章:

Kafka安装与使用

Kafka是一种高吞吐量的分布式发布订阅消息系统&#xff0c;因为其高吞吐量、分布式可扩展性等等强大功能使得在目前互联网系统中广泛使用。该篇博客入门了解一下Kafka的安装及使用。 Kafka概念 Kafk是分布式消息队列。Kafka对消息保存时根据Topic进行归类&#xff0c;发送消息…...

php出现SSL certificate problem: unable to get local issuer certificate的解决办法

当在本地使用curl或者一些其它封装好的http类库或组件&#xff08;如php界 知名的 http客户端 Guzzle&#xff09;需要访问https时&#xff0c;如果本地没有配置证书&#xff0c;会出现SSL certificate problem: unable to get local issuer certificate的报错信息。 解决办法一…...

Flask狼书笔记 | 07_留言板

文章目录 7 留言板7.1 使用包组织代码7.2 Web开发流程7.3 使用Bootstrap-Flask7.4 Flask-Moment本地化日期和时间7.5 使用Faker生成虚拟数据7.6 Flask_DebugToolbar调试程序7.7 Flask配置的两种组织形式小结 7 留言板 这是一个简单的程序&#xff0c;涉及到的大部分是之前所学…...

文件导入之Validation校验List对象数组

背景&#xff1a; 我们的接口是一个List对象&#xff0c;对象里面的数据基本都有一些基础数据校验的注解&#xff0c;我们怎么样才能校验这些基础规则呢&#xff1f; 我们在导入excel文件进行数据录入的时候&#xff0c;数据录入也有基础的校验规则&#xff0c;这个时候我们又…...

【Linux】文件系统

磁盘及文件系统 文件的增删查改 重新认识目录 目录是文件嘛&#xff1f; 是的。 目录有iNode嘛&#xff1f; 有 目录有内容嘛&#xff1f; 有 任何一个文件&#xff0c;一定在一个目录内部&#xff0c;所以一个目录的内容是什么&#xff1f; 需要数据块&#xff0c;目录的数据…...

1.5 空间中的平面与直线

空间中的平面和直线 知识点1 平面方程 1.平面的法向量与法式 定义1 若向量n 垂直与平面N&#xff0c;则称向量n为平面N的法向量。 设一平面通过一直点 M 0 ( x 0 , y 0 , z 0 ) M_0(x_0,y_0,z_0) M0​(x0​,y0​,z0​)求垂直于非零向量 n ⃗ \vec{n} n (A,B,C),求改平面N的…...

【深度学习】实验06 使用TensorFlow完成线性回归

文章目录 使用TensorFlow完成线性回归1. 导入TensorFlow库2. 构造数据集3. 定义基本模型4. 训练模型5. 线性回归图 附&#xff1a;系列文章 使用TensorFlow完成线性回归 TensorFlow是由Google开发的一个开源的机器学习框架。它可以让开发者更加轻松地构建和训练深度学习模型&a…...

2023国赛 C题论文 蔬菜类商品自动定价与补货策略

因为一些不可抗力&#xff0c;下面仅展示小部分论文&#xff0c;其余看文末 一、问题重述 在生鲜超市管理领域&#xff0c;涉及一系列复杂问题&#xff0c;包括供应链管理、定价策略以及市场需求分析等方面。以蔬菜类商品为案例&#xff0c;这些商品在生鲜商超中具有较短的保…...

使用 【jacoco】对基于 SpringBoot 和 Dubbo RPC 的项目生成测试覆盖率报告:实践+原理

基于 Dubbo RPC 的项目中有一个提供者项目backend、一个消费者项目gateway、以及注册中心nacos。本篇文章记录在windows本地对该框架的测试过程&#xff0c;以及介绍jacoco的基本原理 测试过程 官网下载安装包解压到本地&#xff0c;https://www.jacoco.org/jacoco/ 只需要用…...

Mac OS合集

MacOS 10.15os 提取码:u12a 如不能点击跳转请复制此链接到浏览器&#xff1a;https://pan.baidu.com/s/1UgPNYprBgJrc25v5ushWxQ?pwdu12a MacOS 11.0 提取码:y77y 如不能点击跳转请复制此链接到浏览器打开&#xff1a;https://pan.baidu.com/s/1srmibmCi2T7UVGvHkCzGKA?pwdy7…...

算法之位运算

前言 位运算在我们的学习中占有很重要的地位&#xff0c;从二进制中数的存储等都需要我们进行位运算 一、位运算复习 1.位运算复习 按位与(&)&#xff1a;如果两个相应的二进制位都为1&#xff0c;则该位的结果值才为1&#xff0c;否则为0 按位或( | )&#xff1a;如果…...

flask使用Flask-Mail实现邮件发送

Flask-Mail可以实现邮件的发送&#xff0c;并且可以和 Flask 集成&#xff0c;让我们更方便地实现此功能。 1、安装 使用pip安装&#xff1a; $ pip install Flask-Mail或下载源码安装&#xff1a; $ git clone https://github.com/mattupstate/flask-mail.git $ cd flask-…...

React refers to UMD global, but the current file is a module vite初始化react项目

vite搭建react项目 初始化项目 npm create vite 在执行完上面的命令后&#xff0c;npm 首先会自动下载create-vite这个第三方包&#xff0c;然后执行这个包中的项目初始化逻辑。输入项目名称之后按下回车&#xff0c;此时需要选择构建的前端框架&#xff1a; ✔ Project na…...

vscode 调试 ROS2

1、在下列目录同层级找到.vscode文件夹 . ├── build ├── install ├── log └── src 2、 安装ros插件 3、创建tasks.json文件&#xff0c;添加下列内容 //代替命令行进行编译 {"version": "2.0.0","tasks": [{"label": &…...

TuyaOS开发学习笔记(2)——NB-IoT开发SDK架构、运行流程

一、SDK架构 1.1 架构框图 基于 TuyaOS 系统&#xff0c;可以裁剪得到的适用于 NB-IoT 协议产品接入的 SDK。SDK 将设备配网、上下行数据通信、产测授权、固件 OTA 升级等接口进行封装&#xff0c;并提供相关函数。 1.2 目录结构 1.2.1 TuyaOS目录说明 adapter&#xff1a;T…...

Qt应用开发(基础篇)——普通按钮类 QPushButton QCommandLinkButton

一、前言 QPushButton类继承于QAbstractButton&#xff0c;是一个命令按钮的小部件。 按钮基类 QAbstractButton 按钮或者命令按钮是所有图形界面框架最常见的部件&#xff0c;当按下按钮的时候触发命令、执行某些操作或者回答一个问题&#xff0c;典型的按钮有OK&#xff0c;A…...

Data Structures Fan(cf)

考察异或运算以及前缀和 题意大概&#xff1a;给你一个长度为n的a数组&#xff0c;一个长度为n的01字符串&#xff0c;会询问q次 当x的值为1 给出 l r 将 l r 区间中的0 改变为1&#xff0c;1改变为0 。当x的值为2是 若随后的数为0 则输出当前字符串中 是0 的a数组中的数异或 …...

BIOS < UEFI

Basic Input Output System &#xff08;BIOS&#xff09; Unified Extensible Firmware Interface &#xff08;UEFI&#xff09;...

微信最新更新隐私策略(2023-08-15)

1、manifest.json 配置修改 在mp-weixin: 参数修改&#xff08;没有就添加&#xff09; "__usePrivacyCheck__": true, ***2、注意 微信开发者工具调整 不然一直报错 找不到 getPrivacySetting 废话不多说 上代码 3、 编辑首页 或者用户授权界面 <uni-popup…...

Java中xml转javaBean

Java中xml转javaBean maven坐标 <dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.13.4</version></dependency>代码测试 import cn.hutool.js…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖

在前面的练习中&#xff0c;每个页面需要使用ref&#xff0c;onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入&#xff0c;需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

java 实现excel文件转pdf | 无水印 | 无限制

文章目录 目录 文章目录 前言 1.项目远程仓库配置 2.pom文件引入相关依赖 3.代码破解 二、Excel转PDF 1.代码实现 2.Aspose.License.xml 授权文件 总结 前言 java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也…...

django filter 统计数量 按属性去重

在Django中&#xff0c;如果你想要根据某个属性对查询集进行去重并统计数量&#xff0c;你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求&#xff1a; 方法1&#xff1a;使用annotate()和Count 假设你有一个模型Item&#xff0c;并且你想…...

【项目实战】通过多模态+LangGraph实现PPT生成助手

PPT自动生成系统 基于LangGraph的PPT自动生成系统&#xff0c;可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析&#xff1a;自动解析Markdown文档结构PPT模板分析&#xff1a;分析PPT模板的布局和风格智能布局决策&#xff1a;匹配内容与合适的PPT布局自动…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

Angular微前端架构:Module Federation + ngx-build-plus (Webpack)

以下是一个完整的 Angular 微前端示例&#xff0c;其中使用的是 Module Federation 和 npx-build-plus 实现了主应用&#xff08;Shell&#xff09;与子应用&#xff08;Remote&#xff09;的集成。 &#x1f6e0;️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...