kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)
文章目录
- 1、kafka数据传递语义
- 2、kafka生产者事务
- 3、事务消息发送
- 3.1、application.yml配置
- 3.2、创建生产者监听器
- 3.3、创建生产者拦截器
- 3.4、发送消息测试
- 3.5、使用Java代码创建主题分区副本
- 3.6、屏蔽 kafka debug 日志 logback.xml
- 3.7、引入spring-kafka依赖
- 3.8、控制台日志
1、kafka数据传递语义
kafka发送消息时是否需要重试
仅发送一次:生产者发送消息后不重试,只发送一次 可能丢失消息 效率最高
至少一次:生产者发送消息后重试,可能重试多次 效率差
精准一次发送:生产者发送消息后无论是否重复发送 发送了多少次,在 kafka broker 中只保存一次消息,通过幂等性 + 生产者事务来实现
kafak天然支持幂等性,每个消息头中带了一个唯一的标志 kafka broker 根据此标志判断消息是否已经发送过,生产者事务可以保证数据没有最终发送成功时,消费者不可以消费,如果生产者发送消息时出现异常会自动回滚(清除之前发送的事务中的消息)
kafka天然幂等性:但是指的是生产者事务 生产消息时的幂等性,发送消息时消息中带唯一标识、broker接收到消息时如果重复不再保存,事务没提交消费者不能消费改消息
2、kafka生产者事务
保证消息生产的幂等性
一组消息要么一起成功 被消费者消息 要么一起失败都不能被消费者消费
配置ack为-1 分区所有副本均落盘成功
配置生产者重试(发送失败可以继续发送:需要保证发送失败后再次发送消息到kafka实现 精准一次发送)
需要给事务分配事务id(区分一个事务中的多条消息)
3、事务消息发送
3.1、application.yml配置
server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 1 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)transaction-id-prefix: tx_ # 事务id前缀:配置后producer自动开启事务batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
3.2、创建生产者监听器
package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1 leader落盘 broker ack之后 才成功//ack为 -1 分区所有副本全部落盘 broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}
3.3、创建生产者拦截器
package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
3.4、发送消息测试
package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}//kafka事务支持spring-tx的事务注解//单元测试中的事务会自动回滚@Testvoid testTransaction() throws IOException {//多个消息的发送在一个事务中执行kafkaTemplate.executeInTransaction((var1) -> {//通过一个事务中的operations对象来发送消息,执行事务操作var1.send("my_topic1",0,"", "spring-kafka-事务1");var1.send("my_topic1",0,"", "spring-kafka-事务2");int i = 1/0;var1.send("my_topic1",0,"", "spring-kafka-事务3");return "发送消息失败";});System.in.read();}
}
3.5、使用Java代码创建主题分区副本
package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}
3.6、屏蔽 kafka debug 日志 logback.xml
<configuration> <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>
3.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
3.8、控制台日志
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务1
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务2
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务1,offset = -1
异常信息:Failing batch since transaction was aborted
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务2,offset = -1
异常信息:Failing batch since transaction was abortedjava.lang.ArithmeticException: / by zero
相关文章:
kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)
文章目录 1、kafka数据传递语义2、kafka生产者事务3、事务消息发送3.1、application.yml配置3.2、创建生产者监听器3.3、创建生产者拦截器3.4、发送消息测试3.5、使用Java代码创建主题分区副本3.6、屏蔽 kafka debug 日志 logback.xml3.7、引入spring-kafka依赖3.8、控制台日志…...
免费!GPT-4o发布,实时语音视频丝滑交互
We’re announcing GPT-4o, our new flagship model that can reason across audio, vision, and text in real time. 5月14日凌晨,OpenAI召开了春季发布会,发布会上公布了新一代旗舰型生成式人工智能大模型【GPT-4o】,并表示该模型对所有免费…...
DevOps的原理及应用详解(四)
本系列文章简介: 在当今快速变化的商业环境中,企业对于软件交付的速度、质量和安全性要求日益提高。传统的软件开发和运维模式已经难以满足这些需求,因此,DevOps(Development和Operations的组合)应运而生,成为了解决这些问题的有效方法。 DevOps是一种强调软件开发人员(…...
关于选择,关于处事
一个人选择应该选择的是勇敢,选择不应该选择的是无奈。放弃,不该放弃的是懦夫,不放弃应该放弃的是睿智。所以,碰到事的时候要先静,先不管什么事,先静下来,先淡定,先从容。在生活里要…...
大话设计模式解读02-策略模式
本篇文章,来解读《大话设计模式》的第2章——策略模式。并通过Qt和C代码实现实例代码的功能。 1 策略模式 策略模式作为一种软件设计模式,指对象有某个行为,但是在不同的场景中,该行为有不同的实现算法。 策略模式的特点&#…...
展会邀请 | 龙智即将亮相2024上海国际嵌入式展,带来安全合规、单一可信数据源、可追溯、高效协同的嵌入式开发解决方案
2024年6月12日至14日,备受全球嵌入式系统产业和社群瞩目的2024上海国际嵌入式展(embedded world china 2024)即将盛大开幕,龙智将携行业领先的嵌入式开发解决方案亮相 640展位 。 此次参展,龙智将全面展示专为嵌入式行…...
codeforce round951 div2
A guess the maximum 问题: 翻译一下就是求所有相邻元素中max - 1的最小值 代码: #include <iostream> #include <algorithm>using namespace std;const int N 5e4;int a[N]; int n;void solve() {cin >> n;int ans 0x3f3f3f3f;…...
arcgis开发记录
目录 文章目录 [toc]**arcgis JavaScript API安装**1. arcgisAPI下载地址:https://developers.arcgis.com/downloads/2. 4.4版本API:本地配置3. 3.18版本修改方法 **angular2中加载arcgis JS API**** arcgis加载图层 并显示图层上点的信息****使用图层上…...
RPA-UiBot6.0数据整理机器人—杂乱数据秒变报表
前言 友友们是否常常因为杂乱的数据而烦恼?数据分类、排序、筛选这些繁琐的任务是否占据了友友们的大部分时间?这篇博客将为友友们带来一个新的解决方案,让我们共同学习如何运用RPA数据整理机器人,实现杂乱数据的快速整理,为你的工作减负增效! 在这里,友友们将了…...
Application UI
本节包含关于如何用DevExpress控件模拟许多流行的应用程序ui的教程。 Windows 11 UI Windows 11和最新一代微软Office产品启发的UI。 Office Inspired UI Word、Excel、PowerPoint和Visio等微软Office应用程序启发的UI。 如何:手动构建Office风格的UI 本教程演示…...
关于 Redis 中集群
哨兵机制中总结到,它并不能解决存储容量不够的问题,但是集群能。 广义的集群:只要有多个机器,构成了分布式系统,都可以称之为一个“集群”,例如主从结构中的哨兵模式。 狭义的集群:redis 提供的…...
C++必修:探索C++的内存管理
✨✨ 欢迎大家来到贝蒂大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C学习 贝蒂的主页:Betty’s blog 1. C/C的内存分布 我们首先来看一段代码及其相关问题 int globalVar 1; static…...
python列表---基本语法(浅拷贝,深拷贝等)
文章目录 引言:列表的注意事项1 list中的浅拷贝与深拷贝1.1浅拷贝(Shallow Copy)浅拷贝的方法浅拷贝的效果1.2深拷贝(Deep Copy)深拷贝的方法深拷贝的效果1.3 总结:浅拷贝 vs 深拷贝1.4 为什么浅拷贝顶层元素如果是不可变数据就不能共享,不是传的是引用就相当于传的是地…...
go语言接口之sort.Interface接口
排序操作和字符串格式化一样是很多程序经常使用的操作。尽管一个最短的快排程序只要15 行就可以搞定,但是一个健壮的实现需要更多的代码,并且我们不希望每次我们需要的时候 都重写或者拷贝这些代码。 幸运的是,sort包内置的提供了根据一些排序…...
android:text 总为大写字母的原因
当设置某个 Button 的 text 为英文时,界面上显示的是该英文的大写形式(uppercase)。例如: <Buttonandroid:id"id/btn"android:layout_width"wrap_content"android:layout_height"wrap_content"…...
CISCN2024 初赛 wp 部分复现(Re)
Misc 1. 火锅链观光打卡 答题即可 Re 1. asm_re 感谢智谱清言,可以读出大致加密算法 这是输入 这是加密部分 这里判断 找到疑似密文的部分,手动改一下端序 #asm_wp def dec(char):return (((char - 0x1E) ^ 0x4D) - 0x14) // 0x50 #return (ord(cha…...
YOLOv10、YOLOv9 和 YOLOv8 在实际视频中的对比
引言 目标检测技术是计算机视觉领域的核心任务之一,YOLO(You Only Look Once)系列模型凭借其高效的检测速度和准确率成为了业界的宠儿。本文将详细对比YOLOv10、YOLOv9和YOLOv8在实际视频中的表现,探讨它们在性能、速度和实际应用…...
热题系列章节5
169. 多数元素 给定一个大小为 n 的数组,找到其中的多数元素。多数元素是指在数组中出现次数大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示例 1: 输入: [3,2,3] 输出: 3 示例 2: 输入: [2,2,1,1,1,2,2] 输出:…...
ArcGIS for js 4.x 加载图层
二维: 1、创建vue项目 npm create vitelatest 2、安装ArcGIS JS API依赖包 npm install arcgis/core 3、引入ArcGIS API for JavaScript模块 <script setup> import "arcgis/core/assets/esri/themes/light/main.css"; import Map from arcgis…...
Three.js和Babylon.js,webGL中的对比效果分析!
hello,今天分享一些three.js和babylon.js常识,为大家选择three.js还是babylon.js做个分析,欢迎点赞评论转发。 一、Babylon.js是什么 Babylon.js是一个基于WebGL技术的开源3D游戏引擎和渲染引擎。它提供了一套简单易用的API,使开发…...
flask实现抽奖程序(一)
后端代码E:\LearningProject\lottery\app.py from flask import Flask, render_template import randomapp Flask(__name__)employees [赵一, 钱二, 孙三, 李四, 周五, 吴六, 郑七, 王八]app.route(/) def hello_world():return render_template(index.html, employeesemplo…...
Python中数据库连接的管理
在现代应用程序中,数据库是一个至关重要的组件。无论是小型应用还是大型分布式系统,良好的数据库连接管理都是确保系统高效、可靠运行的关键。本文将详细介绍在Python中管理数据库连接的最佳实践和技术,包括连接池、ORM(对象关系映…...
【JAVA技术】mybatis 数据库敏感字段加解密方案
引言:自从有公司项目前2年做了三级等保,每年一度例行公事,昨天继续配合做等保测试。这2天比较忙,这里整理之前写的一篇等保技术文章。 正文: 现在公司项目基本用mybatis实现,但由于项目跨度年份比较久&…...
Collections工具类及其案例
package exercise;public class Demo1 {public static void main(String[] args) {//可变参数//方法形参的个数是可以发生变化的//格式:属性类型...名字//int...argsint sum getSum(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);System.out.println(sum);}//底层:可…...
Duck Bro的第512天创作纪念日
Tips:发布的文章将会展示至 里程碑专区 ,也可以在 专区 内查看其他创作者的纪念日文章 我的创作纪念日第512天 文章目录 我的创作纪念日第512天一、与CSDN平台的相遇1. 为什么在CSDN这个平台进行创作?2. 创作这些文章是为了赚钱吗?…...
【机器学习】GPT-4中的机器学习如何塑造人类与AI的新对话
🚀时空传送门 🔍引言📕GPT-4概述🌹机器学习在GPT-4中的应用🚆文本生成与摘要🎈文献综述与知识图谱构建🚲情感分析与文本分类🚀搜索引擎优化💴智能客服与虚拟助手…...
晨控CK-UR12-E01与欧姆龙NX/NJ系列EtherNet/IP通讯手册
晨控CK-UR12-E01与欧姆龙NX/NJ系列EtherNet/IP通讯手册 晨控CK-UR12-E01 是天线一体式超高频读写器头,工作频率默认为902MHz~928MHz,符合EPC Global Class l Gen 2/IS0-18000-6C 标准,最大输出功率 33dBm。读卡器同时…...
模板显式、隐式实例化和(偏)特化、具体化的详细分析
最近看了<The C Programing Language>看到了模板的特化,突然想起来<C Primer>上说的显式具体化、隐式具体化、特化、偏特化、具体化等概念弄得头晕脑胀,我在网上了找了好多帖子,才把概念给理清楚。 看着这么多叫法,其…...
软件设计师笔记-计算机系统基础知识
CPU的功能 CPU(中央处理器)是计算机的核心部件,负责执行计算机的指令和处理数据。它的功能主要可以分为程序控制、操作控制、时间控制和数据处理四个方面: 程序控制:CPU的首要任务是执行存储在内存中的程序。程序控制功能确保CPU能够按照程序的指令序列,一条一条地执行。…...
flink 作业动态维护更新,不重启flink,不提交作业
Flink任务实时获取并更新规则_flink任务流实时变更-CSDN博客 一种动态更新flink任务配置的方法_flink 数据源 动态更新-CSDN博客 Flink CEP在实时风控场景的落地与优化 最佳实践 - 在SQL任务中使用Flink CEP - 《实时计算用户手册-v4.5.0》 Flink SQL CEP详解-CSDN博客 如…...
新品发布会领导致辞/贵州网站seo
对于做板式家具、全屋定制家具的客户来说,对于雕刻机早已经不陌生,关于雕刻机主要部件你了解多少呢?一起来看下吧雕刻机控制系统一、控制系统一般在雕刻机开料机上用的控制系统会有以下几种宝元2000、北京凯恩帝、山龙、新代、维宏、锐志天宏…...
wordpress数字资源下载会员/泉州百度首页优化
开源软件越来越受欢迎。Forrester Research 最近的研究发现,开源模型现在在应用程序开发中处于突出状态,而自定义编写的代码现在通常只占大多数应用的 10% 到 20%。 随着开源模式成为开发人员的实际标准,对其安全性的担忧变得更加突出。正如…...
pcb高端网站建设/深圳关键词优化
C/C程序的内存分区 1)、栈区(stack)— 由编译器自动分配释放 ,存放函数的参数值,局部变量的值等。其 操作方式类似于数据结构中的栈。 2)、堆区(heap) — 一般由程序员分配释放&a…...
东营网站建设规划书/长沙专业竞价优化公司
43、replace()函数描述:把str.中的 old 替换成 new,如果 count 指定,则替换不超过 count次.。语法:str.replace(old, new, count)参数:old —— 将被替换的子字符串。new —— 新子字符串,用于替换old子字符串。count …...
网站怎么做要钱吗/长尾关键词挖掘站长工具
图像处理之霍夫变换(直线检測算法)霍夫变换是图像变换中的经典手段之中的一个,主要用来从图像中分离出具有某种同样特征的几何形状(如,直线,圆等)。霍夫变换寻找直线与圆的方法相比与其他方法能够更好的降低噪声干扰。经典的霍夫变换经常使用…...
高端网站建设服务/媒体发稿平台
欢迎关注”生信修炼手册”!tophat-fusion 是一款利用RNA_seq 数据鉴定融合基因的工具,官网链接如下:http://ccb.jhu.edu/software/tophat/fusion_index.shtml该软件是集成在tophat软件中的,只需要安装好tophat之后就可以使用了,使用方法也比较…...