当前位置: 首页 > news >正文

互联网全景消息(10)之Kafka深度剖析(中)

一、深入应用

1.1 SpringBoot集成Kafka

        引入对应的依赖。

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--swagger2增强,官方ui太low , 访问地址: /doc.html  --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>swagger-bootstrap-ui</artifactId><version>1.8.8</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency></dependencies>

        添加配置文件:

spring:application:name: demokafka:bootstrap-servers: 这里换成自己的kafka信息producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

        启动服务:

@SpringBootApplication
@RestController
public class Demo {public static void main(String[] args) {new SpringApplicationBuilder(Demo.class).run(args);}}

        启动信息如下:

1.2 消息发送 

 1.2.1 异步发送

        KafkaTemplate调用的send默认采用的是异步发送,如果需要同步发送获取发送结果,则需要调用get方法。 

@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}@KafkaListener(topics = {"test"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}

        效果如下:

        同步发送代码如下:

    @GetMapping("/kafka/sync/{msg}")public void sync(@PathVariable("msg") String msg) throws Exception {Message message = new Message();message.setMessage(msg);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);logger.info("send result:{}",result.getProducerRecord().value());}

1.2.2 序列化 

        序列化详解:

  • 前面我们用到的是Kafka自带的字符串序列化器,
    org.apache.kafka.common.serialization.StringDeserializer
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long等;
  • 这些序列化接器都实现了org.apache.kafka.common.serialization.Serializer 

        自定义序列化,自己实现序列化对应的接口即可,如下:

public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();}}

        然后在yaml配置自己的编辑器:

value-serializer: com.itheima.demo.config.MySerializer

         对应的我们在消费者消费消息的时候也需要按照我们自定义的方式进行解码,具体代码如下:

package com.itheima.demo.config;import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}

 1.2.3 分区策略

         分区策略决定了消息根据key投放到那个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定分区里面去;
  • 没有给定了分区号,给定数据的key值,通过key取上hashcode进行分区;
  • 既没有给定分区号,也没有给定key值,直接轮询进行分区;
  • 自定义分区策略,按照自定义需求选择分区号;

        生产者代码如下:

//测试分区发送
@RestController
public class PartitionProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;//    指定分区发送
//    不管你key是什么,到同一个分区@GetMapping("/kafka/partitionSend/{key}")public void setPartition(@PathVariable("key") String key) {kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");}//    指定key发送,不指定分区
//    根据key做hash,相同的key到同一个分区@GetMapping("/kafka/keysend/{key}")public void setKey(@PathVariable("key") String key) {kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");}// 什么也不指定@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}

        消费者代码如下:

//指定消费组消费
@Component
public class PartitionConsumer {private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);//分区消费@KafkaListener(topics = {"test"},topicPattern = "0")public void onMessage(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=0,message:[{}]", msg);}}@KafkaListener(topics = {"test"},topicPattern = "1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=1,message:[{}]", msg);}}
}

        1)测试默认分区策略,发送什么也不指定的消息

        可以发现发送的是同一条的数据,他是跟partition轮询发送的;

         2)测试指定分区

         3)按照key的hashcode来分区

1.3 消息消费 

 1.3.1 消费者分组

public class GroupConsumer {private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);//组1,消费者1@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-1 , message:{}", msg);}}//组1,消费者2@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-2 , message:{}", msg);}}//组2,只有一个消费者@KafkaListener(topics = {"test"},groupId = "group2")public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group2 , message:{}", msg);}}
}

         启动:

        需要注意的是,注意分区数与消费者数的搭配,如果(消费者数 > 分区数量),消费者将会出现闲置,浪费资源!

1.3.2 位移提交

        1)自动提交,我们在前面设置了以下两个选项,则kafka会延时设置自动提交:

        2)手动提交,有些时候我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复,如下我们配置手动提交:

@Configuration
public class MyOffsetConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 注意这里!!!设置手动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));// ack模式://          AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:////          RECORD//          每处理一条commit一次////          BATCH(默认)//          每次poll的时候批量提交一次,频率取决于每次poll的调用频率////          TIME//          每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)////          COUNT//          累积达到ackCount次的ack去commit////          COUNT_TIME//          ackTime或ackCount哪个条件先满足,就commit////          MANUAL//          listener负责ack,但是背后也是批量上去////          MANUAL_IMMEDIATE//          listner负责ack,每调用一次,就立即commitfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

相关文章:

互联网全景消息(10)之Kafka深度剖析(中)

一、深入应用 1.1 SpringBoot集成Kafka 引入对应的依赖。 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupI…...

Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动同步

