当前位置: 首页 > news >正文

网站设计师和网页设计师/营销型网站外包

网站设计师和网页设计师,营销型网站外包,台州网站哪家专业,武汉互联网营销推广用户行为表数据同步 2.1.4 日志消费Flume测试 [gpbhadoop104 ~]$ cd /opt/module/flume/ [gpbhadoop104 flume]$ cd job/ [gpbhadoop104 job]$ rm file_to_kafka.confcom.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder #定义组件 a1.sourcesr1 a1.channelsc1…

用户行为表数据同步

  • 2.1.4 日志消费Flume测试

[gpb@hadoop104 ~]$ cd /opt/module/flume/
[gpb@hadoop104 flume]$ cd job/
[gpb@hadoop104 job]$ rm file_to_kafka.conf

com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 3#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.1.3 日志消费Flume配置实操
1)创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_log.conf
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf 
2)配置文件内容如下#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置优化
1)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
2HDFS Sink优化
(1HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。(2HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
3)编写Flume拦截器
(1)数据漂移问题(2)在com.atguigu.gmall.flume.interceptor包下创建TimestampInterceptor类
package com.atguigu.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取header和body的数据Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);//2、将body的数据类型转成jsonObject类型(方便获取数据)JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}
}3)重新打包(4)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面。

2.1.4 日志消费Flume测试

1)启动Zookeeper、Kafka集群
2)启动日志采集Flume
[atguigu@hadoop102 ~]$ f1.sh start
3)启动hadoop104的日志消费Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
4)生成模拟数据
[atguigu@hadoop102 ~]$ lg.sh 
5)观察HDFS是否出现数据
2.1.5 日志消费Flume启停脚本
若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
[atguigu@hadoop102 bin]$ vim f2.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
"start")echo " --------启动 hadoop104 日志数据flume-------"ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")echo " --------停止 hadoop104 日志数据flume-------"ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f2.sh
3)f2启动
[atguigu@hadoop102 module]$ f2.sh start
4)f2停止
[atguigu@hadoop102 module]$ f2.sh stop

相关文章:

离线数仓同步数据1

用户行为表数据同步 2.1.4 日志消费Flume测试 [gpbhadoop104 ~]$ cd /opt/module/flume/ [gpbhadoop104 flume]$ cd job/ [gpbhadoop104 job]$ rm file_to_kafka.confcom.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder #定义组件 a1.sourcesr1 a1.channelsc1…...

c语言开篇---跟着视频学C语言

标识符 标识符必须声明定义&#xff0c;可以是变量、函数或其他实体。 Int是标识符吗&#xff1f; 不是&#xff0c;int是c语言关键词&#xff0c;不是随意命名的 C语言关键词如下&#xff1a; 常量 不需要被声明&#xff0c;不能赋值更改。 printf函数 printf是由print打印…...

本地yum源-如学

学不学&#xff1f; 如学&#xff5e; 到底学不学&#xff1f; 如学&#xff5e; 学&#xff1f; 如学&#xff5e; 配置本地的镜像yum 使用到的 rpm 包 是根据centos8 里面自带的 在 /dev/cdrom 中包含着 一些系统自带的 rpm # 先将 /dev/cdrom 设备进行挂载 mkdir /up # 在…...

【实训】“宅急送”订餐管理系统(程序设计综合能力实训)

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 前言 大一小学期&#xff0c;我迎来了人生中的第一次实训…...

openeuler上安装polarismesh集群

1、安装MySQL数据库 数据库连接地址10.10.10.168 用户root 密码123456 MySQL安装参考搭建DSS环境&#xff08;六&#xff09;之安装基础环境MySQL_linux安装dss_青春不流名的博客-CSDN博客 2、安装Redis集群 IPResid PORTSentinel PORTPASSWORDCluster NAME10.10.10.110637…...

Java基础——stream

流 stream是什么&#xff1f;stream优点stream和集合的区别stream的创建steam的操作从steam中取值 stream是什么&#xff1f; stream可以简化对集合的操作&#xff0c;具体操作由流内部实现&#xff0c;而无需用户自行实现过程 stream优点 对于以下ArrayList List<Strin…...

Spring Quartz 持久化解决方案

