离线数仓同步数据1
用户行为表数据同步
- 2.1.4 日志消费Flume测试
[gpb@hadoop104 ~]$ cd /opt/module/flume/
[gpb@hadoop104 flume]$ cd job/
[gpb@hadoop104 job]$ rm file_to_kafka.conf
com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 3#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.1.3 日志消费Flume配置实操
1)创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_log.conf
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf
2)配置文件内容如下#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置优化
1)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
2)HDFS Sink优化
(1)HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。(2)HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
3)编写Flume拦截器
(1)数据漂移问题(2)在com.atguigu.gmall.flume.interceptor包下创建TimestampInterceptor类
package com.atguigu.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取header和body的数据Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);//2、将body的数据类型转成jsonObject类型(方便获取数据)JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}
}
(3)重新打包(4)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面。
2.1.4 日志消费Flume测试
1)启动Zookeeper、Kafka集群
2)启动日志采集Flume
[atguigu@hadoop102 ~]$ f1.sh start
3)启动hadoop104的日志消费Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
4)生成模拟数据
[atguigu@hadoop102 ~]$ lg.sh
5)观察HDFS是否出现数据
2.1.5 日志消费Flume启停脚本
若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
[atguigu@hadoop102 bin]$ vim f2.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
"start")echo " --------启动 hadoop104 日志数据flume-------"ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")echo " --------停止 hadoop104 日志数据flume-------"ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f2.sh
3)f2启动
[atguigu@hadoop102 module]$ f2.sh start
4)f2停止
[atguigu@hadoop102 module]$ f2.sh stop
相关文章:
离线数仓同步数据1
用户行为表数据同步 2.1.4 日志消费Flume测试 [gpbhadoop104 ~]$ cd /opt/module/flume/ [gpbhadoop104 flume]$ cd job/ [gpbhadoop104 job]$ rm file_to_kafka.confcom.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder #定义组件 a1.sourcesr1 a1.channelsc1…...
c语言开篇---跟着视频学C语言
标识符 标识符必须声明定义,可以是变量、函数或其他实体。 Int是标识符吗? 不是,int是c语言关键词,不是随意命名的 C语言关键词如下: 常量 不需要被声明,不能赋值更改。 printf函数 printf是由print打印…...
本地yum源-如学
学不学? 如学~ 到底学不学? 如学~ 学? 如学~ 配置本地的镜像yum 使用到的 rpm 包 是根据centos8 里面自带的 在 /dev/cdrom 中包含着 一些系统自带的 rpm # 先将 /dev/cdrom 设备进行挂载 mkdir /up # 在…...
【实训】“宅急送”订餐管理系统(程序设计综合能力实训)
👀樊梓慕:个人主页 🎥个人专栏:《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》 🌝每一个不曾起舞的日子,都是对生命的辜负 前言 大一小学期,我迎来了人生中的第一次实训…...
openeuler上安装polarismesh集群
1、安装MySQL数据库 数据库连接地址10.10.10.168 用户root 密码123456 MySQL安装参考搭建DSS环境(六)之安装基础环境MySQL_linux安装dss_青春不流名的博客-CSDN博客 2、安装Redis集群 IPResid PORTSentinel PORTPASSWORDCluster NAME10.10.10.110637…...
Java基础——stream
流 stream是什么?stream优点stream和集合的区别stream的创建steam的操作从steam中取值 stream是什么? stream可以简化对集合的操作,具体操作由流内部实现,而无需用户自行实现过程 stream优点 对于以下ArrayList List<Strin…...
Spring Quartz 持久化解决方案
Quartz是实现了序列化接口的,包括接口,所以可以使用标准方式序列化到数据库。 而Spring2.5.6在集成Quartz时却未能考虑持久化问题。 Spring对JobDetail进行了封装,却未实现序列化接口,所以持久化的时候会产生NotSerializable问题&…...
基于Java+SpringBoot+Vue前后端分离火锅店管理系统设计和实现
博主介绍:✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专…...
Unity——导航系统补充说明
一、导航系统补充说明 1、导航与动画 我们可以通过设置动画状态机的变量,让动画匹配由玩家直接控制的角色的移动。那么自动导航的角色如何与动画系统结合呢? 有两个常用的属性可以获得导航代理当前的状态: 一是agent.velocity,…...
nginx实现负载均衡load balance
目录 nginx实现负载均衡load balance相关算法负载均衡https的访问后端的real server是否知道真正访问的用户的IP地址健康检查提升负载均衡的并发数量七层负载均衡和四层负载均衡七层负载均衡四层负载均衡四层和七层的区别502错误 nginx实现负载均衡load balance 准备ÿ…...
淘宝订单接口:连接消费者与商家的桥梁
当我们谈论淘宝订单接口时,我们谈论的是淘宝网为卖家和买家提供的一个用于处理订单的核心系统。通过这个接口,卖家可以接收订单、处理订单状态,并更新买家和平台的状态信息;买家则可以实时追踪自己的订单状态,更好地掌…...
数据结构-第一期——数组(Python)
目录 00、前言: 01、一维数组 一维数组的定义和初始化 一维变长数组 一维正向遍历 一维反向遍历 一维数组的区间操作 竞赛小技巧:不用从a[0]开始,从a[1]开始 蓝桥杯真题练习1 读入一维数组 例题一 例题二 例题三 实战训…...
八 动手学深度学习v2 ——卷积神经网络之卷积+填充步幅+池化+LeNet
目录 1. 图像卷积总结2. 填充和步幅 padding和stride3. 多输入多输出通道4. 池化层5. LeNet 1. 图像卷积总结 二维卷积层的核心计算是二维互相关运算。最简单的形式是,对二维输入数据和卷积核执行互相关操作,然后添加一个偏置。核矩阵和偏移是可学习的参…...
SparkCore
第1章 RDD概述 1.1 什么是RDD RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 RDD类比工厂生产。 …...
配置 Windows 系统环境变量
直接按键盘上面的 WINS 打开 Windows 搜索 搜索“编辑系统环境变量” 也可以右键此电脑->属性->高级系统设置打开相同的界面 点击环境变量 一般添加就是添加在框出的 Path 里面,双击可以看到现有的环境变量并进行编辑 例如我在博客中写把 Java 的 jdk 解压好…...
【计算机视觉】图片文件格式的讲解
文章目录 一、图片的压缩二、计算机表示颜色三、JPG和PNG3.1 JPG3.2 PNG 一、图片的压缩 图片文件格式有可能会对图片的文件大小进行不同程度的压缩,图片的压缩分为有损压缩和无损压缩两种。 有损压缩。指在压缩文件大小的过程中,损失了一部分图片的信…...
2023最全的性能测试种类介绍,这6个种类特别重要!
系统的性能是一个很大的概念,覆盖面非常广泛,包括执行效率、资源占用、系统稳定性、安全性、兼容性、可靠性、可扩展性等,性能测试就是描述测试对象与性能相关的特征并对其进行评价而实施的一类测试。 性能测试是一个统称,它其实包…...
代码随想录算法训练营19期第43天
1049. 最后一块石头的重量 II 视频讲解:动态规划之背包问题,这个背包最多能装多少?LeetCode:1049.最后一块石头的重量II_哔哩哔哩_bilibili 代码随想录 初步思路:动态规划。 总结:套用01背包 dp[j…...
微信小程序wx.previewImage实现图片预览
在微信小程序中,wx.previewImage函数用于预览图片,可以将一组图片以轮播的方式展示给用户,并支持用户手势操作进行切换。 使用wx.previewImage函数需要传入一个参数对象,该对象包含以下属性: current: String&#x…...
Java实现Modbus读写数据
背景 由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题: 关闭连接NioEventLoop没有释放导致oom设计思想是一个设备一个连接,而不是一个网关一个连接连接断开后客户端无从感知 前两个问题解…...
Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...
基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
Android15默认授权浮窗权限
我们经常有那种需求,客户需要定制的apk集成在ROM中,并且默认授予其【显示在其他应用的上层】权限,也就是我们常说的浮窗权限,那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
图表类系列各种样式PPT模版分享
图标图表系列PPT模版,柱状图PPT模版,线状图PPT模版,折线图PPT模版,饼状图PPT模版,雷达图PPT模版,树状图PPT模版 图表类系列各种样式PPT模版分享:图表系列PPT模板https://pan.quark.cn/s/20d40aa…...
