Kafka3.1部署和Topic主题数据生产与消费
文章目录
- 前言
- 一、Kafka3.1X版本在Windows11主机部署
- 二、Kafk生产Topic主题数据
- 1.kafka生产数据
- 2.JAVA kafka客户端消费数据
- 总结
前言
本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用:
一、Kafka3.1X版本在Windows11主机部署
1.安装JDK配置环境变量
2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目录位置:D:\setup\apache-zookeeper-3.7.1
3.安装Kafka3.1X
3.1 下载包(kafka_2.12-3.1.2.tgz)
Kafka

3.2、 解压并进入Kafka目录:
根目录:D:\setup\kafka3.1.2
3、 编辑config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 为根目录下的\logs
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs
4.运行Zookeeper
Zookeeper安装目录D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右键,选择“打开命令窗口”选项,打开命令行
.\zkServer.cmd;

5.运行Kafka
Kafka安装目录D:\setup\kafka3.1.2,按下Shift+右键,选择“打开命令窗口”选项,打开命令行
.\bin\windows\kafka-server-start.bat .\config\server.properties

二、Kafk生产Topic主题数据
1.kafka生产数据
创建Topic主题heima
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.
查看Topic主题heima
.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic heima

Topic主题heima生产数据
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima
在 > 符号后输入数据:
{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}

2.JAVA kafka客户端消费数据
2.1 pom.xml文件配置kafka客户端-kafka-clients-2.0.1版本
<!-- kafka客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.1</version></dependency>
2.2 JAVA数据读取文件
package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** Kafka服务器操作与数据读取*/
public class KafkaUtilDemo {public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);public static final Properties props = new Properties();
// protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);public static void init(String kafakservers) {// 配置Kafka消费者属性props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");}/*** 持续监听并处理kafa消息,当手机号mobilePhone非空时进入数据同步操作* @param kafaktopic* @return*/public static String poll(String kafaktopic) {String msg = "";try {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(kafaktopic));log.info("Kafka消费者订阅指定主题,持续监听并处理消息");while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));for (ConsumerRecord<String, String> record : records) {log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());msg = record.value();if (!StringUtils.isBlank(record.value())) {JSONObject jsonObject = JSONObject.parseObject(record.value());String mobilePhone = jsonObject.getString("mobilePhone");if (StringUtils.isBlank(mobilePhone)) {log.error("Kafka消费者手机号mobilePhone为空");} else {KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();kafkaUtil.syncSystemInfoTask(jsonObject);}}}}} catch (Exception e) {log.error("Kafka消费者订阅指定主题,持续监听并处理消息 error msg=" + e.getMessage());}return msg;}public boolean syncSystemInfoTask(JSONObject jsonObject) {boolean repsBln = true;try {String mobilePhone = jsonObject.getString("mobilePhone");String roleType = jsonObject.getString("roleType");String roleCode = jsonObject.getString("roleCode");log.info("业务数据同步操作................");} catch (Exception e) {repsBln = false;log.error("Kafka消费者同步入库异常,error msg=" + e.getMessage());}return repsBln;}public static void main(String[] args) {try {String kafakservers = "localhost:9092";String kafaktopic = "heima";init(kafakservers);poll(kafaktopic);} catch (Exception e) {log.error("error msg=" + e.getMessage());}}}
3 执行KafkaUtilDemo 文件,查看消费数据。

