当前位置: 首页 > news >正文

【中间件】kafka

目录

  • 一、概述
  • 二、生产者
    • 1. 发送原理
    • 2. 生产者分区 Partition
      • 分区好处
      • 分区策略
    • 3. 生产者如何提高吞吐量
    • 4. 数据可靠性
      • ACK应答级别
      • 数据不丢失:ACK + ISR
      • 数据不重复:幂等性
      • 数据有序
  • 三、broker
    • 1. 工作流程
    • 2. 副本相关
    • 3. 底层存储
    • 4. 高效读写数据
  • 四、消费者
    • 1. 工作流程
    • 2. 分区分配和重平衡
    • 3. offset 位移

一、概述

  1. 定义:是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域

  2. 三大功能

    • 削峰: 高峰期的消息可以积压到消息队列中,随后平滑地处理完成,避免突发访问压力压垮系统
    • 解耦: 消息队列避免模块之间的相互调用,降低各个模块的耦合性,提高系统的可扩展性
    • 异步: 发送方把消息放在消息队列中,接收方无需立即处理,可以等待合适的时间处理
  3. 基础架构:

在这里插入图片描述

组件作用
Producer消息生产者,就是向 Kafka broker 发消息的客户端
Consumer消息消费者,向 Kafka broker 取消息的客户端
Consumer Group(CG)消费者组,由多个 consumer 组成。组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。消费者组是逻辑上的一个订阅者
Broker一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic
Topic消息主题(逻辑概念) ,生产者和消费者面向的都是一个 topic
Partition一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列
Replica副本。每个分区都有若干个副本,一个 Leader 和若干个Follower
Leader一组副本中的“主”,只有主和生产者消费者交互
Follower一组副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步
SegmentPartition 物理上被分成多个 Segment,每个 Segment 1个G
Zookeeper保存元信息,现已废除

二、生产者

1. 发送原理

在这里插入图片描述

涉及到了两个线程——main 线程和 Sender 线程

  • 在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给消息队列
  • 当消息队列内的消息达到一定大小,或者达到时间限制,会通知sender线程
  • Sender 线程不断从消息队列中拉取消息发送到 Kafka Broker
    • 可以选择是异步还是同步(同步就是sender等待收到broker的ack后,再去发送新消息)

2. 生产者分区 Partition

分区好处

  1. 便于合理使用存储资源,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  2. 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据

分区策略

在这里插入图片描述

生产者生产消息的时候:

  1. 指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0

  2. 没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值

    例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那么key1 对应的value1写入1号分区,key2对应的value2写入0号分区

  3. 既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

    例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)

  4. 自定义分区:定义类实现 Partitioner 接口,重写 partition()方法,方法返回分区号

3. 生产者如何提高吞吐量

  • 提高main线程创建的消息队列大小:缓存大一点
  • 提高batchsize大小:多等一些数据再传
  • 调整等待时间:双刃剑,太短一次传的消息太少,太长有延迟
  • 对传输数据做压缩:能传更多的消息

4. 数据可靠性

ACK应答级别

0:生产者发送过来的数据,不需要等数据落盘应答
1:生产者发送过来的数据,Leader收到数据后应答
-1:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答

单纯用0或1都会导致丢数,而单纯用-1会导致多数重复

数据不丢失:ACK + ISR

ACK = -1 + 副本 >= 2 + ISR最小副本数量 >= 2

数据不重复:幂等性

  1. 数据语义

    • 最多一次:ACK = 0
    • 至少一次:ACK = -1 + 副本 >= 2 + ISR最小副本数量 >= 2
    • 精确一次:幂等性 + 至少一次
  2. 重复数据的判断标准:具有 <PID, Partition, SeqNumber> 相同主键的消息提交时,Broker只会持久化一条

    • PID是Kafka每次重启都会分配一个新的Producer ID
    • Partition 表示分区号
    • Sequence Number是单调自增的

    所以幂等性只能保证的是在单分区单会话内不重复

    全局不重复需要开启事务

