大数据之Kafka
Kafka概述
传统定义:一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
最新定义:一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。最主要的功能是做数据的缓冲,相较于flume的channel, 能力更强。
应用场景:
- 缓冲/消峰:解决生产消息和消费消息的处理速度不一致的情况。
- 解耦:只需知道如何连接kafka,作用类似于交换机。
- 异步通信:可以将事务给kafka,自己去处理其他事务。
消息队列的两种模式:
- 点对点模式:消费者拉取数据后删除数据。优点是简单速度快,缺点是不方便实现多用户需要获取同一数据的情况。
- 发布/订阅模式:可以有多个topic主题,消费者拉取数据后不删除数据。
Kafka的基础架构
- 为方便扩展,并提高吞吐量,一个topic分为多个partition
- 配合分区的设计,提出消费者组的概念,组内内个消费者并行消费,以线程为单位。
- 为提高可用性,为每个partiton增加若干副本,类似NameNode HA
- 借助zookeeper来实现leader和follower的选举机制,leader是原数据,follower是副本数据。leader主要用于发送和传输,follower主要作为副本保证安全性。
Kafka的安装部署
官网下载地址:http://kafka.apache.org/downloads.html
- 上传安装包
- 解压安装包
- 修改配置文件
- 配置环境变量
- 编写群启群关脚本kf.sh
#! /bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done
};;
esac
kafka主题相关操作
kafka-topics.sh脚本里面定义了对应相关操作。
- 增加主题,
kafka-topics.sh --bootstrap-server hadoop102:9092 -- create --topic second --replication-facotr 1 --partitions 1 - 删除,是标记删除,预计在1分钟后完全删除。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first - 修改,只能增加分区数量。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3 - 查看,
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
生产和消费
- 启动生产者:
kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first - 启动消费者:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first - 如果单独启动生产者,发送数据,之后再启动消费者,默认不发送之前发送的数据。在消费者启动命令后面添加
--from-beginning关键字可以修改为从头拿取数据。
发送流程
- Kafka Producer生产者
- main线程Producer的生产方法
- interceptors拦截器
- Serializer序列化器
- Partitioner分区器,按照批次随机分区。每个批次默认是16K,默认的等待时间是0ms。
- RecordAccumulator里面创建双端队列,队列个数等于分区个数
- sender进程复制双端队列中的数据发送到Kafka集群,如果成功接收则返回ack应答,否则重新发送,最多重试21亿次。
异步发送和同步发送
sender进程发送请求默认是异步执行,即向kafka集群发送时不管是否收到回复,一直发送,由selector来接收ack和关闭对应的请求进程。在Producer的send方法中有一个Callback对象参数,该对象需要实现一个onCompletetion方法。可以在里面查看到对应方法参数中的元数据的值,里面有主题名称、分区号和偏移量。异步执行时可以发现同一批次分区号是一样的,同步时由于需要等待ack,同一批次的分区号是不同的。
生产者分区
- 分区策略
- 默认分区器
- 如果指定了分区号,到指定分区
- 如果是key-value,使用key进行hash分区
- 粘性分区,如果上一个有分区,跟上一个分区一样,直到数据达到分区容量上限或者等待时间上限进行随机更换分区。
- UniformStickyPartition分区器:如果key值是固定的,可以使用该分区器
- 轮询分区器:需要维护一个列表,效率更低。
- 默认分区器
生产者如何提高吞吐量
- 修改从双端队列拿取数据的等待时间,从0ms修改为5-100ms
- compression.type: 压缩snappy
- 修改批次大小:默认为16K,修改为32KB.
数据可靠性
ack应答级别:
- 0:生产者发送过来的数据,不需要等数据落盘应答,也就是最多一次。
- 1:生产者发送过来的数据,Leader收到后应答
- -1(all): 生产者发送过来的数据,Leader和isr队列里面的所有节点收齐数据并落盘后应答。Leader维护了一个动态的in-sync replica set ISR, 如果有某个节点30s内没有回复,则认为该节点死亡。数据有可能
重复。
数据的去重
幂等性
指producer不论向Broker发送多少次重复数据,Broker都只会持久化一条,保证精准一次。重复数据的判断标准,根据sqlNumber来判断,重发的数据其seqNumber是一样的。
缺点:如果生产者中途宕机,然后重新建立会话时,不能保证不同会话时PID是一样,这时候重新发送重复数据时无法保证幂等性。
解决方案:在Kafka集群中将生产者的信息保存到集群中的某个主题中,如果生产者宕机后重启需要先去读取Kafka集群的状态信息,以保证多会话情况下的幂等性。
数据的有序性
- 因为不能保证多分区之间是有序的,只能指定单分区。
- 开启幂等性,且元数据request个数小于5个,如果发送失败导致顺序异常,Kafka会按照SeqNumber重新排序。
Flume和Kafka
为何Kafka全方面碾压flume,还会有人使用flume?
这是由于flume使用上只需配置一个文件即可使用,无需编写代码。并且可以使用flume将数据灌入到kafka中,既简单又利用到了kafka的性能,flume和kafka结合使用才是日常开发的常用操作。
Kafka Broker总体工作流程
- broker在zk中注册
- controller谁先注册,谁说了算
- 由选举出来的Controller监听brokers节点变化
- Controller决定Leader选举:在isr存活为前提,轮询选举
- Controller将节点信息上传到zk
- 其他controller从zk同步相关信息
Broker节点的服役和退役
- 启动新主机的zookeeper和kafka
- 创建一个要均衡的主题vim topics-to-move.json
{"topics": [{"topic": "first"}],"version": 1
}
- 生成一个负载均衡的计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate - 保存生成的计划到文件中
- 执行负载均衡计划
- 在kafka/datas目录下查看是否正确。
kafka副本
为了提高数据可靠性,副本数量一般设置为两个。
Follower故障处理
LEO(log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1
HW (High Watermark): 所有副本中最小的LEO
高效读取
1. 多分区
2. 稀疏索引
3. 顺序写磁盘
4. 页缓存和零拷贝
页缓存:其实就是把尽可能多的空闲内存当做磁盘缓存来使用。
零拷贝:数据加工处理操作交给生产者和消费者
相关文章:
大数据之Kafka
Kafka概述 传统定义:一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。 最新定义:一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。最主要的功能是做数据的…...
灵活运用OSI模型提升排错能力
1.OSI模型有什么实际价值? 2.二层和三层网络的区别和应用; 3.如何通过OSI模型提升组网排错能力? -- OSI - 开放式系统互联 - OSI参考模型 - 一个互联标准 -- 软件硬件 - 定义标准 数据通信的标准 -- 厂商 思科 华为 华三…...
【最新!企知道AES加密分析】使用Python实现完整解密算法
文章目录 1. 写在前面2. 过debugger3. 抓包分析4. 断点分析5. Python实现解密算法1. 写在前面 最近华为各方面传递出来的消息无不体现出华为科技实力与技术处于遥遥领先的地位。所以出于好奇想要了解一下咱们国内这些互联网科技企业有哪些技术专利,于是就有了这篇文章! 分析目…...
前端架构师之11_JavaScript事件
1 事件处理 1.1 事件概述 在学习事件前,有几个重要的概念需要了解: 事件事件处理程序事件驱动式事件流 事件 可被理解为是JavaScript侦测到的行为。 这些行为指的就是页面的加载、鼠标单击页面、鼠标滑过某个区域等。 事件处理程序 指的就是Java…...
文本过滤工具:grep
什么是grep? grep是一个命令行文本搜索工具,它的名称来源于"Global Regular Expression Print"(全局正则表达式打印)。它的主要功能是在文本文件中查找特定模式或字符串,并将匹配的行打印到终端或输出到文件…...
【Linux】生产者和消费者模型
生产者和消费者概念基于BlockingQueue的生产者消费者模型全部代码 生产者和消费者概念 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。 生产者和消费者彼此之间不直接通讯,而通过这个容器来通讯,所以生产者生产完数据之后不用等待…...
开发APP的费用是多少
开发一款APP的费用可以因多种因素而异,包括项目的规模、功能、复杂性、技术选择、地理位置等。北京是中国的大城市,APP开发的费用也会受到北京的物价水平和市场竞争的影响。以下是一些可以影响APP开发费用的因素,希望对大家有所帮助。北京木奇…...
start()方法源码分析
当我们创建好一个线程之后,可以调用.start()方法进行启动,start()方法的内部其实是调用本地的start0()方法, 其实Thread.java这个类中的方法在底层的Thread.c文件中都是一一对应的,在Thread.c中start0方法的底层调用了jvm.cpp文件…...
VUE_history模式下页面404错误
uniapp 的history 把#去掉了,但是当刷新页面的时候出现404错误 解决方案:需要服务端支持 如果 URL 匹配不到任何静态资源,则应该返回同一个 index.html 页面 Apache <IfModule mod_rewrite.c>RewriteEngine OnRewriteBase /RewriteRu…...
现代数据架构-湖仓一体
当前的数据架构已经从数据库、数据仓库,发展到了数据湖、湖仓一体架构,本篇文章从头梳理了一下数据行业发展的脉络。 上世纪,最早出现了关系型数据库,也就是DBMS,有商业的Oracle、 IBM的DB2、Sybase、Informix、 微软…...
最新AI写作系统ChatGPT源码/支持GPT4.0+GPT联网提问/支持ai绘画Midjourney+Prompt应用+MJ以图生图+思维导图生成
一、智能创作系统 SparkAi创作系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作ChatGPT?小编这里写一个详细图文教程吧&…...
Python机器学习实战-特征重要性分析方法(5):递归特征消除(附源码和实现效果)
实现功能 递归地删除特征并查看它如何影响模型性能。删除时会导致更大下降的特征更重要。 实现代码 from sklearn.ensemble import RandomForestClassifier from sklearn.feature_selection import RFE import pandas as pd from sklearn.datasets import load_breast_cance…...
如何快速走出网站沙盒期(关于优化百度SEO提升排名)
网站沙盒期是指新建立的网站在百度搜索引擎中无法获得好的排名,甚至被完全忽略的现象。这个现象往往发生在新建立的网站上,因为百度需要时间来评估网站的质量和内容。蘑菇号www.mooogu.cn 为了快速走出网站沙盒期,需要优化百度SEO。以下是5个…...
ATA-8000系列射频功率放大器——应用场景介绍
ATA-8000系列是一款射频功率放大器。其P1dB输出功率500W,饱和输出功率最大1000W。增益数控可调,一键保存设置,提供了方便简洁的操作选择,可与主流的信号发生器配套使用,实现射频信号的放大。 图:ATA-8000系…...
2009-2018年各省涉农贷款数据(wind)
2009-2018年各省涉农贷款数据(wind) 1、时间::209-2018年 2、范围:31省 3、来源:wind 4、指标:涉农贷款 指标解释 :在涉农贷款的分类上,按照城乡地域将涉农贷款分为农村贷款和城…...
window.print()打印及出现的问题
<template><transition name"el-zoom-in-center"><div class"JNPF-preview-main"><div class"JNPF-common-page-header"><el-page-header back"goBack" :content"打印通知书" /><div clas…...
Fedora Linux 39 Beta 预估 10 月底发布正式版
Fedora 39 Beta 镜像于今天发布,用户可以根据自己的使用偏好,下载 KDE Plasma,Xfce 和 Cinnamon 等不同桌面环境版本,正式版预估将于 10 月底发布 Fedora 39 Beta 版本主要更新了 DNF 软件包管理器,并优化了 Anaconda …...
【zookeeper】基于Linux环境安装zookeeper集群
前提,需要有几台linux机器,我们可以准备好诸如finalshell来连接linux并且上传文件; 其次Linux需要安装上ssh,并且在/etc/hosts文件中写好其他几台机器的名字和Ip 127.0.0.1 localhost localhost.localdomain localhost4 localh…...
什么是IoT数字孪生?
数字孪生是资产或系统的实时虚拟模型,它使用来自连接的物联网传感器的数据来创建数字表示。数字孪生允许您从任何地方实时监控设备、资产或流程。数字孪生用于多种目的,例如分析性能、监控问题或在实施之前运行测试。从物联网数字孪生中获得的见解使用户…...
俄罗斯四大平台速卖通、Joom、Ozon 和 UMKA中国卖家如何脱颖而出!
随着全球化的不断推进,越来越多的中国卖家将目光投向了俄罗斯这个广阔的市场。在众多的跨境电商平台中,速卖通、Joom、Ozon 和 UMKA 无疑是最受关注的四个平台。本文将从卖家的角度,详细分析这四大平台的特点和优势,帮助找到最…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...
YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序
一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云
目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...
Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
