当前位置: 首页 > news >正文

Flume 简介及基本使用

1.Flume简介

Apache Flume 是一个分布式,高可用的数据收集系统。它可以从不同的数据源收集数据,经过聚合后发送到存储系统中,通常用于日志数据的收集。Flume 分为 NG 和 OG (1.0 之前) 两个版本,NG 在 OG 的基础上进行了完全的重构,是目前使用最为广泛的版本。下面的介绍均以 NG 为基础。

2. Flume架构和基本概念

下图为 Flume 的基本架构图:

2.1 基本架构

外部数据源以特定格式向 Flume 发送 `events` (事件),当 `source` 接收到 `events` 时,它将其存储到一个或多个 `channel`,`channe` 会一直保存 `events` 直到它被 `sink` 所消费。`sink` 的主要功能从 `channel` 中读取 `events`,并将其存入外部存储系统或转发到下一个 `source`,成功后再从 `channel` 中移除 `events`。

2.2 基本概念

1. Event

`Event` 是 Flume NG 数据传输的基本单元。类似于 JMS 和消息系统中的消息。一个 `Event` 由标题和正文组成:前者是键/值映射,后者是任意字节数组。

2. Source 

数据收集组件,从外部数据源收集数据,并存储到 Channel 中。

3. Channel

`Channel` 是源和接收器之间的管道,用于临时存储数据。可以是内存或持久化的文件系统:

+ `Memory Channel` : 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机);

+ `File Channel` : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。

4. Sink

`Sink` 的主要功能从 `Channel` 中读取 `Event`,并将其存入外部存储系统或将其转发到下一个 `Source`,成功后再从 `Channel` 中移除 `Event`。

5. Agent

是一个独立的 (JVM) 进程,包含 `Source`、 `Channel`、 `Sink` 等组件。

2.3 组件种类

Flume 中的每一个组件都提供了丰富的类型,适用于不同场景:

- Source 类型 :内置了几十种类型,如 `Avro Source`,`Thrift Source`,`Kafka Source`,`JMS Source`;

- Sink 类型 :`HDFS Sink`,`Hive Sink`,`HBaseSinks`,`Avro Sink` 等;

- Channel 类型 :`Memory Channel`,`JDBC Channel`,`Kafka Channel`,`File Channel` 等。

对于 Flume 的使用,除非有特别的需求,否则通过组合内置的各种类型的 Source,Sink 和 Channel 就能满足大多数的需求。

3. Flume架构模式

Flume 支持多种架构模式,分别介绍如下

3.1 multi-agent flow

Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和下一个 Agent 的 Source 都必须是 `Avro` 类型,Sink 指向 Source 所在主机名 (或 IP 地址) 和端口(详细配置见下文案例三)。

3.2 Consolidation

日志收集中常常存在大量的客户端(比如分布式 web 服务),Flume 支持使用多个 Agent 分别收集日志,然后通过一个或者多个 Agent 聚合后再存储到文件系统中。

3.3 Multiplexing the flow

Flume 支持从一个 Source 向多个 Channel,也就是向多个 Sink 传递事件,这个操作称之为 `Fan Out`(扇出)。默认情况下 `Fan Out` 是向所有的 Channel 复制 `Event`,即所有 Channel 收到的数据都是相同的。同时 Flume 也支持在 `Source` 上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。

4.Flume配置格式

Flume 配置通常需要以下两个步骤:

1. 分别定义好 Agent 的 Sources,Sinks,Channels,然后将 Sources 和 Sinks 与通道进行绑定。需要注意的是一个 Source 可以配置多个 Channel,但一个 Sink 只能配置一个 Channel。基本格式如下:

<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

2. 分别定义 Source,Sink,Channel 的具体属性。基本格式如下:

<Agent>.sources.<Source>.<someProperty> = <someValue>properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

5. Flume使用案例

介绍几个 Flume 的使用案例:

+ 案例一:使用 Flume 监听文件内容变动,将新增加的内容输出到控制台。

+ 案例二:使用 Flume 监听指定目录,将目录下新增加的文件存储到 HDFS。

+ 案例三:使用 Avro 将本服务器收集到的日志数据发送到另外一台服务器。