数据有序

  • 生产者有序发送消息
    • 一个一个消息的发:一个 Topic 下的同一个 Partition 一定是有序的
    • 不是一个一个发:需要开启幂等性且一次发不能超过5个,这样如果乱序到达的话,broker会自己排序
  • 消费者有序消费
    • 一个分区只让一个消费者来消费,即能保证

三、broker

1. 工作流程

  1. 生产者将消息发送给分区 Leader
  2. Leader 将消息写入本地文件
  3. 对应的 Follower 从 Leader 拉取消息并写入本地文件
  4. Follower 向 Leader 发送 ACK
  5. Leader向生产者回复
  • leader的维护由保存在paitition内的Controller来做,Controller也是分布式的,他会监听brokers节点的变化,在节点挂掉的时候辅助选举新leader,选举规则:在ids列表内按顺序选择

2. 副本相关

  1. 定义:每个partition都有多份,叫副本,来提高可靠性

    • 副本分为Leader和Follower,只有Leader和生产者和消费者交互
    • 副本AR = ISR + OSR
  2. Leader 和 Follower 故障处理

    • Follower故障:被踢出ISR,恢复后再加入ISR
    • Leader故障:从ISR中选出一个新的Leader,恢复后去除旧数据,和新Leader进行同步(只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复)
  3. 副本分区分配

    尽可能的把Leader散开,否则会对某一个broker产生很大的压力

3. 底层存储

partition下进一步将数据分为Segment,每个1G

  • Segment分为

    • log:存具体数据,以追加的方式
    • index:索引,稀疏索引,4KB记一条索引
    • 时间戳:过期删除用的
      在这里插入图片描述
  • 删除方法

    • 删除:直接删除
    • 压缩:相同key只保留最新的

4. 高效读写数据

  • Kafka 本身是分布式集群,可以采用分区技术,并行度高
  • 读数据采用稀疏索引,可以快速定位要消费的数据
  • 顺序写磁盘
  • 页缓存 + 零拷贝技术
    • 页缓存PageCache:重度依赖底层操作系统提供的PageCache功能,写的时候直接交给页缓存,读的时候先读页缓存,没有再读磁盘
    • 零拷贝:消息从磁盘里读出来之后不走应用层代码,直接走网卡,不占用CPU
      在这里插入图片描述

四、消费者

1. 工作流程

在这里插入图片描述

  • 消费者可以分组,一个分区只能由组内的一个消费者消费,消费者组是逻辑上的一个订阅者
  • 用offset标识消费的位置,由消费者提交,保存在主题内,由coordinator管理,这也是个分布式

在这里插入图片描述
主要就是从broker里拉取数据

2. 分区分配和重平衡

分区分配问题:一个consumer group中有多个consumer组成,一个 topic有多个partition组成,问题是,到底由哪个consumer来消费哪个partition的数据

  • 分区分配策略

    • Range:对每个 topic 而言, partitions数/consumer数来决定,会产生数据倾斜
    • RoundRobin:针对集群中所有Topic而言,所有的 partition轮询分配
    • Sticky:尽量均匀地分配分区,根据上次的分配结果尽量减少变动

3. offset 位移

  1. 位移保存方式:存在__consumer_offsets里,采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值

  2. 位移的提交方式

    • 自动提交(可能造成重复消费)

      重复消费:已经消费了数据,但是 offset 没提交
      比如每隔5s,下一轮过了2s挂了,会重复消费这2s的内容

    • 手动提交(可能造成漏消费)

      漏消费:先提交 offset 后消费,有可能会造成数据的漏消费
      比如消费者取了,还在内存里,刚提交还没来得及落盘就挂了,没落盘的就漏消费了

    不管是重复消费还是漏消费,都是提交和落盘的间隙出现宕机的情况,可以开启事务,把这两个动作原子绑定

相关文章:

【中间件】kafka

目录 一、概述二、生产者1. 发送原理2. 生产者分区 Partition分区好处分区策略 3. 生产者如何提高吞吐量4. 数据可靠性ACK应答级别数据不丢失&#xff1a;ACK ISR数据不重复&#xff1a;幂等性数据有序 三、broker1. 工作流程2. 副本相关3. 底层存储4. 高效读写数据 四、消费者…...

