互联网全景消息(10)之Kafka深度剖析(中)
一、深入应用
1.1 SpringBoot集成Kafka
引入对应的依赖。
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--swagger2增强,官方ui太low , 访问地址: /doc.html --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>swagger-bootstrap-ui</artifactId><version>1.8.8</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency></dependencies>
添加配置文件:
spring:application:name: demokafka:bootstrap-servers: 这里换成自己的kafka信息producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
启动服务:
@SpringBootApplication
@RestController
public class Demo {public static void main(String[] args) {new SpringApplicationBuilder(Demo.class).run(args);}}
启动信息如下:

1.2 消息发送
1.2.1 异步发送
KafkaTemplate调用的send默认采用的是异步发送,如果需要同步发送获取发送结果,则需要调用get方法。
@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}@KafkaListener(topics = {"test"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}
效果如下:

同步发送代码如下:
@GetMapping("/kafka/sync/{msg}")public void sync(@PathVariable("msg") String msg) throws Exception {Message message = new Message();message.setMessage(msg);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);logger.info("send result:{}",result.getProducerRecord().value());}
1.2.2 序列化
序列化详解:
- 前面我们用到的是Kafka自带的字符串序列化器,
org.apache.kafka.common.serialization.StringDeserializer
- 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long等;
- 这些序列化接器都实现了org.apache.kafka.common.serialization.Serializer
自定义序列化,自己实现序列化对应的接口即可,如下:
public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();}}
然后在yaml配置自己的编辑器:
value-serializer: com.itheima.demo.config.MySerializer
对应的我们在消费者消费消息的时候也需要按照我们自定义的方式进行解码,具体代码如下:
package com.itheima.demo.config;import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}
1.2.3 分区策略
分区策略决定了消息根据key投放到那个分区,也是顺序消费保障的基石。
- 给定了分区号,直接将数据发送到指定分区里面去;
- 没有给定了分区号,给定数据的key值,通过key取上hashcode进行分区;
- 既没有给定分区号,也没有给定key值,直接轮询进行分区;
- 自定义分区策略,按照自定义需求选择分区号;
生产者代码如下:
//测试分区发送
@RestController
public class PartitionProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;// 指定分区发送
// 不管你key是什么,到同一个分区@GetMapping("/kafka/partitionSend/{key}")public void setPartition(@PathVariable("key") String key) {kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");}// 指定key发送,不指定分区
// 根据key做hash,相同的key到同一个分区@GetMapping("/kafka/keysend/{key}")public void setKey(@PathVariable("key") String key) {kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");}// 什么也不指定@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}
消费者代码如下:
//指定消费组消费
@Component
public class PartitionConsumer {private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);//分区消费@KafkaListener(topics = {"test"},topicPattern = "0")public void onMessage(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=0,message:[{}]", msg);}}@KafkaListener(topics = {"test"},topicPattern = "1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=1,message:[{}]", msg);}}
}
1)测试默认分区策略,发送什么也不指定的消息

可以发现发送的是同一条的数据,他是跟partition轮询发送的;
2)测试指定分区

3)按照key的hashcode来分区

1.3 消息消费
1.3.1 消费者分组
public class GroupConsumer {private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);//组1,消费者1@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-1 , message:{}", msg);}}//组1,消费者2@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-2 , message:{}", msg);}}//组2,只有一个消费者@KafkaListener(topics = {"test"},groupId = "group2")public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group2 , message:{}", msg);}}
}
启动:

需要注意的是,注意分区数与消费者数的搭配,如果(消费者数 > 分区数量),消费者将会出现闲置,浪费资源!
1.3.2 位移提交
1)自动提交,我们在前面设置了以下两个选项,则kafka会延时设置自动提交:

