Kafka安装与使用
Kafka是一种高吞吐量的分布式发布订阅消息系统,因为其高吞吐量、分布式可扩展性等等强大功能使得在目前互联网系统中广泛使用。该篇博客入门了解一下Kafka的安装及使用。
Kafka概念
Kafk是分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接收者称为Consumer。此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。其中每个Topic都由若干个partition组成,partition是topic物理上的分组,每个partition是一个有序的队列。Kafka的消费端有位移(offset)的概念,每条消息在某个partition的位移是固定的,相当于在分区当中的唯一编号。无论是kafka集群,还是consumer都依赖于Zookeeper集群保存一些meta信息,来保证系统可用性。
Kafka集群配置
因为本机是Windows系统,测试方便就单机配置Kafka的集群,但是配置都是共通的,在服务器上也基本一样。
配置版本
- Kafka 2.2.0
- Zookeeper 3.5.2
- Windows 7
- Java 8
配置Zookeeper
1、官网根据版本下载Zookeeper
2、解压Zookeeper的下载包,修改zoo.cfg中的dataDir地址,也可修改端口
3、点击zkServer.cmd,启动Zookeeper
配置Kafka
1、官网根据版本下载Kafka
2、解压Kafka的下载包,并复制三份,用于配置集群
本机的目录
D:\kafka\KafkaCluster\kafka_9020
D:\kafka\KafkaCluster\kafka_9021
D:\kafka\KafkaCluster\kafka_9022
3、配置server.properties
broker.id三份都需要唯一,目前设置为0,1,2
broker.id=0
配置服务器端口,因为是单机所以IP地址一样,需要端口不一样。分别设置9020、9021、9022
listeners=PLAINTEXT://:9020
设置log地址,分别设置/kafka_9020/、/kafka_9021/、/kafka_9022/
log.dirs=D:/kafka/KafkaCluster/kafka_9020/kafka-logs
并添加配置可删除Topic,如果不配置,Kafka只是标记删除
delete.topic.enable=true
4、启动三个Kafka服务器
分别在主目录/kafka_9020/、/kafka_9021/、/kafka_9022/主目录CMD窗口运行
.\bin\windows\kafka-server-start.bat .\config\server.properties
上述正常即可配置成功。
测试Kafka
配置是否成功,我们可以使用命令行操作查看。本机是Windows所以使用的都是bat文件,若到Linux则用sh文件。
创建Topic
kafka-topics.bat --create --zookeeper [Zookeeper地址] --partitions [分区数] --replication-factor [副本集数] --topic [topic名称]
注意,副本集数不能大于不能大于Broker数,这里Broker数为3
测试
D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --partitions 1
--replication-factor 2 --topic kafka-topic-testCreated topic kafka_topic_test.
查看Topic列表
kafka-topics.bat --list --zookeeper [Zookeeper地址]
测试
D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --list --zookeeper localhost:2181
kafka-topic-test
查看Topic详情
kafka-topics.bat --zookeeper [Zookeeper地址] --describe --topic [topic名称]
测试
D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --zookeeper localh
ost:2181 --describe --topic kafka_topic_test
Topic:kafka_topic_test PartitionCount:1 ReplicationFactor:2 Configs:
Topic:kafka_topic_test Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
删除Topic
kafka-topics.bat --delete --zookeeper [Zookeeper地址] --topic [topic名称]
测试
D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --delete --zookeeper localhost:2181 --topic kafka_topic_test
Topic kafka_topic_test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-topics.bat --list --zookeeper localhost:2181
__consumer_offsets
kafka_topic_test - marked for deletion
消息生产和消费
启动生产端
kafka-console-producer.bat --broker-list [broker地址] --topic [topic名称]
启动消费端
kafka-console-consumer.bat --zookeeper [Zookeeper地址] --from-beginning --topic [topic名称]
测试
生产端9020
D:\kafka\KafkaCluster\kafka_9020\bin\windows>kafka-console-producer.bat --broker-list localhost:9020 --topic kafka_topic_test
>hello
>world
>kafka
>
消费端9021
D:\kafka\KafkaCluster\kafka_9021\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9020 --from-beginning --topic kafka_topic_test
hello
world
kafka
消费端9022
D:\kafka\KafkaCluster\kafka_9022\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9020 --from-beginning --topic kafka_topic_test
hello
world
kafka
Java操作Kafka
引入Jar
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version>
</dependency>
消费端
import java.util.Properties;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class CustomerProducer {public static void main(String[] args) {//http://kafka.apache.org/documentation/#producerconfigs 更多配置可以访问此地址//配置信息Properties props = new Properties();//设置kafka集群的地址 -- localhost:9020,localhost:9021,localhost:9022props.put("bootstrap.servers", "localhost:9020,localhost:9021,localhost:9022");//ack模式,all是最慢但最安全的// 0 不等待成功返回 // 1 等Leader写成功返回 //all 等Leader和所有ISR中的Follower写成功返回,all也可以用-1代替props.put("acks", "all");//失败重试次数props.put("retries", 0);//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端props.put("batch.size", 16384);//请求的最大字节数,该值要比batch.size大//不建议去更改这个值,如果设置不好会导致程序不报错,但消息又没有发送成功//props.put("max.request.size",1048576);//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端//数据在缓冲区中保留的时长,0表示立即发送//为了减少网络耗时,需要设置这个值,太大可能容易导致缓冲区满,阻塞消费者,太小容易频繁请求服务端props.put("linger.ms", 1);//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端//buffer.memory要大于batch.size,否则会报申请内存不足的错误//不要超过物理内存,根据实际情况调整props.put("buffer.memory", 33554432);//序列化器props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者对象KafkaProducer<String,String> producer = new KafkaProducer<>(props);//循环发送消息for(int i=10;i<20;i++){producer.send(new ProducerRecord<String, String>("kafka-topic-test", Integer.toString(i)),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println(metadata.partition()+" - "+metadata.offset());}else{System.out.println("发送失败");}}});}//关闭资源producer.close();}}
生产端
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class CustomerConsumer {public static void main(String[] args) {Properties props = new Properties();//设置kafka集群的地址props.put("bootstrap.servers", "localhost:9020,localhost:9021,localhost:9022");//消费者组IDprops.put("group.id", "test-consumer-group");//设置自动提交offsetprops.put("enable.auto.commit", "true");//自动提交间隔props.put("auto.commit.interval.ms", "1000");//earliest //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费//latest//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据//none//topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常//默认建议用earliest。设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。props.put("auto.offset.reset", "earliest");//Consumer session 过期时间props.put("session.timeout.ms", "30000");//反序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//创建消费者对象@SuppressWarnings("resource")KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);//指定Topic//consumer.subscribe(Arrays.asList("first","second","third"));consumer.subscribe(Collections.singletonList("kafka-topic-test"));while (true) {//获取数据ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {System.out.println(consumerRecord.topic()+":"+consumerRecord.partition()+":"+consumerRecord.value());}}}
}
相关文章:
Kafka安装与使用
Kafka是一种高吞吐量的分布式发布订阅消息系统,因为其高吞吐量、分布式可扩展性等等强大功能使得在目前互联网系统中广泛使用。该篇博客入门了解一下Kafka的安装及使用。 Kafka概念 Kafk是分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息…...
php出现SSL certificate problem: unable to get local issuer certificate的解决办法
当在本地使用curl或者一些其它封装好的http类库或组件(如php界 知名的 http客户端 Guzzle)需要访问https时,如果本地没有配置证书,会出现SSL certificate problem: unable to get local issuer certificate的报错信息。 解决办法一…...
Flask狼书笔记 | 07_留言板
文章目录 7 留言板7.1 使用包组织代码7.2 Web开发流程7.3 使用Bootstrap-Flask7.4 Flask-Moment本地化日期和时间7.5 使用Faker生成虚拟数据7.6 Flask_DebugToolbar调试程序7.7 Flask配置的两种组织形式小结 7 留言板 这是一个简单的程序,涉及到的大部分是之前所学…...
文件导入之Validation校验List对象数组
背景: 我们的接口是一个List对象,对象里面的数据基本都有一些基础数据校验的注解,我们怎么样才能校验这些基础规则呢? 我们在导入excel文件进行数据录入的时候,数据录入也有基础的校验规则,这个时候我们又…...
【Linux】文件系统
磁盘及文件系统 文件的增删查改 重新认识目录 目录是文件嘛? 是的。 目录有iNode嘛? 有 目录有内容嘛? 有 任何一个文件,一定在一个目录内部,所以一个目录的内容是什么? 需要数据块,目录的数据…...
1.5 空间中的平面与直线
空间中的平面和直线 知识点1 平面方程 1.平面的法向量与法式 定义1 若向量n 垂直与平面N,则称向量n为平面N的法向量。 设一平面通过一直点 M 0 ( x 0 , y 0 , z 0 ) M_0(x_0,y_0,z_0) M0(x0,y0,z0)求垂直于非零向量 n ⃗ \vec{n} n (A,B,C),求改平面N的…...
【深度学习】实验06 使用TensorFlow完成线性回归
文章目录 使用TensorFlow完成线性回归1. 导入TensorFlow库2. 构造数据集3. 定义基本模型4. 训练模型5. 线性回归图 附:系列文章 使用TensorFlow完成线性回归 TensorFlow是由Google开发的一个开源的机器学习框架。它可以让开发者更加轻松地构建和训练深度学习模型&a…...
2023国赛 C题论文 蔬菜类商品自动定价与补货策略
因为一些不可抗力,下面仅展示小部分论文,其余看文末 一、问题重述 在生鲜超市管理领域,涉及一系列复杂问题,包括供应链管理、定价策略以及市场需求分析等方面。以蔬菜类商品为案例,这些商品在生鲜商超中具有较短的保…...
使用 【jacoco】对基于 SpringBoot 和 Dubbo RPC 的项目生成测试覆盖率报告:实践+原理
基于 Dubbo RPC 的项目中有一个提供者项目backend、一个消费者项目gateway、以及注册中心nacos。本篇文章记录在windows本地对该框架的测试过程,以及介绍jacoco的基本原理 测试过程 官网下载安装包解压到本地,https://www.jacoco.org/jacoco/ 只需要用…...
Mac OS合集
MacOS 10.15os 提取码:u12a 如不能点击跳转请复制此链接到浏览器:https://pan.baidu.com/s/1UgPNYprBgJrc25v5ushWxQ?pwdu12a MacOS 11.0 提取码:y77y 如不能点击跳转请复制此链接到浏览器打开:https://pan.baidu.com/s/1srmibmCi2T7UVGvHkCzGKA?pwdy7…...
算法之位运算
前言 位运算在我们的学习中占有很重要的地位,从二进制中数的存储等都需要我们进行位运算 一、位运算复习 1.位运算复习 按位与(&):如果两个相应的二进制位都为1,则该位的结果值才为1,否则为0 按位或( | ):如果…...
flask使用Flask-Mail实现邮件发送
Flask-Mail可以实现邮件的发送,并且可以和 Flask 集成,让我们更方便地实现此功能。 1、安装 使用pip安装: $ pip install Flask-Mail或下载源码安装: $ git clone https://github.com/mattupstate/flask-mail.git $ cd flask-…...
React refers to UMD global, but the current file is a module vite初始化react项目
vite搭建react项目 初始化项目 npm create vite 在执行完上面的命令后,npm 首先会自动下载create-vite这个第三方包,然后执行这个包中的项目初始化逻辑。输入项目名称之后按下回车,此时需要选择构建的前端框架: ✔ Project na…...
vscode 调试 ROS2
1、在下列目录同层级找到.vscode文件夹 . ├── build ├── install ├── log └── src 2、 安装ros插件 3、创建tasks.json文件,添加下列内容 //代替命令行进行编译 {"version": "2.0.0","tasks": [{"label": &…...
TuyaOS开发学习笔记(2)——NB-IoT开发SDK架构、运行流程
一、SDK架构 1.1 架构框图 基于 TuyaOS 系统,可以裁剪得到的适用于 NB-IoT 协议产品接入的 SDK。SDK 将设备配网、上下行数据通信、产测授权、固件 OTA 升级等接口进行封装,并提供相关函数。 1.2 目录结构 1.2.1 TuyaOS目录说明 adapter:T…...
Qt应用开发(基础篇)——普通按钮类 QPushButton QCommandLinkButton
一、前言 QPushButton类继承于QAbstractButton,是一个命令按钮的小部件。 按钮基类 QAbstractButton 按钮或者命令按钮是所有图形界面框架最常见的部件,当按下按钮的时候触发命令、执行某些操作或者回答一个问题,典型的按钮有OK,A…...
Data Structures Fan(cf)
考察异或运算以及前缀和 题意大概:给你一个长度为n的a数组,一个长度为n的01字符串,会询问q次 当x的值为1 给出 l r 将 l r 区间中的0 改变为1,1改变为0 。当x的值为2是 若随后的数为0 则输出当前字符串中 是0 的a数组中的数异或 …...
BIOS < UEFI
Basic Input Output System (BIOS) Unified Extensible Firmware Interface (UEFI)...
微信最新更新隐私策略(2023-08-15)
1、manifest.json 配置修改 在mp-weixin: 参数修改(没有就添加) "__usePrivacyCheck__": true, ***2、注意 微信开发者工具调整 不然一直报错 找不到 getPrivacySetting 废话不多说 上代码 3、 编辑首页 或者用户授权界面 <uni-popup…...
Java中xml转javaBean
Java中xml转javaBean maven坐标 <dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.13.4</version></dependency>代码测试 import cn.hutool.js…...
Spring Boot集成JPA和ClickHouse数据库
简介 Spring Boot是一个用于创建独立的、基于Spring的应用程序的框架。它具有快速开发特性,可以大大减少开发人员的工作量。JPA(Java Persistence API)是Java中处理关系型数据库持久化的标准规范,而ClickHouse是一个高性能、分布…...
Hadoop生态圈中的Hive数据仓库技术
Hadoop生态圈中的Hive数据仓库技术 一、Hive数据仓库的基本概念二、Hive的架构组成三、Hive和数据库的区别四、Hive的安装部署五、Hive的基本使用六、Hive的元数据库的配置问题七、Hive的相关配置项八、Hive的基本使用方式1、Hive的命令行客户端的使用2、使用hiveserver2方法操…...
idea配置gitLab
前言:网上有很多类似的文章,但描述不够详细 步骤1:安装git 如果安装成功再次点击TEST按钮展示如下:git版本 步骤2:idea配置gitlab 查看当前项目管理的 远程仓库再git的地址,该地址可是gitLab的࿰…...
工程可以编译通过,但是Vscode依然有波浪线提示
前言 (1)我们在使用Vscode进行开发的时候,命名文件成功编译通过了,但是Vscode还是有波浪线的提示。 (2)其实成功编译通过就行,但是肯定还会存在一些强迫症患者,硬要消除这个报错。接…...
黑马JVM总结(二)
(1)栈 栈帧对应一次方法的调用,线程是要执行代码的,这些代码都是由一个个方法组成,线程运行的时候每个方法需要的内存叫做一个栈帧 (2)栈的演示 Frames:相当有栈 方法相当于栈帧…...
《Effective C++中文版,第三版》读书笔记7
条款41: 了解隐式接口和编译期多态 隐式接口: 仅仅由一组有效表达式构成,表达式自身可能看起来很复杂,但它们要求的约束条件一般而言相当直接而明确。 显式接口: 通常由函数的签名式(也就是函数名…...
脚本:python实现动态爱心
文章目录 效果代码Reference python实现dynamic heart 效果 代码 import turtle as tu import random as ratu.setup(0.5, 0.5) # 设置画板大小(小数表示比例,整数表示大小) tu.screensize(1.0, 1.0) # 设置屏幕大小 tu.bgcolor(black) #…...
【李宏毅】深度学习6:机器学习任务攻略
如果在测试集上的效果不佳,应该要做什么?Optimization 如何选择?解决 overfitting 的方法? 测试集上的效果不佳 看训练数据的loss,是不是模型本身就没训练好? 问题:model 太简单了,…...
如何使用SQL SERVER的OpenQuery
如何使用SQL SERVER的OpenQuery 一、OpenQuery使用说明二、 OpenQuery语法2.1 参数说明2.2注解 三、示例3.1 执行 SELECT 传递查询3.2 执行 UPDATE 传递查询3.3 执行 INSERT传递查询3.4 执行 DELETE 传递查询 一、OpenQuery使用说明 在指定的链接服务器上执行指定的传递查询。 …...
element-tree树结构-默认选中第一个节点高亮-根据id选中节点高亮
前言 tree树结构是在开发中经常使用的组件,比如区域树,楼层树,组织架构树,等等包含节点关系 实际开发可能需要我们一进到页面选中树形结构第一个节点,并且调用数据,来达到用户体验 在用户选择之后&#x…...
二手设备回收做哪个网站好/哪里有培训网
对于开发人员,在日常工作中经常也会处理许多文档格式,将图像转换为PDF也是常有的事。那么,在Java语言开发中,如何将图像转换为PDF呢? Spire.PDF for Java(点击下载)支持将多种图像格式…...
石家庄做网站汉狮网络/2023年7月最新新闻摘抄
缺省情况下,应用程序使用缓冲池 IBMDEFAULTBP,它是在创建数据库时创建的。当 SYSCAT.BUFFERPOOLS 目录表中该缓冲池的 NPAGES 值为 -1 时,DB2 数据库配置参数 BUFFPAGE 控制着缓冲池的大小。否则会忽略 BUFFPAGE 参数,并且用 NPAG…...
上海专业网站制作设计公司/广州seo网站排名
有些软件包是以.src.rpm结尾的,这类软件包是包含了源代码的rpm包,在安装时需要进行编译。这类软件包有多种安装方法,以redhat为例说明如下: 注意: 如果没有rpmbuild可以从系统安装光盘的Package中找到 rpm-build-versi…...
seo在线网站诊断推推蛙/今天的新闻 最新消息摘抄
2019独角兽企业重金招聘Python工程师标准>>> Bean的作用域Spring 3中为Bean定义了5中作用域,分别为singleton(单例)、prototype(原型)、request、session和global session,5种作用域说明如下&am…...
中国建设网银登录/昆明百度关键词优化
Elastricsearch学习 及 head插件 文章目录Elastricsearch学习 及 head插件一、Elastricsearch简单概念索引 index(库)类型 type(表)文档 document(一条记录)字段 Field(字段)映射 mapping二、安装安装elast…...
宿州公司网站建设/代运营靠谱吗
转自:http://blog.csdn.net/kay_wyong/article/details/6631870 大体的思路:系统启动成功后SystemServer调用wm.systemReady()通知WindowManagerService,进而调用PhoneWindowManager,最终通过LockPatternKeyguardView显示解锁界面…...