kafka-消费者-指定offset消费(SpringBoot整合Kafka)
文章目录
- 1、指定offset消费
- 1.1、创建消费者监听器‘
- 1.2、application.yml配置
- 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
- 1.4、创建生产者发送消息
- 1.4.1、分区0中的数据
- 1.5、创建SpringBoot启动类
- 1.6、屏蔽 kafka debug 日志 logback.xml
- 1.7、引入spring-kafka依赖
- 1.8、消费者控制台:
1、指定offset消费
1.1、创建消费者监听器‘
package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaPartitionListener {//初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用@KafkaListener(topicPartitions = {@TopicPartition(topic = "my_topic1",partitionOffsets = {@PartitionOffset(partition = "0",initialOffset = "2")})}, groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}
1.2、application.yml配置
server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class MyKafkaConfig {@Beanpublic NewTopic springTestPartitionTopic() {return TopicBuilder.name("my_topic1") //主题名称.partitions(3) //分区数量.replicas(3) //副本数量.build();}
}


1.4、创建生产者发送消息
package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i);}}}


1.4.1、分区0中的数据
[[{"partition": 0,"offset": 0,"msg": "指定offset消费0","timespan": 1717660785962,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 1,"msg": "指定offset消费3","timespan": 1717660785974,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 2,"msg": "指定offset消费6","timespan": 1717660785975,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 3,"msg": "指定offset消费9","timespan": 1717660785975,"date": "2024-06-06 07:59:45"}]
]
1.5、创建SpringBoot启动类
package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}
1.6、屏蔽 kafka debug 日志 logback.xml
<configuration> <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>
1.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
1.8、消费者控制台:
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9
此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9
相关文章:
kafka-消费者-指定offset消费(SpringBoot整合Kafka)
文章目录 1、指定offset消费1.1、创建消费者监听器‘1.2、application.yml配置1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本1.4、创建生产者发送消息1.4.1、分区0中的数据 1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1…...
JavaWeb2-Vue
Vue 前端框架,免除原生JS中的DOM操作简化书写 (以前学过又忘了,现在才知道原来vue是前端的) 基于MVVM思想(model-view -viewModel)实现数据双向绑定 model是数据模型 view负责数据展示 即DOM 中间这个负责…...
《广告数据定量分析》读书笔记之统计原理2
3.相关分析:描述的是两个数值变量间关系的强度。(两个数值型变量之间的关系) (1)图表表示:散点图 (2)衡量关系强度指标:相关系数r。 (r的取值为-1到 1&…...
计算机视觉与模式识别实验2-2 SIFT特征提取与匹配
文章目录 🧡🧡实验流程🧡🧡SIFT算法原理总结:实现SIFT特征检测和匹配通过RANSAC 实现图片拼接更换其他图片再次测试效果(依次进行SIFT特征提取、RANSAC 拼接) 🧡🧡全部代…...
kerberos: Clock skew too great (37) - PROCESS_TGS
kerberos认证失败错误信息: Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Clock skew too great (37) - PROCESS_TGS)at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:772)at sun.security.j…...
【MATLAB高级编程】入门篇 | 向量化编程
【入门篇】向量化编程 1. 什么是向量?2. 向量的创建2.1 行向量2.2 列向量2.3 使用冒号运算符2.4 使用`linspace`和`logspace`3. 向量的基本操作3.1 向量元素访问3.2 向量的长度3.3 向量的加法和减法3.4 向量的点乘和叉乘3.5 向量的元素乘法和除法4. 向量的高级操作4.1 逻辑索引…...
Debezium日常分享系列之:Debezium 2.7.0.Beta1发布
Debezium日常分享系列之:Debezium 2.7.0.Beta1发布 一、重大变化1.快照工件2.Oracle 二、新功能和改进1.在 z/OS 上支持 Db22.NATS JetStream 接收器身份验证改进3.JDBC 接收器 MariaDB 方言支持4.JMX 导出器添加到 Debezium 服务器5.使用 Debezium Operator 启用 J…...
eNSP学习——RIP的水平分割和触发更新
目录 主要命令 原理概述 实验目的 实验内容 实验拓扑 实验编址 实验步骤 1、基本配置 2、搭建RIP网络 3、验证触发更新 4.验证水平分割 5、验证毒性逆转 需要eNSP各种配置命令的点击链接自取:华为eNSP各种设备配置命令大全PDF版_…...
华为面经整理
文章目录 实习第一面准备提问相关算法相关 第一面结果提问环节 总结 实习 第一面准备 提问相关 操作系统有哪些功能 进程管理: 进程调度、进程同步和通信、多任务处理 内存管理: 内存分配、虚拟内存技术、内存保护 文件系统管理: 文件存储…...
数据恢复工具推荐:电脑回收站删除的文件怎么恢复?8个回收站恢复软件,收藏!
当文件从电脑的回收站被删除后,许多用户可能认为这些文件已永久丢失。然而,实际上,在数据被新数据覆盖之前,这些删除的文件仍然可以通过使用专门的数据恢复软件来恢复。本文将介绍8款顶级的文件恢复软件,恢复电脑回收站…...
前端之npm运行时配置文件.npmrc(可用于配置npm淘宝源)
文章目录 前端之npm运行时配置文件.npmrc什么是.npmrc设置项目配置文件设置用户配置文件设置全局配置文件给npm 命令添加注册源选项 前端之npm运行时配置文件.npmrc 什么是.npmrc 官网:https://nodejs.cn/npm/cli/v7/configuring-npm/npmrc/ .npmrc,可…...
如何充分利用代理IP扩大网络接触面
目录 前言 第一部分:什么是代理IP? 第二部分:如何获取代理IP? 1. IP质量 2. 匿名性 3. 限制 第三部分:如何使用代理IP? 第四部分:如何充分利用代理IP? 总结: 前…...
StableDiffusion Windows本地部署
检查电脑环境 启动CMD命令窗。 如上图,在CMD窗口输入python命令,可查看本地安装的python版本信息等。输入exit()退出python命令行 执行where命令,可查看python安装目录。 必须安装Python3.10.x,因为stable-diffusion-webui的一…...
OpenCV学习(4.5) 图像的形态转换
1.目标 在本教程中: 我们将学习不同的形态操作,如腐蚀、膨胀、开、闭等。我们将看到不同的函数,如: cv.erode()**、 **cv.dilate()**、 **cv.morphologyEx() 等。 理论: 图像的形态转换是图像处理中的一个重要领域…...
MFC设置窗口在Z轴上的位置
函数原型: BOOL CWnd::SetWindowPos(const CWnd* pWndInsertAfter, int x, int y, int cx, int cy, UINT nFlags);返回值: 如果函数成功,则返回非零值;否则返回0。 参数: pWndInsertAfter:标识了在Z轴次…...
STM32项目分享:智能门禁锁系统
目录 一、前言 二、项目简介 1.功能详解 2.主要器件 三、原理图设计 四、PCB硬件设计 1.PCB图 2.PCB板及元器件图 五、程序设计 六、实验效果 七、资料内容 项目分享 一、前言 项目成品图片: 哔哩哔哩视频链接: https://www.bilibili.c…...
PostgreSQL中有没有类似Oracle的dba_objects系统视图
PostgreSQL中有没有类似Oracle的dba_objects系统视图 在PostgreSQL中,没有一个完全集成了所有对象信息的视图(类似于Oracle中的DBA_OBJECTS)。但是,PostgreSQL提供了一些系统目录表和视图,可以用来获取数据库对象的信…...
【kubernetes】探索k8s集群的配置资源(secret和configma)
目录 一、Secret 1.1Secret 有四种类型 1.2Pod 有 3 种方式来使用 secret 1.3应用场景:凭据 1.4创建 Secret 1.4.1用kubectl create secret命令创建Secret 1.4.2内容用 base64 编码,创建Secret 1.4.2.1Base64编码 1.4.2.2创建YAML文件 1.4.2.3…...
基于springboot实现社区养老服务系统项目【项目源码+论文说明】计算机毕业设计
基于springboot实现社区养老服务系统演示 摘要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本社区养老服务系统就是在这样的大环境下诞生,其可以帮助…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...
Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...
Reasoning over Uncertain Text by Generative Large Language Models
https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
【Linux】自动化构建-Make/Makefile
前言 上文我们讲到了Linux中的编译器gcc/g 【Linux】编译器gcc/g及其库的详细介绍-CSDN博客 本来我们将一个对于编译来说很重要的工具:make/makfile 1.背景 在一个工程中源文件不计其数,其按类型、功能、模块分别放在若干个目录中,mak…...
通过 Ansible 在 Windows 2022 上安装 IIS Web 服务器
拓扑结构 这是一个用于通过 Ansible 部署 IIS Web 服务器的实验室拓扑。 前提条件: 在被管理的节点上安装WinRm 准备一张自签名的证书 开放防火墙入站tcp 5985 5986端口 准备自签名证书 PS C:\Users\azureuser> $cert New-SelfSignedCertificate -DnsName &…...
【堆垛策略】设计方法
堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下…...