Html5版音乐游戏制作及分享(H5音乐游戏)

这里实现了Html5版的音乐游戏的核心玩法。 游戏的制作借鉴了&#xff0c;很多经典的音乐游戏玩法&#xff0c;通过简单的代码将音乐的节奏与操作相结合。 可以通过手机进行游戏&#xff0c;准确点击下落时的目标&#xff0c;进行得分。 点击试玩 游戏内的下落数据是通过手打记…...

Python基于Pytorch Transformer实现对iris鸢尾花的分类预测,分别使用CPU和GPU训练

1、鸢尾花数据iris.csv iris数据集是机器学习中一个经典的数据集&#xff0c;由英国统计学家Ronald Fisher在1936年收集整理而成。该数据集包含了3种不同品种的鸢尾花&#xff08;Iris Setosa&#xff0c;Iris Versicolour&#xff0c;Iris Virginica&#xff09;各50个样本&am…...

【运动规划算法项目实战】如何实现简单的状态机

文章目录 简介一、状态机1.1 简介1.2 原理介绍1.3 使用方法二、行为树2.1 简介2.2 原理介绍2.3 使用方法三、如何实现一个简单的状态机四、其他的决策模型简介四、总结简介 在机器人算法中,状态机和行为树是常用的两种设计模式。它们能够帮助机器人在复杂的环境中更好地执行任…...

JavaScript实现用while语句计算1+n的和的代码

以下为用while语句计算1n的和实现结果的代码和运行截图 目录 前言 一、实现用while语句计算1n的和 1.1运行流程及思想 1.2代码段 1.3 JavaScript语句代码 1.4运行截图 【附加】用while计算110的和 1.1代码段 1.3 运行截图 前言 1.若有选择&#xff0c;您可以在目录里…...

Three.js教程:顶点索引复用顶点数据

推荐&#xff1a;将 NSDT场景编辑器 加入你3D工具链 其他工具系列&#xff1a; NSDT简石数字孪生 顶点索引复用顶点数据 通过几何体BufferGeometry的顶点索引属性BufferGeometry.index可以设置几何体顶点索引数据&#xff0c;如果你有WebGL基础很容易理解顶点索引的概念&#…...

机器学习中的数学——学习曲线如何区别欠拟合与过拟合

通过这篇博客&#xff0c;你将清晰的明白什么是如何区别欠拟合与过拟合。这个专栏名为白话机器学习中数学学习笔记&#xff0c;主要是用来分享一下我在 机器学习中的学习笔记及一些感悟&#xff0c;也希望对你的学习有帮助哦&#xff01;感兴趣的小伙伴欢迎私信或者评论区留言&…...

【Java】类和对象,封装

目录 1.类和对象的定义 2.关键字new 3.this引用 4.对象的构造及初始化 5.封装 //包的概念 //如何访问 6.static成员 7.代码块 8.对象的打印 1.类和对象的定义 对象&#xff1a;Java中一切皆对象。 类&#xff1a;一般情况下一个Java文件一个类&#xff0c;每一个类…...

Python小姿势 - 知识点:

知识点&#xff1a; Python的字符串格式化 标题&#xff1a; Python字符串格式化实例解析 顺便介绍一下我的另一篇专栏&#xff0c; 《100天精通Python - 快速入门到黑科技》专栏&#xff0c;是由 CSDN 内容合伙人丨全站排名 Top 4 的硬核博主 不吃西红柿 倾力打造。 基础知识…...

【Python】【进阶篇】9、Django路由系统精讲

目录 Django路由系统精讲1. Django 路由系统应用1&#xff09;配置第一个URL实现页面访问2&#xff09;正则与正则分组使用3&#xff09;正则捕获组使用 2. path()与re_path() Django路由系统精讲 在《URL是什么》一节中&#xff0c;我们对 URL 有了基本的认识&#xff0c;在本…...

在Linux操作系统上部署wgcloud监控

