kafka-生产者监听器(SpringBoot整合Kafka)
文章目录
- 1、生产者监听器
- 1.1、创建生产者监听器
- 1.2、创建生产者拦截器
- 1.3、发送消息测试
- 1.4、使用Java代码创建主题分区副本
- 1.5、application.yml配置----v1版
- 1.6、屏蔽 kafka debug 日志 logback.xml
- 1.7、引入spring-kafka依赖
- 1.8、控制台日志
1、生产者监听器
1.1、创建生产者监听器
package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1 leader落盘 broker ack之后 才成功//ack为 -1 分区所有副本全部落盘 broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}
1.2、创建生产者拦截器
package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
1.3、发送消息测试
package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}
}
1.4、使用Java代码创建主题分区副本
package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}
1.5、application.yml配置----v1版
server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 0 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
1.6、屏蔽 kafka debug 日志 logback.xml
<configuration> <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>
1.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
1.8、控制台日志
生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器
消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717573749549
MyKafkaProducerListener消息发送成功:topic=my_topic1,partition = null,key = null,value = spring-kafka-生产者监听器,offset = 0
[[{"partition": 0,"offset": 0,"msg": "spring-kafka-生产者监听器","timespan": 1717573749549,"date": "2024-06-05 07:49:09"}]
]
相关文章:
kafka-生产者监听器(SpringBoot整合Kafka)
文章目录 1、生产者监听器1.1、创建生产者监听器1.2、创建生产者拦截器1.3、发送消息测试1.4、使用Java代码创建主题分区副本1.5、application.yml配置----v1版1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、控制台日志 1、生产者监听器 1.1、创建生产…...
3D感知视觉表示与模型分析:深入探究视觉基础模型的三维意识
在深度学习与大规模预训练的推动下,视觉基础模型展现出了令人印象深刻的泛化能力。这些模型不仅能够对任意图像进行分类、分割和生成,而且它们的中间表示对于其他视觉任务,如检测和分割,同样具有强大的零样本能力。然而࿰…...
VS2019+QT5.15调用动态库dll带有命名空间
VS2019QT5.15调用动态库dll带有命名空间 vs创建动态库 参考: QT调用vs2019生成的c动态库-CSDN博客 demo的dll头文件: // 下列 ifdef 块是创建使从 DLL 导出更简单的 // 宏的标准方法。此 DLL 中的所有文件都是用命令行上定义的 DLL3_EXPORTS // 符号…...
助力草莓智能自动化采摘,基于YOLOv5全系列【n/s/m/l/x】参数模型开发构建果园种植采摘场景下草莓成熟度智能检测识别系统
随着科技的飞速发展,人工智能(AI)技术已经渗透到我们生活的方方面面,从智能家居到自动驾驶,再到医疗健康,其影响力无处不在。然而,当我们把目光转向中国的农业领域时,一个令人惊讶的…...
C++中的生成器模式
目录 生成器模式(Builder Pattern) 实际应用 构建一辆汽车 构建一台计算机 构建一个房子 总结 生成器模式(Builder Pattern) 生成器模式是一种创建型设计模式,它允许你分步骤创建复杂对象。与其他创建型模式不同…...
基于python的PDF文件解析器汇总
基于python的PDF文件解析器汇总 大多数已发表的科学文献目前以 PDF 格式存在,这是一种轻量级、普遍的文件格式,能够保持一致的文本布局和格式。对于人类读者而言, PDF格式的文件内容展示整洁且一致的布局有助于阅读,可以很容易地…...
C++多线程同步总结
C多线程同步总结 关于C多线程同步 一、C11规范下的线程库 1、C11 线程库的基本用法:创建线程、分离线程 #include<iostream> #include<thread> #include<windows.h> using namespace std; void threadProc() {cout<<"this is in t…...
【机器学习】基于CNN-RNN模型的验证码图片识别
1. 引言 1.1. OCR技术研究的背景 1.1.1. OCR技术能够提升互联网体验 随着互联网应用的广泛普及,用户在日常操作中频繁遇到需要输入验证码的场景,无论是在登录、注册、支付还是其他敏感操作中,验证码都扮演着重要角色来确保安全性。然而&am…...
一文读懂Samtec分离式线缆组件选型 | 快速攻略
【摘要/前言】 2023年,全球线缆组件市场规模大致在2100多亿美元。汽车和电信行业是线缆组件最大的两个市场,中国和北美是最大的两个制造地区。有趣的是,特定应用(即定制)和矩形组件是两个最大的产品组。 【Samtec产品…...
批量申请SSL证书如何做到既方便成本又最低
假如您手头拥有1千个域名,并且打算为每一个域名搭建网站,那么在当前的网络环境下,您必须确保这些网站通过https的方式提供服务。这意味着,您将为每一个域名申请SSL证书,以确保网站数据传输的安全性和可信度。那么&…...
Python 设计模式(创建型)
文章目录 抽象工厂模式场景示例 单例模式场景实现方式 工厂方法模式场景示例 简单工厂模式场景示例 建造者模式场景示例 原型模式场景示例 抽象工厂模式 抽象工厂模式(Abstract Factory Pattern)是一种创建型设计模式,它提供了一种将一组相关…...
PyTorch 索引与切片-Tensor基本操作
以如下 tensor a 为例,展示常用的 indxing, slicing 及其他高阶操作 >>> a torch.rand(4,3,28,28) >>> a.shape torch.Size([4, 3, 28, 28])Indexing: 使用索引获取目标对象,[x,x,x,....] >>> a[0].shape torch.Size([3, 2…...
深入浅出 LangChain 与智能 Agent:构建下一代 AI 助手
我们小时候都玩过乐高积木。通过堆砌各种颜色和形状的积木,我们可以构建出城堡、飞机、甚至整个城市。现在,想象一下如果有一个数字世界的乐高,我们可以用这样的“积木”来构建智能程序,这些程序能够阅读、理解和撰写文本…...
scss是什么安装使⽤的步骤
当谈到SCSS时,我们首先需要了解它是什么。SCSS,也称为Sassy CSS,是Sass(Syntactically Awesome Stylesheets)的一种语法,它是CSS的预处理器,允许你使用变量、嵌套规则、混合(mixin&a…...
Pspark从hive读数据写到Pgsql数据库
前提条件 要使用PySpark从Hive读取数据并写入到PostgreSQL数据库,你需要确保以下几点: 你的PySpark环境已经配置好,并且能够连接到你的Hive数据。 PostgreSQL JDBC驱动程序已经添加到你的PySpark环境中。 你已经在PostgreSQL中创建好了相应…...
Pixi.js学习 (六)数组
目录 前言 一、数组 1.1 定义数组 1.2 数组存取与删除 1.3 使用数组统一操作敌机 二、实战 例题一:使用数组统一操作敌机 例题一代码: 总结 前言 为了提高作者的代码编辑水品,作者在使用博客的时候使用的集成工具为 HBuilderX。 下文所有截…...
操作系统复习-Linux的文件系统
文件系统概述 FAT FAT(File Allocation Table)FAT16、FAT32等,微软Dos/Windows使用的文件系统使用一张表保存盘块的信息 NTFS NTFS (New Technology File System)WindowsNT环境的文件系统NTFS对FAT进行了改进,取代了日的文件系统 EXT EXT(Extended…...
代码随想录算法训练营第三十六天| 860.柠檬水找零、 406.根据身高重建队列、 452. 用最少数量的箭引爆气球
LeetCode 860.柠檬水找零 题目链接:https://leetcode.cn/problems/lemonade-change/description/ 文章链接:https://programmercarl.com/0860.%E6%9F%A0%E6%AA%AC%E6%B0%B4%E6%89%BE%E9%9B%B6.html 思路 贪心算法:遇见20的时候有两种找零的…...
如何在C#中实现多线程
在C#中实现多线程有多种方式,包括使用System.Threading.Thread类、System.Threading.Tasks.Task类、System.Threading.Tasks.Parallel类以及异步编程模型(async和await)。下面我将为你展示每种方法的基本用法。 1. 使用System.Threading.Thread类 using System; using Syst…...
【LLM】快速了解Dify 0.6.10的核心功能:知识库检索、Agent创建和工作流编排(二)
【LLM】快速了解Dify 0.6.10的核心功能:知识库检索、Agent创建和工作流编排(二) 文章目录 【LLM】快速了解Dify 0.6.10的核心功能:知识库检索、Agent创建和工作流编排(二)一、创建一个简单的聊天助手&#…...
【介绍下Pandas,什么是Pandas?】
🌈个人主页: 程序员不想敲代码啊 🏆CSDN优质创作者,CSDN实力新星,CSDN博客专家 👍点赞⭐评论⭐收藏 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共…...
linux系统安装anaconda,并通过java程序调用python程序
虚拟环境准备 首先准备一块空的分区,安装anaconda至少要20g以上才能执行简单程序,这里准备20G的磁盘空间 创建分区,执行以下步骤,之后执行reboot重启 fdisk /dev/sda p n 回车 回车 w查看当前系统创建的分区,我这里是名为sda3的…...
Stable diffusion的SDXL模型,针不错!(含实操)
与之前的SD1.5大模型不同,这次的SDXL在架构上采用了“两步走”的生图方式: 以往SD1.5大模型,生成步骤为 Prompt → Base → Image,比较简单直接;而这次的SDXL大模型则是在中间加了一步 Refiner。Refiner的作用是什么呢…...
wordpress轻量免费主题
WordPress建站公司 适合提供WordPress建站服务的公司或个体(个人)工作室使用的WordPress建站公司主题模板。 https://www.jianzhanpress.com/?p545 首屏大图红色简洁wordpress主题 首屏大图红色简洁wordpress主题,非常地高端大气上档次,可用于多个行…...
Go AfterFunc 不触发
前言 函数原型为: func AfterFunc(d Duration, f func()) *TimerGo 的 time.AfterFunc 的作用是等待指定的时间间隔,然后在它自己的 goroutine 中调用 f。 现在有一个问题,我明明调用了 AfterFunc,但是它还没调用我指定的函数&…...
小程序视图渲染数据和部分事件的绑定
今天依旧使用这个目录进行教学 数据的渲染 在 index.js的 page中定义一个data对象结构是这样的 Page({data:{name:张三} }) 在index.wxml 中 利用模板语法进行渲染 <view >{{name}}</view> 注意这个模板里边不能使用js的方法 要循环渲染数组,如 在…...
“探索AIGC市场:腾讯元宝APP加入竞争,大模型产品的未来走向与个人选择“
文章目录 每日一句正能量前言使用体验分享独特优势和倾向选择字节豆包百度文心一言阿里通义千问腾讯元宝个人倾向选择结论 未来发展方向技术创新可持续可拓展性用户体验应用场景政府赋能数据安全与隐私保护伦理与社会责任国际合作与竞争结论 后记 每日一句正能量 不管现在有多么…...
node设置镜像源详细教程
在Node.js环境中,你可以通过设置npm或yarn的镜像源来加速依赖包的下载。以下是如何设置npm和yarn的镜像源的详细步骤: 使用npm设置镜像源 临时设置镜像源: 你可以在安装包时临时指定镜像源,例如: npm install package…...
四季变换,制氮机使用注意事项
随着四季的轮回变换,大自然展现着不同的风貌。对于制氮机而言,季节的变换同样会带来不同的使用挑战和注意事项。本文将为您揭示四季变换对制氮机使用的影响,帮助您更好地掌握制氮机的季节使用须知。 春季 温湿度变化:春季温湿度逐…...
如何实现办公终端安全
在网络安全日益严峻的当下,可信白名单作为一种高效的终端安全防护手段,正在逐渐受到业界的广泛关注和应用。本文将简要探讨可信白名单如何实现终端安全的原理、方法及其在实际应用中的优势与挑战。 首先,我们需要了解可信白名单的基本原理。可…...
苏州企业网站制作电话/东营网站建设哪家更好
2-3树真的是非常完美的平衡二叉树了,平衡二叉树(BST)的要点在于它的每条从根节点到叶子节点的路径都具有相同的长度,并且在数据结构中,对于查找和插入操作,最糟糕的情况下时间复杂度为O(log n)。与我在第四十五天讲的BST有什么不一…...
珠海市网站开发公司/营销网站建设制作
开发原因: 页面需要做页面统计,需要访问域。 访问域方法: using System.DirectoryServices.AccountManagement;命名空间负责管理。 MSDN:https://msdn.microsoft.com/en-us/library/system.directoryservices.accountmanagement%2…...
石家庄网站建设解决方案/域名归属查询
(From Swing)中的JFrame允许您设置菜单栏(使用JFrame.setMenuBar(mb)的MenuBar实例;).此菜单栏可以显示在不同的位置,具体取决于其运行的系统.如果运行应用程序的操作系统在屏幕顶部有一个菜单栏,则JFrame中设置的此菜单栏通常会出现在此菜单栏中.如果不支持,菜单栏将显示在框架…...
惠州网站建设制作/网站建设公司大型
一、链表题目 1、从尾到头打印链表 使用栈(也可以使用数组,逆序输出) /** * public class ListNode { * int val; * ListNode next null; * * ListNode(int val) { * this.val val; * } * …...
php网站制作教程/友情链接可以帮助店铺提高浏览量
官网:express 初始化:npm init -y安装:npm i -S express引包:var express require(express); app.js // 1. 引包 var express require(express);// 2. 创建你的服务器应用程序(也就是原来的 http.createServer&…...
一个ip做几个网站/网站申请流程
背景 首先我是个菜鸡,工资也低的一笔。 刚毕业时候在一家国企上班干 app 开发,干了快两年的时候,跳槽到了一家伪大厂干安全。投了不少简历都没有回音,只有这加伪大厂要我就来了。当时说好了会接触一些底层的东西,然而…...