【Flume Kafaka实战】Using Kafka with Flume
一 目标
在Cloudera Manager中创建两个Flume的Agent,Agent1从local file中获取内容,写入到kafka的队列中。Agent2以Agent1的sink作为source,将数据从kafka中读取出来,写入到HDFS中。
二 实战
2.1 Kafka Sink
第一步,在Cloudera Manager中安装Flume,安装时指定两个Agent。这一步很简单。
第二步,创建一个新Role Group。默认情况下,所有的Agent都处于一个叫Agent Default Group的角色组中,处于同一角色组中的Agent共享相同的配置。但是在我们这个例子中,两个Agent要完成不同的工作,需要不同的配置。所有新建一个Role Group,并把其中一个Agent移到到这个新的Group中,如下图所示。
第三步,分别编辑两个Agent的配置文件,我的第一个Agent名字为file2Kafka,配置文件内容如下。不难看出,这个配置的source就是去tail一个本地文件,然后写入到kafka的消息队列中。
即:Kafka Sink
# Name the components on this agent
file2Kafka.sources = file2Kafka_source
file2Kafka.sinks = file2Kafka_sink
file2Kafka.channels = file2Kafka_channel# Describe/configure the source
file2Kafka.sources.file2Kafka_source.type = exec
file2Kafka.sources.file2Kafka_source.command = tail -F /home/demo/flume-exec.txt# Describe the sink
file2Kafka.sinks.file2Kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
# topic前不加kafka
file2Kafka.sinks.file2Kafka_sink.topic = flumetest
file2Kafka.sinks.file2Kafka_sink.kafka.bootstrap.servers= slave1:9092,slave2:9092
file2Kafka.sinks.file2Kafka_sink.kafka.flumeBatchSize= 20# Use a channel which buffers events in memory
file2Kafka.channels.file2Kafka_channel.type = memory
file2Kafka.channels.file2Kafka_channel.capacity = 1000
file2Kafka.channels.file2Kafka_channel.transactionCapacity = 1000# Bind the source and sink to the channel
file2Kafka.sources.file2Kafka_source.channels = file2Kafka_channel
file2Kafka.sinks.file2Kafka_sink.channel = file2Kafka_channel
2.2 Kafka Source
第二Agent的名字是kafka2Hdfs,配置文件如下。这个配置的内容就是把Agent1中写到kafka的数据读出来,然后写入到HDFS中。注意hdfs.path这个配置,由于在Cloudera Manager中,Flume知道HDFS相关的配置,所以无需去加入hdfs://my-cluster这样的协议前缀。
# Name the components on this agent
kafka2Hdfs.sources = kafka2Hdfs_source
kafka2Hdfs.sinks = kafka2Hdfs_sink
kafka2Hdfs.channels = kafka2Hdfs_channel# Describe/configure the source
kafka2Hdfs.sources.kafka2Hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
kafka2Hdfs.sources.kafka2Hdfs_source.batchSize = 10
kafka2Hdfs.sources.kafka2Hdfs_source.batchDurationMillis = 1000
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.topics = flumetest
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.consumer.group.id = flume# Describe the sink
kafka2Hdfs.sinks.kafka2Hdfs_sink.type = hdfs
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.path = /flume/
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.fileType = DataStream
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.filePrefix=sxt
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollCount=0
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollInterval=0# Use a channel which buffers events in memory
kafka2Hdfs.channels.kafka2Hdfs_channel.type = memory
kafka2Hdfs.channels.kafka2Hdfs_channel.capacity = 1000
kafka2Hdfs.channels.kafka2Hdfs_channel.transactionCapacity = 100# Bind the source and sink to the channel
kafka2Hdfs.sources.kafka2Hdfs_source.channels = kafka2Hdfs_channel
kafka2Hdfs.sinks.kafka2Hdfs_sink.channel = kafka2Hdfs_channel
整个配置完成之后,Cloudera Manager中的界面如下图:
在运行中可能会出现一些目录读写的权限问题,需要去修改hdfs中相关目录的权限。比如我的配置中,数据是写到/flume这个目录下的,这个目录我是用root用户去创建的,但flume运行是使用一个叫flume的用户名来运行的,所以用hdfs dfs -chmod 777 /flume把这个目录的读写权限放开了。
这是一个例子,主要演示如何在cloudera manager中把两个flume的agent串联在一起使用。在现实的生产中,如果需要把一个文本数据通过kakfa写入到hdfs中,更合理的做法是使用一个agent,把kafka作为channel来使用。具体可以参考https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html
2.3 Kafka Channel
# Name the components on this agent
kafkaCh.sources = src_1_file
kafkaCh.channels = ch_1_kafka
kafkaCh.sinks = sink_1_hdfs# Describe/configure the source
kafkaCh.sources.src_1_file.type = exec
kafkaCh.sources.src_1_file.command = tail -F /home/demo/flume-exec.txt# Define a kafka channel
kafkaCh.channels.ch_1_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
kafkaCh.channels.ch_1_kafka.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafkaCh.channels.ch_1_kafka.kafka.topic = kafka_channel
kafkaCh.channels.ch_1_kafka.kafka.consumer.group.id = flume-consumer# Describe the sink
kafkaCh.sinks.sink_1_hdfs.type = hdfs
kafkaCh.sinks.sink_1_hdfs.hdfs.path = /flume/kafka/channel
kafkaCh.sinks.sink_1_hdfs.hdfs.fileType = DataStream
kafkaCh.sinks.sink_1_hdfs.hdfs.filePrefix=sxt
kafkaCh.sinks.sink_1_hdfs.hdfs.rollCount=0
kafkaCh.sinks.sink_1_hdfs.hdfs.rollInterval=0# Bind the source and sink to the channel
kafkaCh.sources.src_1_file.channels = ch_1_kafka
kafkaCh.sinks.sink_1_hdfs.channel = ch_1_kafka
将上面两个Agent放在一个Agent中,用Kafka Channel实现。
注意:hdfs.path 必须存在,且有权限进行操作
相关文章:
【Flume Kafaka实战】Using Kafka with Flume
一 目标 在Cloudera Manager中创建两个Flume的Agent,Agent1从local file中获取内容,写入到kafka的队列中。Agent2以Agent1的sink作为source,将数据从kafka中读取出来,写入到HDFS中。 二 实战 2.1 Kafka Sink 第一步࿰…...
5G NR物理信号
文章目录 NR 物理信号与LTE的区别上行参考信号DMRS (UL)SRSPT-RS(UL) 下行参考信号DMRS(DL)PT-RS(DL)CSI-RSPSSSSS NR 物理信号与LTE的区别 用SSS、CSI-RS和DMRS 取代了CRS信号。下行业务信道采用TM1波束赋形传输模式。基于SSB 或者CSI-RS进行RSRP和SINR测量。基于DMRS 进行共…...
Pikachu-Cross-Site Scripting-存储型xss
存储型xss ,随便输入点内容,都能保存下来;刷新后也不会丢失;输入特殊字符,也能原样返回; 查看代码,也可以看到输出结果直接原路返回,不做处理 构造payload <script>alert(1)…...
媲美GPT-4o mini的小模型,Meta Llama 3.2模型全面解读!
大家好,我是木易,一个持续关注AI领域的互联网技术产品经理,国内Top2本科,美国Top10 CS研究生,MBA。我坚信AI是普通人变强的“外挂”,专注于分享AI全维度知识,包括但不限于AI科普,AI工…...
【leetcode】 45.跳跃游戏 ||
如果我们「贪心」地进行正向查找,每次找到可到达的最远位置,就可以在线性时间内得到最少的跳跃次数。 例如,对于数组 [2,3,1,2,4,2,3],初始位置是下标 0,从下标 0 出发,最远可到达下标 2。下标 0 可到达的…...
coco(json)、yolo(txt)、voc(xml)标注格式的相互转换
一般都是用labeleme进行标注 标注格式都是json 然后根据不同的格式进行数据标注转换: 1.逐个json转xml: 当我们在使用数据集训练计算机视觉模型时,常常会遇到有的数据集只给了单个的json annotation文件,而模型所需要的annotation是基于每…...
以太网交换安全:端口安全
一、端口安全介绍 端口安全是一种网络设备防护措施,通过将接口学习到的动态MAC地址转换为安全MAC地址(包括安全动态MAC和Sticky MAC),阻止除安全MAC和静态MAC之外的主机通过本接口和设备通信,从而增强设备的安全性。以…...
[题解] Codeforces Round 976 (Div. 2) A ~ E
A. Find Minimum Operations 签到. void solve() {int n, k;cin >> n >> k;if (k 1) {cout << n << endl;return;}int ans 0;while (n) {ans n % k;n / k;}cout << ans << endl; }B. Brightness Begins 打表发现, 翻转完后的序列为: 0…...
【零基础入门产品经理】学习准备篇 | 需要学一些什么呢?
前言: 零实习转行产品经理经验分享01-学习准备篇_哔哩哔哩_bilibili 该篇内容主要是对bilibili这个视频的观后笔记~谢谢美丽滴up主友情分享。 全文摘要:如何在0实习且没有任何产品相关经验下,如何上岸产品经理~ 目录 一、想清楚为什么…...
第四届机器人、自动化与智能控制国际会议(ICRAIC 2024)征稿
第四届机器人、自动化与智能控制国际会议(ICRAIC 2024)由湖南第一师范学院主办,南京师范大学、山东女子学院、爱迩思出版社(ELSP)协办。 大会将专注于机器人、数字化、自动化、人工智能等技术的开发和融合,…...
[数据集][目标检测]电力场景防震锤缺陷检测数据集VOC+YOLO格式705张1类别
重要说明:防震锤缺陷图片太难找,数据集里面存在大量单一场景图片,请仔细查看图片预览谨慎下载,此外数据集均为小目标检测,如果训练map偏低属于正常现象 数据集格式:Pascal VOC格式YOLO格式(不包含分割路径…...
【SpringBoot】
目录 一、Spring Boot概要 1. SpringBoot介绍 2. SpringBoot优点 3. SpringBoot缺点 4. 时代背景-微服务 二、Spring Boot 核心配置 1. Spring Boot配置文件分类 1.1 application.properties 1.2 application.yml 1.3 小结 2. YAML概述 3. YAML基础语法 3.1 注意事…...
Linux操作系统中MongoDB
1、什么是MongoDB 1、非关系型数据库 NoSQL,泛指非关系型的数据库。随着互联网web2.0网站的兴起,传统的关系数据库在处理web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显得力不从心,出现了很多难以克服的问…...
2、.Net 前端框架:OpenAuth.Net - .Net宣传系列文章
OpenAuth.Net 是一个开源的身份验证框架,由开发者 Yubaolee 创建,它旨在简化 Web 应用和服务的安全授权过程。这个框架以其强大的功能和易用性,为开发人员提供了一种高效的方式来处理用户认证和授权问题。 OpenAuth.Net 的关键特性包括&#…...
unreal engine5制作动作类游戏时,我们使用刀剑等武器攻击怪物或敌方单位时,发现攻击特效、伤害等没有触发
UE5系列文章目录 文章目录 UE5系列文章目录前言一、问题分析二、解决方法1. 添加项目设置碰撞检测通道2.玩家角色碰撞设置3.怪物角色碰撞预设 最终效果 前言 在使用unreal engine5制作动作类游戏时,我们使用刀剑等武器攻击怪物或敌方单位时,发现攻击特效…...
数据权限的设计与实现系列11——前端筛选器组件Everright-filter集成功能完善2
筛选条件数据类型完善 文本类 筛选器组件给了一个文本类操作的范例,如下: Text: [{label: 等于,en_label: Equal,style: noop},{label: 等于其中之一,en_label: Equal to one of,value: one_of,style: tags},{label: 不等于,en_label: Not equal,v…...
C++ 游戏开发
C游戏开发 C 是一种高效、灵活且功能强大的编程语言,因其性能和控制能力而在游戏开发中被广泛应用。许多著名的游戏引擎,如 Unreal Engine、CryEngine 和 Godot 等,都依赖于 C 进行核心开发。本文将详细介绍 C 在游戏开发中的应用࿰…...
【历年CSP-S复赛第一题】暴力解法与正解合集(2019-2022)
P5657 [CSP-S2019] 格雷码P7076 [CSP-S2020] 动物园P7913 [CSP-S 2021] 廊桥分配P8817 [CSP-S 2022] 假期计划 P5657 [CSP-S2019] 格雷码 暴力50分 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(false),cin.tie(0),cout.tie(0) #define int long long #d…...
基于PyQt5和SQLite的数据库操作程序
基于PyQt5和SQLite的数据库操作程序:功能解析 在现代办公和数据处理中,数据库操作是不可或缺的一部分。然而,传统的数据库管理工具往往界面复杂,操作繁琐,对于非专业人士来说存在一定的学习曲线。为了解决这个问题,我们开发了一款基于PyQt5和SQLite的数据库操作程序。该…...
在Ubuntu 20.04中安装CARLA
0. 引言 CARLA (Car Learning to Act) 是一款开源自动驾驶模拟器,其支持自动驾驶系统全管线的开发、训练和验证(Development, Training, and Validation of autonomous driving systems)。Carla提供了丰富的数字资产,例如城市布局…...
【高中数学/对数/导数】曲线y=ln|x|过坐标原点的两切线方程为?
【问题】 曲线yln|x|过坐标原点的两切线方程为?(高考真题) 【出处】 《高考数学 函数与导数题型解题研究》P5第8题 中原教研工作室编著 【解答】 yln|x|的图线分两部分,y轴左边的部分是ylnx的镜像 所以知ylnx上切线过原点的…...
Qt CMake
使用 CMake 构建 CMake 是一款用于简化跨不同平台开发项目的构建流程的工具。 CMake 可自动生成构建系统,如 Makefile 和 Visual Studio 项目文件。 CMake 是一个第三方工具,有自己的文档。 本主题介绍如何在 Qt 5 中使用 CMake 3.1.0。 开始使用 CMak…...
制造企业各部门如何参与生产成本控制与管理?
国内制造业的分量可不轻,从日常生活用品到高端工业设备,中国制造几乎涵盖了各个领域。 不过很多制造业企业在管理方面确实存在一些难题:成本控制不容易,产品质量并不稳定,生产周期也常常较长。 一、中国制造业生产管…...
FireRedTTS - 小红书最新开源AI语音克隆合成系统 免训练一键音频克隆 本地一键整合包下载
小红书技术团队FireRed最近推出了一款名为FireRedTTS的先进语音合成系统,该系统能够基于少量参考音频快速模仿任意音色和说话风格,实现独特的音频内容创造。 FireRedTTS 只需要给定文本和几秒钟参考音频,无需训练,就可模仿任意音色…...
活体检测标签之2.4G有源RFID--SI24R2F+
首先从客户对食品安全和可追溯性的关注切入,引出活体标签这个解决方案。接着分别阐述活体标签在动物养殖和植物产品方面的应用,强调其像 “身份证” 一样记录重要信息,让客户能够了解食品的来源和成长历程,从而放心食用。最后呼吁…...
Web3Auth 如何工作?
Web3Auth 用作钱包基础设施,为去中心化应用程序 (dApp) 和区块链钱包提供增强的灵活性和安全性。在本文档中,我们将探索 Web3Auth 的功能,展示它如何为每个用户和应用程序生成唯一的加密密钥提供程序。 高级架构 Web3Auth SDK 完全存在于用…...
问:SQL中join语法的差异?
在SQL中,JOIN语法用于结合来自两个或多个表的数据。不同类型的JOIN会基于不同的条件来合并表中的数据。以下是几种常见的JOIN及其差异: 假设我们有两个表:employees 和 departments。 employees 表: employee_idnamedepartment_id1Alice10…...
计算机网络各层有哪些协议?计算机网络协议解析:从拟定到实现,全面了解各层协议的作用与区别
在数字化时代,计算机网络无处不在,已经成为不可或缺的一部分。为了让不同设备能够有效地进行通信,网络协议作为一种约定和规则,确保了数据在网络中的可靠传输。今天,我们将深入探讨计算机网络的各层协议,详…...
解决方案:机器学习中,基学习器 跟 弱学习器,有什么区别
文章目录 一、现象二、解决方案 一、现象 在工作中,在机器学习中,有时候会看到基学习器 跟 弱学习器,会容易混淆,所以整理一下 二、解决方案 在机器学习中,“基学习器”(Base Learner)和“弱…...
【Python】ftfy 使用指南:修复 Unicode 编码问题
ftfy(fixes text for you)是一个专为修复各种文本编码错误而设计的 Python 工具。它的主要目标是将损坏的 Unicode 文本恢复为正确的 Unicode 格式。ftfy 并非用于处理非 Unicode 编码,而是旨在修复因为编码不一致、解码错误或混合编码导致的…...
创建免费网站/短视频seo排名加盟
1. Java线程的状态 Java线程在某个时刻只能处于以下六个状态中的一个。 – New(新创建),一个线程刚刚被创建出来,还没有开始运行的状态,更通俗点说:还没有调用start方法; – Runnableÿ…...
中企做的网站太原/东莞百度seo推广公司
SQL Server的总结还没有做完,但剩下的我想慢慢来了。今天开始打算进行学生信息管理系统的实战。不知道就是想实战了,不动手学习不踏实。打开刚从师父那儿拿到的学生信息管理系统,有点庞大,此时我的胸腔里有一股嗜血的冲动…...
做平面设计兼职的网站有哪些/武汉建站公司
点击链接http://hi.baidu.com/rungok/item/1c20bceb876a4e355a2d64a9转载于:https://blog.51cto.com/rungok/960049...
wordpress 评论到微博/谷歌浏览器下载安装2023最新版
这些公司究竟怎么“区块链”,上交所和深交所也很好奇。 文 | 雪姣 运营 | 盖遥 编辑 | 卢晓明出品 | Odaily星球日报(ID:o-daily)区块链给 A 股“拉盘”,还管用吗?5 月 26 日(周日)…...
做网站云服务器选择多大带宽/商丘网站seo
审批流属于工作流的范畴,一提起工作流,一般都会觉得那东西太复杂了,确实要做到一个方便的比较通用的工作流不是件简单的事情,但也不是那么难的事情,只是要花多的时间去琢磨和研究。这里我想给大家分享一个项目中涉及的…...
济南智能网站建设服务/网络广告营销的特点
一、windows server 2003 3790版本识别 RTMrelease to manufacture (公开发行批量生产)是给硬件制造商的版本!是送去压盘的,不是拿去卖的。 OEMOriginal Equipment Manufacturer只能全新安装, 和RTM差不多,只是称呼不同…...