当前位置: 首页 > news >正文

Kafka 详解:全面解析分布式流处理平台

Kafka 详解:全面解析分布式流处理平台

Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流式应用。它具有高吞吐量、低延迟、高可用性和高可靠性的特点,广泛应用于日志收集、数据流处理、消息系统、实时分析等场景。

📢 Kafka 概述

Apache Kafka 是由 LinkedIn 开发并于 2011 年开源的一个分布式流处理平台,后来捐赠给 Apache 软件基金会。它设计用于高吞吐量、分布式系统,能够处理大规模的实时数据流。

核心概念

  • Producer(生产者):负责发布消息到 Kafka 集群的客户端。
  • Consumer(消费者):订阅和处理 Kafka 中消息的客户端。
  • Broker(代理):Kafka 集群中的一个服务器节点。
  • Topic(主题):消息的分类和管理单位,类似于消息队列的队列。
  • Partition(分区):Topic 的子单位,用于并行处理和数据分布。
  • Replica(副本):分区的副本,用于数据冗余和高可用性。
  • Zookeeper:用于管理和协调 Kafka 集群的元数据和状态信息。

更多zookeeper相关知识,请点击:Zookeeper 详解:分布式协调服务的核心概念与实践

📢 Kafka 架构

Kafka 的架构主要包括以下几个部分:

  • 生产者:向 Kafka 主题发布消息。
  • 消费者:从 Kafka 主题订阅和消费消息。
  • 主题和分区:消息被发布到主题中,并分布在多个分区上。
  • 代理(Broker):Kafka 集群中的服务器,负责存储消息和处理请求。
  • Zookeeper:用于存储集群的元数据、配置和状态信息。

📢 Kafka 数据模型

消息

消息是 Kafka 中最小的数据单位,每条消息包含一个键值对和一些元数据,如时间戳。

主题(Topic)

主题是消息的分类单位。生产者将消息发送到主题,消费者从主题订阅消息。

分区(Partition)

每个主题被划分为多个分区,分区是 Kafka 并行处理和数据分布的基本单位。

副本(Replica)

每个分区有多个副本,以确保高可用性和数据冗余。

Kafka 集群

Kafka 集群由多个 Broker 组成,Broker 之间通过 Zookeeper 进行协调和管理。Zookeeper 负责存储集群的元数据,包括 Broker 信息、主题和分区的元数据等。

Broker

Broker 是 Kafka 集群中的一个节点,负责接收、存储和转发消息。Broker 通过 Zookeeper 协调和管理集群中的分区和副本。

Zookeeper

Zookeeper 是一个分布式协调服务,用于管理和协调 Kafka 集群的元数据和状态信息。Kafka 依赖 Zookeeper 来实现分布式协调、负载均衡和故障恢复。

📢 Kafka 安装与配置

环境准备

  • 安装 Java(Kafka 依赖于 Java 运行环境)。
  • 下载并安装 Kafka 和 Zookeeper。

配置文件

Kafka 的主要配置文件包括:

  • server.properties:Broker 的配置文件。
  • zookeeper.properties:Zookeeper 的配置文件。

启动 Kafka 和 Zookeeper

#  启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 Kafka
bin/kafka-server-start.sh config/server.properties

📢 Kafka 生产者

生产者是向 Kafka 主题发布消息的客户端。生产者通过 Producer API 向 Kafka 发送消息。

生产者配置

主要配置选项包括:

  • bootstrap.servers:Kafka 集群的地址。
  • key.serializer 和 value.serializer:用于序列化键和值的类。
  • acks:消息确认模式。

生产者示例

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));}producer.close();}
}

📢 Kafka 消费者

消费者是从 Kafka 主题订阅和消费消息的客户端。消费者通过 Consumer API 读取消息。

消费者配置

主要配置选项包括:

  • bootstrap.servers:Kafka 集群的地址。
  • group.id:消费者组 ID。
  • key.deserializer 和 value.deserializer:用于反序列化键和值的类。
  • auto.offset.reset:消费位移的重置策略。

消费者示例

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

📢 Kafka Topic

创建 Topic

可以使用 Kafka 提供的命令行工具创建 Topic。

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

查看 Topic 列表

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