5.1 案例一

需求: 监听文件内容变动,将新增加的内容输出到控制台。

实现: 主要使用 `Exec Source` 配合 `tail` 命令实现。

1. 配置

新建配置文件 `exec-memory-logger.properties`,其内容如下:

#指定agentsources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  #配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c#将sourceschannels进行绑定
a1.sources.s1.channels = c1#配置sink 
a1.sinks.k1.type = logger#将sinkschannels进行绑定  
a1.sinks.k1.channel = c1  #配置channel类型
a1.channels.c1.type = memory

2. 启动

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \
--name a1 \
-Dflume.root.logger=INFO,console

3. 测试

向文件中追加数据:

控制台的显示:

5.2 案例二

需求: 监听指定目录,将目录下新增加的文件存储到 HDFS。

实现:使用 `Spooling Directory Source` 和 `HDFS Sink`。

1. 配置

#指定agentsources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  #配置sources属性
a1.sources.s1.type =spooldir  
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName 
#将sourceschannels进行绑定  
a1.sources.s1.channels =c1 #配置sink 
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream  
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#将sinkschannels进行绑定  
a1.sinks.k1.channel = c1#配置channel类型
a1.channels.c1.type = memory

2. 启动

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console

3. 测试

拷贝任意文件到监听目录下,可以从日志看到文件上传到 HDFS 的路径:

cp log.txt logs/

查看上传到 HDFS 上的文件内容与本地是否一致:

hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801

5.3 案例三

需求: 将本服务器收集到的数据发送到另外一台服务器。

实现:使用 `avro sources` 和 `avro Sink` 实现。

1. 配置日志收集Flume

新建配置 `netcat-memory-avro.properties`,监听文件内容变化,然后将新的文件内容通过 `avro sink` 发送到 hadoop001 这台服务器的 8888 端口:

#指定agentsources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

2. 配置日志聚合Flume

使用 `avro source` 监听 hadoop001 服务器的 8888 端口,将获取到内容输出到控制台:

#指定agentsources,sinks,channels
a2.sources = s2
a2.sinks = k2
a2.channels = c2#配置sources属性
a2.sources.s2.type = avro
a2.sources.s2.bind = hadoop001
a2.sources.s2.port = 8888#将sourceschannels进行绑定
a2.sources.s2.channels = c2#配置sink
a2.sinks.k2.type = logger#将sinkschannels进行绑定
a2.sinks.k2.channel = c2#配置channel类型
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

3. 启动

启动日志聚集 Flume:

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \
--name a2 -Dflume.root.logger=INFO,console

在启动日志收集 Flume:

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
--name a1 -Dflume.root.logger=INFO,console

这里建议按以上顺序启动,原因是 `avro.source` 会先与端口进行绑定,这样 `avro sink` 连接时才不会报无法连接的异常。但是即使不按顺序启动也是没关系的,`sink` 会一直重试,直至建立好连接。

4.测试

向文件 `tmp/log.txt` 中追加内容:

可以看到已经从 8888 端口监听到内容,并成功输出到控制台:

相关文章:

Flume 简介及基本使用

1.Flume简介 Apache Flume 是一个分布式&#xff0c;高可用的数据收集系统。它可以从不同的数据源收集数据&#xff0c;经过聚合后发送到存储系统中&#xff0c;通常用于日志数据的收集。Flume 分为 NG 和 OG (1.0 之前) 两个版本&#xff0c;NG 在 OG 的基础上进行了完全的重构…...

行业追踪,2023-10-11

自动复盘 2023-10-11 凡所有相&#xff0c;皆是虚妄。若见诸相非相&#xff0c;即见如来。 k 线图是最好的老师&#xff0c;每天持续发布板块的rps排名&#xff0c;追踪板块&#xff0c;板块来开仓&#xff0c;板块去清仓&#xff0c;丢弃自以为是的想法&#xff0c;板块去留让…...

Linux:进程控制

目录 一、进程创建 写时拷贝 二、进程终止 echo $? 如何终止进程 _exit与exit 三、进程等待 进程等待的必要性 进程等待的操作 wait waitpid status 异常退出情况 status相关宏 options 四、进程程序替换 1、关于进程程序替换 2、如何进行进程程序替换 程序…...

