当前位置: 首页 > news >正文

kafka-重试和死信主题(SpringBoot整合Kafka)

文章目录

  • 1、重试和死信主题
  • 2、死信队列
  • 3、代码演示
    • 3.1、appication.yml
    • 3.2、引入spring-kafka依赖
    • 3.3、创建SpringBoot启动类
    • 3.4、创建生产者发送消息
    • 3.5、创建消费者消费消息

1、重试和死信主题

kafka默认支持重试和死信主题

  1. 重试主题:当消费者消费消息异常时,手动ack如果有异常继续向后消费,为了保证消息尽可能成功消费,可以将消费异常的消息扔到重试主题,再通过重试主题的消费者尝试消费
  2. 死信主题:当一个消息消费多次仍然失败,可以将该消息存到死信主题。避免漏消息,以后可以人工介入手动解决消息消费失败的问题

2、死信队列

在Kafka中,DLT通常指的是 Dead Letter Topic(死信队列)

Dead Letter Topic(DLT)的定义与功能

  1. 背景:原生Kafka是不支持Retry Topic和DLT的,但Spring Kafka在客户端实现了这两个功能。
  2. 功能:当消息在Kafka中被消费时,如果消费逻辑抛出异常,并且重试策略(如默认重试10次后)仍无法成功处理该消息,Spring Kafka会将该消息发送到DLT中。
  3. 自定义处理:可以通过自定义SeekToCurrentErrorHandler来控制消费失败后的处理逻辑,例如添加重试间隔、设置重试次数等。如果在重试后消息仍然消费失败,Spring Kafka会将该消息发送到DLT。

DLT的使用与意义

  1. 错误处理:DLT为Kafka提供了一种机制来处理那些无法被成功消费的消息,从而避免了消息的丢失或阻塞。
  2. 监控与告警:通过对DLT的监控,可以及时发现并处理那些无法被成功消费的消息,从而保障Kafka系统的稳定性和可靠性。
  3. 二次处理:对于发送到DLT的消息,可以进行二次处理,如手动干预、修复数据等,以确保这些消息能够最终得到处理。

总之,在Kafka中,DLT是一个用于处理无法被成功消费的消息的特殊Topic,它提供了一种灵活且可靠的机制来保障Kafka系统的稳定性和可靠性。

3、代码演示

3.1、appication.yml

server:port: 8120
# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量# 调用ack方法时才会提交ack给kafka
#      enable-auto-commit: false# 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始# earliest:从最早的消息开始消费# latest:第一次从LEO位置开始消费# none:如果主题分区没有偏移量,则抛出异常auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer# json反序列化器
#      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerproperties:spring.json.trusted.packages: "*"listener:# 手动ack:manual手动ack时 如果有异常会尝试一直消费
#      ack-mode: manual# 手动ack:消费有异常时停止ack-mode: manual_immediate

3.2、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.3、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

3.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.LocalDateTime;
@SpringBootTest
public class KafkaProducerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid sendRertyMsg(){kafkaTemplate.send("retry_topic",0,"","重试机制和死信队列"+"_"+ LocalDateTime.now());}
}

在这里插入图片描述

没有指定分区,默认会创建一个分区,即使此时有3个kafka实例,也只会用一个,因为只有一个分区

在这里插入图片描述
在这里插入图片描述

[[{"partition": 0,"offset": 0,"msg": "重试机制和死信队列_2024-06-07T15:49:37.398841600","timespan": 1717746578357,"date": "2024-06-07 07:49:38"}]
]

3.5、创建消费者消费消息