删除 Topic

bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

📢 Kafka 分区和副本

分区

分区是 Kafka 实现并行处理和数据分布的基本单位。每个分区在物理上是一个日志文件,分区内的消息是有序的,但分区之间是无序的。

副本

副本用于数据冗余和高可用性。每个分区有一个 leader 副本和多个 follower 副本。生产者和消费者只能与 leader 副本交互,follower 副本从 leader 副本同步数据。

副本分配策略

Kafka 使用一致性哈希算法将分区分配到不同的 Broker 上,以实现负载均衡和高可用性。

Kafka 数据持久化

Kafka 提供两种主要的数据持久化机制:日志段和索引文件。

日志段

每个分区的消息被分成多个日志段,日志段是顺序写入的。Kafka 通过滚动机制创建新的日志段,并删除旧的日志段。

索引文件

Kafka 为每个日志段创建索引文件,用于快速查找特定的消息偏移量。索引文件包括偏移量索引和时间戳索引。

📢 Kafka 高级功能

事务

Kafka 支持跨分区、跨主题的事务,保证消息的原子性和一致性。

压缩

Kafka 支持消息压缩,以减少网络带宽和存储空间。常见的压缩算法包括 Gzip、Snappy 和 LZ4。

ACL

Kafka 提供访问控制列表(ACL),用于控制用户和客户端对 Kafka 集群的访问权限。

📢 Kafka 调优

Broker 调优

  • 调整文件描述符限制:增加 Broker 可用的文件描述符数量。
  • 调整 JVM 参数:优化 JVM 的内存分配和垃圾回收策略。
  • 调整网络参数:优化 Broker 的网络传输性能。

生产者调优

  • 批量发送:启用消息批量发送,以提高吞吐量。
  • 压缩:启用消息压缩,以减少网络带宽和存储空间。

消费者调优

  • 并行消费:使用多个消费者实例并行消费消息,以提高消费速度。
  • 自动提交位移:根据需求配置位移提交策略,平衡性能和数据一致性。

🔥 Kafka 常见问题

消息丢失

  • 原因:可能由于网络故障、Broker 宕机或生产者/消费者配置不当。
  • 解决:配置合适的 ack 策略、增加副本数量、优化网络和硬件环境。

消息重复

  • 原因:可能由于生产者重试、消费者位移提交失败等。
  • 解决:使用 Kafka 事务、配置幂等生产者、合理处理消费逻辑。

消息延迟

  • 原因:可能由于网络延迟、Broker 负载过高、磁盘 I/O 性能不足等。
  • 解决:优化网络和硬件配置、调整 Broker 和客户端参数、使用更高性能的存储设备。

通过这篇详解指南,你可以全面了解 Kafka 的基本原理、架构设计、安装配置、生产者和消费者的使用,以及高级功能和调优技巧。希望这能帮助你更好地使用和掌握 Kafka,构建高效、可靠的流处理系统。

相关文章:

Kafka 详解:全面解析分布式流处理平台

Kafka 详解&#xff1a;全面解析分布式流处理平台 Apache Kafka 是一个分布式流处理平台&#xff0c;主要用于构建实时数据管道和流式应用。它具有高吞吐量、低延迟、高可用性和高可靠性的特点&#xff0c;广泛应用于日志收集、数据流处理、消息系统、实时分析等场景。 &…...

RabbitMQ系列-rabbitmq无法重新加入集群,启动失败的问题

当前存在3个节点&#xff1a;rabbitmq5672、rabbitmq5673、rabbitmq5674 当rabbitmq5673节点掉线之后&#xff0c;重启失败 重启的时候5672节点报错如下&#xff1a; 解决方案 在集群中取消失败节点 rabbitmqctl forget_cluster_node rabbitrabbitmq5673删除失败节点5673的…...

postgresql之翻页优化

列表和翻页是所有应用系统里面必不可少的需求&#xff0c;但是当深度翻页的时候&#xff0c;越深越慢。下面是几种常用方式 准备工作 CREATE UNLOGGED TABLE data (id bigint GENERATED ALWAYS AS IDENTITY,value double precision NOT NULL,created timestamp with time zon…...

小白学Linux | 日志排查

