大数据项目实战之数据仓库:用户行为采集平台——第4章 用户行为数据采集模块
第4章 用户行为数据采集模块
4.1 数据通道

4.2 环境准备
4.2.1 集群所有进程查看脚本
1)在/home/atguigu/bin目录下创建脚本xcall
[atguigu@hadoop102 bin]$ vim xcall
2)在脚本中编写如下内容
#! /bin/bashfor i in hadoop102 hadoop103 hadoop104
doecho --------- $i ----------ssh $i "$*"
done
3)修改脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 xcall
4)启动脚本
[atguigu@hadoop102 bin]$ xcall jps
4.2.2 Hadoop安装
1)安装步骤
详见:尚硅谷大数据技术之Hadoop(入门)
(1)集群规划
| hadoop102 | hadoop103 | hadoop104 | |
|---|---|---|---|
| HDFS | NameNode DataNode | DataNode | DataNode SecondaryNameNode |
| YARN | NodeManager | Resourcemanager NodeManager | NodeManager |
注意:尽量使用离线方式安装
2)项目经验
(1)项目经验之HDFS存储多目录
①生产环境服务器磁盘情况

②在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题
HDFS的DataNode节点保存数据的路径由dfs.datanode.data.dir参数决定,其默认值为file://${hadoop.tmp.dir}/dfs/data,若服务器有多个磁盘,必须对该参数进行修改。如服务器磁盘如上图所示,则该参数应修改为如下的值。
<property><name>dfs.datanode.data.dir</name><value>file:///dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>
</property>
注意:因为每台服务器节点的磁盘情况不同,所以这个配置配完之后,不需要分发
(2)项目经验之集群数据均衡
①节点间数据均衡
开启数据均衡命令:
start-balancer.sh -threshold 10
对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。
停止数据均衡命令
stop-balancer.sh
注意:于HDFS需要启动单独的Rebalance Server来执行Rebalance操作,所以尽量不要在NameNode上执行start-balancer.sh,而是找一台比较空闲的机器。
②磁盘间数据均衡
生成均衡计划(我们只有一块磁盘,不会生成计划)
hdfs diskbalancer -plan hadoop103
执行均衡计划
hdfs diskbalancer -execute hadoop103.plan.json
查看当前均衡任务的执行情况
hdfs diskbalancer -query hadoop103
取消均衡任务
hdfs diskbalancer -cancel hadoop103.plan.json
(3)项目经验之Hadoop参数调优
①HDFS参数调优hdfs-site.xml
The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.
NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。
对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。
<property><name>dfs.namenode.handler.count</name><value>10</value>
</property>
dfs.namenode.handler.count=20×〖log〗_e^(Cluster Size),比如集群规模为8台时,此参数设置为41。可通过简单的python代码计算该值,代码如下。
[atguigu@hadoop102 ~]$ python
Python 2.7.5 (default, Apr 11 2018, 07:36:10)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import math
>>> print int(20*math.log(8))
41
>>> quit()
②YARN参数调优yarn-site.xml
情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive
面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。
解决办法:
内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。
(a)yarn.nodemanager.resource.memory-mb
表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
(b)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是8192(MB)。
4.2.3 Zookeeper安装
1)安装步骤
详见:尚硅谷大数据技术之Zookeeper
4.2.4 Kafka安装
1)安装步骤
详见:尚硅谷大数据技术之Kafka
4.2.5 Flume安装
按照采集通道规划,需在hadoop102,hadoop103,hadoop104三台节点分别部署一个Flume。可参照以下步骤先在hadoop102安装,然后再进行分发。
1)安装步骤
详见:尚硅谷大数据技术之Flume
2)分发Flume
[atguigu@hadoop102 ~]$ xsync /opt/module/flume/
3)项目经验
(1)堆内存调整
Flume堆内存通常设置为4G或更高,配置方式如下:
修改/opt/module/flume/conf/flume-env.sh文件,配置如下参数**(虚拟机环境暂不配置)**
export JAVA_OPTS="-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote"
注:
-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;
-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。
4.3 日志采集Flume
4.3.1 日志采集Flume配置概述
按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。
此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。
选择TailDirSource和KafkaChannel的原因如下:
1)TailDirSource
TailDirSource相比ExecSource、SpoolingDirectorySource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
2)KafkaChannel
采用Kafka Channel,省去了Sink,提高了效率。
日志采集Flume关键配置如下:

4.3.2 日志采集Flume配置实操
1)创建Flume配置文件
在hadoop102节点的Flume的job目录下创建file_to_kafka.conf
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ vim job/file_to_kafka.conf
2)配置文件内容如下
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1
3)编写Flume拦截器
(1)创建Maven工程flume-interceptor
(2)创建包:com.atguigu.gmall.flume.interceptor
(3)在pom.xml文件中添加如下配置
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
</dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
(4)在com.atguigu.gmall.flume.utils包下创建JSONUtil类
package com.atguigu.gmall.flume.utils;import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true 不是:返回false
* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}
(5)在com.atguigu.gmall.flume.interceptor包下创建ETLInterceptor类
package com.atguigu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {}
}
(6)打包

(7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。
4.3.3 日志采集Flume测试
1)启动Zookeeper、Kafka集群
2)启动hadoop102的日志采集Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
3)启动一个Kafka的Console-Consumer
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
4)生成模拟数据
[atguigu@hadoop102 ~]$ lg.sh
5)观察Kafka消费者是否能消费到数据
4.3.4 日志采集Flume启停脚本
1)分发日志采集Flume配置文件和拦截器
若上述测试通过,需将hadoop102节点的Flume的配置文件和拦截器jar包,向另一台日志服务器发送一份。
[atguigu@hadoop102 flume]$ scp -r job hadoop103:/opt/module/flume/
[atguigu@hadoop102 flume]$ scp lib/flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar hadoop103:/opt/module/flume/lib/
2)方便起见,此处编写一个日志采集Flume进程的启停脚本
在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh
[atguigu@hadoop102 bin]$ vim f1.sh
在脚本中填写如下内容
#!/bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"done
};;
"stop"){for i in hadoop102 hadoop103doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "done};;
esac
3)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f1.sh
4)f1启动
[atguigu@hadoop102 module]$ f1.sh start
5)f2停止
[atguigu@hadoop102 module]$ f1.sh stop
相关文章:
大数据项目实战之数据仓库:用户行为采集平台——第4章 用户行为数据采集模块
第4章 用户行为数据采集模块 4.1 数据通道 4.2 环境准备 4.2.1 集群所有进程查看脚本 1)在/home/atguigu/bin目录下创建脚本xcall [atguiguhadoop102 bin]$ vim xcall2)在脚本中编写如下内容 #! /bin/bashfor i in hadoop102 hadoop103 hadoop104 d…...
《统计学习方法》(李航)——学习笔记
第一章 概论统计学习,又称统计机器学习(机器学习),现在提到的 机器学习 往往指的就是 统计机器学习。统计学习研究的对象是数据,其对数据的基本假设是同类数据存在一定的统计规律性,因此可以用概率统计方法…...
阿里云EMR集群搭建及使用
目录 1.简介 1.什么是EMR 2.组成 3.与自建hadoop集群对比 4.产品架构 2.使用 1.创建EMR集群 1.登录EMR on ECS控制台 2.软件设置 3.硬件设置 3.基础配置 2.配置 1.组件配置 2.用户管理 3.安全组 4.Gateway 3.组件UI 1.简介 1.什么是EMR EMR是运行在阿里云平台…...
学习streamlit-4
st.slider 今天学习st.slider滑块组件的使用。 st.slider滑块组件通常被用来作为应用的输入,支持整数、浮点数、日期、时间和日期时间。 下面的示例程序包含以下简单功能,以演示st.slider滑块组件: 用户通过调整滑块选择值应用打印出所选…...
高级Oracle DBA面试题及答案
作为高级 Oracle DBA,您将负责 Oracle 数据库基础架构的设计、安装、配置、监控和维护。您还将负责制定和实施备份和恢复计划,并确保数据的安全性和完整性。要成功担任此职位,您需要对 Oracle 数据库架构有深入的了解,并能够有效地…...
程序员成长路线
程序员在成长的过程中,不同的阶段,需要关注的问题点一会都会有所不同,今天给大家分享下自己的感受。 0-1年,入门,掌握语言基础、提高工具的使用熟练度。 工作第一年,主要围绕ssm三件套、mysql、red…...
【Galois工具开发之路】关于类的重新装载思路
思路 当一个java的类文件发生变更,如果动态的热更新这个新的类文件?目前来说,有两种可能的方式 新增一个自定义ClassLoader,名为NC,让NC去load这个新的类文件,这样就完成了新的类定义的替换 但目前Java有…...
哪款蓝牙耳机音质好?内行推荐四款高音质蓝牙耳机
蓝牙耳机经过近几年的快速发展,在音质上的表现也越来越好。哪款蓝牙耳机音质好?最近看到很多人问。接下来,我来给大家推荐四款高音质蓝牙耳机,可以当个参考。 一、南卡小音舱蓝牙耳机 参考价:246 发声单元ÿ…...
Android程序自动在线升级安装
安卓小白分享: Android程序自动在线升级安装.(通过GetSharedDownloadsPath方法) 1>.修改AndroidManifest.template.xml ( 此文件在你DELPHI项目的目录中,如找不到就文件查找吧) 最好把此文件拖到DELPHI, 用DELPHI打开,(这样,它会一行一行格式清楚) 找到文字<%u…...
JS的BroadcastChannel与MessageChannel
BroadcastChannel与MessageChannel BroadcastChannel BroadcastChannel以广播的形式进行通信 BroadcastChannel用于创建浏览器标签页之间的通信 使用BroadcastChannel的浏览器标签页面必须要遵循同源策略 页面1使用BroadcastChannel创建一个频道,页面2使用Broadc…...
nextjs开发 + vercel 部署 ssr ssg
前言 最近想实践下ssr 就打算用nextjs 做一个人博客 , vercel 部署 提供免费域名,来学习实践下ssr ssg nextjs 一个轻量级的react服务端渲染框架 vercel 由 Next.js 的创建者制作 支持nextjs 部署 免费静态网站托管 初始化项目 npx create-next-app p…...
Good Idea, 利用MySQL JSON特性优化千万级文库表
👳我亲爱的各位大佬们好😘😘😘 ♨️本篇文章记录的为 利用MySQL JSON特性优化千万级文库表 相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉…...
【python游戏制作】快来跟愤怒的小鸟一起攻击肥猪们的堡垒吧
前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 为了防止/报复偷走鸟蛋的肥猪们,鸟儿以自己的身体为武器, 仿佛炮弹一样去攻击肥猪们的堡垒,保卫自己的鸟蛋 这个游戏大家没玩过的想必也听说过~ 今天就给大家分享一下用python写的愤怒的…...
ARM 学习(一)
ARM 处理器的运行模式ARM处理器共有7种运行模式,如下表所示:处理器模式描述用户模式(User)正常程序运行模式中断模式(IRQ)用于通常的中断处理快速中断模式(FIQ)用于高速传输和通道处…...
深入分析Java的序列化与反序列化
序列化是一种对象持久化的手段。普遍应用在网络传输、RMI等场景中。本文通过分析ArrayList的序列化来介绍Java序列化的相关内容。主要涉及到以下几个问题: 怎么实现Java的序列化 为什么实现了java.io.Serializable接口才能被序列化 transient的作用是什么 怎么自…...
、Tomcat源码分析-类加载器
接下来,我们再来看下 tomcat 是如何创建 common 类加载器的。关键代码如下所示,在创建类加载器时,会读取相关的路径配置,并把路径封装成 Repository 对象,然后交给 ClassLoaderFactory 创建类加载器。 Bootstrap.java…...
反转链表相关的练习(下)
目录 一、回文链表 二、 重排链表 三、旋转链表 一、回文链表 给你一个单链表的头节点 head ,请你判断该链表是否为回文链表。如果是,返回 true ;否则,返回 false 。 示例 1: 输入:head [1,2,2,1] 输…...
2.进程和线程
1.进程1.1 终止正常退出(自愿)出错退出(自愿)严重错误(非自愿)被其他进程杀死(非自愿)1.2 状态就绪态:可运行,但因为其他进程正在运行而暂时停止阻塞态:除非某种外部事件发生,否则进程不能运行1.3 实现一个进程在执行过程中可能被…...
C++回顾(十四)—— 函数模板
14.1 概述 所谓函数模板(function template),实际上是建立一个通用函数,其函数类型和形参类型不具体指定,用一个虚拟的类型来代表。这个通用函数就称为函数模板。凡是函数体相同的函数都可以用这个模板来代替,不必定义多个函数&a…...
如何做好项目各干系人的管理及应对?
如何更好地识别、分析和管理项目关系人?主要有以下几个方面: 1、项目干系人的分析 一般对项目干系人的分析有2种方法, 方法一:权利(影响),即对项目可以产生影响的人; 方法二…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
Qt Http Server模块功能及架构
Qt Http Server 是 Qt 6.0 中引入的一个新模块,它提供了一个轻量级的 HTTP 服务器实现,主要用于构建基于 HTTP 的应用程序和服务。 功能介绍: 主要功能 HTTP服务器功能: 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列,以便知晓哪些列包含有价值的数据,…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
【Veristand】Veristand环境安装教程-Linux RT / Windows
首先声明,此教程是针对Simulink编译模型并导入Veristand中编写的,同时需要注意的是老用户编译可能用的是Veristand Model Framework,那个是历史版本,且NI不会再维护,新版本编译支持为VeriStand Model Generation Suppo…...
boost::filesystem::path文件路径使用详解和示例
boost::filesystem::path 是 Boost 库中用于跨平台操作文件路径的类,封装了路径的拼接、分割、提取、判断等常用功能。下面是对它的使用详解,包括常用接口与完整示例。 1. 引入头文件与命名空间 #include <boost/filesystem.hpp> namespace fs b…...
