当前位置: 首页 > news >正文

Kafka java 配置

前言:
        大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。

介绍:

        我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。

给大家解释配置含义。

1.Kafka配置代码

public KafkaConsumer<String, String> getCustomer() {// 1. 配置属性参数Properties properties = new Properties();// 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");// 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);// 设置消费者是否自动提交offset,true表示自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 设置自动提交offset的时间间隔(单位:毫秒)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 设置每次poll操作返回的最大记录数properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);// 根据配置属性创建Kafka消费者实例return new KafkaConsumer<>(properties);
}

2.Kafka消费者代码

@Test
void KafkaConsumerTest() {// 创建Kafka消费者实例,通过getCustomer()方法获取KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();// 订阅要消费的主题,这里是 "test-topic"consumer.subscribe(Collections.singletonList("test-topic"));// 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));for (ConsumerRecord<String, String> record : records) {// 处理消息的逻辑// 打印消息的offset、key和valueSystem.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());//以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。boolean flag = true;if (flag){// 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息// 如果需要手动提交offset,可以取消注释下面的代码// consumer.commitAsync();// 由于flag为true,这里会跳出循环,不再处理后续的消息break;}}// 关闭消费者,释放资源consumer.close();// 打印结束消费的日志System.out.println("结束消费");
}

相关文章:

Kafka java 配置

前言&#xff1a; 大家好&#xff0c;大家在springboot项目中&#xff0c;经常采用 KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。 介绍&#xff1a; 我们已经集成spring-Kafka 就不需要再…...

网络安全现状:复杂的威胁形势导致压力水平飙升

《2024 年网络安全状况》报告深入分析了当前网络安全挑战和趋势。 该报告重点介绍了几个关键的关注领域&#xff0c;包括人员短缺、技能差距、不断演变的威胁和预算限制&#xff0c;同时还指出了取得进展的领域&#xff0c;例如对威胁响应能力的信心增强以及对网络风险评估的认…...

【机器学习】强化学习(1)——强化学习原理浅析(区分强化学习、监督学习和启发式算法)

文章目录 强化学习介绍强化学习和监督学习比较监督学习强化学习 强化学习的数学和过程表达动作空间序列决策策略&#xff08;policy&#xff09;价值函数&#xff08;value function&#xff09;模型&#xff08;model&#xff09; 强化学习和启发式算法比较强化学习步骤代码走…...

【SoC设计指南 基于Arm Cortex-M】学习笔记1——AMBA

AMBA简介 先进微控制器总线架构&#xff08;Advanced Microcontroller Bus Architecture&#xff0c;AMBA&#xff09;是用在arm处理器上的片上总线协议规范集。 AMBA总线协议规范集包含AHB、APB、AXI等。 AHB&#xff1a;先进高性能总线(Advanced High-performance Bus) APB&…...

flutter鸿蒙模拟器 Win环境调试报错问题记录(暂未解决)

前情提要&#xff1a; 1、flutter项目已经正确生成了ohos项目 2、flutter和鸿蒙的环境变量配置正确 3、ohos项目执行flutter build hap成功 4、没有真机&#xff0c;使用win环境创建的x86模拟器 问题状态 使用模拟器运行ohos&#xff0c;控制台提示“安装HAP 报 code:9568347错…...

详解Rust标准库:HashSet

## 查看本地官方文档安装rust后运行 rustup doc查看The Standard Library即可获取标准库内容 std::collections::hash_set::HashSet定义 HashSet是一种集合数据结构&#xff0c;它只存储唯一的元素。它主要用于检查元素是否存在于集合中&#xff0c;或者对元素进行去重操作&…...

记录学习react的一些内容

由于是在公司实际项目中学习&#xff0c;所以不是很完整 需要一点一点的学 1.React.useState 类似于vue中的ref 可以修改状态 但是是异步的 感觉不好用 const [wishData, setWishData] React.useState<any>(null); 只能使用setxxx来修改 2.useEffect(()>{},[]) 类…...

json绘制热力图

首先需要一段热力信息的json&#xff0c;我放在头部了。 然后就是需要de-geo库了。 实现代码如下&#xff1a; import * as d3geo from d3-geoimport trafficJSON from ../assets/json/traffic.jsonlet geoFun;// 地理投影函数// let info {max: Number.MIN_SAFE_INTEGER,mi…...

linux 下查看程序启动的目录

以azkaban为例 第一步、ps -ef | grep azkaban 查询出进程号 第二步、cd /proc/ 第三步 、cd 进程号 第四部 ll 查看详情 查看jar 位置 查看jar 启动命令...

书生浦语第四期基础岛L1G2000-玩转书生「多模态对话」与「AI搜索」产品

文章目录 一、MindSearch二、书生浦语三、书生万象四、进阶任务 一、MindSearch MindSearch 是一个开源的 AI 搜索引擎。它会对你提出的问题进行分析并拆解为数个子问题&#xff0c;在互联网上搜索、总结得到各个子问题的答案&#xff0c;最后通过模型总结得到最终答案。书生浦…...

保护Kubernetes免受威胁:容器安全的有效实践

