【企业级分布式系统】 Kafka集群
文章目录
- Kafka
- Kafka 概述
- 使用消息队列的好处
- Kafka 的特性
- Kafka 系统架构
- Kafka 的应用场景
- Kafka 的优缺点
- Kafka 集群部署
- 下载安装包
- 安装 Kafka
- Kafka 命令行操作
- Kafka 架构深入
- Filebeat+Kafka+ELK 部署指南~
- 部署 Zookeeper+Kafka 集群
- 部署 Filebeat
- 部署 ELK(Logstash 配置)
- Kibana 配置与查看日志
Kafka
Kafka 概述
Kafka 是一个分布式、基于发布/订阅模式的消息队列系统,由 Linkedin 开发并贡献给 Apache 基金会,现已成为顶级开源项目。它主要应用于大数据领域的实时计算以及日志收集,具有高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性和高并发的特性。
使用消息队列的好处
- 解耦:允许独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 可恢复性:系统的一部分组件失效时,不会影响到整个系统。
- 缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 灵活性 & 峰值处理能力:使用消息队列能够使关键组件顶住突发的访问压力。
- 异步通信:允许用户把一个消息放入队列,但并不立即处理它。
Kafka 的特性
- 高吞吐量、低延迟:每秒可以处理几十万条消息,延迟最低只有几毫秒。
- 可扩展性:Kafka 集群支持热扩展。
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
- 容错性:允许集群中节点失败。
- 高并发:支持数千个客户端同时读写。
Kafka 系统架构
- Broker:
- 一台 Kafka 服务器就是一个 broker。
- 一个集群由多个 broker 组成。
- 一个 broker 可以容纳多个 topic。
- Topic:
- 可以理解为一个队列,生产者和消费者面向的都是一个 topic。
- 类似于数据库的表名或者 ES 的 index。
- 物理上不同 topic 的消息分开存储。
- Partition:
- 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker 上。
- 一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列。
- Kafka 只保证 partition 内的记录是有序的。
- 数据路由规则:指定了 partition 则直接使用;未指定但指定 key,通过对 key 的 value 进行 hash 取模选出一个 partition;都未指定,使用轮询选出一个 partition。
- 每个 partition 中的数据使用多个 segment 文件存储。
- Replica:
- 副本机制,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作。
- 一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
- Leader:
- 当前负责数据的读写的 partition。
- Follower:
- 跟随 Leader,所有写请求都通过 Leader 路由。
- 数据变更会广播给所有 Follower,与 Leader 保持数据同步。
- 只负责备份,不负责数据的读写。
- 如果 Leader 故障,则从 Follower 中选举出一个新的 Leader。
- Producer:
- 数据的发布者,将消息 push 发布到 Kafka 的 topic 中。
- Consumer:
- 从 broker 中 pull 拉取数据。
- 可以消费多个 topic 中的数据。
- Consumer Group(CG):
- 由多个 consumer 组成。
- 所有的消费者都属于某个消费者组。
- 消费者组内每个消费者负责消费不同分区的数据,防止数据被重复读取。
- 消费者组之间互不影响。
- Offset 偏移量:
- 唯一标识一条消息。
- 决定读取数据的位置。
- 消费者通过偏移量来决定下次读取的消息。
- 消息被消费之后,并不被马上删除。
- 某一个业务也可以通过修改偏移量达到重新读取消息的目的。
- 消息默认生命周期为 1 周(7*24小时)。
- Zookeeper:
- 在 Kafka 中,ZooKeeper 负责维护 Kafka 集群的一些元数据和 leader 选举等协调工作。
- 元数据存储:存储主题、分区、Broker 节点等信息。
- Leader 选举:参与领导者选举的过程。
- 健康监控:进行集群的健康监控。
- 消费者组协调:协调和追踪消费者的位置信息。
Kafka 的应用场景
- 日志收集:Kafka 可以被用作日志收集系统,将各种应用的日志数据集中收集起来,方便后续的处理和分析。
- 实时计算:Kafka 可以作为实时计算系统的数据源,如 Spark Streaming、Flink 等,用于实时数据处理和分析。
- 消息通讯:Kafka 可以作为消息通讯系统,实现不同系统之间的数据交换和通信。
- 流量削峰:在高并发场景下,Kafka 可以作为流量削峰的工具,将大量的请求缓存到 Kafka 中,然后按照一定的速率进行处理,避免系统崩溃。
Kafka 的优缺点
优点:
- 高吞吐量、低延迟。
- 可扩展性强。
- 持久性、可靠性高。
- 支持多副本、容错性强。
- 社区活跃、生态丰富。
缺点:
- 依赖 Zookeeper,如果 Zookeeper 出现故障,会影响 Kafka 的正常运行。
- 数据一致性方面,虽然 Kafka 提供了多副本机制,但是在极端情况下,仍然可能存在数据丢失的风险。
- 消息顺序问题,如果生产者发送消息到多个分区,那么消费者消费时可能无法保证消息的顺序性。
Kafka 集群部署
下载安装包
- 官方下载地址:Apache Kafka 下载页面
- 步骤:
- 切换到
/opt
目录。 - 使用
wget
从清华大学镜像站下载 Kafka 2.7.1 版本。
- 切换到
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz
安装 Kafka
- 步骤:
- 解压 Kafka 压缩包。
- 将解压后的目录移动到
/usr/local/kafka
。 - 备份并编辑
server.properties
文件,配置 Kafka。
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafkacd /usr/local/kafka/config/
cp server.properties{,.bak}
vim server.properties
-
关键配置项:
broker.id
:每个 Kafka 实例的唯一标识,集群中每个实例的broker.id
必须不同。listeners
:指定 Kafka 监听的 IP 和端口。num.network.threads
和num.io.threads
:分别设置处理网络请求和磁盘 IO 的线程数。log.dirs
:Kafka 数据和日志的存放路径。zookeeper.connect
:指定 Zookeeper 集群的地址。
-
环境变量配置:
- 将 Kafka 的 bin 目录添加到 PATH 环境变量中。
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
- 配置启动脚本:
- 创建一个 Kafka 的启动脚本,并设置开机自启。
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esacchmod +x /etc/init.d/kafka
chkconfig --add kafka
service kafka start
Kafka 命令行操作
- 创建 topic:
kafka-topics.sh --create --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --replication-factor 2 --partitions 3 --topic test
- 查看 topic:
kafka-topics.sh --list --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181
kafka-topics.sh --describe --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test
- 发布和消费消息:
# 生产者
kafka-console-producer.sh --broker-list 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test# 消费者
kafka-console-consumer.sh --bootstrap-server 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test --from-beginning
- 修改和删除 topic:
# 修改分区数
kafka-topics.sh --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --alter --topic test --partitions 6# 删除 topic
kafka-topics.sh --delete --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test
Kafka 架构深入
-
工作流程及文件存储机制:
- Kafka 以 topic 对消息进行分类,producer 和 consumer 都是面向 topic 的。
- Topic 是逻辑概念,partition 是物理概念,每个 partition 对应一个 log 文件。
- 为防止 log 文件过大,Kafka 采用分片和索引机制,将每个 partition 分为多个 segment,每个 segment 包含
.index
和.log
文件。
-
数据可靠性保证:
- Kafka 通过 ack 应答机制保证数据可靠性,producer 发送数据后需要等待 broker 的确认。
-
数据一致性问题:
- LEO:每个副本的最大 offset。
- HW:消费者能见到的最大 offset,所有副本中最小的 LEO。
- Leader 和 follower 故障时的数据恢复和同步机制。
-
ack 应答机制:
- Kafka 提供了三种可靠性级别(acks=0, 1, -1),用户可以根据需求选择。
- 幂等性:在 0.11 版本及以后,Kafka 引入了幂等性特性,保证 producer 发送重复数据时,server 端只持久化一条。
注释:
- Kafka 的安装和配置需要根据集群的实际环境进行调整,特别是 IP 地址和端口号。
- 在生产环境中,通常需要配置更多的参数以优化性能和可靠性。
- Kafka 的数据可靠性和一致性机制是其核心特性之一,理解这些机制对于保证数据的安全性和一致性至关重要。
Filebeat+Kafka+ELK 部署指南~
部署 Zookeeper+Kafka 集群
- 目的:搭建消息队列系统,用于日志数据的传输。
- 步骤:
- 安装并配置 Zookeeper 集群。
- 安装并配置 Kafka 集群,指定 Zookeeper 集群地址。
- 启动 Zookeeper 和 Kafka 服务,确保集群正常运行。
部署 Filebeat
- 目的:收集服务器上的日志数据。
- 步骤:
- 下载并解压 Filebeat 到指定目录(如
/usr/local/filebeat
)。 - 编辑
filebeat.yml
配置文件:filebeat.prospectors: - type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]# 添加输出到 Kafka 的配置 output.kafka:enabled: truehosts: ["192.168.80.10:9092","192.168.80.11:9092","192.168.80.12:9092"]topic: "httpd"
- 启动 Filebeat,开始收集日志并发送到 Kafka。
- 下载并解压 Filebeat 到指定目录(如
部署 ELK(Logstash 配置)
- 目的:从 Kafka 拉取日志数据,并处理、存储到 Elasticsearch 中。
- 步骤:
- 在 Logstash 组件所在节点上,新建一个 Logstash 配置文件
kafka.conf
:input {kafka {bootstrap_servers => "192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092"topics => "httpd"type => "httpd_kafka"codec => "json"auto_offset_reset => "latest"decorate_events => true} }output {if "access" in [tags] {elasticsearch {hosts => ["192.168.80.30:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.80.30:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug } }
- 启动 Logstash,开始从 Kafka 拉取日志并存储到 Elasticsearch。
- 在 Logstash 组件所在节点上,新建一个 Logstash 配置文件
Kibana 配置与查看日志
- 目的:通过 Kibana 可视化界面查看日志数据。
- 步骤:
- 在浏览器中访问 Kibana(如
http://192.168.80.30:5601
)。 - 登录 Kibana(如果设置了登录认证)。
- 单击“Create Index Pattern”按钮,添加索引模式,例如
httpd_access-*
和httpd_error-*
(注意:这里应与 Logstash 配置中的 index 名称匹配,但原笔记中的filebeat_test-*
是不正确的)。 - 单击“create”按钮创建索引模式。
- 单击“Discover”按钮,可查看图表信息及日志信息。
- 在浏览器中访问 Kibana(如
注释:
- 在配置 Filebeat 和 Logstash 时,确保 Kafka 集群的地址和 topic 名称正确无误。
- Logstash 的
auto_offset_reset
参数决定了从 Kafka 拉取数据的起始位置,latest
表示从最新的数据开始拉取,earliest
表示从头开始拉取。 - Kibana 中的索引模式应与 Logstash 配置中的 index 名称一致,以便正确显示日志数据。
- 在实际部署中,还需要考虑安全性、性能优化等方面的问题。
相关文章:

【企业级分布式系统】 Kafka集群
文章目录 KafkaKafka 概述使用消息队列的好处 Kafka 的特性Kafka 系统架构Kafka 的应用场景Kafka 的优缺点 Kafka 集群部署下载安装包安装 KafkaKafka 命令行操作Kafka 架构深入 FilebeatKafkaELK 部署指南~部署 ZookeeperKafka 集群部署 Filebeat部署 ELK(Logstash…...

MySQL 中有哪几种锁?
在 MySQL 中,锁(Locks)是为了保证数据的一致性和完整性而设计的机制。常见的锁可以从粒度和操作类型两个角度分类。以下是详细介绍: 按 粒度 分类 1. 全局锁 描述:锁定整个数据库实例。用途:主要用于备份…...

kafka中节点如何服役和退役
节点服役(添加新节点) 1.准备新节点: 安装 Kafka 和相关依赖。 配置 Kafka Broker 的 server.properties 文件,确保 broker.id 是唯一的,并且配置正确的 zookeeper.connect 地址。 重启网卡 2.启动新节点ÿ…...

HTML5实现剪刀石头布小游戏(附源码)
文章目录 1.设计来源1.1 主界面1.2 皮肤风格1.2 游戏中界面 2.效果和源码源码下载万套模板,程序开发,在线开发,在线沟通 作者:xcLeigh 文章地址:https://blog.csdn.net/weixin_43151418/article/details/143798520 HTM…...

集群聊天服务器(3)muduo网络库
目录 基于muduo的客户端服务器编程 muduo只能装在linux中,依赖boost库 客户端并不需要高并发 基于muduo的客户端服务器编程 支持epoll线程池,muduo封装了线程池 而且还有完善的日志系统 使用muduo库代码非常固定,基本就只有chatserver的类名…...

解决在Ubuntu 20.04中使用PyCharm时无法输入中文的问题
解决在Ubuntu 20.04中使用PyCharm时无法输入中文的问题 要解决在Ubuntu 20.04中使用PyCharm时无法输入中文的问题,特别是当使用IBus作为输入法框架时,我们需要通过设置适当的环境变量来确保PyCharm可以正确调用IBus输入法。下面将详细说明原因及解决步骤…...

【jvm】HotSpot中方法区的演进
目录 1. 说明2. JDK1.6及以前3. JDK1.74. JDK1.8及以后 1. 说明 1.在HotSpot虚拟机中,方法区(Method Area)的演进是一个重要的内存管理优化过程。2.从JDK1.6到JDK1.8,HotSpot虚拟机中的方法区经历了从永久代到元空间的重大变化。…...

Win10/11 安装使用 Neo4j Community Edition
如果你下载的是 Neo4j Community Edition 的压缩包,意味着你需要手动解压并配置 Neo4j。以下是详细的使用步骤: 0. 下载压缩包 访问Neo4j官网,找到 Community Edition 版本并选择 4.x 或者 5.x 下载:https://neo4j.com/deployme…...

Ubuntu 22.04 上快速搭建 Samba 文件共享服务器
Samba 简介 Samba 是一个开源软件,它扮演着不同操作系统间沟通的桥梁。通过实现 SMB(Server Message Block)协议,Samba 让文件和打印服务在 Windows、Linux 和 macOS 之间自由流动。 以下是 Samba 的特点: 跨平台兼…...

JQuery 基础知识学习(详尽版)2024.11.17
一、jQuery简介及使用详解 1.1 jQuery简介 写更少的代码,做更多的事;jQuery可以做:HTML 元素选取 , HTML 元素操作 ,CSS 操作 ,HTML 事件函数 ,JavaScript 特效和动画 ,HTML DOM 遍…...

Spring Validation参数校验
Validation Validation是Spring提供的一个参数校验框架,使用预定义的注解完成参数校验 使用步骤 引入Spring Validation起步依赖在需要校验的参数所在的类上添加Validated注解在需要校验的参数前面加上Pattern注解 <!--参数校验依赖--><dependency>&l…...

高斯数据库Postgresql死锁和锁表解决方法
解决死锁进方法: 查询死锁进程列表 select * from pg_stat_activity where waiting‘t’ 发现有好几条挂起的记录,记录下所有或需要解锁的pid 解决死锁进程 select pg_cancel_backend(‘pid值’) 解决完后,刷新后测试,恢复正…...

【设计模式】模板方法模式 在java中的应用
设计模式: 设计模式是对软件设计中普遍存在(反复出现)的各种问题,所提出的解决方案。这个术语是由Erich Gamma等人在1995年的书《设计模式:可复用面向对象软件的基础》中首次引入的。设计模式可以加快开发过程&#x…...

PVE纵览-安装系统卡“Loading Driver”的快速解决方案
PVE纵览-安装系统卡“Loading Driver”的快速解决方案 文章目录 PVE纵览-安装系统卡“Loading Driver”的快速解决方案摘要通过引导参数解决PVE安装卡在“Loading Driver”问题官方解决方法 关键字: PVE、 显卡、 Loading、 Driver、 nomodeset 摘要 在虚拟机…...

Lua资料
Lua脚本语言 cheet sheet Lua & c Lua与C API交互全面解析 Lua语言:和C语言的交互 Lua进阶用法之Lua和C的接口设计 Lua C API 简介 C和Lua之间的相互调用 深入Lua:用户数据userdata 基本数据类型 之 UserData calling-lua-from-c/ Embedding Lua i…...

【C语言】值传递和地址传递
值传递 引用传递(传地址,传引用)的区别 传值,是把实参的值赋值给行参 ,那么对行参的修改,不会影响实参的值。 传地址,是传值的一种特殊方式,只是他传递的是地址,不是普通…...

PyTorch 中使用自动求导计算梯度
使用 PyTorch 进行自动求导和梯度计算 在 PyTorch 中,张量的 requires_grad 属性决定了是否需要计算该张量的梯度。设置为 True 的张量会在计算过程中记录操作,以便在调用 .backward() 方法时自动计算梯度。通过构建计算图,PyTorch 能够有效…...

Oracle Instant Client 23.5安装配置完整教程
Oracle Instant Client 23.5安装配置完整教程 简介环境要求安装步骤1. 准备工作目录2. 下载Oracle Instant Client3. 解压Instant Client4. 安装依赖包5. 配置系统环境5.1 配置库文件路径5.2 配置环境变量 6. 配置Oracle钱包(可选) 验证安装常见问题解决…...

【jvm】方法区的理解
目录 1. 说明2. 方法区的演进3. 内部结构4. 作用5.内存管理 1. 说明 1.方法区用于存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码缓存等数据。它是各个线程共享的内存区域。2.尽管《Java虚拟机规范》中把方法区描述为堆的一个逻辑部分,但它却…...

ES-针对某个字段去重后-获取某个字段值的所有值
针对上面表的数据,现在想根据age分组,并获取每个分组后的name有哪些(去重后)。 select age, GROUP_CONCAT(DISTINCT(name)) from testtable group by age ; 结果: 如果想要增加排序: SELECT age, GROUP_CONCAT(DISTINCT name)…...

百度 2025届秋招提前批 文心一言大模型算法工程师
文章目录 个人情况一面/技术面 1h二面/技术面 1h三面/技术面 40min 个人情况 先说一下个人情况: 学校情况:211本中9硕,本硕学校都一般,本硕都是计算机科班,但研究方向并不是NLP,而是图表示学习论文情况&a…...

sglang 部署Qwen2VL7B,大模型部署,速度测试,深度学习
sglang 项目github仓库: https://github.com/sgl-project/sglang 项目说明书: https://sgl-project.github.io/start/install.html 资讯: https://github.com/sgl-project/sgl-learning-materials?tabreadme-ov-file#the-first-sglang…...

fastadmin操作数据库字段为json、查询遍历each、多级下拉、union、php密码设置、common常用函数的使用小技巧
数据库中遇到的操作 查询字段是json的某个值 //获取数据库中某个字段是json中得某个值,进行查询,goods是表中字段,brand_id是json中要查詢的字段。//数据类型一定要对应要不然查询不出来。$map[json_extract(goods, "$.brand_id")]…...

UniApp在Vue3的setup语法糖下自定义组件插槽详解
UniApp在 Vue3的 setup 语法糖下自定义组件插槽详解 UniApp 是一个基于 Vue.js 的跨平台开发框架,可以用来开发微信小程序、H5、App 等多种平台的应用。Vue 3 引入了 <script setup> 语法糖,使得组件的编写更加简洁和直观。本文将详细介绍如何在 …...

springboot上传下载文件
RequestMapping(“bigJson”) RestController Slf4j public class TestBigJsonController { Resource private BigjsonService bigjsonService;PostMapping("uploadJsonFile") public ResponseResult<Long> uploadJsonFile(RequestParam("file")Mul…...

Python学习从0到1 day29 Python 高阶技巧 ⑦ 正则表达式
目录 一、正则表达式 二、正则表达式的三个基础方法 1.match 从头匹配 2.search(匹配规则,被匹配字符串) 3.findall(匹配规则,被匹配字符串) 三、元字符匹配 单字符匹配: 注: 示例&a…...

机器学习-web scraping
Web Scraping,通常称为网络抓取或数据抓取,是一种通过自动化程序从网页中提取数据的技术。以下是对Web Scraping的详细解释: 一、定义与原理 Web Scraping是指采用技术手段从大量网页中提取结构化和非结构化信息,并按照一定的规…...

移远通信5G RedCap模组RG255C-CN通过中国电信5G Inside终端生态认证
近日,移远通信5G RedCap模组RG255C-CN荣获中国电信颁发的5G Inside终端生态认证证书。这表明,该产品在5G基本性能、网络兼容性、安全特性等方面已经过严格评测且表现优异,将进一步加速推动5G行业终端规模化应用。 中国电信5G Inside终端生态认…...

Javaweb梳理17——HTMLCSS简介
Javaweb梳理17——HTML&CSS简介 17 HTML&CSS简介17.1 HTML介绍17.2 快速入门17.3 基础标签17.3 .1 标题标签17.3.2 hr标签17.3.3 字体标签17.3.4 换行17.3.8 案例17.3.9 图片、音频、视频标签17.3.10 超链接标签17.3.11 列表标签17.3.12 表格标签17.3.11 布局标签17.3.…...

【Android、IOS、Flutter、鸿蒙、ReactNative 】自定义View
Android Java 自定义View 步骤 创建一个新的Java类,继承自View、ViewGroup或其他任何一个视图类。 如果需要,重写构造函数以支持不同的初始化方式。 重写onMeasure方法以提供正确的测量逻辑。 重写onDraw方法以实现绘制逻辑。 根据需要重写其他方法&…...