当前位置: 首页 > news >正文

Kafka java 配置

前言:
        大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。

介绍:

        我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。

给大家解释配置含义。

1.Kafka配置代码

public KafkaConsumer<String, String> getCustomer() {// 1. 配置属性参数Properties properties = new Properties();// 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");// 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);// 设置消费者是否自动提交offset,true表示自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 设置自动提交offset的时间间隔(单位:毫秒)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 设置每次poll操作返回的最大记录数properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);// 根据配置属性创建Kafka消费者实例return new KafkaConsumer<>(properties);
}

2.Kafka消费者代码

@Test
void KafkaConsumerTest() {// 创建Kafka消费者实例,通过getCustomer()方法获取KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();// 订阅要消费的主题,这里是 "test-topic"consumer.subscribe(Collections.singletonList("test-topic"));// 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));for (ConsumerRecord<String, String> record : records) {// 处理消息的逻辑// 打印消息的offset、key和valueSystem.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());//以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。boolean flag = true;if (flag){// 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息// 如果需要手动提交offset,可以取消注释下面的代码// consumer.commitAsync();// 由于flag为true,这里会跳出循环,不再处理后续的消息break;}}// 关闭消费者,释放资源consumer.close();// 打印结束消费的日志System.out.println("结束消费");
}

相关文章:

Kafka java 配置

前言&#xff1a; 大家好&#xff0c;大家在springboot项目中&#xff0c;经常采用 KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。 介绍&#xff1a; 我们已经集成spring-Kafka 就不需要再…...

网络安全现状:复杂的威胁形势导致压力水平飙升

《2024 年网络安全状况》报告深入分析了当前网络安全挑战和趋势。 该报告重点介绍了几个关键的关注领域&#xff0c;包括人员短缺、技能差距、不断演变的威胁和预算限制&#xff0c;同时还指出了取得进展的领域&#xff0c;例如对威胁响应能力的信心增强以及对网络风险评估的认…...

【机器学习】强化学习(1)——强化学习原理浅析(区分强化学习、监督学习和启发式算法)

文章目录 强化学习介绍强化学习和监督学习比较监督学习强化学习 强化学习的数学和过程表达动作空间序列决策策略&#xff08;policy&#xff09;价值函数&#xff08;value function&#xff09;模型&#xff08;model&#xff09; 强化学习和启发式算法比较强化学习步骤代码走…...

【SoC设计指南 基于Arm Cortex-M】学习笔记1——AMBA

AMBA简介 先进微控制器总线架构&#xff08;Advanced Microcontroller Bus Architecture&#xff0c;AMBA&#xff09;是用在arm处理器上的片上总线协议规范集。 AMBA总线协议规范集包含AHB、APB、AXI等。 AHB&#xff1a;先进高性能总线(Advanced High-performance Bus) APB&…...

flutter鸿蒙模拟器 Win环境调试报错问题记录(暂未解决)

前情提要&#xff1a; 1、flutter项目已经正确生成了ohos项目 2、flutter和鸿蒙的环境变量配置正确 3、ohos项目执行flutter build hap成功 4、没有真机&#xff0c;使用win环境创建的x86模拟器 问题状态 使用模拟器运行ohos&#xff0c;控制台提示“安装HAP 报 code:9568347错…...

详解Rust标准库:HashSet

## 查看本地官方文档安装rust后运行 rustup doc查看The Standard Library即可获取标准库内容 std::collections::hash_set::HashSet定义 HashSet是一种集合数据结构&#xff0c;它只存储唯一的元素。它主要用于检查元素是否存在于集合中&#xff0c;或者对元素进行去重操作&…...

记录学习react的一些内容

由于是在公司实际项目中学习&#xff0c;所以不是很完整 需要一点一点的学 1.React.useState 类似于vue中的ref 可以修改状态 但是是异步的 感觉不好用 const [wishData, setWishData] React.useState<any>(null); 只能使用setxxx来修改 2.useEffect(()>{},[]) 类…...

json绘制热力图

首先需要一段热力信息的json&#xff0c;我放在头部了。 然后就是需要de-geo库了。 实现代码如下&#xff1a; import * as d3geo from d3-geoimport trafficJSON from ../assets/json/traffic.jsonlet geoFun;// 地理投影函数// let info {max: Number.MIN_SAFE_INTEGER,mi…...

linux 下查看程序启动的目录

以azkaban为例 第一步、ps -ef | grep azkaban 查询出进程号 第二步、cd /proc/ 第三步 、cd 进程号 第四部 ll 查看详情 查看jar 位置 查看jar 启动命令...

书生浦语第四期基础岛L1G2000-玩转书生「多模态对话」与「AI搜索」产品

文章目录 一、MindSearch二、书生浦语三、书生万象四、进阶任务 一、MindSearch MindSearch 是一个开源的 AI 搜索引擎。它会对你提出的问题进行分析并拆解为数个子问题&#xff0c;在互联网上搜索、总结得到各个子问题的答案&#xff0c;最后通过模型总结得到最终答案。书生浦…...