安全并非“放之四海而皆准”的解决方案&#xff0c;相反地&#xff0c;它更多的是一个范围&#xff0c;受其应用的特定上下文的影响。安全领域的专业人士很少宣称什么产品是完全安全的&#xff0c;但总有方法可以实现更强的安全性。在本文中&#xff0c;我们将介绍各种方法来支…...

【客观理性深入讨论国产中间件及数据库-科创基础软件】

随着国产化的进程&#xff0c;越来越多的国企央企开始要求软件产品匹配过程化的要求&#xff0c; 最近有一家银行保险的科技公司对行为验证码产品就要求匹配国产中间件&#xff0c; 于是开始了解国产中间件都有哪些厂家 一&#xff1a;国产中间件主要产品及厂商 1 东方通&…...

MFC中Excel的导入以及使用步骤

参考地址 在需要对EXCEL表进行操作的类中添加以下头文件&#xff1a;若出现大量错误将其放入stdafx.h中 #include "resource.h" // 主符号 #include "CWorkbook.h" //单个工作簿 #include "CRange.h" //区域类&#xff0c;对Excel大…...

AWS S3在客户端应用不能使用aws-sdk场景下的文件上传与下载

简介 通常情况下&#xff0c;应用程序上传文件到AWS S3&#xff0c;会使用aws-sdk&#xff0c;但是有些情况下&#xff0c;客户端应用会有安装限制&#xff0c;比如不能安装aws-sdk&#xff0c;此时我们就需要通过其他方式实现文件上传与下载。 这里我们提供一个服务端&#…...

深入解析 Transformers 框架(四):Qwen2.5/GPT 分词流程与 BPE 分词算法技术细节详解

前面我们已经通过三篇文章&#xff0c;详细介绍了 Qwen2.5 大语言模型在 Transformers 框架中的技术细节&#xff0c;包括包和对象加载、模型初始化和分词器技术细节&#xff1a; 深入解析 Transformers 框架&#xff08;一&#xff09;&#xff1a;包和对象加载中的设计巧思与…...

【Python-AI篇】K近邻算法(KNN)

0. 前置----机器学习流程 获取数据集数据基本处理特征工程机器学习模型评估在线服务 1. KNN算法概念 如果一个样本在特征空间中的K个最相似&#xff08;即特征空间中最邻近&#xff09;的样本中大多数属于某一个类别&#xff0c;则该样本也属于这一个类别 1.1 KNN算法流程总…...

aws xray如何实现应用log和trace的关联关系

参考资料 https://community.aws/tutorials/solving-problems-you-cant-see-using-aws-x-ray-and-cloudwatch-for-user-level-observability-in-your-serverless-microservices-applicationshttps://stackoverflow.com/questions/76000811/search-cloudwatch-logs-for-aws-xra…...

centos服务器登录失败次数设定

实现的效果 一台centos服务&#xff0c;如果被别人暴力或者登录次数超过多少次&#xff0c;就拒绝或者在规定时间内拒绝ip登录。这里使用的是fail2ban 安装fail2ban sudo yum install epel-release -y # 先安装 EPEL 源 sudo yum install fail2ban -y配置fail2ban # 复制默…...

实时高效,全面测评快递100API的物流查询功能

一、引言 你是否曾经在网购后焦急地等待包裹&#xff0c;频繁地手动刷新订单页面以获取最新的物流信息&#xff1f;或者作为一名开发者&#xff0c;正在为如何在自己的应用程序中高效地实现物流查询功能而发愁&#xff1f;其实&#xff0c;有一个非常好用的解决方案——快递10…...

第14张 GROUP BY 分组

一、分组功能介绍 使用group by关键字通过某个字段进行分组&#xff0c;对分完组的数据分别 “SELECT 聚合函数”查询结果。 1.1 语法 SELECT column, group_function(column) FROM table [WHERE condition] [GROUP BY group_by_expression] [ORDER BY column]; 明确&#…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

从WWDC看苹果产品发展的规律

WWDC 是苹果公司一年一度面向全球开发者的盛会&#xff0c;其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具&#xff0c;对过去十年 WWDC 主题演讲内容进行了系统化分析&#xff0c;形成了这份…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

基础测试工具使用经验

背景 vtune&#xff0c;perf, nsight system等基础测试工具&#xff0c;都是用过的&#xff0c;但是没有记录&#xff0c;都逐渐忘了。所以写这篇博客总结记录一下&#xff0c;只要以后发现新的用法&#xff0c;就记得来编辑补充一下 perf 比较基础的用法&#xff1a; 先改这…...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解&#xff08;7大核心步骤实战技巧&#xff09; 一、爬虫完整工作流程 以下是爬虫开发的完整流程&#xff0c;我将结合具体技术点和实战经验展开说明&#xff1a; 1. 目标分析与前期准备 网站技术分析&#xff1a; 使用浏览器开发者工具&#xff08;F12&…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!

简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求&#xff0c;并检查收到的响应。它以以下模式之一…...

CSS | transition 和 transform的用处和区别

省流总结&#xff1a; transform用于变换/变形&#xff0c;transition是动画控制器 transform 用来对元素进行变形&#xff0c;常见的操作如下&#xff0c;它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...