1.wgcloud监控介绍 1.1 介绍 ​ 这是一款开源的主机监控系统&#xff0c;可以支持主机各种指标监测&#xff08;cpu使用率&#xff0c;cpu温度&#xff0c;内存使用率&#xff0c;磁盘容量空间&#xff0c;磁盘IO&#xff0c;硬盘SMART健康状态&#xff0c;系统负载&#xff…...

浙大的SAMTrack,自动分割和跟踪视频中的任何内容

Meta发布的SAM之后&#xff0c;Meta的Segment Anything模型(可以分割任何对象)体验过感觉很棒&#xff0c;既然能够在图片上面使用&#xff0c;那肯定能够在视频中应用&#xff0c;毕竟视频就是一帧一帧的图片的组合。 果不其然浙江大学就发布了这个SAMTrack&#xff0c;就是在…...

Spring第三方资源配置管理

Spring第三方资源配置管理 1. 管理DataSource连接池对象1.1 管理Druid连接池【重点】1.2 管理c3p0连接池 2. 加载properties属性文件【重点】2.1 基本用法2.2 配置不加载系统属性2.3 加载properties文件写法 说明&#xff1a;以管理DataSource连接池对象为例讲解第三方资源配置…...

网络编程代码实例:多进程版

文章目录 前言代码仓库内容代码&#xff08;有详细注释&#xff09;server.cclient.cMakefile 结果总结参考资料作者的话 前言 网络编程代码实例&#xff1a;多进程版。 代码仓库 yezhening/Environment-and-network-programming-examples: 环境和网络编程实例 (github.com)E…...

一家传统制造企业的上云之旅,怎样成为了数字化转型典范?

众所周知&#xff0c;中国是一个制造业大国。在想要上云以及正在上云的企业当中&#xff0c;传统制造企业也占据了相当大的比例。 那么这类企业在实施数字化转型的时候&#xff0c;应该如何着手&#xff1f;我们不妨来看看一家传统制造企业的现身说法。 国茂股份的数字化转型诉…...

C++入门(C++)

目录 命名空间 1、命名空间的定义 2、命名空间的使用 1、加名空间名称和作用域限定符 2、使用using namespace 命名空间引入 3、使用using将命名空间中某个成员引入 C的输入与输出 缺省参数 1、缺省参数的概念 2、缺省参数分类 1、全缺省参数 2、半缺省参数 函数重载 1、函数重…...

Linux 利用网络同步时间

yum -y install ntp ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime ntpdate ntp1.aliyun.com 创建加入crontab echo "*/20 * * * * /usr/sbin/ntpdate -u ntp.api.bz >/dev/null &" >> /var/spool/cron/rootntp常用服务器 中国国家授…...

炫技亮点 SpringBoot下消灭If Else,让你的代码更亮眼

文章目录 背景案例第一阶段 萌芽第二阶段 屎上雕花第三阶段 策略工厂模式重构第四阶段 优化 总结 背景 大家好&#xff0c;我是大表哥laker。今天&#xff0c;我要和大家分享一篇关于如何使用策略模式和工厂模式消除If Else耦合问题的文章。这个方法能够让你的代码更加优美、简…...

免费ChatGPT接入网站-网站加入CHATGPT自动生成关键词文章排名

网站怎么接入chatGPT 要将ChatGPT集成到您的网站中&#xff0c;需要进行以下步骤&#xff1a; 注册一个OpenAI账户&#xff1a;访问OpenAI网站并创建一个账户。这将提供访问API密钥所需的身份验证凭据。 获取API密钥&#xff1a;在您的OpenAI控制台中&#xff0c;您可以找到您…...

PostgreSQL的数据类型有哪些?

数据类型分类 分类名称说明与其他数据库的对比布尔类型PG支持SQL标准的boolean数据类型与MySQL中的bool、boolean类型相同&#xff0c;占用1字节存储空间数值类型整数类型有2字节的smallint、4字节的int、8字节的bigint&#xff1b;精确类型的小数有numeric&#xff1b;非精确…...