Oracle Dataguard&#xff08;主库为双节点集群&#xff09;配置详解&#xff08;5&#xff09;&#xff1a;将主库复制到备库并启动同步 目录 Oracle Dataguard&#xff08;主库为双节点集群&#xff09;配置详解&#xff08;5&#xff09;&#xff1a;将主库复制到备库并启动…...

pytorch小记(一):pytorch矩阵乘法:torch.matmul(x, y)

pytorch小记&#xff08;一&#xff09;&#xff1a;pytorch矩阵乘法&#xff1a;torch.matmul&#xff08;x, y&#xff09;/ x y 代码代码 1&#xff1a;torch.matmul(x, y)输入张量&#xff1a;计算逻辑&#xff1a;输出结果&#xff1a; 代码 2&#xff1a;y y.view(4,1)…...

PyTorch环境配置常见报错的解决办法

目标 小白在最基础的环境配置里一般都会出现许多问题。 这里把一些常见的问题分享出来。希望可以节省大家一些时间。 最终目标是可以在cmd虚拟环境里进入jupyter notebook&#xff0c;new的时候有对应的环境&#xff0c;并且可以跑通所有的import code。 第一步&#xff1a;…...

罗永浩再创业,这次盯上了 AI?

罗永浩&#xff0c;1972年7月9日生于中国延边朝鲜族自治州的一个军人家庭&#xff0c;是一名朝鲜族人&#xff1b;早年在新东方授课&#xff0c;2004年当选 “网络十大红人” &#xff1b;2006年8月1日&#xff0c;罗永浩创办牛博网&#xff1b;2008年5月&#xff0c;罗永浩注册…...

VUE3 provide 和 inject,跨越多层级组件传递数据

provide 和 inject 是 Vue 3 提供的 API&#xff0c;主要用于实现祖先组件与后代组件之间的依赖注入。它们可以让你在组件树中&#xff0c;跨越多层组件传递数据&#xff0c;而不需要通过 props 或事件的方式逐层传递。这个机制主要用于状态共享、插件系统或某些跨层级的功能。…...

git打补丁

1、应用场景 跨仓库升级 开发项目B使用的是开源项目A。开源项目A发现漏洞&#xff0c;作者进行了修复&#xff0c;我们可以通过使用git补丁的方式&#xff0c;将作者修改的内容复制到我 们的项目B中。 2、TortoiseGit方式 源仓库 格式化补丁 根据提交数量&#xff0c;生成…...

机械燃油车知识图谱、知识大纲、知识结构(持续更新...)

一、发动机 曲柄连杆机构 配气机构 点火系统 起动系统 燃油供给系统 润滑系统 冷却系统 二、底盘 &#xff08;一&#xff09;传动系统 1、离合器 2、变速器 3、万向传动装置 4、驱动桥 &#xff08;二&#xff09;行驶系统 1、车架 2、车桥 3、悬架 4、车轮 &a…...

Vue3学习总结

一、Vue 3 基础搭建与核心语法 1.创建 Vue 3 应用 在项目的入口文件 main.js 中&#xff0c;通过以下代码创建 Vue 3 应用实例&#xff1a; import { createApp } from vue; import App from ./App.vue;const app createApp(App); app.mount(#app); 这几行代码的作用是引入…...

Type-C双屏显示器方案

在数字化时代&#xff0c;高效的信息处理和视觉体验已成为我们日常生活和工作的关键需求。随着科技的进步&#xff0c;一款结合了便携性和高效视觉输出的设备——双屏便携屏&#xff0c;逐渐崭露头角&#xff0c;成为追求高效工作和娱乐体验人群的新宠。本文将深入探讨双屏便携…...

【读书与思考】焦虑与内耗

【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】【读书与思考】 导言 今天一个朋友和我说&#xff0c;最近比较焦虑和内耗&#xff0c;无心工作和学习&#xff0c;我问他你焦虑内耗的时候&#xff0c;时间主要花在哪了&#xff0c;他告诉我说主要花在看有关移…...

基于python的网页表格数据下载--转excel

基于 Python 的网页表格数据爬取与下载:以维基百科为例 目录 基于 Python 的网页表格数据爬取与下载:以维基百科为例1. 背景介绍2. 工具与环境3. 操作步骤1. 获取网页内容2. 定位表格元素3. 表格变身 Pandas DataFrame4. 检查数据,收工!5. 进阶玩法与优化6. 完整代码4. 结果…...

Vue.js开发入门:从零开始搭建你的第一个项目

