当前位置: 首页 > news >正文

大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 拦截器实现 Java

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 需求分析 指标口径
  • 日志数据采集 taildir source HDFS Sink Agent Flume
  • 优化配置

在这里插入图片描述

Flume的优化配置

Flume 是一种分布式、可靠且高效的数据收集、聚合和传输系统,广泛应用于大数据生态系统中。为了提升 Flume 的性能和稳定性,优化配置至关重要。

使用如下的指令,启动Agent进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs1.conf -name a1 -Dflum
e.roog.logger=INFO,console

启动后的截图如下所示:
在这里插入图片描述

查看刚才的Flume窗口:
在这里插入图片描述

查看HDFS的内容:
在这里插入图片描述

批量处理

  • 参数:batchSize
  • 作用:控制 Flume 在批量传输时每次传输的事件数量。
    配置建议:
  • Source 到 Channel:根据 Source 的吞吐量和 Channel 的吞吐能力调整,推荐值为 100-1000。
  • Channel 到 Sink:根据 Sink 的处理能力和目标系统的写入性能调整,推荐值为 500-5000。

压缩传输

  • 参数:compressionType
  • 作用:对事件进行压缩后传输,减少网络带宽消耗。
  • 支持的压缩类型:gzip、snappy、lz4 等。
  • 配置建议:根据目标系统是否支持解压缩功能选择合适的压缩类型。

Source 优化

Taildir Source

  • 参数:batchSize 和 fileHeader
  • batchSize:设置单次从文件中读取的事件数量。
  • fileHeader:是否在事件头部添加文件名,推荐开启以便于后续处理。

Kafka Source

  • 参数:kafka.consumer.timeout.ms 和 fetch.message.max.bytes
  • kafka.consumer.timeout.ms:设置 Kafka 消费者读取数据的超时时间,通常为 100-500ms。
  • fetch.message.max.bytes:设置每次读取的最大消息大小,默认值通常为 1MB,可以根据业务场景适当调整。

Channel 优化

Memory Channel

  • 参数:capacity 和 transactionCapacity
  • capacity:Channel 中允许的最大事件数。
  • transactionCapacity:单次事务中允许的最大事件数。

File Channel

  • 参数:checkpointDir 和 dataDirs
  • checkpointDir:存储 Channel 状态的目录。
  • dataDirs:存储事件数据的目录,建议设置多个磁盘路径以提升 IO 性能。
  • 配置建议:确保磁盘 IO 性能足够,避免瓶颈。

Flume报错解决

向 logs 目录中存放入日志文件,此时如果出现OOM的日志,是因为缺省情况下FlumeJVM的最大分配20M,这个值太小,需要调整。
我这里直接放入:

vim /opt/wzk/logs/start/test.log2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

解决方案:
在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容:

export JAVA_OPTS="-Xms4000m -Xmx4000m -
Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf flume-1.9/conf --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,consoleflume-ng agent --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

Flume内存参数设置及优化:

  • 根据日志数据量大小,JVM堆一般要设置为4G或者更高
  • -Xms -Xmx最好设置一致,减少内存抖动带来的性能影响

自定义拦截器

前面FlumeAgent的配置使用了本地时间,可能导致数据存放的路径不正确。要解决上面的问题就需要使用自定义拦截器。
Agent用于测试自定义拦截器,source => logger sink
flumetest1.conf

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器原理

自定义拦截器的原理:

  • 自定义拦截器要集成 Flume 的 Interceptor
  • Event 分为 header 和 body (接收的字符串)
  • 获取 header 和 body
  • 从 body 中 获取 time,并将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置到header中

自定义拦截器实现

自定义拦截器的实现:

  • 获取event的header
  • 获取event的body
  • 解析body获取json串
  • 解析json串获取时间戳
  • 将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置header中
  • 返回event

导入依赖

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency>
</dependencies>

编写代码

package icu.wzk;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class CustomerInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 这里是逐条处理String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);// 获取Event的HeaderMap<String, String> headerMap = event.getHeaders();// 解析Body获取JSON字符串String[] bodyArr = eventBody.split("\\s+");try {String jsonStr = bodyArr[6];// 解析JSON字符串获取时间戳JSONObject jsonObject = JSON.parseObject(jsonStr);String timestampStr = jsonObject.getJSONObject("app_active").getString("time");// 将时间戳转换字符串 yyyy-MM-dd// 将字符串转换为Longlong timestampLong = Long.parseLong(timestampStr);DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");Instant instant = Instant.ofEpochMilli(timestampLong);LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());String date = formatter.format(localDateTime);// 将转换后的字符串放置header中headerMap.put("logtime", date);event.setHeaders(headerMap);} catch (Exception e) {headerMap.put("logtime", "Unknown");event.setHeaders(headerMap);}return event;}@Overridepublic List<Event> intercept(List<Event> list) {List<Event> lstEvent = new ArrayList<>();for (Event event : list) {Event outEvent = intercept(event);if (outEvent != null) {lstEvent.add(outEvent);}}return lstEvent;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomerInterceptor();}@Overridepublic void configure(Context context) {}}}

