kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)
文章目录
- 1、kafka数据传递语义
- 2、kafka生产者事务
- 3、事务消息发送
- 3.1、application.yml配置
- 3.2、创建生产者监听器
- 3.3、创建生产者拦截器
- 3.4、发送消息测试
- 3.5、使用Java代码创建主题分区副本
- 3.6、屏蔽 kafka debug 日志 logback.xml
- 3.7、引入spring-kafka依赖
- 3.8、控制台日志
1、kafka数据传递语义
kafka发送消息时是否需要重试
仅发送一次:生产者发送消息后不重试,只发送一次 可能丢失消息 效率最高至少一次:生产者发送消息后重试,可能重试多次 效率差精准一次发送:生产者发送消息后无论是否重复发送 发送了多少次,在 kafka broker 中只保存一次消息,通过幂等性 + 生产者事务来实现
kafak天然支持幂等性,每个消息头中带了一个唯一的标志 kafka broker 根据此标志判断消息是否已经发送过,生产者事务可以保证数据没有最终发送成功时,消费者不可以消费,如果生产者发送消息时出现异常会自动回滚(清除之前发送的事务中的消息)
kafka天然幂等性:但是指的是生产者事务 生产消息时的幂等性,发送消息时消息中带唯一标识、broker接收到消息时如果重复不再保存,事务没提交消费者不能消费改消息
2、kafka生产者事务
保证消息生产的幂等性
一组消息要么一起成功 被消费者消息 要么一起失败都不能被消费者消费
配置ack为-1 分区所有副本均落盘成功配置生产者重试(发送失败可以继续发送:需要保证发送失败后再次发送消息到kafka实现 精准一次发送)需要给事务分配事务id(区分一个事务中的多条消息)
3、事务消息发送
3.1、application.yml配置
server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 1 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)transaction-id-prefix: tx_ # 事务id前缀:配置后producer自动开启事务batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
3.2、创建生产者监听器
package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1 leader落盘 broker ack之后 才成功//ack为 -1 分区所有副本全部落盘 broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}
3.3、创建生产者拦截器
package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
3.4、发送消息测试
package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}//kafka事务支持spring-tx的事务注解//单元测试中的事务会自动回滚@Testvoid testTransaction() throws IOException {//多个消息的发送在一个事务中执行kafkaTemplate.executeInTransaction((var1) -> {//通过一个事务中的operations对象来发送消息,执行事务操作var1.send("my_topic1",0,"", "spring-kafka-事务1");var1.send("my_topic1",0,"", "spring-kafka-事务2");int i = 1/0;var1.send("my_topic1",0,"", "spring-kafka-事务3");return "发送消息失败";});System.in.read();}
}
3.5、使用Java代码创建主题分区副本
package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}
3.6、屏蔽 kafka debug 日志 logback.xml
<configuration> <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>
3.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
3.8、控制台日志
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务1
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务2
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务1,offset = -1
异常信息:Failing batch since transaction was aborted
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务2,offset = -1
异常信息:Failing batch since transaction was abortedjava.lang.ArithmeticException: / by zero

