Kafka 详解:全面解析分布式流处理平台
Kafka 详解:全面解析分布式流处理平台
Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流式应用。它具有高吞吐量、低延迟、高可用性和高可靠性的特点,广泛应用于日志收集、数据流处理、消息系统、实时分析等场景。
📢 Kafka 概述
Apache Kafka 是由 LinkedIn 开发并于 2011 年开源的一个分布式流处理平台,后来捐赠给 Apache 软件基金会。它设计用于高吞吐量、分布式系统,能够处理大规模的实时数据流。
核心概念
- Producer(生产者):负责发布消息到 Kafka 集群的客户端。
- Consumer(消费者):订阅和处理 Kafka 中消息的客户端。
- Broker(代理):Kafka 集群中的一个服务器节点。
- Topic(主题):消息的分类和管理单位,类似于消息队列的队列。
- Partition(分区):Topic 的子单位,用于并行处理和数据分布。
- Replica(副本):分区的副本,用于数据冗余和高可用性。
- Zookeeper:用于管理和协调 Kafka 集群的元数据和状态信息。
更多zookeeper相关知识,请点击:
Zookeeper 详解:分布式协调服务的核心概念与实践
📢 Kafka 架构
Kafka 的架构主要包括以下几个部分:
- 生产者:向 Kafka 主题发布消息。
- 消费者:从 Kafka 主题订阅和消费消息。
- 主题和分区:消息被发布到主题中,并分布在多个分区上。
- 代理(Broker):Kafka 集群中的服务器,负责存储消息和处理请求。
- Zookeeper:用于存储集群的元数据、配置和状态信息。
📢 Kafka 数据模型
消息
消息是 Kafka 中最小的数据单位,每条消息包含一个键值对和一些元数据,如时间戳。
主题(Topic)
主题是消息的分类单位。生产者将消息发送到主题,消费者从主题订阅消息。
分区(Partition)
每个主题被划分为多个分区,分区是 Kafka 并行处理和数据分布的基本单位。
副本(Replica)
每个分区有多个副本,以确保高可用性和数据冗余。
Kafka 集群
Kafka 集群由多个 Broker 组成,Broker 之间通过 Zookeeper 进行协调和管理。Zookeeper 负责存储集群的元数据,包括 Broker 信息、主题和分区的元数据等。
Broker
Broker 是 Kafka 集群中的一个节点,负责接收、存储和转发消息。Broker 通过 Zookeeper 协调和管理集群中的分区和副本。
Zookeeper
Zookeeper 是一个分布式协调服务,用于管理和协调 Kafka 集群的元数据和状态信息。Kafka 依赖 Zookeeper 来实现分布式协调、负载均衡和故障恢复。
📢 Kafka 安装与配置
环境准备
- 安装 Java(Kafka 依赖于 Java 运行环境)。
- 下载并安装 Kafka 和 Zookeeper。
配置文件
Kafka 的主要配置文件包括:
- server.properties:Broker 的配置文件。
- zookeeper.properties:Zookeeper 的配置文件。
启动 Kafka 和 Zookeeper
# 启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 Kafka
bin/kafka-server-start.sh config/server.properties
📢 Kafka 生产者
生产者是向 Kafka 主题发布消息的客户端。生产者通过 Producer API 向 Kafka 发送消息。
生产者配置
主要配置选项包括:
- bootstrap.servers:Kafka 集群的地址。
- key.serializer 和 value.serializer:用于序列化键和值的类。
- acks:消息确认模式。
生产者示例
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));}producer.close();}
}
📢 Kafka 消费者
消费者是从 Kafka 主题订阅和消费消息的客户端。消费者通过 Consumer API 读取消息。
消费者配置
主要配置选项包括:
- bootstrap.servers:Kafka 集群的地址。
- group.id:消费者组 ID。
- key.deserializer 和 value.deserializer:用于反序列化键和值的类。
- auto.offset.reset:消费位移的重置策略。
消费者示例
import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}
📢 Kafka Topic
创建 Topic
可以使用 Kafka 提供的命令行工具创建 Topic。
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
查看 Topic 列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
删除 Topic
bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
📢 Kafka 分区和副本
分区
分区是 Kafka 实现并行处理和数据分布的基本单位。每个分区在物理上是一个日志文件,分区内的消息是有序的,但分区之间是无序的。
副本
副本用于数据冗余和高可用性。每个分区有一个 leader 副本和多个 follower 副本。生产者和消费者只能与 leader 副本交互,follower 副本从 leader 副本同步数据。
副本分配策略
Kafka 使用一致性哈希算法将分区分配到不同的 Broker 上,以实现负载均衡和高可用性。
Kafka 数据持久化
Kafka 提供两种主要的数据持久化机制:日志段和索引文件。
日志段
每个分区的消息被分成多个日志段,日志段是顺序写入的。Kafka 通过滚动机制创建新的日志段,并删除旧的日志段。
索引文件
Kafka 为每个日志段创建索引文件,用于快速查找特定的消息偏移量。索引文件包括偏移量索引和时间戳索引。
📢 Kafka 高级功能
事务
Kafka 支持跨分区、跨主题的事务,保证消息的原子性和一致性。
压缩
Kafka 支持消息压缩,以减少网络带宽和存储空间。常见的压缩算法包括 Gzip、Snappy 和 LZ4。
ACL
Kafka 提供访问控制列表(ACL),用于控制用户和客户端对 Kafka 集群的访问权限。
📢 Kafka 调优
Broker 调优
- 调整文件描述符限制:增加 Broker 可用的文件描述符数量。
- 调整 JVM 参数:优化 JVM 的内存分配和垃圾回收策略。
- 调整网络参数:优化 Broker 的网络传输性能。
生产者调优
- 批量发送:启用消息批量发送,以提高吞吐量。
- 压缩:启用消息压缩,以减少网络带宽和存储空间。
消费者调优
- 并行消费:使用多个消费者实例并行消费消息,以提高消费速度。
- 自动提交位移:根据需求配置位移提交策略,平衡性能和数据一致性。
🔥 Kafka 常见问题
消息丢失
- 原因:可能由于网络故障、Broker 宕机或生产者/消费者配置不当。
- 解决:配置合适的 ack 策略、增加副本数量、优化网络和硬件环境。
消息重复
- 原因:可能由于生产者重试、消费者位移提交失败等。
- 解决:使用 Kafka 事务、配置幂等生产者、合理处理消费逻辑。
消息延迟
- 原因:可能由于网络延迟、Broker 负载过高、磁盘 I/O 性能不足等。
- 解决:优化网络和硬件配置、调整 Broker 和客户端参数、使用更高性能的存储设备。
通过这篇详解指南,你可以全面了解 Kafka 的基本原理、架构设计、安装配置、生产者和消费者的使用,以及高级功能和调优技巧。希望这能帮助你更好地使用和掌握 Kafka,构建高效、可靠的流处理系统。
相关文章:
Kafka 详解:全面解析分布式流处理平台
Kafka 详解:全面解析分布式流处理平台 Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流式应用。它具有高吞吐量、低延迟、高可用性和高可靠性的特点,广泛应用于日志收集、数据流处理、消息系统、实时分析等场景。 &…...
RabbitMQ系列-rabbitmq无法重新加入集群,启动失败的问题
当前存在3个节点:rabbitmq5672、rabbitmq5673、rabbitmq5674 当rabbitmq5673节点掉线之后,重启失败 重启的时候5672节点报错如下: 解决方案 在集群中取消失败节点 rabbitmqctl forget_cluster_node rabbitrabbitmq5673删除失败节点5673的…...
postgresql之翻页优化
列表和翻页是所有应用系统里面必不可少的需求,但是当深度翻页的时候,越深越慢。下面是几种常用方式 准备工作 CREATE UNLOGGED TABLE data (id bigint GENERATED ALWAYS AS IDENTITY,value double precision NOT NULL,created timestamp with time zon…...
小白学Linux | 日志排查
一、windows日志分析 在【运行】对话框中输入【eventvwr】命令,打开【事件查看器】窗 口,查看相关的日志 管理员权限进入PowerShell 使用Get-EventLog Security -InstanceId 4625命令,可获取安全性日志下事 件 ID 为 4625(失败登…...
Spring6
一 概述 1.1、Spring是什么? Spring 是一款主流的 Java EE 轻量级开源框架 ,Spring 由“Spring 之父”Rod Johnson 提出并创立,其目的是用于简化 Java 企业级应用的开发难度和开发周期。Spring的用途不仅限于服务器端的开发。从简单性、可测…...
数字孪生概念、数字孪生技术架构、数字孪生应用场景,深度长文学习
一、数字孪生起源与发展 1.1 数字孪生产生背景 数字孪生的概念最初由Grieves教授于2003年在美国密歇根大学的产品全生命周期管理课程上提出,并被定义为三维模型,包括实体产品、虚拟产品以及二者间的连接,如下图所示: 2011年&…...
云服务对比:阿里云国际站和阿里云国内站有什么区别
阿里云国际站(Alibaba Cloud International)和阿里云国内站(Alibaba Cloud China)在许多方面存在明显区别,这些区别主要体现在服务范围、合规性、定价和支付方式、语言和客服支持、以及备案要求等方面。 首先…...
如何在npm上发布自己的包
如何在npm上发布自己的包 npm创建自己的包 一、一个简单的创建 1、创建npm账号 官网:https://www.npmjs.com/创建账号入口:https://www.npmjs.com/signup 注意:需要进入邮箱验证 2、创建目录及初始化 $ mkdir ufrontend-test $ cd ufron…...
SQL Chat:从SQL到SPEAKL的数据库操作新纪元
引言 SQL Chat是一款创新的、对话式的SQL客户端工具。 它采用自然语言处理技术,让你能够像与人交流一样,通过日常对话的形式对数据库执行查询、修改、创建及删除操作 极大地简化了数据库管理流程,提升了数据交互的直观性和效率。 在这个框…...
jmeter性能优化之mysql配置
一、连接数据库和grafana 准备:连接好数据库和启动grafana并导入mysql模板 大批量注册、登录、下单等,还有过节像618,双11和数据库交互非常庞大,都会存在数据库的某一张表里面,当用户在登录或者查询某一个界面时&…...
VueRouter3学习笔记
文章目录 1,入门案例2,一些细节高亮效果非当前路由会被销毁 3,嵌套路由4, 传递查询参数5,命名路由6,传递路径参数7,路径参数转props8,查询参数转props9,replace模式10&am…...
「前端+鸿蒙」鸿蒙应用开发-TS函数
在 TypeScript 中,函数是一等公民,这意味着函数可以作为参数传递、作为其他函数的返回值,甚至可以赋值给变量。TypeScript 为 JavaScript 的函数增加了类型系统,使得函数的参数和返回值都具有明确的类型。 TS快速入门-函数 基本函…...
python后端结合uniapp与uview组件tabs,实现自定义导航按钮与小标签颜色控制
实现效果(红框内): 后端api如下: task_api.route(/user/task/states_list, methods[POST, GET]) visitor_token_required def task_states(user):name_list [待接单, 设计中, 交付中, 已完成, 全部]data []color [#F04864, …...
mingw如何制作动态库附python调用
1.mingw和msvc g -fpic HelloWorld.cpp -shared -o test.dllg -L . -ltest .\test.cpp 注意-L后面的.挨不挨着都行,-l不需要-ltest.dll,只需要-ltest 2.dll.cpp extern "C" {__declspec(dllexport) int __stdcall add(int a, int b) {return…...
Vue学习|Vue快速入门、常用指令、生命周期、Ajax、Axios
什么是Vue? Vue 是一套前端框架,免除原生JavaScript中的DOM操作,简化书写 基于MVVM(Model-View-ViewModel)思想,实现数据的双向绑定,将编程的关注点放在数据上。官网:https://v2.cn.vuejs.org/ Vue快速入门 打开页面࿰…...
Python基础教程(八):迭代器与生成器编程
💝💝💝首先,欢迎各位来到我的博客,很高兴能够在这里和您见面!希望您在这里不仅可以有所收获,同时也能感受到一份轻松欢乐的氛围,祝你生活愉快! 💝Ὁ…...
Oracle10.2.0.1冷备迁移之_数据文件拷贝方式
由于阿里云机房要下架旧服务器,单位未购买整机迁移服务,且业务较老不兼容Oracle11g,所以新购买一台新服务器进行安装Oracle10.2.0.1 ,后续再将数据迁移到新服务器上。 id 数据库版本 操作系统版本 实例名 源库 115.28.242.25…...
智能合约中外部调用漏洞
外部调用 : 在智能合约开发中,调用不受信任的外部合约是一个常见的安全风险点。这是因为,当你调用另一个合约的函数时,你实际上是在执行那个合约的代码,而这可能会引入你未曾预料的行为,包括恶意行为。下面…...
转型AI产品经理(4):“认知负荷”如何应用在Chatbot产品
认知负荷理论主要探讨在学习过程中,人脑处理信息的有限容量以及如何优化信息的呈现方式以促进学习。认知负荷定律认为,学习者的工作记忆容量是有限的,而不同类型的认知任务会对工作记忆产生不同程度的负荷,从而影响学习效果。以下…...
【C++11】常见的c++11新特性(一)
文章目录 1. C11 简介2. 常见的c11特性3.统一的列表初始化3.1initializer_list 4. decltype与auto4.1decltype与auto的区别 5.nullptr6.右值引用和移动语义6.1左值和右值6.1.1左值的特点6.1.2右值的特点6.1.3右值的进一步分类 6.2左值引用和右值引用以及区别6.2.1左值引用6.2.2…...
牛客周赛 Round 46 题解 C++
目录 A 乐奈吃冰 B 素世喝茶 C 爱音开灯 D 小灯做题 E 立希喂猫 F 祥子拆团 A 乐奈吃冰 #include <iostream> #include <cstring> #include <algorithm> #include <cmath> #include <queue> #include <set> #include <vector>…...
9.3 Go 接口的多态性
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…...
Java通过字符串字段匹配形成树形结构
Java通过字符串字段匹配形成树形结构 文章目录 Java通过字符串字段匹配形成树形结构数据表模拟数据解决办法:1、domian 类:2、Node层(形成树形关系):3、controller 层4、Util 工具类1、BeanCopierUtil4、Mapper5、Manager(用来组装树形结构)6、测试:有的时候我们形成树形不…...
数字孪生智慧水利:精准管理与智能决策的新时代
图扑数字孪生技术在智慧水利中的应用,通过虚拟模型与真实水利系统的无缝连接,实现对水资源和水利工程的全面监控和精细管理。实时数据采集与动态模拟提升了水利系统的预测和响应能力,从洪水预警到水质监测,数字孪生助力各项决策更…...
基于ChatGLM3的本地问答机器人部署流程
基于ChatGLM3的本地问答机器人部署流程 前言一、确定文件结构1.新建文件夹储存本地模型2.下载源码和模型 二、Anaconda环境搭建1.创建anaconda环境2.安装相关库3.设置本地模型路径4.启动 三、构建本地知识库1.下载并安装postgresql2.安装c库3.配置向量插件 四、线上运行五、 全…...
归并排序——逆序数对的统计
逆序数对的统计 题目描述 运行代码 #include <iostream> using namespace std; #define LL long long const int N 1e5 5; int a[N], tmp[N]; LL merge_sort(int q[], int l, int r) {if (l > r)return 0; int mid l r >> 1; LL res merge_sort(q, l,…...
基于截图和模拟点击的自动化压测工具开发(MFC)
1.背景 想对一个MFC程序做自动压测功能,根据判断程序界面某块区域是否达到预定状态,来自动执行鼠标点击或者键盘输入的操作,以解决测试人员需要重复手动压测问题。 1.涉及的技术 串口控制,基于MFC橡皮筋类(CRectTracker)做一个…...
力扣每日一题 6/10
881.救生艇[中等] 题目: 给定数组 people 。people[i]表示第 i 个人的体重 ,船的数量不限,每艘船可以承载的最大重量为 limit。 每艘船最多可同时载两人,但条件是这些人的重量之和最多为 limit。 返回 承载所有人所需的最小船…...
[知识点] 内存顺序属性的用途和行为
C标准库中定义了以下几种内存顺序属性: std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_releasestd::memory_order_acq_relstd::memory_order_seq_cst 1. std::memory_order_relaxed 定义:不提供同步…...
JAVA Mongodb 深入学习(二)索引的创建和优化
一、常用索引类型 1、单个索引 单个索引的创建 db.你的表名.createIndex({"你的字段名":1}) 单个索引的创建且是唯一索引 db.你的表名.createIndex({"你的字段名":1}),{ unique: true }) 2、复合索引 将多个过滤的字段,做成索引,…...
网站建设-英九网络/竞价排名营销
有时候需要正无穷或负无穷来表示特殊情况,那正无穷和负无穷如何表示呢? 在C中, 如果是int,用INT_MAX表示正无穷,INT_MIN表示负无穷,需要包含头文件limits.h; 如果是double,用DBL_…...
php动态网站开发 求数值/seo外链增加
(接上文《Java并发基石——所谓“阻塞”:Object Monitor和AQS(2)》) 使用AQS实现的Mutex现在我们来看看在AbstractQueuedSynchronizer官方文档中(实际上就是JDK源代码上的注释说明)给出的另一个…...
专业做二手网站/品牌推广的概念
一. 面试题及剖析 1. 今日面试题 Redis缓存如何实现? 什么是热key? 怎么发现热key? 如何解决热key问题? 如何保证Redis缓存与MySQL数据库的一致性? 你遇到过哪些常见的缓存问题? 什么是缓存雪崩?缓存穿透怎么解决? 2. 题目剖析 在上一篇文章中,壹哥跟大家详细地讲解…...
利用社交网站做淘宝客/网上销售渠道
非阻塞通信:异步通信通常是使MPI应用程序实现高性能计算的关键,使用异步通信具有如下优势:1)函数是非阻塞的,这使得进程在与另一个进程通信的同时继续参与计算;2)如果应用适当,可以绕…...
成都网站建设外包/青岛网站优化公司哪家好
AndroidAnnotations是如何工作的 (2014年2月 By Hiperion) AndroidAnnotations工作在一个非常简单的方式。它会使用标准的Java注解处理工具自动添加一个额外的编译步骤生成的源代码。(译者注:即生成一个原有类名加“_”的类,这个…...
建设网站教学/小说搜索风云榜排名
9 月 19 日,RTE 2020 编程挑战赛秋季赛的决赛在线上圆满落幕了。本次秋季赛的赛题只有一个,参赛者可以根据自己的创意,基于声网Agora SDK、 声网Agora 实时消息 RTM SDK、云录制 SDK 等 SDK 实现实时互动应用,或在已有的项目中实现…...