总结
pom.xml文件在引入spring-kafka 会由于版本问题出现
org.apache.kafka
kafka-clients
2.0.1
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.8.RELEASE</version></dependency>
相关文章:
Kafka3.1部署和Topic主题数据生产与消费
文章目录 前言一、Kafka3.1X版本在Windows11主机部署二、Kafk生产Topic主题数据1.kafka生产数据2.JAVA kafka客户端消费数据 总结 前言 本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用: 一、Kafka3.1X版本在Windows11主机部署 1.安装JDK配…...
ICIF2023化工展首亮相,宏工科技解决方案助力制造升级
ICIF China 2023中国国际化工展览会于9月4日-6日在上海新国际博览中心举办。宏工科技携化工物料处理一站式解决方案首次亮相,同化工行业全产业链共叙物料处理自动化未来。 宏工科技是一家提供物料处理自动化设备、系统与服务的国家级高新技术企业,业务覆…...
本地部署kubesphere集群
本地部署kubesphere集群 本文采用一主两从结构 1.前置硬件准备 准备最少3台机器,本人分配如下 IP:192.168.58.10 (主) 192.168.58.11 (节点1) 192.168.58.12 (节点2) 系统镜像…...
HNU小学期工训-STC15单片机模型大作业实验报告
STC15单片机模型大作业实验报告 全称:基于STC15单片机与OLED显示模块&PC端演示的多功能声光温振时钟智能手表模型 计科210X 甘晴void 202108010XXX 【请注意:本作业入选优秀范例,直接照抄源码有很大风险】 【建议理解原理之后作改动】 …...
【计算机网络】 TCP协议头相关知识点
文章目录 TCP协议头 TCP协议头 我们来看一下TCP协议头里都有什么东西,研究一下为什么TCP协议是可靠的呢 TCP协议可靠是因为在协议头里带着一些校验的数据 首先是源端口和目的端口,这两个是UDP中也有的,但是UDP中只有这两个,没有…...
深度学习相关VO梳理
相关论文 基于学习的VO 相关: DeepVO Towards End-to-End Visual Odometry with Deep Recurrent Convolutional Neural Networks(ICRA,2017) TartanVO: A Generalizable Learning-based VO(CoRL2021) SimVODIS: Simultaneous Vis…...
SpringMVC---CRUD实现
思路分析 搭建环境逆向生层对应的类(model、mapper.xml、mapper.java)编写业务逻辑层编写web层(控制器)前端页面 一、环境搭建 1.1、导入项目所需依赖(pom.xml) <project xmlns"http://maven.apache.org/POM/4.0.0"…...
vue+elementUI el-select 自定义搜索逻辑(filter-method)
下拉列表的默认搜索是搜索label显示label,我司要求输入id显示label名称 <el-form-item label"部门:"><el-select v-model"form.region1" placeholder"请选择部门" filterable clearable:filter-method"dataFilter&qu…...
数据库——事务
事务是指作为一个整体被执行的一系列操作。在数据库管理系统中,事务是指一组数据库操作(如插入、更新、删除等)的逻辑单元,也就是说事务的本质是把多个操作打包成一个操作,并且它要么完全执行,要么完全不执…...
echarts折线图每段显示不同的颜色
效果图 配置项: zqChartFour: {title: {text: "一天用电量分布",subtext: "纯属虚构",},tooltip: {trigger: "axis",axisPointer: {type: "cross",},},toolbox: {show: true,feature: {saveAsImage: {},},},xAxis: {type:…...
设计模式-单例模式(Singleton)
文章目录 前言一、单例模式的概念二、单例模式的实现三、单例模式的应用场景四、单例模式优缺点优点:缺点:总结 前言 单例模式(Singleton Pattern)是一种创建型设计模式,它确保一个类只有一个实例,并提供一…...
优漫动游 常见的AI视频生成网站的官方网站:
1、Lumen5 Lumen5是一款在线视频制作工具,利用人工智能技术能够迅速将文本、和音乐转换为视频。它可以帮助你把博客文章、社交媒体内容等转化为吸引人的视频,从而提高你的品牌曝光率和社交媒体的参与度。 2.Animoto Animoto是一个视频制作平台&…...
Vue中数据可视化关系图展示与关系图分析
Vue中数据可视化关系图展示与关系图分析 数据可视化是现代Web应用程序的重要组成部分之一,它可以帮助我们以图形的方式呈现和分析复杂的数据关系。Vue.js是一个流行的JavaScript框架,它提供了强大的工具来构建数据可视化应用。本文将介绍如何使用Vue.js…...
【启扬方案】基于启扬安卓屏一体机的医疗手推车解决方案
医疗手推车作为医院基础设施的一部分,被广泛应用于医院内部,包括急诊室、手术室、病房和其他临床部门。伴随着互联网技术的发展和行业的渗透,智慧医疗受到越来越多的青睐,这也使得很多医疗设施得到了改进,医疗手推车也…...
JavaScript实现MD5加密的6种方式
关于MD5: MD5.js是通过前台js加密的方式对用户信息,密码等私密信息进行加密处理的工具,也可称为插件。 在本案例中 可以看到MD5共有6种加密方法: 1, hex_md5(value) 2, b64_md5(value) 3, …...
腾讯云和阿里云2核2G服务器租用价格表对比
2核2G云服务器可以选择阿里云服务器或腾讯云服务器,腾讯云轻量2核2G3M带宽服务器95元一年,阿里云轻量2核2G3M带宽优惠价108元一年,不只是轻量应用服务器,阿里云还可以选择ECS云服务器u1,腾讯云也可以选择CVM标准型S5云…...
抖音无需API开发连接Stable Diffusion,实现自动根据评论区的指令生成图像并返回
抖音用户使用场景: 随着AI绘图的热度不断升高,许多抖音达人通过录制视频介绍不同的AI工具,包括产品背景、使用方法以及价格等,以吸引更多的用户。其中,Stable Diffusion这款产品受到了许多博主达人的青睐。在介绍这款产…...
MySQL(三)
DDL(数据定义语言) 库 /* 创建数据库testone */ create database testone; /* 查询数据库testone */ show databases; /* 选择数据库testone */ use testone; /* 删除数据库testone */ drop database testone; 表 创建表 create table table_name (…...
汽车级肖特基二极管DSS220-Q 200V 2A
DSS220-Q是什么二极管?贵司有生产吗? 肖特基二极管DSS220-Q符合汽车级AEC Q101标准吗? DSS220-Q贴片肖特基二极管参数是什么封装?正向电流和反向电压是多大? DSS220-Q肖特基二极管需要100KK,有现货吗&#…...
maven jetty post 上传长度设置
maven jetty post 上传长度设置 <plugin><groupId>org.eclipse.jetty</groupId><artifactId>jetty-maven-plugin</artifactId><version>9.4.8.v20171121</version><configuration><scanIntervalSeconds>1</scanInter…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...
IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”
2025年#高考 将在近日拉开帷幕,#AI 监考一度冲上热搜。当AI深度融入高考,#时间同步 不再是辅助功能,而是决定AI监考系统成败的“生命线”。 AI亮相2025高考,40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕,江西、…...
HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...
从实验室到产业:IndexTTS 在六大核心场景的落地实践
一、内容创作:重构数字内容生产范式 在短视频创作领域,IndexTTS 的语音克隆技术彻底改变了配音流程。B 站 UP 主通过 5 秒参考音频即可克隆出郭老师音色,生成的 “各位吴彦祖们大家好” 语音相似度达 97%,单条视频播放量突破百万…...