保护Kubernetes免受威胁:容器安全的有效实践

安全并非“放之四海而皆准”的解决方案&#xff0c;相反地&#xff0c;它更多的是一个范围&#xff0c;受其应用的特定上下文的影响。安全领域的专业人士很少宣称什么产品是完全安全的&#xff0c;但总有方法可以实现更强的安全性。在本文中&#xff0c;我们将介绍各种方法来支…...

【客观理性深入讨论国产中间件及数据库-科创基础软件】

随着国产化的进程&#xff0c;越来越多的国企央企开始要求软件产品匹配过程化的要求&#xff0c; 最近有一家银行保险的科技公司对行为验证码产品就要求匹配国产中间件&#xff0c; 于是开始了解国产中间件都有哪些厂家 一&#xff1a;国产中间件主要产品及厂商 1 东方通&…...

MFC中Excel的导入以及使用步骤

参考地址 在需要对EXCEL表进行操作的类中添加以下头文件&#xff1a;若出现大量错误将其放入stdafx.h中 #include "resource.h" // 主符号 #include "CWorkbook.h" //单个工作簿 #include "CRange.h" //区域类&#xff0c;对Excel大…...

AWS S3在客户端应用不能使用aws-sdk场景下的文件上传与下载

简介 通常情况下&#xff0c;应用程序上传文件到AWS S3&#xff0c;会使用aws-sdk&#xff0c;但是有些情况下&#xff0c;客户端应用会有安装限制&#xff0c;比如不能安装aws-sdk&#xff0c;此时我们就需要通过其他方式实现文件上传与下载。 这里我们提供一个服务端&#…...

深入解析 Transformers 框架(四):Qwen2.5/GPT 分词流程与 BPE 分词算法技术细节详解

前面我们已经通过三篇文章&#xff0c;详细介绍了 Qwen2.5 大语言模型在 Transformers 框架中的技术细节&#xff0c;包括包和对象加载、模型初始化和分词器技术细节&#xff1a; 深入解析 Transformers 框架&#xff08;一&#xff09;&#xff1a;包和对象加载中的设计巧思与…...

【Python-AI篇】K近邻算法(KNN)

0. 前置----机器学习流程 获取数据集数据基本处理特征工程机器学习模型评估在线服务 1. KNN算法概念 如果一个样本在特征空间中的K个最相似&#xff08;即特征空间中最邻近&#xff09;的样本中大多数属于某一个类别&#xff0c;则该样本也属于这一个类别 1.1 KNN算法流程总…...

aws xray如何实现应用log和trace的关联关系

参考资料 https://community.aws/tutorials/solving-problems-you-cant-see-using-aws-x-ray-and-cloudwatch-for-user-level-observability-in-your-serverless-microservices-applicationshttps://stackoverflow.com/questions/76000811/search-cloudwatch-logs-for-aws-xra…...

centos服务器登录失败次数设定

实现的效果 一台centos服务&#xff0c;如果被别人暴力或者登录次数超过多少次&#xff0c;就拒绝或者在规定时间内拒绝ip登录。这里使用的是fail2ban 安装fail2ban sudo yum install epel-release -y # 先安装 EPEL 源 sudo yum install fail2ban -y配置fail2ban # 复制默…...

实时高效,全面测评快递100API的物流查询功能

一、引言 你是否曾经在网购后焦急地等待包裹&#xff0c;频繁地手动刷新订单页面以获取最新的物流信息&#xff1f;或者作为一名开发者&#xff0c;正在为如何在自己的应用程序中高效地实现物流查询功能而发愁&#xff1f;其实&#xff0c;有一个非常好用的解决方案——快递10…...

第14张 GROUP BY 分组

一、分组功能介绍 使用group by关键字通过某个字段进行分组&#xff0c;对分完组的数据分别 “SELECT 聚合函数”查询结果。 1.1 语法 SELECT column, group_function(column) FROM table [WHERE condition] [GROUP BY group_by_expression] [ORDER BY column]; 明确&#…...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

利用ngx_stream_return_module构建简易 TCP/UDP 响应网关

一、模块概述 ngx_stream_return_module 提供了一个极简的指令&#xff1a; return <value>;在收到客户端连接后&#xff0c;立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量&#xff08;如 $time_iso8601、$remote_addr 等&#xff09;&a…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

【2025年】解决Burpsuite抓不到https包的问题

环境&#xff1a;windows11 burpsuite:2025.5 在抓取https网站时&#xff0c;burpsuite抓取不到https数据包&#xff0c;只显示&#xff1a; 解决该问题只需如下三个步骤&#xff1a; 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

Angular微前端架构:Module Federation + ngx-build-plus (Webpack)

以下是一个完整的 Angular 微前端示例&#xff0c;其中使用的是 Module Federation 和 npx-build-plus 实现了主应用&#xff08;Shell&#xff09;与子应用&#xff08;Remote&#xff09;的集成。 &#x1f6e0;️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...