当前位置: 首页 > news >正文

kafka安装和使用的入门教程

这篇文章简单介绍如何在ubuntu上安装kafka,并使用kafka完成消息的发送和接收。

一、安装kafka

访问kafka官网Apache Kafka,然后点击快速开始

紧接着,点击Download

最后点击下载链接下载安装包

如果下载缓慢,博主已经把安装包上传到百度网盘:

链接:https://pan.baidu.com/s/1nZ1duIt64ZVUsimaQ1meZA?pwd=3aoh
提取码:3aoh
--来自百度网盘超级会员V3的分享

二、启动kafka

经过上一步下载完成后,按照页面的提示启动kafka

1、通过远程连接工具,如finalshell、xshell上传kafka_2.13-3.6.0.tgz到服务器上的usr目录

2、切换到usr目录,解压kafka_2.13-3.6.0.tgz

cd /usrtar -zxzf kafka_2.13-3.6.0.tgz

3、启动zookeeper

修改配置文件confg/zookeeper.properties,修改一下数据目录

dataDir=/usr/local/zookeeper

然后通过以下命令启动kafka自带的zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

4、启动kafka

修改配置文件confg/server.properties,修改一下kafka保存日志的目录

log.dirs=/usr/local/kafka/logs

然后新开一个连接窗口,通过以下命令启动kafka

bin/kafka-server-start.sh config/server.properties

三、kafka发送、接收消息

创建topic

bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092

生产消息

往刚刚创建的topic里发送消息,可以一次性发送多条消息,点击Ctrl+C完成发送

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello

消费消息

消费最新的消息

新开一个连接窗口,在命令行输入以下命令拉取topic为hello上的消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

消费之前的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic hello

指定偏移量消费

 指定从第几条消息开始消费,这里--offset参数设置的偏移量是从0开始的。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 1 --topic hello

消息的分组消费
每个消费者都可以指定一个消费者组, kafka 中的同一条消息,只能被同一个消费者组下的某一个消费 者消费。而不属于同一个消费者组的其他消费者,也可以消费到这一条消息。
通过以下命令在启动消费者时设置分组:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=helloGroup --topic hello

四、Java中使用kafka

通过maven官网搜索kafka的maven依赖版本

https://central.sonatype.com/search?q=kafkaicon-default.png?t=N7T8https://central.sonatype.com/search?q=kafka然后通过IntelliJ IDEA创建一个maven项目kafka,在pom.xml中添加kafka的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>kafka</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>3.6.0</version></dependency></dependencies>
</project>

创建消息生产者