相关文章:

大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 拦截器实现 Java

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; 目前开始更新 MyBatis&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff0…...

idea maven 重新构建索引

当设置maven仓库为离线模式的时候&#xff0c;会出现一些问题。 比如本地的仓库被各种方式手动更新之后&#xff0c; 举例&#xff1a;我需要一个spring的包&#xff0c;在pmo文件中写好了引入包的代码 但是由于是离线模式没有办法触发自动下载&#xff0c;那么这个时候我可以…...

C#桌面应用制作计算器

C#桌面应用制作简易计算器&#xff0c;可实现数字之间的加减乘除、AC按键清屏、Del按键清除末尾数字、/-按键取数字相反数、%按键使数字缩小100倍、按键显示运算结果等...... 页面实现效果 功能实现 布局 计算器主体使用Panel容器&#xff0c;然后将button控件排列放置Pane…...

细说STM32单片机DMA中断收发RTC实时时间并改善其鲁棒性的方法

目录 一、DMA基础知识 1、DMA简介 (1)DMA控制器 (2)DMA流 (3)DMA请求 (4)仲裁器 (5)DMA传输属性 2、源地址和目标地址 3、DMA传输模式 4、传输数据量的大小 5、数据宽度 6、地址指针递增 7、DMA工作模式 8、DMA流的优先级别 9、FIFO或直接模式 10、单次传输或突…...

【Unity/Animator动画系统】多层动画状态机实现角色的基本移动

文章目录 前言实现顶层地面状态四方向混合树计算动画所需参数 空中状态分层动画 前言 最近打算做个Rougelike RPG 塔科夫 混搭风格的冒险游戏。暂且就当是一个有随机元素&#xff0c;有基地&#xff0c;死亡会掉落物品的近战塔科夫罢。 花了三天时间&#xff0c;整合了Mixa…...

每日算法一练:剑指offer——栈与队列篇(1)

1.图书整理II 读者来到图书馆排队借还书&#xff0c;图书管理员使用两个书车来完成整理借还书的任务。书车中的书从下往上叠加存放&#xff0c;图书管理员每次只能拿取书车顶部的书。排队的读者会有两种操作&#xff1a; push(bookID)&#xff1a;把借阅的书籍还到图书馆。pop…...

【Java】ArrayList与LinkedList详解!!!

目录 一&#x1f31e;、List 1&#x1f345;.什么是List&#xff1f; 2&#x1f345;.List中的常用方法 二&#x1f31e;、ArrayList 1&#x1f34d;.什么是ArrayList? 2&#x1f34d;.ArrayList的实例化 3&#x1f34d;.ArrayList的使用 4&#x1f34d;.ArrayList的遍…...

怎么用VIM查看UVM源码

利用ctags工具可以建立源码的索引表&#xff0c;在使用VIM或其他文本编辑器时&#xff0c;就可以跳转查看所调用的UVM或VIP的funtcion/task/class等源码了。 首先需要确认ctags安装&#xff0c;一般安装VIM后都有&#xff0c;如果没有可以手动安装。在VIM中可以输入:help ctag…...

数据结构C语言描述3(图文结合)--双链表、循环链表、约瑟夫环问题

前言 这个专栏将会用纯C实现常用的数据结构和简单的算法&#xff1b;有C基础即可跟着学习&#xff0c;代码均可运行&#xff1b;准备考研的也可跟着写&#xff0c;个人感觉&#xff0c;如果时间充裕&#xff0c;手写一遍比看书、刷题管用很多&#xff0c;这也是本人采用纯C语言…...

第二十五章 TCP 客户端 服务器通信 - TCP 设备的 READ 命令

文章目录 第二十五章 TCP 客户端 服务器通信 - TCP 设备的 READ 命令TCP 设备的 READ 命令READ 修改 $ZA 和 $ZB$ZA 和 READ 命令 第二十五章 TCP 客户端 服务器通信 - TCP 设备的 READ 命令 TCP 设备的 READ 命令 从服务器或客户端发出 READ 命令以读取客户端或服务器设置的…...

【C++】哈希表的实现详解

哈希表的实现详解 一、哈希常识1.1、哈希概念1.2、哈希冲突1.3、哈希函数&#xff08;直接定执 除留余数&#xff09;1.4、哈希冲突解决闭散列&#xff08;线性探测 二次探测&#xff09;开散列 二、闭散列哈希表的模拟实现2.1、框架2.2、哈希节点状态的类2.3、哈希表的扩容2…...

高阶C语言之五:(数据)文件