一、windows日志分析 在【运行】对话框中输入【eventvwr】命令&#xff0c;打开【事件查看器】窗 口&#xff0c;查看相关的日志 管理员权限进入PowerShell 使用Get-EventLog Security -InstanceId 4625命令&#xff0c;可获取安全性日志下事 件 ID 为 4625&#xff08;失败登…...

Spring6

一 概述 1.1、Spring是什么&#xff1f; Spring 是一款主流的 Java EE 轻量级开源框架 &#xff0c;Spring 由“Spring 之父”Rod Johnson 提出并创立&#xff0c;其目的是用于简化 Java 企业级应用的开发难度和开发周期。Spring的用途不仅限于服务器端的开发。从简单性、可测…...

数字孪生概念、数字孪生技术架构、数字孪生应用场景,深度长文学习

一、数字孪生起源与发展 1.1 数字孪生产生背景 数字孪生的概念最初由Grieves教授于2003年在美国密歇根大学的产品全生命周期管理课程上提出&#xff0c;并被定义为三维模型&#xff0c;包括实体产品、虚拟产品以及二者间的连接&#xff0c;如下图所示&#xff1a; 2011年&…...

云服务对比:阿里云国际站和阿里云国内站有什么区别

阿里云国际站&#xff08;Alibaba Cloud International&#xff09;和阿里云国内站&#xff08;Alibaba Cloud China&#xff09;在许多方面存在明显区别&#xff0c;这些区别主要体现在服务范围、合规性、定价和支付方式、语言和客服支持、以及备案要求等方面。 首先&#xf…...

如何在npm上发布自己的包

如何在npm上发布自己的包 npm创建自己的包 一、一个简单的创建 1、创建npm账号 官网&#xff1a;https://www.npmjs.com/创建账号入口&#xff1a;https://www.npmjs.com/signup 注意&#xff1a;需要进入邮箱验证 2、创建目录及初始化 $ mkdir ufrontend-test $ cd ufron…...

SQL Chat:从SQL到SPEAKL的数据库操作新纪元

引言 SQL Chat是一款创新的、对话式的SQL客户端工具。 它采用自然语言处理技术&#xff0c;让你能够像与人交流一样&#xff0c;通过日常对话的形式对数据库执行查询、修改、创建及删除操作 极大地简化了数据库管理流程&#xff0c;提升了数据交互的直观性和效率。 在这个框…...

jmeter性能优化之mysql配置

一、连接数据库和grafana 准备&#xff1a;连接好数据库和启动grafana并导入mysql模板 大批量注册、登录、下单等&#xff0c;还有过节像618&#xff0c;双11和数据库交互非常庞大&#xff0c;都会存在数据库的某一张表里面&#xff0c;当用户在登录或者查询某一个界面时&…...

VueRouter3学习笔记

文章目录 1&#xff0c;入门案例2&#xff0c;一些细节高亮效果非当前路由会被销毁 3&#xff0c;嵌套路由4&#xff0c; 传递查询参数5&#xff0c;命名路由6&#xff0c;传递路径参数7&#xff0c;路径参数转props8&#xff0c;查询参数转props9&#xff0c;replace模式10&am…...

「前端+鸿蒙」鸿蒙应用开发-TS函数

在 TypeScript 中&#xff0c;函数是一等公民&#xff0c;这意味着函数可以作为参数传递、作为其他函数的返回值&#xff0c;甚至可以赋值给变量。TypeScript 为 JavaScript 的函数增加了类型系统&#xff0c;使得函数的参数和返回值都具有明确的类型。 TS快速入门-函数 基本函…...

python后端结合uniapp与uview组件tabs,实现自定义导航按钮与小标签颜色控制

实现效果&#xff08;红框内&#xff09;&#xff1a; 后端api如下&#xff1a; task_api.route(/user/task/states_list, methods[POST, GET]) visitor_token_required def task_states(user):name_list [待接单, 设计中, 交付中, 已完成, 全部]data []color [#F04864, …...

mingw如何制作动态库附python调用