Quartz是实现了序列化接口的&#xff0c;包括接口&#xff0c;所以可以使用标准方式序列化到数据库。 而Spring2.5.6在集成Quartz时却未能考虑持久化问题。 Spring对JobDetail进行了封装&#xff0c;却未实现序列化接口&#xff0c;所以持久化的时候会产生NotSerializable问题&…...

基于Java+SpringBoot+Vue前后端分离火锅店管理系统设计和实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…...

Unity——导航系统补充说明

一、导航系统补充说明 1、导航与动画 我们可以通过设置动画状态机的变量&#xff0c;让动画匹配由玩家直接控制的角色的移动。那么自动导航的角色如何与动画系统结合呢&#xff1f; 有两个常用的属性可以获得导航代理当前的状态&#xff1a; 一是agent.velocity&#xff0c;…...

nginx实现负载均衡load balance

目录 nginx实现负载均衡load balance相关算法负载均衡https的访问后端的real server是否知道真正访问的用户的IP地址健康检查提升负载均衡的并发数量七层负载均衡和四层负载均衡七层负载均衡四层负载均衡四层和七层的区别502错误 nginx实现负载均衡load balance 准备&#xff…...

淘宝订单接口:连接消费者与商家的桥梁

当我们谈论淘宝订单接口时&#xff0c;我们谈论的是淘宝网为卖家和买家提供的一个用于处理订单的核心系统。通过这个接口&#xff0c;卖家可以接收订单、处理订单状态&#xff0c;并更新买家和平台的状态信息&#xff1b;买家则可以实时追踪自己的订单状态&#xff0c;更好地掌…...

数据结构-第一期——数组(Python)

目录 00、前言&#xff1a; 01、一维数组 一维数组的定义和初始化 一维变长数组 一维正向遍历 一维反向遍历 一维数组的区间操作 竞赛小技巧&#xff1a;不用从a[0]开始&#xff0c;从a[1]开始 蓝桥杯真题练习1 读入一维数组 例题一 例题二​ 例题三 实战训…...

八 动手学深度学习v2 ——卷积神经网络之卷积+填充步幅+池化+LeNet

目录 1. 图像卷积总结2. 填充和步幅 padding和stride3. 多输入多输出通道4. 池化层5. LeNet 1. 图像卷积总结 二维卷积层的核心计算是二维互相关运算。最简单的形式是&#xff0c;对二维输入数据和卷积核执行互相关操作&#xff0c;然后添加一个偏置。核矩阵和偏移是可学习的参…...

SparkCore

第1章 RDD概述 1.1 什么是RDD RDD&#xff08;Resilient Distributed Dataset&#xff09;叫做弹性分布式数据集&#xff0c;是Spark中最基本的数据抽象。代码中是一个抽象类&#xff0c;它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 RDD类比工厂生产。 …...

配置 Windows 系统环境变量

直接按键盘上面的 WINS 打开 Windows 搜索 搜索“编辑系统环境变量” 也可以右键此电脑->属性->高级系统设置打开相同的界面 点击环境变量 一般添加就是添加在框出的 Path 里面&#xff0c;双击可以看到现有的环境变量并进行编辑 例如我在博客中写把 Java 的 jdk 解压好…...

【计算机视觉】图片文件格式的讲解

文章目录 一、图片的压缩二、计算机表示颜色三、JPG和PNG3.1 JPG3.2 PNG 一、图片的压缩 图片文件格式有可能会对图片的文件大小进行不同程度的压缩&#xff0c;图片的压缩分为有损压缩和无损压缩两种。 有损压缩。指在压缩文件大小的过程中&#xff0c;损失了一部分图片的信…...

2023最全的性能测试种类介绍,这6个种类特别重要!

系统的性能是一个很大的概念&#xff0c;覆盖面非常广泛&#xff0c;包括执行效率、资源占用、系统稳定性、安全性、兼容性、可靠性、可扩展性等&#xff0c;性能测试就是描述测试对象与性能相关的特征并对其进行评价而实施的一类测试。 性能测试是一个统称&#xff0c;它其实包…...

代码随想录算法训练营19期第43天

1049. 最后一块石头的重量 II 视频讲解&#xff1a;动态规划之背包问题&#xff0c;这个背包最多能装多少&#xff1f;LeetCode&#xff1a;1049.最后一块石头的重量II_哔哩哔哩_bilibili 代码随想录 初步思路&#xff1a;动态规划。 总结&#xff1a;套用01背包 dp[j…...