目录 文件名 文件类型 文件指针 文件的打开和关闭 文件打开模式 文件操作函数&#xff08;顺序&#xff09; 0、“流” 1、字符输出函数fputc 2、字符输入函数fgetc 3、字符串输出函数fputs 4、 字符串输入函数fgets 5、格式化输入函数fscanf 6、格式化输出函数fpr…...

服务器上部署并启动 Go 语言框架 **GoZero** 的项目

要在服务器上部署并启动 Go 语言框架 **GoZero** 的项目&#xff0c;下面是一步步的操作指南&#xff1a; ### 1. 安装 Go 语言环境 首先&#xff0c;确保你的服务器上已安装 Go 语言。如果还没有安装&#xff0c;可以通过以下步骤进行安装&#xff1a; #### 1.1 安装 Go 语…...

【Java SE 】继承 与 多态 详解

&#x1f525;博客主页&#x1f525;&#xff1a;【 坊钰_CSDN博客 】 欢迎各位点赞&#x1f44d;评论✍收藏⭐ 目录 1. 继承 1.1 继承的原因 1.2 继承的概念 1.3 继承的语法 2. 子类访问父类 2.1 子类访问父类成员变量 2.1.1 子类与父类不存在同名成员变量 2.1.2 子类…...

【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法

【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法 目录 文章目录 【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法目录摘要&#xff1a;研究背景&#xff1a;问题与挑战&#xff1a;如何解…...

秋招大概到此结束了

1、背景 学院本&#xff0c;软工&#xff0c;秋招只有同程&#xff0c;快手和网易面试&#xff0c;后两家kpi&#xff08;因为面试就很水&#xff09;&#xff0c;秋招情况&#xff1a;哈啰&#xff08;实习转正ing&#xff09;&#xff0c;同程测开offer。 2、走测开的原因 很…...

华为OD机试真题---字符串化繁为简

华为OD机试真题中的“字符串化繁为简”题目是一个涉及字符串处理和等效关系传递的问题。以下是对该题目的详细解析&#xff1a; 一、题目描述 给定一个输入字符串&#xff0c;字符串只可能由英文字母&#xff08;a~z、A~Z&#xff09;和左右小括号&#xff08;(、)&#xff0…...

概念解读|K8s/容器云/裸金属/云原生...这些都有什么区别?

随着容器技术的日渐成熟&#xff0c;不少企业用户都对应用系统开展了容器化改造。而在容器基础架构层面&#xff0c;很多运维人员都更熟悉虚拟化环境&#xff0c;对“容器圈”的各种概念容易混淆&#xff1a;容器就是 Kubernetes 吗&#xff1f;容器云又是什么&#xff1f;容器…...

初识Arkts

创建对象&#xff1a; 类&#xff1a; 类声明引入一个新类型&#xff0c;并定义其字段、方法和构造函数。 定义类后&#xff0c;可以使用关键字new创建实例 可以使用对象字面量创建实例 在以下示例中&#xff0c;定义了Person类&#xff0c;该类具有字段name和surname、构造函…...

基本的SELECT语句

1.SQL概述 SQL&#xff08;Structured Query Language&#xff09;是一种用于管理和操作关系数据库的编程语言。它是一种标准化的语言&#xff0c;用于执行各种数据库操作&#xff0c;包括创建、查询、插入、更新和删除数据等。 SQL语言具有简单、易学、高效的特点&#xff0c;…...

51c自动驾驶~合集30

我自己的原文哦~ https://blog.51cto.com/whaosoft/12086789 #跨越微小陷阱&#xff0c;行动更加稳健 目前四足机器人的全球市场上&#xff0c;市场份额最大的是哪个国家的企业&#xff1f;A.美国 B.中国 C.其他 波士顿动力四足机器人 云深处 绝影X30 四足机器人 &#x1f…...

Python Tutor网站调试利器

概述 本文主要是推荐一个网站:Python Tutor. 网站首页写道: Online Compiler, Visual Debugger, and AI Tutor for Python, Java, C, C++, and JavaScript Python Tutor helps you do programming homework assignments in Python, Java, C, C++, and JavaScript. It contai…...

h5小游戏实现获取本机图片