1.mingw和msvc g -fpic HelloWorld.cpp -shared -o test.dllg -L . -ltest .\test.cpp 注意-L后面的.挨不挨着都行&#xff0c;-l不需要-ltest.dll&#xff0c;只需要-ltest 2.dll.cpp extern "C" {__declspec(dllexport) int __stdcall add(int a, int b) {return…...

Vue学习|Vue快速入门、常用指令、生命周期、Ajax、Axios

什么是Vue? Vue 是一套前端框架&#xff0c;免除原生JavaScript中的DOM操作&#xff0c;简化书写 基于MVVM(Model-View-ViewModel)思想&#xff0c;实现数据的双向绑定&#xff0c;将编程的关注点放在数据上。官网:https://v2.cn.vuejs.org/ Vue快速入门 打开页面&#xff0…...

Python基础教程(八):迭代器与生成器编程

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…...

Oracle10.2.0.1冷备迁移之_数据文件拷贝方式

由于阿里云机房要下架旧服务器&#xff0c;单位未购买整机迁移服务&#xff0c;且业务较老不兼容Oracle11g&#xff0c;所以新购买一台新服务器进行安装Oracle10.2.0.1 &#xff0c;后续再将数据迁移到新服务器上。 id 数据库版本 操作系统版本 实例名 源库 115.28.242.25…...

智能合约中外部调用漏洞

外部调用 &#xff1a; 在智能合约开发中&#xff0c;调用不受信任的外部合约是一个常见的安全风险点。这是因为&#xff0c;当你调用另一个合约的函数时&#xff0c;你实际上是在执行那个合约的代码&#xff0c;而这可能会引入你未曾预料的行为&#xff0c;包括恶意行为。下面…...

转型AI产品经理(4):“认知负荷”如何应用在Chatbot产品

认知负荷理论主要探讨在学习过程中&#xff0c;人脑处理信息的有限容量以及如何优化信息的呈现方式以促进学习。认知负荷定律认为&#xff0c;学习者的工作记忆容量是有限的&#xff0c;而不同类型的认知任务会对工作记忆产生不同程度的负荷&#xff0c;从而影响学习效果。以下…...

【C++11】常见的c++11新特性(一)

文章目录 1. C11 简介2. 常见的c11特性3.统一的列表初始化3.1initializer_list 4. decltype与auto4.1decltype与auto的区别 5.nullptr6.右值引用和移动语义6.1左值和右值6.1.1左值的特点6.1.2右值的特点6.1.3右值的进一步分类 6.2左值引用和右值引用以及区别6.2.1左值引用6.2.2…...

牛客周赛 Round 46 题解 C++

目录 A 乐奈吃冰 B 素世喝茶 C 爱音开灯 D 小灯做题 E 立希喂猫 F 祥子拆团 A 乐奈吃冰 #include <iostream> #include <cstring> #include <algorithm> #include <cmath> #include <queue> #include <set> #include <vector>…...

9.3 Go 接口的多态性

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…...

Java通过字符串字段匹配形成树形结构

Java通过字符串字段匹配形成树形结构 文章目录 Java通过字符串字段匹配形成树形结构数据表模拟数据解决办法:1、domian 类:2、Node层(形成树形关系):3、controller 层4、Util 工具类1、BeanCopierUtil4、Mapper5、Manager(用来组装树形结构)6、测试:有的时候我们形成树形不…...

数字孪生智慧水利:精准管理与智能决策的新时代

图扑数字孪生技术在智慧水利中的应用&#xff0c;通过虚拟模型与真实水利系统的无缝连接&#xff0c;实现对水资源和水利工程的全面监控和精细管理。实时数据采集与动态模拟提升了水利系统的预测和响应能力&#xff0c;从洪水预警到水质监测&#xff0c;数字孪生助力各项决策更…...

基于ChatGLM3的本地问答机器人部署流程

基于ChatGLM3的本地问答机器人部署流程 前言一、确定文件结构1.新建文件夹储存本地模型2.下载源码和模型 二、Anaconda环境搭建1.创建anaconda环境2.安装相关库3.设置本地模型路径4.启动 三、构建本地知识库1.下载并安装postgresql2.安装c库3.配置向量插件 四、线上运行五、 全…...

归并排序——逆序数对的统计