微信小程序wx.previewImage实现图片预览

在微信小程序中&#xff0c;wx.previewImage函数用于预览图片&#xff0c;可以将一组图片以轮播的方式展示给用户&#xff0c;并支持用户手势操作进行切换。 使用wx.previewImage函数需要传入一个参数对象&#xff0c;该对象包含以下属性&#xff1a; current: String&#x…...

Java实现Modbus读写数据

背景 由于当时项目周期赶&#xff0c;引入了一个PLC4X组件&#xff0c;上手快。接下来就是使用这个组件遇到的一些问题&#xff1a; 关闭连接NioEventLoop没有释放导致oom设计思想是一个设备一个连接&#xff0c;而不是一个网关一个连接连接断开后客户端无从感知 前两个问题解…...

C++11新特性⑤ | 仿函数与lambda表达式

目录 1、引言 2、仿函数 3、lambda表达式 3.1、lambda表达式的一般形式 3.2、返回类型说明 3.3、捕获列表的规则 3.4、可以捕获哪些变量 3.5、lambda表达式给编程带来的便利 VC常用功能开发汇总&#xff08;专栏文章列表&#xff0c;欢迎订阅&#xff0c;持续更新...&a…...

解决websocket不定时出现1005错误

后台抛出异常如下&#xff1a; Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005 Caused by: java.lang.IllegalArgume…...

文章内容生成随机图像,并将这些图像上链

一、需求背景 在当前的互联网时代,信息越来越快速地传播,一篇好的文章不仅需要有吸引人的文字内容,还需要有精美的配图。但是,对于某些只有文字,而没有图片的文章,我们可以使用程序去生成随机的图片来作为文章的配图。 本文将详细介绍如何使用Java语言实现文章内容生成…...

l8-d9 UDP通信实现

一、函数接口扩展与UDP通信实现流程 1.write/read到send/recv 函数原型&#xff1a; ssize_t send(int sockfd, const void *buf, size_t len, int flags); ssize_t recv(int sockfd, void *buf, size_t len, int flags); 前三个参数同read/write一样&#xff1b; ssize_t rea…...

MongoDB复杂聚合查询与java中MongoTemplate的api对应

MongoDB聚合json脚本 db.getCollection("202303_refund").aggregate([{"$match": {"courseType": "常规班课","teacherRefundReasonCheck": true,"teacherId": {"$in": [7544]},"createTime"…...

WireShark抓包工具的安装

1.下载安装包 在官网或者电脑应用商城都可以下载 2.安装 打开安装包&#xff0c;点击next 点击next 选择UI界面&#xff0c;两种都装上 根据习惯选择 选择安装位置点击安装 开始安装安装成功...

审计智能合约的成本是多少?如何审计智能合约?

审计智能合约的成本是多少&#xff1f;如何审计智能合约&#xff1f; 智能合约安全审计在去中心化金融 (DeFi) 生态系统中非常普遍。如果您投资了一个区块链项目&#xff0c;您的决定可能部分基于智能合约代码审查的结果。 虽然大多数人都了解审计对网络安全的重要性&#xff…...

9.7 校招 内推 面经

绿泡*泡&#xff1a; neituijunsir 交流裙 &#xff0c;内推/实习/校招汇总表格 1、校招 | Momenta 2024校招火热进行中&#xff01;新增招聘岗位&#xff08;内推&#xff09; 校招 | Momenta 2024校招火热进行中&#xff01;新增招聘岗位&#xff08;内推&#xff09; 2、…...

【网络编程】IO多路复用

IO多路复用是一种高效的I/O处理方式&#xff0c;它允许单个进程能够同时监视多个文件描述符&#xff08;sockets、文件等&#xff09;&#xff0c;并在其中任何一个文件描述符准备好进行I/O操作时进行处理。它的核心在于使用少量的线程或进程来管理多个I/O操作&#xff0c;以提…...

MySQL与postgreSQL数据库的区别

MySQL 是一个流行的开源关系型数据库管理系统&#xff0c;具有以下优势&#xff1a; 开源和免费&#xff1a;MySQL 是一个开源软件&#xff0c;允许用户免费下载、使用和修改。它的免费版本&#xff08;Community Edition&#xff09;提供了广泛的功能&#xff0c;适用于大多数…...