Android 9.0 系统开机自启动第三方app

1.前言 在9.0的系统rom定制化开发中,在framework定制话的功能开发中,在内置的app中,有时候在系统开机以后会要求启动第三方app的功能,所以这就需要在监听开机完成的广播,然后在启动第三方app就可以了,接下来就需要在系统类中监听开机完成的广播流程来实现功能 2.系统开…...

一些想法:关于学习一门新的编程语言

很多人可能长期使用一种编程语言&#xff0c;并感到很有成就感和舒适感&#xff0c;发现学习一种新的编程语言的想法令人生畏而痛苦。或者可能知道并使用多种编程语言&#xff0c;但有一段时间没有学习新的语言。更或者可能只是好奇别人是如何潜心学习新的编程语言并迅速取得成…...

线性代数——矩阵

文章目录 版权声明基础概念矩阵的运算矩阵的加法数与矩阵相乘矩阵的乘法矩阵的转置 矩阵和方程组方阵和行列式伴随矩阵可逆矩阵分块矩阵矩阵的初等变换初等矩阵等价矩阵行阶梯矩阵行最简矩阵初等变换在矩阵求解中的应用 矩阵的秩 版权声明 本文大部分内容皆来自李永乐老师考研…...

taro之小程序持续集成

小程序持续集成 Taro 小程序端构建后支持 CI&#xff08;持续集成&#xff09;的插件 tarojs/plugin-mini-ci。 目前已支持&#xff08;企业&#xff09;微信、京东、字节、支付宝、钉钉、百度小程序 功能包括&#xff1a; 构建完毕后自动唤起小程序开发者工具并打开项目上传…...

Ceph入门到精通-Ceph 编排器简介

第 1 章 Ceph 编排器简介 作为存储管理员&#xff0c;您可以将 Ceph 编排器与 Cephadm 实用程序搭配使用&#xff0c;能够发现设备并在 Red Hat Ceph Storage 集群中创建服务。 1.1. 使用 Ceph Orchestrator Red Hat Ceph Storage Orchestrators 是经理模块&#xff0c;主要…...

【Feign扩展】OpenFeign日志打印Http请求参数和响应数据

SpringBoot使用log4j2 在Spring Boot中所有的starter 都是基于spring-boot-starter-logging的&#xff0c;默认使用Logback。使用Log4j2的话&#xff0c;你需要排除 spring-boot-starter-logging 的依赖&#xff0c;并添加 spring-boot-starter-log4j2的依赖。 配置依赖 <…...

MongoDB (零) 安装和简单使用

1.安装(Ubuntu) 1.1.安装gnupg sudo apt-get install gnupg1.2.获取GPG Key curl -fsSL https://pgp.mongodb.com/server-6.0.asc | \sudo gpg -o /usr/share/keyrings/mongodb-server-6.0.gpg \--dearmor1.3.创建本地文件 echo "deb [ archamd64,arm64 signed-by/usr…...

Java中的异常是什么?

Java中的异常是指在程序运行时发生的错误或异常情况。这些异常可能会导致程序崩溃或无法正确执行&#xff0c;因此需要在代码中进行处理。Java中的异常机制可以帮助程序员捕获并处理异常&#xff0c;从而保证程序的稳定性和可靠性。 Java中的异常分为两种类型&#xff1a;受检…...

微短剧“小阳春”,“爱优腾芒”抢滩登陆?

降本增效一整年&#xff0c;长视频平台们似乎扭转了市场对于它们“烧钱”的印象。 爱奇艺宣布2022全年盈利&#xff0c;腾讯视频宣布从去年10月起开始盈利&#xff0c;视频平台们结束了一场“无限战争”。 与此同时&#xff0c;随着短视频平台的崛起&#xff0c;视频内容的形…...

C++菱形继承(再剖析)

当子类对象给父类对象的时候&#xff0c;怎么找公共的虚基类&#xff08;A&#xff09; 就得通过偏移量来算虚基类的位置 ---------------------------------------------------------------------------------------------------------------------------- 我们来分析一下B…...