逆序数对的统计 题目描述 运行代码 #include <iostream> using namespace std; #define LL long long const int N 1e5 5; int a[N], tmp[N]; LL merge_sort(int q[], int l, int r) {if (l > r)return 0; int mid l r >> 1; LL res merge_sort(q, l,…...

基于截图和模拟点击的自动化压测工具开发(MFC)

1.背景 想对一个MFC程序做自动压测功能&#xff0c;根据判断程序界面某块区域是否达到预定状态&#xff0c;来自动执行鼠标点击或者键盘输入的操作&#xff0c;以解决测试人员需要重复手动压测问题。 1.涉及的技术 串口控制&#xff0c;基于MFC橡皮筋类(CRectTracker)做一个…...

力扣每日一题 6/10

881.救生艇[中等] 题目&#xff1a; 给定数组 people 。people[i]表示第 i 个人的体重 &#xff0c;船的数量不限&#xff0c;每艘船可以承载的最大重量为 limit。 每艘船最多可同时载两人&#xff0c;但条件是这些人的重量之和最多为 limit。 返回 承载所有人所需的最小船…...

[知识点] 内存顺序属性的用途和行为

C标准库中定义了以下几种内存顺序属性&#xff1a; std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_releasestd::memory_order_acq_relstd::memory_order_seq_cst 1. std::memory_order_relaxed 定义&#xff1a;不提供同步…...

JAVA Mongodb 深入学习(二)索引的创建和优化

一、常用索引类型 1、单个索引 单个索引的创建 db.你的表名.createIndex({"你的字段名":1}) 单个索引的创建且是唯一索引 db.你的表名.createIndex({"你的字段名":1}),{ unique: true }) 2、复合索引 将多个过滤的字段&#xff0c;做成索引&#xff0c;…...

wordpress文章 页面模板/东莞有限公司seo

今年6月是全国第19个“安全生产月”&#xff0c;连日来&#xff0c;全省消防救援部门紧紧围绕“消除事故隐患&#xff0c;筑牢安全防线”这一主题&#xff0c;开展了一系列助力平安活动&#xff0c;进一步普及消防安全知识&#xff0c;有效提高广大群众面对突发事故的自救和应急…...

企业网站管理制度建设/关于友情链接的作用有

网络协议系列文章 网络协议(一)&#xff1a;基本概念、计算机之间的连接方式 网络协议(二)&#xff1a;MAC地址、IP地址、子网掩码、子网和超网 网络协议(三)&#xff1a;路由器原理及数据包传输过程 网络协议(四)&#xff1a;网络分类、ISP、上网方式、公网私网、NAT 网络…...

成人高考考试时间/seo优化网站教程百度

使用merge merge into 表名t1using (select 数据数据 字段1,数据数据 字段2 from dual) t2on(t1.字段1 t2.字段1)when matched thenupdate set t1.字段2t2.字段2when not matched theninsert values (t2.字段1, t2.字段2) 转载于:https://www.cnblogs.com/pangkang/p/8342258.…...

wordpress 图标不显示/市场营销方案

1.控制台&#xff1a;执行过 Hibernate: select count(*) as y0_ from V_TRANSDETAIL_INFO this_ 后报错&#xff1a;Caused by: java.sql.SQLSyntaxErrorException: ORA-01031: 权限不足分析&#xff1a;debug&#xff0c;跟进去确实是因为查询存储过程V_TRANSDETAIL_INFO…...

webmin 添加网站/网站查找工具

最近在弄zabbix监控MySQL和Redis事宜&#xff0c;发现shell脚本无法解决字符串转换为整数操作&#xff0c;于是想到了Python&#xff0c;这里就用Python3环境 首先安装pymysql pip install pymysql 1 #!/usr/bin/env python2 # -*- coding: utf8 -*-3 import pymysql4 mysql_co…...

建设银行u盾官方网站首页/网站建设推广公司

&#x1f4d6;摘要 今天分享下 —— RSA加密 请求报错&#xff1a;javax.crypto.BadPaddingException: Decryption error&#xff0c;欢迎关注&#xff01; 相关文章阅读&#xff1a;springbootsecurity基于前后端分离的RSA密码加密登录流程 &#x1f302;解决方法 在登录方法里…...