消息队列 Kafka
Kafka
Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域
为什么使用消息队列MQ
在高并发环境下,同步请求来不及处理会发生堵塞,从而触发too many connection错误,引发雪崩效应。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多
我们使用消息队列,通过异步请求,缓解系统压力,消息队列经常应用于异步处理,流量削峰,应用解耦,消息通讯等场景
当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等
使用消息队列的好处
- 解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
- 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂了,加入队列的消息仍然可以在系统恢复后被处理
- 缓冲
有助于控制和优化数据流结果系统的速度,解决生产消息和消费消息的处理速度不一致的情况
- 灵活性,峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见
如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费
使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃
- 异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它
想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们
消息队列的两种模式
- 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到消息队列中,然后消息消费者从消息队列中取出并且消费消息
消息被消费以后,消息队列中不再有存储,所以消息消费者不可能消费到已经被消费的消息消息队列支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
- 发布/订阅模式(一对多,又叫观察者模式,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息和点对点方式不同,发布到 topic 的消息会被所有订阅者消费
发布/订阅模式是定义对象间一种一对多的依赖关系,使得每当一个对象(目对标象)的状态发生改变,则所有依赖于它的对象(观察者对象)都会得到通知并自动更新
Kafka 概述
基于 Zookeeper
Kafka 是最初由 Linkedin 公司开发,是一个分布式、支持分区的(partition)、多副本的(replicar 协调的分布式消息中间件系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于 hadoop 的批处理系统、低延迟的实时系统、Spark/Flink 流式处理引擎,nginx 访问日志,消息服务等等,用 scala 语言编写
Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目
Kafka 特性
- 高吞吐量、低延迟
Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。每个 topic 可以分多个 Partition,Consumer Group 对 Partition 进行消费操作,提高负载均衡能力和消费能力。
- 可扩展性
kafka 集群支持热扩展
- 持久性、可靠性
消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性
允许集群中节点失败(多副本情况下,若副本数量为 n,则允许 n-1 个节点失败)
- 高并发
支持数千个客户端同时读写
Kafka 系统架构
- Broker 服务器
一台 kafka 服务器就是一个 broker
一个集群由多个 broker 组成,一个 broker 可以容纳多个 topic
- Topic 主题
可以理解为一个队列,生产者和消费者面向的都是一个 topic
类似于数据库的表名或者 ES 的 index
物理上不同 topic 的消息分开存储
- Partition 分区
为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上
一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列
Kafka 只保证 partition 内的记录是有序的,而不保证 topic 中不同 partition 的顺序
每个 topic 至少有一个 partition,当生产者产生数据的时候,会根据分配策略选择分区
然后将消息追加到指定的分区的队列末尾
分区的原因
- 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了
- 可以提高并发,因为可以以Partition为单位读写了
基础架构
1、Replica
副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失
且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本
一个 leader 和若干个 follower
2、Leader
每个 partition 有多个副本,其中有且仅有一个作为 Leader
Leader 是当前负责数据的读写的 partition
3、Follower
Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower
Follower 与 Leader 保持数据同步。Follower 只负责备份,不负责数据的读写。
如果 Leader 故障,则从 Follower 中选举出一个新的 Leader
当 Follower 挂掉、卡住或者同步太慢,Leader 会把这个 Follower 从 ISR
(Leader 维护的一个和 Leader 保持同步的 Follower 集合) 列表中删除,重新创建一个 Follower
4、producer
生产者即数据的发布者,该角色将消息 push 发布到 Kafka 的 topic 中。
broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中
生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition
5、Consumer
消费者可以从 broker 中 pull 拉取数据。消费者可以消费多个 topic 中的数据
6、Consumer Group(CG)
消费者组,由多个 consumer 组成
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
可为每个消费者指定组名,若不指定组名则属于默认的组
将多个消费者集中到一起去处理某一个 Topic 的数据,可以更快的提高数据的消费能力。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,
防止数据被重复读取,消费者组之间互不影响
7、offset 偏移量
可以唯一的标识一条消息
偏移量决定读取数据的位置,不会有线程安全的问题
消费者通过偏移量来决定下次读取的消息(即消费位置)
消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 Kafka 的消息
某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制
消息最终是会还被删除的,默认生命周期为 1 周(7*24小时)
8、Zookeeper
Kafka 通过 Zookeeper 来存储集群的 meta 信息
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,
需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,
以便故障恢复后继续消费
部署 Kafka 集群
安装 Kafka
//官方下载地址:http://kafka.apache.org/downloads.html
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz//安装 Kafka
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka//修改配置文件
cd /usr/local/kafka/config/
cp server.properties{,.bak}vim server.properties
修改 Kafka 配置文件
broker.id=0
#21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置
broker.id=1、broker.id=2listeners=PLAINTEXT://192.168.10.17:9092
#31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3 #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400 #48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400 #51行,接收套接字的缓冲区大小
socket.request.max.bytes=104857600 #54行,请求套接字的缓冲区大小
log.dirs=/usr/local/kafka/logs #60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1 #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1 #69行,用来恢复和清理data下数据的线程数量log.retention.hours=168
#103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除log.segment.bytes=1073741824
#110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件zookeeper.connect=192.168.54.10:2181,192.168.154.20:2181,192.168.154.30:2181
#123行,配置连接Zookeeper集群地址
修改环境变量
//修改环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/binsource /etc/profile
配置 Zookeeper 启动脚本
//设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka//分别启动 Kafka
service kafka start
Kafka 命令行操作
创建topic
查看当前服务器中的所有 topic
kafka-topics.sh --list --zookeeper
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181
查看某个 topic 详情
kafka-topics.sh --describe --zookeeper
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181
发布消息
kafka-console-producer.sh --broker-list
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181 --topic test
消费消息
kafka-console-consumer.sh --bootstrap-server
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181
--topic test --from-beginning
--from-beginning:会把主题中以往所有的数据都读取出来
修改分区数
kafka-topics.sh --zookeeper
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181
--alter --topic test --partitions 6
删除 topic
kafka-topics.sh --delete --zookeeper
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181 --topic test
相关文章:

消息队列 Kafka
Kafka Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域 为什么使用消息队列MQ 在高并发环境下,同步请求来不及处理会发生堵塞,从而触发too many conne…...

抽象轻松的java-mybatis简单入门
第一步:用IDEA新建一个java包 第二步:在IDEA中添加数据库(ps:自己百度) 点击数据库 第二步,新建数据库 选择你使用的数据库 用户与密码根据自己的设置进行配置 为了更方便的查看数据库,可以像图…...

012-第二代硬件选型
第二代硬件选型 文章目录 第二代硬件选型项目介绍重新换平台缘由X86 && Arm 架构切换 ARM Linux 硬件选型系统确定Qt 版本确定总结一下 关键字: Qt、 Qml、 Arm、 X86、 linux 项目介绍 欢迎来到我们的 QML & C 项目!这个项目结合了 QM…...

Spring中的设计模式
目录 工厂模式 组合模式 适配器模式 代理模式 单例模式 观察者模式 模板方法模式 责任链模式 Spring有着非常优雅的设计,很多地方都遵循SOLID原则,里面的设计模式更是数不胜数大概有以下几种: 工厂模式 所谓的工厂模式,核…...

软考 系统架构设计师系列知识点之软件质量属性(1)
这个十一注定是一个不能放松、保持“紧”的十一。由于报名了全国计算机技术与软件专业技术资格(水平)考试,11月4号就要考试,因此8天长假绝不能荒废,必须要好好利用起来。现在将各个核心知识点一一进行提炼并做记录。 所…...

GPT系列论文解读:GPT-1
GPT系列 GPT(Generative Pre-trained Transformer)是一系列基于Transformer架构的预训练语言模型,由OpenAI开发。以下是GPT系列的主要模型: GPT:GPT-1是于2018年发布的第一个版本,它使用了12个Transformer…...

数学分析:含参变量的积分
同样很多收敛性的证明不是重点,但里面的知识还是需要适当掌握,知道中间的大致思考和解决路径即可。 本质还是极限的可交换性,求导可以换到积分里面去操作。 这里要注意变量的区别,首先积分的被积变量是x,但是函数的变量…...

关于一篇ElementUI之CUD+表单验证
目录 一.CUD增删改查简述 1.1.增删改功能实现 二.表单验证 前端所有代码: 好啦今天就分享到这了,希望能帮到你哦!!! 以下的代码基于我博客中的代码进行续写 : 关于ElementUI之动态树数据表格分页实例 一.CUD增删改…...

VUE模板编译的实现原理
前言 在Vue.js 2.0中,模板编译是通过将模板转换为渲染函数来实现的。渲染函数是一个函数,它返回虚拟DOM节点,用于渲染实际的DOM。Vue.js的模板编译过程可以分为以下几个步骤: 将模板解析为抽象语法树(AST)…...

基础算法之——【动态规划之路径问题】1
今天更新动态规划路径问题1,后续会继续更新其他有关动态规划的问题!动态规划的路径问题,顾名思义,就是和路径相关的问题。当然,我们是从最简单的找路径开始! 动态规划的使用方法: 1.确定状态并…...

三十三、【进阶】索引的分类
1、索引的分类 (1)总分类 主键索引、唯一索引、常规索引、全文索引 (2)InnoDB存储引擎中的索引分类 2、 索引的选取规则(InnoDB存储引擎) 如果存在主键,主键索引就是聚集索引; 如果不存在主键ÿ…...

VBox启动失败、Genymotion启动失败、Vagrant迁移
VBox启动失败、Genymotion启动失败、Vagrant迁移 2023.10.9 最新版本vbox7.0.10、Genymotion3.5.0 Vbox启动失败 1、查看日志 Error -610 in supR3HardenedMainInitRuntime! (enmWhat4) Failed to locate ‘vcruntime140.dll’ 日志信息查看方法->找到虚拟机所在位置->…...

一篇短小精悍的文章让你彻底明白KMP算法中next数组的原理
以后保持每日一更,由于兴趣较多,更新内容不限于数据结构,计算机组成原理,数论,拓扑学......,所谓:深度围绕职业发展,广度围绕兴趣爱好。往下看今日内容 一.什么是KMP算法 KMP&#x…...

CSS盒子定位的扩张
定位的扩展 绝对定位(固定定位)会完全压住盒子 浮动元素不会压住下面标准流的文字,而绝对定位或固定位会压住下面标准流的所有内容 如果一个盒子既有向左又有向右,则执行左,同理执行上 显示隐藏 display: none&…...

SpringBoot整合POI实现Excel文件读写操作
1.环境准备 1、导入sql脚本: create database if not exists springboot default charset utf8mb4;use springboot;create table if not exists user (id bigint(20) primary key auto_increment comment 主键id,username varchar(255) not null comment 用…...

从零开始的力扣刷题记录-第八十七天
力扣每日四题 129. 求根节点到叶节点数字之和-中等130. 被围绕的区域-中等437. 路径总和 III-中等376. 摆动序列-中等总结 129. 求根节点到叶节点数字之和-中等 题目描述: 给你一个二叉树的根节点 root ,树中每个节点都存放有一个 0 到 9 之间的数字。 …...

【1】c++设计模式——>UML类图的画法
UML介绍 UML:unified modeling language 统一建模语言 面向对象设计主要就是使用UML类图,类图用于描述系统中所包含的类以及他们之间的相互关系,帮助人们简化对系统的理解,他是系统分析和设计阶段的重要产物,也是系统编码和测试的…...

SAP UI5 指定 / 变更版本
SAP UI5 指定 / 变更版本 Currently, SAP Fiori tools support SAP Fiori elements and SAPUI5 freestyle projects with minimum SAPUI5 versions 1.65 or higher. In case there’s a need to test an existing projects with a lower SAPUI5 version, the following worka…...

SpringMVC中异常处理详解
单个控制器异常处理 // 添加ExceptionHandler,表示该方法是处理异常的方法,属性为处理的异常类ExceptionHandler({java.lang.NullPointerException.class,java.lang.ArithmeticException.class})public String exceptionHandle1(Exception ex, Model mo…...

PPT课件培训视频生成系统实现全自动化
前言 困扰全动自化的重要环节,AI语音合成功能,终于可以实现自动化流程,在此要感谢团队不懈的努力和韧性的精神! 实现原理 请参照我的文章《Craneoffice云PPT课件培训视频生成系统》 基本流程 演示视频 PPT全自动 总结 过去实…...

基于腾讯云的OTA远程升级
一、OTA OTA即over the air,是一种远程固件升级技术,它允许在设备已经部署在现场运行时通过网络远程更新其固件或软件。OTA技术有许多优点,比如我们手机系统有个地方做了优化,使用OTA技术我们就不用召回每部手机,直接通过云端就可…...

如何在VS2022中进行调试bug,调试的快捷键,debug与release之间有什么区别
什么是bug 在学习编程的过程中,应该都听说过bug吧,那么bug这个词究竟是怎么来的呢? 其实Bug的本意是“虫子”或者“昆虫”,在1947年9月9日,格蕾丝赫柏,一位为美国海军工作的电脑专家,也是最早…...

初识jmeter及简单使用
目录 1、打开页面: 2、添加线程组: 3、线程组中设置参数: 4、添加请求 5、添加一个http请求后,设置请求内容 6、添加察看结果树 7、执行,查看结果 一般步骤是:在测试计划下面新建一个线程组…...

Spring 在多线程环境下如何确保事务一致性
问题在现 如何解决异步执行 多线程环境下如何确保事务一致性 事务王国回顾 事务实现方式回顾 编程式事务 利用编程式事务解决问题 问题分析完了,那么如何解决问题呢? 小结 问题在现 我先把问题抛出来,大家就明白本文目的在于解决什…...

[Machine Learning] Learning with Noisy Data
文章目录 Probabilistic Perspective of NoiseBias and VarianceRobustness among Surrogate Loss FunctionsNMF Probabilistic Perspective of Noise 假设数据来源于一个确定的函数,叠加了高斯噪声。我们有: y h ( x ) ϵ y h(x) \epsilon yh(x)ϵ…...

C++中有哪些常用的标准库?
C中有许多常用的标准库,这些库提供了丰富的功能和工具,方便开发人员进行各种任务。以下是一些常见的C标准库: iostream:用于输入和输出操作,包括cin、cout和cerr等类和函数。algorithm:提供了许多常用的算…...

软考-信息安全工程师概述
本文为作者学习文章,按作者习惯写成,如有错误或需要追加内容请留言(不喜勿喷) 本文为追加文章,后期慢慢追加 2023年10月 信息考试大纲 通过本考试的合格人员能够掌握网络信息安全的基础知识和技术原理,…...

2023-2024年华为ICT网络赛道模拟题库
2023-2024年网络赛道模拟题库上线啦,全面覆盖网络,安全,vlan考点,都是带有解析 参赛对象及要求: 参赛对象:现有华为ICT学院及未来有意愿成为华为ICT学院的本科及高职院校在校学生。 参赛要求:…...

英特尔参与 CentOS Stream 项目
导读红帽官方发布公告欢迎英特尔参与进 CentOS Stream 项目,并表示 “这一举措不仅进一步深化了我们长期的合作关系,也构建在英特尔已经在 Fedora 项目中积极贡献的基础之上。” 目前,CentOS Stream 共包括以下特别兴趣小组(SIG&a…...