HTTP中的GET方法与POST方法

1、GET 和 POST方法之间的区别 根据 RFC 规范&#xff0c;GET 的语义是从服务器获取指定的资源&#xff0c;这个资源可以是静态的文本、页面、图片视频等。GET 请求的参数位置一般是写在 URL 中&#xff0c;URL 规定只能支持 ASCII&#xff0c;所以 GET 请求的参数只允许 ASCI…...

2023年10月16日-10月22日,(光追+ue+osg继续按部就班进行即可。)

根据月计划&#xff0c; 本周计划如下&#xff1a; 2023年10月16日-10月22日&#xff0c;光追10.7-10.13&#xff0c;ue rpg(p47-p53),ue5底层渲染01A19-01B4,osg29,osg30,filament文档每天看 落实到天就是 2023年10月16日光追10.7&#xff0c;ue rpg(p47),ue5底层渲染01A19,o…...

【Docker】命令使用大全

【Docker】命令使用大全 目录 【Docker】命令使用大全 简述 Docker 的主要用途 基本概念 容器周期管理 run start/stop/restart kill rm pause/unpause create exec 容器操作 ps inspect top attach events logs wait export port 容器 rootfs 命令 c…...

查找算法:二分查找、插值查找、斐波那契查找

二分查找 查找的前提是数组有序 思路分析 代码实现 # 二分查找&#xff08;递归法实现&#xff09; # 找到一个相等的值就返回该值的下标 def binary_search(arr: list, find_val: int, left: int, right: int):mid (left right) // 2 # 寻找数组中间位置的下标if left &…...

python+django高校教室资源预约管理系统lqg8u

技术栈 后端&#xff1a;pythondjango 前端&#xff1a;vueCSSJavaScriptjQueryelementui 开发语言&#xff1a;Python 框架&#xff1a;django/flask Python版本&#xff1a;python3.7.7 数据库&#xff1a;mysql 数据库工具&#xff1a;Navicat 开发软件&#xff1a;PyChar…...

Potato靶机

信息搜集 设备发现 扫描端口 综合扫描 开放了80端口的HTTP服务和7120端口的SSH服务 目录扫描 扫描目录 看看这个info.php&#xff0c;发现只有php的版本信息&#xff0c;没有可以利用的注入点 SSH突破 hydra 爆破 考虑到 7120 端口是 ssh 服务&#xff0c;尝试利用 hydra …...

【环境搭建】linux docker-compose安装gitlab和redis

gitlab需要redis&#xff0c;一起安装了 新建gitlab和redis挂载目录 mkdir -p /data/docker/redis/data mkdir -p /data/docker/redis/logs mkdir -p /data/docker/redis/confmkdir -p /data/docker/gitlab/data mkdir -p /data/docker/gitlab/logs mkdir -p /data/docker/gi…...

JAVAEE初阶相关内容第十三弹--文件操作 IO

写在前 终于完成了&#xff01;&#xff01;&#xff01;&#xff01;内容不多就是本人太拖拉&#xff01; 这里主要介绍文件input&#xff0c;output操作。File类&#xff0c;流对象&#xff08;分为字节流、字符流&#xff09; 需要掌握每个流对象的使用方式&#xff1a;打…...

POI报表的高级应用

POI报表的高级应用 掌握基于模板打印的POI报表导出理解自定义工具类的执行流程 熟练使用SXSSFWorkbook完成百万数据报表打印理解基于事件驱动的POI报表导入 模板打印 概述 自定义生成Excel报表文件还是有很多不尽如意的地方&#xff0c;特别是针对复杂报表头&#xff0c;单…...

【计算机毕设选题推荐】超市管理系统SpringBoot+SSM+Vue

前言&#xff1a;我是IT源码社&#xff0c;从事计算机开发行业数年&#xff0c;专注Java领域&#xff0c;专业提供程序设计开发、源码分享、技术指导讲解、定制和毕业设计服务 项目名 基于SpringBoot的超市管理系统 技术栈 SpringBootVueMySQLMaven 文章目录 一、超市管理系统…...