给一个消费者配重试主题的时候,死信主题消费者一般不写

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.annotation.Backoff;
@Component
public class MyKafkaListenerAck {/*** 自动ack可能会导致漏消息*      spring-kafka:*                     自动ack 如果有异常,会死循环获取消息重新消费*                        不能继续向后消费消息,会导致消息积压**                    手动ack 配置了手动ack,且ack-mode为manual_immediate时,*                        如果消息消费失败,会继续向后消费* @param record*///指定消费者消费异常使用重试+死信主题   必须结合手动ack使用@RetryableTopic(attempts = "3",numPartitions = "3",  //重试主题的分区数量backoff =@Backoff(value = 2_000L),  //重试的时间间隔autoCreateTopics ="true",retryTopicSuffix = "-myRetry",  //指定重试主题创建时的后缀名:使用原主题的名称拼接后缀生成名称 -retrydltTopicSuffix = "-myDlt" //指定DLT主题创建时的后缀名:使用原主题的名称拼接后缀生成名称 -dlt)@KafkaListener(groupId = "my_group1",topicPartitions = {@TopicPartition(topic = "retry_topic",partitions = {"0","1","2"})})public void consumeByRetry(ConsumerRecord<String, String> record, Acknowledgment ack) {System.out.println("consumeByRetry消费者获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());int i = 1/0;//手动ackack.acknowledge();}/* //死信队列默认名称在原队列后拼接'-dlt'@KafkaListener(groupId = "my_group2",topicPartitions = {@TopicPartition(topic = "topic_dlt",partitions = {"0","1","2"})})public void consumeByDLT(ConsumerRecord<String, String> record, Acknowledgment ack) {System.out.println("consumeByDLT消费者获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());//手动ackack.acknowledge();}*/}

此时运行SpringKafkaConsumerApplication,控制台会报错,因为我们手动写了异常 int i=1/0

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

[[{"partition": 0,"offset": 0,"msg": "重试机制和死信队列_2024-06-07T15:49:37.398841600","timespan": 1717746979414,"date": "2024-06-07 07:56:19"}]
]

我们发现原本在重试主题中的消息,死信主题也有一份

相关文章:

kafka-重试和死信主题(SpringBoot整合Kafka)

文章目录 1、重试和死信主题2、死信队列3、代码演示3.1、appication.yml3.2、引入spring-kafka依赖3.3、创建SpringBoot启动类3.4、创建生产者发送消息3.5、创建消费者消费消息 1、重试和死信主题 kafka默认支持重试和死信主题 重试主题&#xff1a;当消费者消费消息异常时&…...

electron-Vue: Module parse failed: Unexpected character ‘ ‘

​ electron-Vue项目中&#xff0c;我自己写了一个node的C扩展&#xff08;xx.node&#xff09;&#xff0c;然后在.vue文件里import它&#xff0c;然后运行npm run electron:serve&#xff0c;报错如下: ​​ electron-Vue打包默认使用webpack&#xff0c;默认情况下webpack没…...

贪心算法-数组跳跃游戏(mid)

目录 一、问题描述 二、解题思路 1.回溯法 2.贪心算法 三、代码实现 1.回溯法实现 2.贪心算法实现 四、刷题链接 一、问题描述 二、解题思路 1.回溯法 使用递归的方式&#xff0c;找到所有可能的走步方式&#xff0c;并记录递归深度&#xff08;也就是走步次数&#x…...

C++经典150题

经典150题 数组/字符串 文章目录 经典150题数组/字符串88. 合并两个有序数组27.移除元素26.删除有序数组中的重复项80.删除有序数组重点重复项II169.多数元素189.轮转数组121.买卖股票的最佳时机123.买卖股票的最佳时机 III55.跳跃游戏45.跳跃游戏II 88. 合并两个有序数组 给…...

超详解——Python 序列详解——基础篇

目录 1. 序列的概念 字符串&#xff08;String&#xff09; 列表&#xff08;List&#xff09; 元组&#xff08;Tuple&#xff09; 2. 标准类型操作符 连接操作符&#xff08;&#xff09; 重复操作符&#xff08;*&#xff09; 索引操作符&#xff08;[]&#xff09; …...

DVWA-DC-6

靶机IP:192.168.20.140 kaliIP:192.168.20.128 网络有问题的可以看下搭建Vulnhub靶机网络问题(获取不到IP) 信息收集 nmap扫描靶机端口及版本信息 dirsearch扫目录 发现是个wordpress建站 我们去访问前端界面 存在重定向&#xff0c;修改hosts文件&#xff0c;加入192.168…...

ubuntu早期版本以及18.04后的版本,通过rc.local配置开机自启

在ubuntu早期版本以及18.04后的版本&#xff0c;还是支持在rc.local中进行操作开机自启。 1、编辑rc.local文件 cat <<EOF >/etc/rc.local #!/bin/sh -e # rc.local # This script is executed at the end of each multiuser runlevel. # Make sure that the script…...

【环境搭建】1.阿里云ECS服务器 安装jdk8

在阿里云服务器上安装 JDK 8 可以通过以下步骤完成。假设你使用的是 CentOS 或者其他基于 Red Hat 的发行版或Alibaba Cloud Linux 3.2104 LTS 64位。 1.更新系统软件包 sudo yum update -y2.安装 OpenJDK 8 使用 yum 包管理器安装 OpenJDK 8 sudo yum install -y java-1.8…...

idea插件开发之定义侧边栏

写在前面 看下如何在侧边栏定义窗口&#xff0c;如下的效果&#xff1a; 1&#xff1a;正戏 先来定义UI&#xff0c;随便拖拽个组件&#xff0c;就看个效果&#xff1a; 接着定义一个工厂类来创建这个UI&#xff0c;需要实现接口com.intellij.openapi.wm.ToolWindowFactor…...

HarmonyOS未来五年的市场展望

一、引言 随着科技的不断进步和消费者对于智能化设备需求的日益增长&#xff0c;操作系统作为连接硬件与软件的核心平台&#xff0c;其重要性愈发凸显。HarmonyOS&#xff08;鸿蒙系统&#xff09;&#xff0c;作为华为自主研发的分布式操作系统&#xff0c;自诞生以来便备受瞩…...

R语言:什么是向量化操作(Vectorization)?

在R语言中&#xff0c;向量化操作是一个非常重要且强大的概念。它不仅提高了代码的简洁性和可读性&#xff0c;还大大提升了代码的执行效率。本文将详细介绍什么是向量化操作&#xff0c;并通过几个示例来展示其应用。 什么是向量化操作&#xff1f; 向量化操作是指在不使用显…...

Python 机器学习 基础 之 【实战案例】中药数据分析项目实战

Python 机器学习 基础 之 【实战案例】中药数据分析项目实战 目录 Python 机器学习 基础 之 【实战案例】中药数据分析项目实战 一、简单介绍 二、中药数据分析项目实战 三、数据处理与分析实战 1、数据读取 2、中药材数据集的数据处理与分析 2.1数据清洗 2.2、 提取别…...

python中报错“ModuleNotFoundError: No module named ‘docx2txt‘”

python中from langchain_community.document_loaders import Docx2txtLoader报错“ModuleNotFoundError: No module named ‘docx2txt’” 问题描述: python中from langchain_community.document_loaders import Docx2txtLoader报错“ModuleNotFoundError: No module named ‘…...

json.dumps参数

json.dumps&#xff08;&#xff09;是 Python 中json 模块的一个函数&#xff0c;用于将 Python 对象编码成 JSON格式的字符串。这个函数有几个常用的参数&#xff0c;下面是一些主要的参数及其描述&#xff1a; 1. **obj**&#xff1a; 必需。要转换的 Python 对象。 2. *…...

未来已来,划时代革命性产品——全息数字人管家系统,全网首发

尊敬的投资人、亲爱的网友们&#xff1a; 大家好&#xff0c;我是数字人管家项目总设计师&#xff0c;我叫William wang。在这个科技日新月异的时代&#xff0c;我们正站在一个前所未有的交汇点上&#xff0c;数字与现实的边界日益模糊&#xff0c;智能技术正以前所未有的方式…...

psql导入数据报错排查

问题&#xff1a;采用pg_dump导出表数据后&#xff0c;用psql导入表数据&#xff0c;导入时报错 无效的命令 \N定位该问题的方法 --进入psql \set ON_ERROR_STOP on --退出psqlpsql -U postgres -d test -v ON_ERROR_STOPon < /home/postgres/test.dmp参考文章&#xff1a…...

项目:双人五子棋对战-对战模块(6)

完整代码见: 邹锦辉个人所有代码: 测试仓库 - Gitee.com 当玩家进入到游戏房间后, 就要开始一局紧张而又刺激的五子棋对战了, 本文将就前端后端的落子与判断胜负的部分作详细讲解. 模块详细讲解 约定前后端交互的接口 首先是建立连接后, 服务器需要生成一些游戏的初始信息(可…...

LeakSearch:针对网络公开凭证的安全扫描与检测工具

关于LeakSearch 在红队演戏过程中&#xff0c;往往需要获取到针对目标域的访问权限。在这个过程中&#xff0c;很多红队人员会选择使用暴露在互联网上的代理服务器来实现目标域的访问&#xff0c;那么此时就需要在互联网上收集公开暴露的凭证信息。 对于蓝队来说&#xff0c;…...

ArcoDesgin a-model中自定义样式model-class无效

增加黄框代码解决 原因是&#xff0c;动态加载的组件默认渲染在body下面&#xff0c;与#app平级。样式设置无效 加上:render-to-body“false”&#xff0c;让组件不渲染到body下&#xff0c;渲染在app下面&#xff0c;样式设置生效...

持续总结中!2024年面试必问 20 道分布式、微服务面试题(十)

上一篇地址&#xff1a;持续总结中&#xff01;2024年面试必问 20 道分布式、微服务面试题&#xff08;九&#xff09;-CSDN博客 十九、请描述一种微服务部署策略。 微服务部署策略是确保微服务架构中各个独立服务能够高效、稳定地部署到生产环境中的方法。以下是一些常见的微…...

北航第四次数据结构与程序设计编程题复习

到期末了&#xff0c;所以再来复习一下以前的作业。 北航第四次数据结构与程序设计编程题 一、栈操作&#xff08;栈-基本题&#xff09;二、C程序括号匹配检查三、计算器&#xff08;表达式计算-后缀表达式实现&#xff0c;结果为浮点&#xff09;四、文本编辑操作模拟&#…...

golang调用外部程序包os/exec中的 Command和CommandContext 函数创建的Cmd对象的区别

在go语言中&#xff0c;我们可以通过os/exec包中的Command和CommandContext 函数创建对应的外部程序执行Cmd对象&#xff0c; 这2个函数创建的cmd命令执行对象是有区别的&#xff0c;CommandContext创建的对象可以携带上下文&#xff0c;这个主要用于我们通过cancel函数给对应的…...

Redis进阶知识个人汇总

持久化 三种方式实现它的持久化&#xff1a; RDB持久化 全称Redis数据备份文件&#xff0c;又称Redis数据快照 这种就是将Redis内存中所有数据记录到磁盘中&#xff0c;当实例出故障后&#xff0c;从磁盘中读快照文件进行恢复数据。 一般使用bgsave指令实现 复制主线程得到一…...

从中序与后序遍历序列构造二叉树-力扣

中序遍历序列存放节点的顺序是左中右&#xff0c;后序遍历存放节点的顺序是左右中后序遍历序列的最后一个节点即为二叉树的根节点由于每个值在二叉树中都是唯一的&#xff0c;那么根据根节点的值&#xff0c;就可以将中序遍历序列一分为二&#xff0c;前部分存储的是根节点左子…...

操作系统期末复习(大题)

1. 进程调度 周转时间作业完成时刻-作业到达时刻 带权周转时间周转时间/服务时间 平均周转时间各个作业周转时间之和/作业个数 操作系统&#xff1a;周转时间和其他时间_系统为作业提供的时间-CSDN博客 2. 进程调度 3. 调度算法 4. 临界区互斥访问问题 即证明是否满足互斥&a…...

解决富文本中抖音视频无法播放的问题——403

问题 富文本中的抖音视频无法播放&#xff0c;资源状态码是403禁止访问打开控制台&#xff0c;可以看到在项目中打开&#xff0c;数据请求的请求头多了一个Referer: http://localhost:3000/而复制链接在新窗口直接打开&#xff0c;请求头中并不会携带Referer 解决方案 在ind…...

2024最新华为OD机试(C卷+D卷)真题目录+使用说明+在线评测

文章目录 &#x1f4d2;声明&#x1f39a;专栏介绍&#x1f4d6;试读文章&#x1f380;关于华为OD &#x1f9f7;真题目录2024最新 C卷 & D卷 目录(实时跟新中~)2024最新 C卷 & D卷 100分题目 (实时跟新中~)2024最新 C卷 & D卷 200分题目 (实时跟新中~) &#x1f4…...

hana 中的缓存视图功能,类似ORACLE 中的 物化视图功能

为什么启用物化视图、缓存视图这里就不过多解释了。 参考官方文章&#xff1a; Static Result Cache | SAP Help Portal 在 HANA中&#xff0c;视图的缓存分 静态结果缓存 和 动态结果缓存。 静态结果缓存和动态结果缓存是缓存查询结果以获得性能优势的可配置应用程序。 缓…...

express入门02静态资源托管

目录 1 搭建静态资源结构2 代码助手3 多目录托管4 服务器热启动总结 上一篇我们讲解了使用express搭建服务器的过程&#xff0c;服务器搭建好了之后&#xff0c;除了在地址栏里输入URL发起get请求或者post请求外&#xff0c;通常我们还需要访问静态资源&#xff0c;比如html、c…...

Java常见的引用类型

1、强引用&#xff1a;普通的变量引用&#xff0c;Student sutnew Student(); 2、软引用&#xff1a;堆内对象若未被引用&#xff0c;GC不会立刻删除&#xff0c;而是在堆内存空间不足时才会进行删除。 3、弱引用&#xff1a;GC触发&#xff0c;会立刻删除。 4、虚引用&am…...

怎么查询网站的域名/南通百度网站快速优化

一、商品模块数据库&代码工具准备 1.数据库准备&#xff1a; 2.中导入工具类&#xff1a; 二、商品品牌 - 后端实现 1.集成模板生成代码 - 配置query和controller - 具体可看项目源码 // 调整 query 生成目录演示 focList.add(new FileOutConfig("/templates/query…...

制作网页图片/windows优化大师官方免费

1、什么叫WebInternet是一个连接世界上计算机的物理网络Web是建立在Internet上的其中一种服务(Service)Web是Internet上多种不同的服务之一&#xff0c;其他还包括E-mail&#xff0c;流媒体&#xff0c;FTP等2、Web工作原理作为一种服务&#xff0c;Web定义两个方面&#xff1a…...

成都住建局官网有问题怎么办/seo搜索引擎优化技术教程

一 、HandlerThread简介 从字面意思上看&#xff0c;它既与Handler有关系又与Thread有联系。确实如此&#xff0c;它继承自Thread&#xff0c;是一个线程类&#xff0c;同时又内嵌Looper对象。因此&#xff0c;用它开启的线程的内部可以直接创建一个Handler&#xff0c;并可以…...

淄博网站关键字优化/福州seo招聘

关于数据库调优主要分以下几块&#xff1a; 服务器内核优化&#xff1a;此优化一般由系统运维人员完成数据库配置参数优化:此优化需根据压力测试结果来调整参数sql语句及表优化索引优化架构优化分库分表 sql和表优化 若是能够恰当的使用sql规则&#xff0c;便能极大提高系统…...

wordpress 小人/重庆seo结算

看着ie9 、firefox &#xff0c;opera都效仿tab风格&#xff0c;对比了下&#xff0c;chrome的窗口上面部分还不够小&#xff0c;干脆把地址栏去掉&#xff0c;按F10或ALT键&#xff0c;显示隐藏地址栏&#xff0c;这里是我编译好的程序&#xff0c;喜欢的拿走&#xff08;26M&…...

怎么对网站做压力测试/广州seo工资

2019独角兽企业重金招聘Python工程师标准>>> http://blog.csdn.net/yuzhiqiang_1993/article/details/78292004 http://blog.csdn.net/qq_25943493/article/details/52065096 转载于:https://my.oschina.net/u/3698786/blog/1557293...