2)手动提交,有些时候我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复,如下我们配置手动提交:
@Configuration
public class MyOffsetConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 注意这里!!!设置手动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));// ack模式:// AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种://// RECORD// 每处理一条commit一次//// BATCH(默认)// 每次poll的时候批量提交一次,频率取决于每次poll的调用频率//// TIME// 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)//// COUNT// 累积达到ackCount次的ack去commit//// COUNT_TIME// ackTime或ackCount哪个条件先满足,就commit//// MANUAL// listener负责ack,但是背后也是批量上去//// MANUAL_IMMEDIATE// listner负责ack,每调用一次,就立即commitfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}相关文章:
互联网全景消息(10)之Kafka深度剖析(中)
一、深入应用 1.1 SpringBoot集成Kafka 引入对应的依赖。 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupI…...
Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动同步
Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动同步 目录 Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动…...
pytorch小记(一):pytorch矩阵乘法:torch.matmul(x, y)
pytorch小记(一):pytorch矩阵乘法:torch.matmul(x, y)/ x y 代码代码 1:torch.matmul(x, y)输入张量:计算逻辑:输出结果: 代码 2:y y.view(4,1)…...
PyTorch环境配置常见报错的解决办法
目标 小白在最基础的环境配置里一般都会出现许多问题。 这里把一些常见的问题分享出来。希望可以节省大家一些时间。 最终目标是可以在cmd虚拟环境里进入jupyter notebook,new的时候有对应的环境,并且可以跑通所有的import code。 第一步:…...
罗永浩再创业,这次盯上了 AI?
罗永浩,1972年7月9日生于中国延边朝鲜族自治州的一个军人家庭,是一名朝鲜族人;早年在新东方授课,2004年当选 “网络十大红人” ;2006年8月1日,罗永浩创办牛博网;2008年5月,罗永浩注册…...
VUE3 provide 和 inject,跨越多层级组件传递数据
provide 和 inject 是 Vue 3 提供的 API,主要用于实现祖先组件与后代组件之间的依赖注入。它们可以让你在组件树中,跨越多层组件传递数据,而不需要通过 props 或事件的方式逐层传递。这个机制主要用于状态共享、插件系统或某些跨层级的功能。…...
git打补丁
1、应用场景 跨仓库升级 开发项目B使用的是开源项目A。开源项目A发现漏洞,作者进行了修复,我们可以通过使用git补丁的方式,将作者修改的内容复制到我 们的项目B中。 2、TortoiseGit方式 源仓库 格式化补丁 根据提交数量,生成…...
机械燃油车知识图谱、知识大纲、知识结构(持续更新...)
一、发动机 曲柄连杆机构 配气机构 点火系统 起动系统 燃油供给系统 润滑系统 冷却系统 二、底盘 (一)传动系统 1、离合器 2、变速器 3、万向传动装置 4、驱动桥 (二)行驶系统 1、车架 2、车桥 3、悬架 4、车轮 &a…...
Vue3学习总结
一、Vue 3 基础搭建与核心语法 1.创建 Vue 3 应用 在项目的入口文件 main.js 中,通过以下代码创建 Vue 3 应用实例: import { createApp } from vue; import App from ./App.vue;const app createApp(App); app.mount(#app); 这几行代码的作用是引入…...
Type-C双屏显示器方案
在数字化时代,高效的信息处理和视觉体验已成为我们日常生活和工作的关键需求。随着科技的进步,一款结合了便携性和高效视觉输出的设备——双屏便携屏,逐渐崭露头角,成为追求高效工作和娱乐体验人群的新宠。本文将深入探讨双屏便携…...
【读书与思考】焦虑与内耗
【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】【读书与思考】 导言 今天一个朋友和我说,最近比较焦虑和内耗,无心工作和学习,我问他你焦虑内耗的时候,时间主要花在哪了,他告诉我说主要花在看有关移…...
基于python的网页表格数据下载--转excel
基于 Python 的网页表格数据爬取与下载:以维基百科为例 目录 基于 Python 的网页表格数据爬取与下载:以维基百科为例1. 背景介绍2. 工具与环境3. 操作步骤1. 获取网页内容2. 定位表格元素3. 表格变身 Pandas DataFrame4. 检查数据,收工!5. 进阶玩法与优化6. 完整代码4. 结果…...
Vue.js开发入门:从零开始搭建你的第一个项目
前言 嘿,小伙伴们!今天咱们来聊聊 Vue.js,一个超火的前端框架。如果你是编程小白,别怕,跟着我一步步来,保证你能轻松上手,搭建起属于自己的第一个 Vue 项目。Vue.js 可能听起来有点高大上&#…...
LS1046+XILINX XDMA PCIE调通
欢迎点赞收藏,欢迎私下讨论技术,分享技术 硬件平台 :NXP LS1046 XILINX FPGA 软件平台:LINUX 4.19.68 buildroot LS1046 PEX3 接 XILINX FPGA,linux使用designware的PCI主控制器。下载XILINX DMA驱动,解…...
HarmonyOS:@LocalBuilder装饰器: 维持组件父子关系
一、前言 当开发者使用Builder做引用数据传递时,会考虑组件的父子关系,使用了bind(this)之后,组件的父子关系和状态管理的父子关系并不一致。为了解决组件的父子关系和状态管理的父子关系保持一致的问题,引入LocalBuilder装饰器。…...
YOLOv10-1.1部分代码阅读笔记-downloads.py
downloads.py ultralytics\utils\downloads.py 目录 downloads.py 1.所需的库和模块 2.def is_url(url, checkFalse): 3.def delete_dsstore(path, files_to_delete(".DS_Store", "__MACOSX")): 4.def zip_directory(directory, compressTrue, ex…...
计算机图形学【绘制立方体和正六边形】
工具介绍 OpenGL:一个跨语言的图形API,用于渲染2D和3D图形。它提供了绘制图形所需的底层功能。 GLUT:OpenGL的一个工具库,简化了窗口创建、输入处理和其他与图形环境相关的任务。 使用的函数 1. glClear(GL_COLOR_BUFFER_BIT |…...
基于django中医药数据可视化平台(源码+lw+部署文档+讲解),源码可白嫖!
摘要 时代在飞速进步,每个行业都在努力发展现在先进技术,通过这些先进的技术来提高自己的水平和优势,中医药管理平台当然不能排除在外。中医药数据可视化平台是在实际应用和软件工程的开发原理之上,运用Python语言、ECharts技术、…...
kafka消费堆积问题探索
背景 我们的商城项目用PHP写的,原本写日志方案用的是PHP的方案,但是,这个方案导致资源消耗一直降不下来,使用了20个CPU。后面考虑使用通过kafka的方案写日志,商城中把产生的日志丢到kafka中,在以go写的项目…...
Vue.js 使用插槽(Slots)优化组件结构
Vue.js 使用插槽(Slots)优化组件结构 今天我们聊聊 Vue.js 的一个超实用功能——插槽(Slots)。插槽是 Vue 组件开发中的神器,用好它,你可以让组件变得更灵活、更可复用,还能写出优雅的代码结构…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...
el-switch文字内置
el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...
基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...
面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
【WebSocket】SpringBoot项目中使用WebSocket
1. 导入坐标 如果springboot父工程没有加入websocket的起步依赖,添加它的坐标的时候需要带上版本号。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dep…...