相关文章:
kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)
文章目录 1、kafka数据传递语义2、kafka生产者事务3、事务消息发送3.1、application.yml配置3.2、创建生产者监听器3.3、创建生产者拦截器3.4、发送消息测试3.5、使用Java代码创建主题分区副本3.6、屏蔽 kafka debug 日志 logback.xml3.7、引入spring-kafka依赖3.8、控制台日志…...
免费!GPT-4o发布,实时语音视频丝滑交互
We’re announcing GPT-4o, our new flagship model that can reason across audio, vision, and text in real time. 5月14日凌晨,OpenAI召开了春季发布会,发布会上公布了新一代旗舰型生成式人工智能大模型【GPT-4o】,并表示该模型对所有免费…...
DevOps的原理及应用详解(四)
本系列文章简介: 在当今快速变化的商业环境中,企业对于软件交付的速度、质量和安全性要求日益提高。传统的软件开发和运维模式已经难以满足这些需求,因此,DevOps(Development和Operations的组合)应运而生,成为了解决这些问题的有效方法。 DevOps是一种强调软件开发人员(…...
关于选择,关于处事
一个人选择应该选择的是勇敢,选择不应该选择的是无奈。放弃,不该放弃的是懦夫,不放弃应该放弃的是睿智。所以,碰到事的时候要先静,先不管什么事,先静下来,先淡定,先从容。在生活里要…...
大话设计模式解读02-策略模式
本篇文章,来解读《大话设计模式》的第2章——策略模式。并通过Qt和C代码实现实例代码的功能。 1 策略模式 策略模式作为一种软件设计模式,指对象有某个行为,但是在不同的场景中,该行为有不同的实现算法。 策略模式的特点&#…...
展会邀请 | 龙智即将亮相2024上海国际嵌入式展,带来安全合规、单一可信数据源、可追溯、高效协同的嵌入式开发解决方案
2024年6月12日至14日,备受全球嵌入式系统产业和社群瞩目的2024上海国际嵌入式展(embedded world china 2024)即将盛大开幕,龙智将携行业领先的嵌入式开发解决方案亮相 640展位 。 此次参展,龙智将全面展示专为嵌入式行…...
codeforce round951 div2
A guess the maximum 问题: 翻译一下就是求所有相邻元素中max - 1的最小值 代码: #include <iostream> #include <algorithm>using namespace std;const int N 5e4;int a[N]; int n;void solve() {cin >> n;int ans 0x3f3f3f3f;…...
arcgis开发记录
目录 文章目录 [toc]**arcgis JavaScript API安装**1. arcgisAPI下载地址:https://developers.arcgis.com/downloads/2. 4.4版本API:本地配置3. 3.18版本修改方法 **angular2中加载arcgis JS API**** arcgis加载图层 并显示图层上点的信息****使用图层上…...
RPA-UiBot6.0数据整理机器人—杂乱数据秒变报表
前言 友友们是否常常因为杂乱的数据而烦恼?数据分类、排序、筛选这些繁琐的任务是否占据了友友们的大部分时间?这篇博客将为友友们带来一个新的解决方案,让我们共同学习如何运用RPA数据整理机器人,实现杂乱数据的快速整理,为你的工作减负增效! 在这里,友友们将了…...
Application UI
本节包含关于如何用DevExpress控件模拟许多流行的应用程序ui的教程。 Windows 11 UI Windows 11和最新一代微软Office产品启发的UI。 Office Inspired UI Word、Excel、PowerPoint和Visio等微软Office应用程序启发的UI。 如何:手动构建Office风格的UI 本教程演示…...
关于 Redis 中集群
哨兵机制中总结到,它并不能解决存储容量不够的问题,但是集群能。 广义的集群:只要有多个机器,构成了分布式系统,都可以称之为一个“集群”,例如主从结构中的哨兵模式。 狭义的集群:redis 提供的…...
C++必修:探索C++的内存管理
✨✨ 欢迎大家来到贝蒂大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C学习 贝蒂的主页:Betty’s blog 1. C/C的内存分布 我们首先来看一段代码及其相关问题 int globalVar 1; static…...
python列表---基本语法(浅拷贝,深拷贝等)
文章目录 引言:列表的注意事项1 list中的浅拷贝与深拷贝1.1浅拷贝(Shallow Copy)浅拷贝的方法浅拷贝的效果1.2深拷贝(Deep Copy)深拷贝的方法深拷贝的效果1.3 总结:浅拷贝 vs 深拷贝1.4 为什么浅拷贝顶层元素如果是不可变数据就不能共享,不是传的是引用就相当于传的是地…...
go语言接口之sort.Interface接口
排序操作和字符串格式化一样是很多程序经常使用的操作。尽管一个最短的快排程序只要15 行就可以搞定,但是一个健壮的实现需要更多的代码,并且我们不希望每次我们需要的时候 都重写或者拷贝这些代码。 幸运的是,sort包内置的提供了根据一些排序…...
android:text 总为大写字母的原因
当设置某个 Button 的 text 为英文时,界面上显示的是该英文的大写形式(uppercase)。例如: <Buttonandroid:id"id/btn"android:layout_width"wrap_content"android:layout_height"wrap_content"…...
CISCN2024 初赛 wp 部分复现(Re)
Misc 1. 火锅链观光打卡 答题即可 Re 1. asm_re 感谢智谱清言,可以读出大致加密算法 这是输入 这是加密部分 这里判断 找到疑似密文的部分,手动改一下端序 #asm_wp def dec(char):return (((char - 0x1E) ^ 0x4D) - 0x14) // 0x50 #return (ord(cha…...
YOLOv10、YOLOv9 和 YOLOv8 在实际视频中的对比
引言 目标检测技术是计算机视觉领域的核心任务之一,YOLO(You Only Look Once)系列模型凭借其高效的检测速度和准确率成为了业界的宠儿。本文将详细对比YOLOv10、YOLOv9和YOLOv8在实际视频中的表现,探讨它们在性能、速度和实际应用…...
热题系列章节5
169. 多数元素 给定一个大小为 n 的数组,找到其中的多数元素。多数元素是指在数组中出现次数大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示例 1: 输入: [3,2,3] 输出: 3 示例 2: 输入: [2,2,1,1,1,2,2] 输出:…...
ArcGIS for js 4.x 加载图层
二维: 1、创建vue项目 npm create vitelatest 2、安装ArcGIS JS API依赖包 npm install arcgis/core 3、引入ArcGIS API for JavaScript模块 <script setup> import "arcgis/core/assets/esri/themes/light/main.css"; import Map from arcgis…...
Three.js和Babylon.js,webGL中的对比效果分析!
hello,今天分享一些three.js和babylon.js常识,为大家选择three.js还是babylon.js做个分析,欢迎点赞评论转发。 一、Babylon.js是什么 Babylon.js是一个基于WebGL技术的开源3D游戏引擎和渲染引擎。它提供了一套简单易用的API,使开发…...
DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
【堆垛策略】设计方法
堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下…...
comfyui 工作流中 图生视频 如何增加视频的长度到5秒
comfyUI 工作流怎么可以生成更长的视频。除了硬件显存要求之外还有别的方法吗? 在ComfyUI中实现图生视频并延长到5秒,需要结合多个扩展和技巧。以下是完整解决方案: 核心工作流配置(24fps下5秒120帧) #mermaid-svg-yP…...
Java详解LeetCode 热题 100(26):LeetCode 142. 环形链表 II(Linked List Cycle II)详解
文章目录 1. 题目描述1.1 链表节点定义 2. 理解题目2.1 问题可视化2.2 核心挑战 3. 解法一:HashSet 标记访问法3.1 算法思路3.2 Java代码实现3.3 详细执行过程演示3.4 执行结果示例3.5 复杂度分析3.6 优缺点分析 4. 解法二:Floyd 快慢指针法(…...
数据可视化交互
目录 【实验目的】 【实验原理】 【实验环境】 【实验步骤】 一、安装 pyecharts 二、下载数据 三、实验任务 实验 1:AQI 横向对比条形图 代码说明: 运行结果: 实验 2:AQI 等级分布饼图 实验 3:多城市 AQI…...
leetcode 386. 字典序排数 中等
给你一个整数 n ,按字典序返回范围 [1, n] 内所有整数。 你必须设计一个时间复杂度为 O(n) 且使用 O(1) 额外空间的算法。 示例 1: 输入:n 13 输出:[1,10,11,12,13,2,3,4,5,6,7,8,9]示例 2: 输入:n 2…...