【算法1-4】递推与递归-P1002 [NOIP2002 普及组] 过河卒

## 题目描述 棋盘上 A 点有一个过河卒&#xff0c;需要走到目标 B 点。卒行走的规则&#xff1a;可以向下、或者向右。同时在棋盘上 C 点有一个对方的马&#xff0c;该马所在的点和所有跳跃一步可达的点称为对方马的控制点。因此称之为“马拦过河卒”。 棋盘用坐标表示&#…...

浅谈压力测试的作用是什么

随着现代应用程序变得越来越复杂&#xff0c;用户的期望也在不断提高&#xff0c;对性能和可靠性的要求变得更加苛刻。在应用程序开发和维护的过程中&#xff0c;压力测试是一项至关重要的活动&#xff0c;它可以帮助发现潜在的问题、评估系统的性能极限&#xff0c;以及确保在…...

互联网Java工程师面试题·Java 总结篇·第一弹

目录 1、面向对象的特征有哪些方面&#xff1f; 2、访问修饰符 public,private,protected,以及不写&#xff08;默认&#xff09;时的区别&#xff1f; 3、String 是最基本的数据类型吗&#xff1f; 4、float f3.4;是否正确&#xff1f; 5、short s1 1; s1 s1 1;有错吗…...

Anylogic 读取和写入Excel文件

1、选择面板-连接-Excel文件&#xff0c;拖入到视图中 然后在excel文件的属性中进行绑定外部excel文件。 绑定完之后&#xff0c;在你需要读取的地方进行写代码&#xff0c; //定义开始读取的行数 //这里设为2&#xff0c;是因为第一行是数据名称 int row12; //读取excel文件信…...

茶百道全链路可观测实战

作者&#xff1a;山猎 茶百道是四川成都的本土茶饮连锁品牌&#xff0c;创立于 2008 年 。经过 15 年的发展&#xff0c;茶百道已成为餐饮标杆品牌&#xff0c;全国门店超 7000 家&#xff0c;遍布全国 31 个省市&#xff0c;实现中国大陆所有省份及各线级城市的全覆盖。2021 …...

Java-JDBC

JDBC JDBC英文名为&#xff1a;Java Data Base Connectivity(Java数据库连接)&#xff0c;官方解释它是Java编程语言和广泛的数据库之间独立于数据库的连接标准的Java API 根本上说JDBC是一种规范&#xff0c;它提供的接口&#xff0c;一套完整的&#xff0c;允许便捷式访问底…...

【ROS】Nav2源码之nav2_planner详解

【ROS】郭老二博文之:ROS目录 1、简述 nav2_planner是路径规划器,把起始位置、姿势的信息输入nav2_planner模块,将会生成可行路径。 nav2_planner路径规划器和nav2_controller控制器相似,也使用插件的形式加载不同的路径规划器。 常用的路径规划器插件有: 1)NavFn Plan…...

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接&#xff1a;3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

黑马Mybatis

Mybatis 表现层&#xff1a;页面展示 业务层&#xff1a;逻辑处理 持久层&#xff1a;持久数据化保存 在这里插入图片描述 Mybatis快速入门 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6501c2109c4442118ceb6014725e48e4.png //logback.xml <?xml ver…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

系统设计 --- MongoDB亿级数据查询优化策略

系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log&#xff0c;共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题&#xff0c;不能使用ELK只能使用…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

#Uniapp篇:chrome调试unapp适配

chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器&#xff1a;Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

从零开始了解数据采集(二十八)——制造业数字孪生

近年来&#xff0c;我国的工业领域正经历一场前所未有的数字化变革&#xff0c;从“双碳目标”到工业互联网平台的推广&#xff0c;国家政策和市场需求共同推动了制造业的升级。在这场变革中&#xff0c;数字孪生技术成为备受关注的关键工具&#xff0c;它不仅让企业“看见”设…...

Python爬虫实战:研究Restkit库相关技术

1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的有价值数据。如何高效地采集这些数据并将其应用于实际业务中,成为了许多企业和开发者关注的焦点。网络爬虫技术作为一种自动化的数据采集工具,可以帮助我们从网页中提取所需的信息。而 RESTful API …...