生产者工厂类
package producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/*** 消息生产者工厂类* @author heyunlin* @version 1.0*/
public class MessageProducerFactory {private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";public static Producer<String, String> getProducer() {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<>(props);}}

测试发送消息
package producer;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;/*** @author heyunlin* @version 1.0*/
public class MessageProducer {private static final String TOPIC = "hello";public static void main(String[] args) {ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "1", "Message From Producer.");Producer<String, String> producer = MessageProducerFactory.getProducer();// 同步发送消息producer.send(record);// 异步发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {String topic = recordMetadata.topic();long offset = recordMetadata.offset();int partition = recordMetadata.partition();String message = recordMetadata.toString();System.out.println("topic = " + topic);System.out.println("offset = " + offset);System.out.println("message = " + message);System.out.println("partition = " + partition);}});// 加上这行代码才会发送消息producer.close();}}

创建消息消费者

消费者工厂类
package consumer;import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;/*** 消息生产者工厂类* @author heyunlin* @version 1.0*/
public class MessageConsumerFactory {private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";public static Consumer<String, String> getConsumer() {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "helloGroup");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<>(props);}}

测试消费消息
package consumer;import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.time.Duration;
import java.util.Collections;/*** @author heyunlin* @version 1.0*/
public class MessageConsumer {private static final String TOPIC = "hello";public static void main(String[] args) {Consumer<String, String> consumer = MessageConsumerFactory.getConsumer();consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key() + ": " + record.value());}// 提交偏移量,避免消息重复推送consumer.commitSync(); // 同步提交// consumer.commitAsync(); // 异步提交}}}

五、springboot整合kafka

开始前的准备工作

然后通过IntelliJ IDEA创建一个springboot项目springboot-kafka,在pom.xml中添加kafka的依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

然后修改application.yml,添加kafka相关配置

spring:kafka:bootstrap-servers: 192.168.254.128:9092producer:acks: 1retries: 3batch-size: 16384properties:linger:ms: 0buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: helloGroupenable-auto-commit: falseauto-commit-interval: 1000auto-offset-reset: latestproperties:request:timeout:ms: 18000session:timeout:ms: 12000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

创建消息生产者

package com.example.springboot.kafka.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;/*** @author heyunlin* @version 1.0*/
@RestController
@RequestMapping(path = "/producer", produces = "application/json;charset=utf-8")
public class KafkaProducer {private final KafkaTemplate<String, Object> kafkaTemplate;@Autowiredpublic KafkaProducer(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@RequestMapping(value = "/sendMessage", method = RequestMethod.GET)public String sendMessage(String message) {kafkaTemplate.send("hello", message);return "发送成功~";}}

创建消息消费者

package com.example.springboot.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author heyunlin* @version 1.0*/
@Component
public class KafkaConsumer {@KafkaListener(topics = "hello")public void receiveMessage(ConsumerRecord<String, String> record) {String topic = record.topic();long offset = record.offset();int partition = record.partition();System.out.println("topic = " + topic);System.out.println("offset = " + offset);System.out.println("partition = " + partition);}}

然后访问网址http://localhost:8080/producer/sendMessage?message=hello往topic为hello的消息队列发送消息。控制台打印了参数,成功监听到发送的消息。

 

文章涉及的项目已经上传到gitee,按需获取~

Java中操作kafka的基本项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/kafka.git

springboot整合kafka案例项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/springboot-kafka.git

相关文章:

kafka安装和使用的入门教程

这篇文章简单介绍如何在ubuntu上安装kafka&#xff0c;并使用kafka完成消息的发送和接收。 一、安装kafka 访问kafka官网Apache Kafka&#xff0c;然后点击快速开始 紧接着&#xff0c;点击Download 最后点击下载链接下载安装包 如果下载缓慢&#xff0c;博主已经把安装包上传…...

享搭低代码平台:加速企业应用开发,轻松搭建表单和报表

在当今快节奏的商业环境中&#xff0c;企业需要快速响应市场需求并提供高效的解决方案。然而&#xff0c;传统的应用开发过程繁琐、耗时&#xff0c;并且需要专业的编程技能。为了解决这些问题&#xff0c;享搭低代码平台应运而生。本文将详细介绍享搭低代码平台的特点和优势&a…...

华为云应用中间件DCS系列—Redis实现(社交APP)实时评论

云服务、API、SDK&#xff0c;调试&#xff0c;查看&#xff0c;我都行 阅读短文您可以学习到&#xff1a;应用中间件系列之Redis实现&#xff08;社交APP&#xff09;实时评论 1 什么是DEVKIT 华为云开发者插件&#xff08;Huawei Cloud Toolkit&#xff09;&#xff0…...

01-spring源码概述

文章目录 1. Spring两大主要功能2. Bean的生命周期&#xff08;部分生命周期&#xff0c;不包括销毁&#xff09;2.1 两个重要接口及Aware接口2.2 创建对象的过程2.3 Bean的scope作用域2.4 Bean的类型2.5 获得反射对象的三种方式 3. 涉及的接口汇总4. 涉及设计模式 1. Spring两…...

datax 同步本地csv到mysql

csv 文件 /root/tempdata/us_population.csv NY,New York,8143197 CA,Los Angeles,3844829 IL,Chicago,2842518 TX,Houston,2016582 PA,Philadelphia,1463281 AZ,Phoenix,1461575 TX,San Antonio,1256509 CA,San Diego,1255540 TX,Dallas,1213825 CA,San Jose,912332csv2mysq…...

国内原汁原味的免费sd训练工具--哩布哩布AI

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 公众号&#xff1a;网络豆云计算学堂 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a; 网络豆的主页​​​​​ 目录 写在前面 一.体验与操作 1.注册 2.为何可…...

组合数(1) 用Vector实现获取所有组合数列表的QT实现

1.工程文件 QT coreCONFIG c17 cmdline# You can make your code fail to compile if it uses deprecated APIs. # In order to do so, uncomment the following line. #DEFINES QT_DISABLE_DEPRECATED_BEFORE0x060000 # disables all the APIs deprecated before Qt 6.…...

Ultra-Fast-Lane-Detection-v2 裁剪数据增强

目录 标注拆分为独立文件加载并数据增强 Ultra-Fast-Lane-Detection-v2 裁剪数据增强 main方法是调用示例...

从零开始学习调用百度地图网页API:三、鼠标点击绘图功能

目录 代码功能问题注意addEventListenerplot_line 代码 <!DOCTYPE html> <html> <head><meta http-equiv"Content-Type" content"text/html; charsetutf-8" /><meta name"viewport" content"initial-scale1.0,…...

强化学习案例复现(1)--- MountainCar基于Q-learning

1 搭建环境 1.1 gym自带 import gym# Create environment env gym.make("MountainCar-v0")eposides 10 for eq in range(eposides):obs env.reset()done Falserewards 0while not done:action env.action_space.sample()obs, reward, done, action, info env.…...

BUUCTF学习(6): 命令执行ip

1、介绍 2、hackbar安装 BUUCTF学习(四): 文件包含tips-CSDN博客 ?ip127.0.0.1;ag;cat$IFS$9fla$a.php 空格过滤 $IFS$9 检查源代码 结束...

javaweb:mybatis:mapper(sql映射+代理开发+配置文件之设置别名、多环境配置、顺序+注解开发)

1.0版本 sql映射文件实现 流程 首先程序进入启动类MyBatisDemo.java中&#xff0c;读取配置文件mybatis-config.xml 再由mybatis-config的mappers属性 <mappers><mapper resource"UserMapper.xml"></mapper></mappers>找到sql映射文件Use…...

JavaScript基础知识——练习巩固(2)

写一个程序&#xff0c;要求如下 需求1&#xff1a;让用户输入五个有效年龄&#xff08;0-100之间&#xff09;&#xff0c;放入数组中 必须输入五个有效年龄年龄&#xff0c;如果是无效年龄&#xff0c;则不能放入数组中 需求2&#xff1a;打印出所有成年人的年龄 (数组筛选)…...

FutureTask的测试使用和方法执行分析

FutureTask类图如下 java.util.concurrent.FutureTask#run run方法执行逻辑如下 public void run() {if (state ! NEW ||!RUNNER.compareAndSet(this, null, Thread.currentThread()))return;try {Callable<V> c callable;if (c ! null && state NEW) {V res…...

SpringMVC的请求处理

目录 请求映射路径的配置 请求数据的接收 接收Restful风格的数据 什么是Restful风格&#xff1f; 接收上传文件 获取headers头信息和cookie信息 JavaWeb常用对象获取 请求静态资源 注解驱动标签 请求映射路径的配置 请求映射路径的配置主要是通过RequestMapping注解实现…...

260. 只出现一次的数字 III

给你一个整数数组 nums&#xff0c;其中恰好有两个元素只出现一次&#xff0c;其余所有元素均出现两次。 找出只出现一次的那两个元素。你可以按 任意顺序 返回答案。 你必须设计并实现线性时间复杂度的算法且仅使用常量额外空间来解决此问题。 示例 1&#xff1a; 输入&…...

家政预约接单系统,家政保洁小程序开发;

家政预约接单系统&#xff0c;家政保洁维修小程序开发&#xff0c;阿姨管理&#xff0c;家政保险&#xff0c;合同管理&#xff0c;资金管理&#xff0c;营销推广等功能&#xff0c;包括&#xff1a;推广、营销、管理、培训、周边服务等等 家政系统详细功能介绍&#xff1a; 家…...

网络安全工程师需要学什么?零基础怎么从入门到精通,看这一篇就够了

网络安全工程师需要学什么&#xff1f;零基础怎么从入门到精通&#xff0c;看这一篇就够了 我发现关于网络安全的学习路线网上有非常多看似高大上却无任何参考意义的回答。大多数的路线都是给了一个大概的框架&#xff0c;告诉你那些东西要考&#xff0c;以及建议了一个学习顺…...

出差学知识No3:ubuntu查询文件大小|文件包大小|磁盘占用情况等

1、查询单个文件占用内存大小2、显示一个目录下所有文件和文件包的大小3、显示ubuntu所有磁盘的占用情况4、查看ubuntu单个包的占用情况 1、查询单个文件占用内存大小 使用指令&#xff1a;ls -lh 文件 2、显示一个目录下所有文件和文件包的大小 指令&#xff1a;du -sh* 3…...

详解cv2.copyMakeBorder函数【OpenCV图像边界填充Python版本】

文章目录 简介函数原型代码示例参考资料 简介 做深度学习图像数据集时&#xff0c;有时候需要调整一张图片的长和宽。如果直接使用cv2.resize函数会造成图像扭曲失真&#xff0c;因此我们可以采取填充图像短边的方法解决这个问题。cv2.copyMakeBorder函数提供了相关操作。本篇…...

前端技术-并发请求

并发请求 代码解释 定义了一个函数 concurRequest&#xff0c;用于并发请求多个 URL 并返回它们的响应结果。 function concurRequest(urls, maxNum) {return new Promise((resolve, reject) > {if (urls.length 0) {resolve([]);return;}const results [];let index …...

面试题-React(十三):React中获取Refs的几种方式

一、Refs的基本概念 Refs是React提供的一种访问DOM元素或组件实例的方式。通过Refs&#xff0c;我们可以在React中获取到底层的DOM节点或组件实例&#xff0c;并进行一些操作。Refs的使用场景包括但不限于&#xff1a;访问DOM属性、调用组件方法、获取输入框的值等。 二、获取…...

Linux CentOS 7升级curl8.4.0使用编译安装方式

1、查看当前版本 # curl --version curl 7.29.0 (x86_64-redhat-linux-gnu) libcurl/7.29.0 NSS/3.19.1 Basic ECC zlib/1.2.7 libidn/1.28 libssh2/1.4.3 Protocols: dict file ftp ftps gopher http https imap imaps ldap ldaps pop3 pop3s rtsp scp sftp smtp smtps tel…...

探寻JWT的本质:它是什么?它有什么作用?

JWT&#xff08;JSON Web Token&#xff09;是一种基于 JSON 格式的轻量级令牌&#xff08;token&#xff09;协议&#xff0c;它被广泛应用于网络应用程序的身份验证和授权。相较于传统的 session-based 认证机制&#xff0c;JWT 具有更好的扩展性和互操作性&#xff0c;同时也…...

关于雅思听力答案限定字数的解释。

1. No more than three words and/or a number&#xff1a;31&#xff0c;可以填3/2/1个单词&#xff1b;1个数字&#xff1b;3/2/1个单词1个数字 2. No more than three words and/or numbers&#xff1a;3n&#xff0c;可以填3/2/1个单词&#xff1b;n个数字&#xff1b;3/2…...

化工python | CSTR连续搅拌反应器系统

绝热连续搅拌釜反应器 (CSTR) 是过程工业中常见的化学系统。 容器中发生单个一级放热且不可逆的反应 A → B,假定容器始终完全混合。 试剂 A 的入口流以恒定的体积速率进入罐。 产物流B以相同的体积速率连续排出,液体密度恒定。 因此,反应液体的体积是恒定的。 在反应器中发…...

交通物流模型 | 基于自监督学习的交通流预测模型

交通物流模型 | 基于自监督学习的交通流预测模型 在智能交通系统中,准确预测不同时间段的城市交通流量是至关重要的。现有的方法存在两个关键的局限性:1、大多数模型集中预测所有区域的交通流量,而没有考虑空间异质性,即不同区域的交通流量分布可能存在偏差;2、现有模型无…...

343. 整数拆分 96.不同的二叉搜索树

343. 整数拆分 设dp[i]表示拆分 数字i 出来的正整数相乘值最大的值 (i - j) * j,和dp[i - j] * j是获得dp[i]的两种乘法&#xff0c;在里面求最大值可以得到当前dp[i]的最大值&#xff0c;但是这一次的得出的最大值如果赋值给dp[i]&#xff0c;可能没有没赋值的dp[i]大&#…...

Vue3理解(9)

侦听器 1.计算属性允许我们声明性地计算衍生值,而在有些情况下&#xff0c;我们需要状态变化时执行一些方法例如修改DOM。 2.侦测数据源类型&#xff0c;watch的第一个参数可以市不同形式的‘数据源’&#xff0c;它可以市一个ref(包括计算属性)&#xff0c;一个响应式对象&…...

CRM系统中的销售漏斗有什么作用?

随着数字化发展&#xff0c;越来越多的企业使用CRM销售管理系统提高销售管理水平&#xff0c;提升盈利能力。在这个过程中&#xff0c;销售漏斗起到了非常重要的作用。下面就来说说&#xff0c;CRM系统中的销售漏斗有什么作用&#xff1f; 一、销售数据可视化 CRM销售漏斗通过…...

用vsweb做购物网站/网站客服

之前的文章中我们介绍了如何在.NET下运用相关类库进行多线程编程的基础&#xff0c;我们知道.NET 4.0已经正式推出了&#xff0c;带来的重要特性是并行库。本文就谈谈对并行计算的一些理解和看法。并行计算不是一个很新的概念&#xff0c;其实它就是通过多线程把同一个任务分割…...

wordpress 下载短代码/全国十大教育机构

期货交易服务市场的企业竞争态势 该报告涉及的主要国际市场参与者有Daniels Trading、Saxo、Tradovate、NinjaTrader、AGT Futures、CQG、Gain Capital Group、ABLWSYS、SmartQuant、E-Futures、TransAct Futures、Trade Navigator、MultiCharts等。这些参与者的市场份额、收入…...

调节wordpress手机样式/web前端培训费用大概多少

数据 能被计算操作&#xff0c;并识别的的客观事物的符号。数据包括整形&#xff0c;实型&#xff08;小数&#xff09;&#xff0c;字符&#xff0c;声音、图片等是一个笼统的范畴。 数据结构的概念 数据结构是计算机存储、组织数据的方式。是相互之间存在一种或多种特定关…...

crm管理系统开发语言/搜索引擎快速优化排名

requests: import requests # 返回一个Response对象 r requests.get("http://www.baidu.com") # 状态码 200 表示成功 code r.status_code print(code) # encoding表示网页编码&#xff0c;从HTTP header中的charset中猜测出来&#xff0c;没有charset的话就会默…...

做公司网站建设价格/合肥关键词排名技巧

1, 数据 函数程序 都定义在 实例化对象中通过调用 实例化对象的函数方法 调用 实例化对象中 存储的数据执行程序 实现效果2, this指向必须是实例化对象匿名函数 --- 箭头函数回调函数 通过 bind() 语法修改设定 this指向提前定义一个变量 存储this指向一个函数中 要是用 多个…...

电子商务网站建设花费/关键词挖掘工具

hello 我是涤生,以下为笔者自己见解&#xff0c;如有错误&#xff0c;请大家务必指出&#xff0c;谢谢♪(&#xff65;ω&#xff65;)&#xff89; 首先来说说 int main() 、void main()、void main(void)这几个吧 以前我也不知道这为什么&#xff0c;上网一搜&#xff0c;好…...