h5小游戏实现获取本机图片 本文使用cocos引擎 1.1 需求 用户通过文件选择框选择图片。将图片内容转换为Cocos Creator的纹理 (cc.Texture2D),将纹理设置到 cc.SpriteFrame 并显示到节点中。 1.2 实现步骤 创建文件输入框用于获取文件 let input document.createElement(&quo…...

前端 javascript a++和++a的区别

前端 javascript a和a的区别 a 是先执行表达式后再自增&#xff0c;执行表达式时使用的是a的原值。a是先自增再执行表达示&#xff0c;执行表达式时使用的是自增后的a。 var a0 console.log(a); // 输出0 console.log(a); // 输出1var a0 console.log(a); // 输出1 console.l…...

OceanBase V4.x应用实践:如何排查表被锁问题

DBA在日常工作中常常会面临以下两种常见情况&#xff1a; 业务人员会提出问题&#xff1a;“表被锁了&#xff0c;导致业务受阻&#xff0c;请帮忙解决。” 业务人员还会反馈&#xff1a;“某个程序通常几秒内就能执行完毕&#xff0c;但现在却运行了好几分钟&#xff0c;不清楚…...

ctfshow-web入门-SSRF(web351-web360)

目录 1、web351 2、web352 3、web353 4、web354 5、web355 6、web356 7、web357 8、web358 9、web359 10、web360 1、web351 看到 curl_exec 函数&#xff0c;很典型的 SSRF 尝试使用 file 协议读文件&#xff1a; urlfile:///etc/passwd 成功读取到 /etc/passwd 同…...

【日常记录-Git】如何为post-checkout脚本传递参数

1. 简介 在Git中&#xff0c;post-checkout 钩子是一个在git checkout 或git switch命令成功执行后自动调用的脚本。该脚本不接受任何来自Git命令的直接参数&#xff0c;因为Git设计该钩子是为了在特定的版本控制操作后执行一些预定义的任务&#xff0c;而不是作为一个通用的脚…...

《机器人控制器设计与编程》考试试卷**********大学2024~2025学年第(1)学期

消除误解&#xff0c;课程资料逐步公开。 复习资料&#xff1a; Arduino-ESP32机器人控制器设计练习题汇总_arduino编程语言 题-CSDN博客 试卷样卷&#xff1a; 开卷考试&#xff0c;时间&#xff1a; 2024年11月16日 001 002 003 004 005 ……………………装………………………...

后台管理系统(开箱即用)

很久没有更新博客了&#xff0c;给大家带上一波福利吧,大佬勿扰 现在市面上流行的后台管理模板很多,若依,芋道等,可是这些框架对我们来说可能会有点重,所以我自己从0到1写了一个后台管理模板,你们使用时候可扩展性也会更高 项目主要功能: 成员管理&#xff0c;部门管理&#…...

5G CPE与4G CPE的主要区别有哪些

什么是CPE&#xff1f; CPE是Customer Premise Equipment&#xff08;客户前置设备&#xff09;的缩写&#xff0c;也可称为Customer-side Equipment、End-user Equipment或On-premises Equipment。CPE通常指的是位于用户或客户处的网络设备或终端设备&#xff0c;用于连接用户…...

国内做外单的网站有哪些资料/百度推广登录入口下载

Nginx 的 ngx_http_auth_basic_module模块允许通过使用“HTTP基本认证”协议验证用户名和密码来限制对资源的访问。 配置举例&#xff1a; location / {auth_basic "closed site";auth_basic_user_file conf/htpasswd; } #生成htpasswd密码文件&#xff0c…...

天猫网站左侧导航用js怎么做/seo推广怎么样

方法有点笨&#xff0c;但是&#xff0c;没有找到好一点的办法&#xff0c;就这样先装着&#xff0c;看朋友们是否也有需要&#xff0c;记录一下 CentOS 下安装MySQL5.7的时候出现各种问题&#xff0c;各种报错&#xff0c;试过无数办法&#xff0c;今天终于安装上去&#xff0…...

wordpress 笔记插件/nba排名赛程

2015年5月8日&#xff0c;国务院公布《中国制造2025》,这是中国版的“工业4.0”规划。该规划提到“加快推动新一代信息技术与制造技术融合发展&#xff0c;把智能制造作为两化深度融合的主攻方向。大数据时代&#xff0c;利用大数据驱动业务发展&#xff0c;打造企业新型能力势…...

做网站订金是多少/简述网站推广的方法

4、新建Makefile.am新建Makefile.am文件&#xff0c;命令&#xff1a;$ vi Makefile.am 内容如下:AUTOMAKE_OPTIONSforeignbin_PROGRAMShelloworldhelloworld_SOURCEShelloworld.c automake会根据你写的Makefile.am来自动生成Makefile.in。Makefile.am中定义的宏和目标,会指导a…...

自己做时时彩网站/手机制作网页

https://blog.csdn.net/z13615480737/article/details/78906598 使用yum&#xff0c;比较简单&#xff0c;不用考虑版本依赖问题转载于:https://www.cnblogs.com/a1304908180/p/10161299.html...

广告设计公司网/seo软文是什么

Openmeetings 当前作为Apache下的一个项目&#xff0c;基于JAVA开发&#xff0c;主要用于提供视频会议、即时通讯、白板、协作文档等群件工具&#xff0c;通过使用Red 5流媒体服务器处理媒体流。 Openmeetings的主要功能和特性: 音频、视频会议 会议前可选择音频、视频、音…...