前言 嘿&#xff0c;小伙伴们&#xff01;今天咱们来聊聊 Vue.js&#xff0c;一个超火的前端框架。如果你是编程小白&#xff0c;别怕&#xff0c;跟着我一步步来&#xff0c;保证你能轻松上手&#xff0c;搭建起属于自己的第一个 Vue 项目。Vue.js 可能听起来有点高大上&#…...

LS1046+XILINX XDMA PCIE调通

欢迎点赞收藏&#xff0c;欢迎私下讨论技术&#xff0c;分享技术 硬件平台 &#xff1a;NXP LS1046 XILINX FPGA 软件平台&#xff1a;LINUX 4.19.68 buildroot LS1046 PEX3 接 XILINX FPGA&#xff0c;linux使用designware的PCI主控制器。下载XILINX DMA驱动&#xff0c;解…...

HarmonyOS:@LocalBuilder装饰器: 维持组件父子关系

一、前言 当开发者使用Builder做引用数据传递时&#xff0c;会考虑组件的父子关系&#xff0c;使用了bind(this)之后&#xff0c;组件的父子关系和状态管理的父子关系并不一致。为了解决组件的父子关系和状态管理的父子关系保持一致的问题&#xff0c;引入LocalBuilder装饰器。…...

YOLOv10-1.1部分代码阅读笔记-downloads.py

downloads.py ultralytics\utils\downloads.py 目录 downloads.py 1.所需的库和模块 2.def is_url(url, checkFalse): 3.def delete_dsstore(path, files_to_delete(".DS_Store", "__MACOSX")): 4.def zip_directory(directory, compressTrue, ex…...

计算机图形学【绘制立方体和正六边形】

工具介绍 OpenGL&#xff1a;一个跨语言的图形API&#xff0c;用于渲染2D和3D图形。它提供了绘制图形所需的底层功能。 GLUT&#xff1a;OpenGL的一个工具库&#xff0c;简化了窗口创建、输入处理和其他与图形环境相关的任务。 使用的函数 1. glClear(GL_COLOR_BUFFER_BIT |…...

基于django中医药数据可视化平台(源码+lw+部署文档+讲解),源码可白嫖!

摘要 时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;中医药管理平台当然不能排除在外。中医药数据可视化平台是在实际应用和软件工程的开发原理之上&#xff0c;运用Python语言、ECharts技术、…...

kafka消费堆积问题探索

背景 我们的商城项目用PHP写的&#xff0c;原本写日志方案用的是PHP的方案&#xff0c;但是&#xff0c;这个方案导致资源消耗一直降不下来&#xff0c;使用了20个CPU。后面考虑使用通过kafka的方案写日志&#xff0c;商城中把产生的日志丢到kafka中&#xff0c;在以go写的项目…...

Vue.js 使用插槽(Slots)优化组件结构

Vue.js 使用插槽&#xff08;Slots&#xff09;优化组件结构 今天我们聊聊 Vue.js 的一个超实用功能——插槽&#xff08;Slots&#xff09;。插槽是 Vue 组件开发中的神器&#xff0c;用好它&#xff0c;你可以让组件变得更灵活、更可复用&#xff0c;还能写出优雅的代码结构…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

HTML 列表、表格、表单

1 列表标签 作用&#xff1a;布局内容排列整齐的区域 列表分类&#xff1a;无序列表、有序列表、定义列表。 例如&#xff1a; 1.1 无序列表 标签&#xff1a;ul 嵌套 li&#xff0c;ul是无序列表&#xff0c;li是列表条目。 注意事项&#xff1a; ul 标签里面只能包裹 li…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力

引言&#xff1a; 在人工智能快速发展的浪潮中&#xff0c;快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型&#xff08;LLM&#xff09;。该模型代表着该领域的重大突破&#xff0c;通过独特方式融合思考与非思考…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

ABAP设计模式之---“简单设计原则(Simple Design)”

“Simple Design”&#xff08;简单设计&#xff09;是软件开发中的一个重要理念&#xff0c;倡导以最简单的方式实现软件功能&#xff0c;以确保代码清晰易懂、易维护&#xff0c;并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计&#xff0c;遵循“让事情保…...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息&#xff0c;对客户进行统一管理&#xff0c;可以把所有客户信息录入系统&#xff0c;进行维护和统计功能。可通过文件的方式保存相关录入数据&#xff0c;对…...

springboot整合VUE之在线教育管理系统简介

可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生&#xff0c;小白用户&#xff0c;想学习知识的 有点基础&#xff0c;想要通过项…...

NPOI Excel用OLE对象的形式插入文件附件以及插入图片

static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...