互联网全景消息(10)之Kafka深度剖析(中)
一、深入应用
1.1 SpringBoot集成Kafka
引入对应的依赖。
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--swagger2增强,官方ui太low , 访问地址: /doc.html --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>swagger-bootstrap-ui</artifactId><version>1.8.8</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency></dependencies>
添加配置文件:
spring:application:name: demokafka:bootstrap-servers: 这里换成自己的kafka信息producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
启动服务:
@SpringBootApplication
@RestController
public class Demo {public static void main(String[] args) {new SpringApplicationBuilder(Demo.class).run(args);}}
启动信息如下:

1.2 消息发送
1.2.1 异步发送
KafkaTemplate调用的send默认采用的是异步发送,如果需要同步发送获取发送结果,则需要调用get方法。
@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}@KafkaListener(topics = {"test"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}
效果如下:

同步发送代码如下:
@GetMapping("/kafka/sync/{msg}")public void sync(@PathVariable("msg") String msg) throws Exception {Message message = new Message();message.setMessage(msg);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);logger.info("send result:{}",result.getProducerRecord().value());}
1.2.2 序列化
序列化详解:
- 前面我们用到的是Kafka自带的字符串序列化器,
org.apache.kafka.common.serialization.StringDeserializer
- 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long等;
- 这些序列化接器都实现了org.apache.kafka.common.serialization.Serializer
自定义序列化,自己实现序列化对应的接口即可,如下:
public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();}}
然后在yaml配置自己的编辑器:
value-serializer: com.itheima.demo.config.MySerializer
对应的我们在消费者消费消息的时候也需要按照我们自定义的方式进行解码,具体代码如下:
package com.itheima.demo.config;import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}
1.2.3 分区策略
分区策略决定了消息根据key投放到那个分区,也是顺序消费保障的基石。
- 给定了分区号,直接将数据发送到指定分区里面去;
- 没有给定了分区号,给定数据的key值,通过key取上hashcode进行分区;
- 既没有给定分区号,也没有给定key值,直接轮询进行分区;
- 自定义分区策略,按照自定义需求选择分区号;
生产者代码如下:
//测试分区发送
@RestController
public class PartitionProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;// 指定分区发送
// 不管你key是什么,到同一个分区@GetMapping("/kafka/partitionSend/{key}")public void setPartition(@PathVariable("key") String key) {kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");}// 指定key发送,不指定分区
// 根据key做hash,相同的key到同一个分区@GetMapping("/kafka/keysend/{key}")public void setKey(@PathVariable("key") String key) {kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");}// 什么也不指定@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}
消费者代码如下:
//指定消费组消费
@Component
public class PartitionConsumer {private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);//分区消费@KafkaListener(topics = {"test"},topicPattern = "0")public void onMessage(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=0,message:[{}]", msg);}}@KafkaListener(topics = {"test"},topicPattern = "1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=1,message:[{}]", msg);}}
}
1)测试默认分区策略,发送什么也不指定的消息

可以发现发送的是同一条的数据,他是跟partition轮询发送的;
2)测试指定分区

3)按照key的hashcode来分区

1.3 消息消费
1.3.1 消费者分组
public class GroupConsumer {private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);//组1,消费者1@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-1 , message:{}", msg);}}//组1,消费者2@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-2 , message:{}", msg);}}//组2,只有一个消费者@KafkaListener(topics = {"test"},groupId = "group2")public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group2 , message:{}", msg);}}
}
启动:

需要注意的是,注意分区数与消费者数的搭配,如果(消费者数 > 分区数量),消费者将会出现闲置,浪费资源!
1.3.2 位移提交
1)自动提交,我们在前面设置了以下两个选项,则kafka会延时设置自动提交:

2)手动提交,有些时候我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复,如下我们配置手动提交:
@Configuration
public class MyOffsetConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 注意这里!!!设置手动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));// ack模式:// AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种://// RECORD// 每处理一条commit一次//// BATCH(默认)// 每次poll的时候批量提交一次,频率取决于每次poll的调用频率//// TIME// 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)//// COUNT// 累积达到ackCount次的ack去commit//// COUNT_TIME// ackTime或ackCount哪个条件先满足,就commit//// MANUAL// listener负责ack,但是背后也是批量上去//// MANUAL_IMMEDIATE// listner负责ack,每调用一次,就立即commitfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}相关文章:
互联网全景消息(10)之Kafka深度剖析(中)
一、深入应用 1.1 SpringBoot集成Kafka 引入对应的依赖。 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupI…...
Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动同步
Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动同步 目录 Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动…...
pytorch小记(一):pytorch矩阵乘法:torch.matmul(x, y)
pytorch小记(一):pytorch矩阵乘法:torch.matmul(x, y)/ x y 代码代码 1:torch.matmul(x, y)输入张量:计算逻辑:输出结果: 代码 2:y y.view(4,1)…...
PyTorch环境配置常见报错的解决办法
目标 小白在最基础的环境配置里一般都会出现许多问题。 这里把一些常见的问题分享出来。希望可以节省大家一些时间。 最终目标是可以在cmd虚拟环境里进入jupyter notebook,new的时候有对应的环境,并且可以跑通所有的import code。 第一步:…...
罗永浩再创业,这次盯上了 AI?
罗永浩,1972年7月9日生于中国延边朝鲜族自治州的一个军人家庭,是一名朝鲜族人;早年在新东方授课,2004年当选 “网络十大红人” ;2006年8月1日,罗永浩创办牛博网;2008年5月,罗永浩注册…...
VUE3 provide 和 inject,跨越多层级组件传递数据
provide 和 inject 是 Vue 3 提供的 API,主要用于实现祖先组件与后代组件之间的依赖注入。它们可以让你在组件树中,跨越多层组件传递数据,而不需要通过 props 或事件的方式逐层传递。这个机制主要用于状态共享、插件系统或某些跨层级的功能。…...
git打补丁
1、应用场景 跨仓库升级 开发项目B使用的是开源项目A。开源项目A发现漏洞,作者进行了修复,我们可以通过使用git补丁的方式,将作者修改的内容复制到我 们的项目B中。 2、TortoiseGit方式 源仓库 格式化补丁 根据提交数量,生成…...
机械燃油车知识图谱、知识大纲、知识结构(持续更新...)
一、发动机 曲柄连杆机构 配气机构 点火系统 起动系统 燃油供给系统 润滑系统 冷却系统 二、底盘 (一)传动系统 1、离合器 2、变速器 3、万向传动装置 4、驱动桥 (二)行驶系统 1、车架 2、车桥 3、悬架 4、车轮 &a…...
Vue3学习总结
一、Vue 3 基础搭建与核心语法 1.创建 Vue 3 应用 在项目的入口文件 main.js 中,通过以下代码创建 Vue 3 应用实例: import { createApp } from vue; import App from ./App.vue;const app createApp(App); app.mount(#app); 这几行代码的作用是引入…...
Type-C双屏显示器方案
在数字化时代,高效的信息处理和视觉体验已成为我们日常生活和工作的关键需求。随着科技的进步,一款结合了便携性和高效视觉输出的设备——双屏便携屏,逐渐崭露头角,成为追求高效工作和娱乐体验人群的新宠。本文将深入探讨双屏便携…...
【读书与思考】焦虑与内耗
【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】【读书与思考】 导言 今天一个朋友和我说,最近比较焦虑和内耗,无心工作和学习,我问他你焦虑内耗的时候,时间主要花在哪了,他告诉我说主要花在看有关移…...
基于python的网页表格数据下载--转excel
基于 Python 的网页表格数据爬取与下载:以维基百科为例 目录 基于 Python 的网页表格数据爬取与下载:以维基百科为例1. 背景介绍2. 工具与环境3. 操作步骤1. 获取网页内容2. 定位表格元素3. 表格变身 Pandas DataFrame4. 检查数据,收工!5. 进阶玩法与优化6. 完整代码4. 结果…...
Vue.js开发入门:从零开始搭建你的第一个项目
前言 嘿,小伙伴们!今天咱们来聊聊 Vue.js,一个超火的前端框架。如果你是编程小白,别怕,跟着我一步步来,保证你能轻松上手,搭建起属于自己的第一个 Vue 项目。Vue.js 可能听起来有点高大上&#…...
LS1046+XILINX XDMA PCIE调通
欢迎点赞收藏,欢迎私下讨论技术,分享技术 硬件平台 :NXP LS1046 XILINX FPGA 软件平台:LINUX 4.19.68 buildroot LS1046 PEX3 接 XILINX FPGA,linux使用designware的PCI主控制器。下载XILINX DMA驱动,解…...
HarmonyOS:@LocalBuilder装饰器: 维持组件父子关系
一、前言 当开发者使用Builder做引用数据传递时,会考虑组件的父子关系,使用了bind(this)之后,组件的父子关系和状态管理的父子关系并不一致。为了解决组件的父子关系和状态管理的父子关系保持一致的问题,引入LocalBuilder装饰器。…...
YOLOv10-1.1部分代码阅读笔记-downloads.py
downloads.py ultralytics\utils\downloads.py 目录 downloads.py 1.所需的库和模块 2.def is_url(url, checkFalse): 3.def delete_dsstore(path, files_to_delete(".DS_Store", "__MACOSX")): 4.def zip_directory(directory, compressTrue, ex…...
计算机图形学【绘制立方体和正六边形】
工具介绍 OpenGL:一个跨语言的图形API,用于渲染2D和3D图形。它提供了绘制图形所需的底层功能。 GLUT:OpenGL的一个工具库,简化了窗口创建、输入处理和其他与图形环境相关的任务。 使用的函数 1. glClear(GL_COLOR_BUFFER_BIT |…...
基于django中医药数据可视化平台(源码+lw+部署文档+讲解),源码可白嫖!
摘要 时代在飞速进步,每个行业都在努力发展现在先进技术,通过这些先进的技术来提高自己的水平和优势,中医药管理平台当然不能排除在外。中医药数据可视化平台是在实际应用和软件工程的开发原理之上,运用Python语言、ECharts技术、…...
kafka消费堆积问题探索
背景 我们的商城项目用PHP写的,原本写日志方案用的是PHP的方案,但是,这个方案导致资源消耗一直降不下来,使用了20个CPU。后面考虑使用通过kafka的方案写日志,商城中把产生的日志丢到kafka中,在以go写的项目…...
Vue.js 使用插槽(Slots)优化组件结构
Vue.js 使用插槽(Slots)优化组件结构 今天我们聊聊 Vue.js 的一个超实用功能——插槽(Slots)。插槽是 Vue 组件开发中的神器,用好它,你可以让组件变得更灵活、更可复用,还能写出优雅的代码结构…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
遍历 Map 类型集合的方法汇总
1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...
智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
ETLCloud可能遇到的问题有哪些?常见坑位解析
数据集成平台ETLCloud,主要用于支持数据的抽取(Extract)、转换(Transform)和加载(Load)过程。提供了一个简洁直观的界面,以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
爬虫基础学习day2
# 爬虫设计领域 工商:企查查、天眼查短视频:抖音、快手、西瓜 ---> 飞瓜电商:京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空:抓取所有航空公司价格 ---> 去哪儿自媒体:采集自媒体数据进…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
