当前位置: 首页 > news >正文

数据采集项目之业务数据(三)

1. Maxwell框架

开发公司为Zendesk公司开源,用java编写的MySQL变更数据抓取软件。内部是通过监控MySQL的Binlog日志,并将变更数据以JSON格式发送到Kafka等流处理平台。

1.1 MySQL主从复制

主机每次变更数据都会生成对应的Binlog日志,从机可以通过IO流的方式将Binlog日志下载到本地,可以通过它创造和主机一样的环境或者作为热备。

1.2 安装Maxwell

  1. 解压改名
  2. 启动MySQL Binlog, vim /etc/my.cnf. 增加如下配置:
    • binlog_format 日志类型的三种类型:
      • 基于语句:主机执行了什么语句,在从机里同样执行一遍。如果使用了random语句,会导致主从不一致。但是量级比较低
      • 基于行:主机被改动后,从机同步一份。不会有主从不一致的问题,但是量价比较大,需要将每行修改的数据都拿一份。
      • 混合模式:一般基于语句,但是如果基于语句会导致前后结果产生差异,自动转成基于行。
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall
  1. 重启MySQL服务
  2. 创建Maxwell所需所需的数据库和用户,用来存储断点续传所需的数据。
CREATE DATABASE maxwell;
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';//maxwell库的所有权限给maxwell
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';//其他库的查询、复制权限给maxwell
  1. 修改maxwell配置文件
    cp 配置文件,将会复制某个文件并且可以改名。
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_db
# MySQL相关配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true# 过滤gmall中的z_log表数据,该表是日志数据的备份,无须采集
filter=exclude:gmall.z_log
# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
producer_partition_by=primary_key

1.3 Maxwell的使用

  1. 启动zookeeper,kafka
  2. 启动maxwell, bin/maxwell --config config.properties --daemon
  3. 启动kafka消费者进程,用于消费maxwell添加到kafka的变更数据
  4. 启动数据生成jar包,查看消费者进程是否有新数据。
  5. 编写Maxwell启停脚本
#!/bin/bashMAXWELL_HOME=/opt/module/maxwellstatus_maxwell(){result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`return $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho "启动Maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho "Maxwell正在运行"fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho "停止Maxwell"ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9elseecho "Maxwell未在运行"fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac

1.4 Bootstrap全量同步

Maxwell获取的数据都是后期变更的数据,但没有获取到数据库在开启Binlog日志之前的原始数据。

全量同步命令:/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties

2. 数仓数据同步策略

2.1 用户行为数据

数据源:Kafka
目的地:HDFS
传输方式采用Flume, 其中source为Kafka source, channel为Memmory channel, sink为HDFS sink。

根据官网查找相应参数:

  1. Kafka Source
    • type = Kafka Source全类名
    • kafka.bootstrap.servers 连接地址
    • kafka.topics = topic_log
    • batchSize: 批次大小
    • batchDurationMillis: 批次间隔2s
  2. File Channel
    • type: file
    • dataDirs: 存储路径
    • checkpointDir: 偏移量存储地址
    • keep-alive: 管道满了后,生产者间隔多少秒再放数据
  3. HDFS Sink
    • hdfs.rollInterval : 文件滚动,解决小文件问题,每隔多久滚动一次
    • rollSize: 文件大小
    • hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d, 文件存放路径
    • hdfs.round = false, 不采用系统本地时间
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false # 是否获取本地时间a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.2 零点漂移问题

在HDFS系统存放文件时是按照时间进行分区存放的,存放时查看的是header中的timestamp,但是由于数据传输过程中也需要一段时间,header中的时间并不是数据的实际产生时间,这个就是零点漂移问题。

解决办法:借助拦截器,修改header中的timestamp的值。编写拦截器代码,需要在IDEA中创建对应的项目并打包。

  1. 导入依赖,flume-ng-core和JSON解析依赖fastjson (1.2.62)
  2. 创建包gmall.interceptor
  3. 创建类TimeStampInterceptor, 继承Interceptor接口
  4. 实现intercept(Event event)和intercept(Event events)
  5. 使用fastjson来解析json文件,得到jsonObject对象,用来获取时间戳ts。将获取到的时间戳覆盖header中的timestamp, 如果数据格式错误会抛异常,使用try-catch来捕获它,并过滤掉该条数据。注意此处不能使用for循环来一边遍历,一边删除集合数据
@Overridepublic Event intercept(Event event) {//1、获取header和body的数据Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);try {//2、将body的数据类型转成jsonObject类型(方便获取数据)JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;} catch (Exception e) {e.printStackTrace();return null;}
@Override
public List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();//必须使用迭代器删除}}return list;
}
  1. 打包时注意要带上fastjson依赖,需要在maven中添加配置打包插件。依赖中有flume和fastjson,但在虚拟机上有flume,没有fastjson,所以需要排除flume。可以使用provided标签来排除让打包时排除依赖。

    • compile:在单元测试、编译、运行三种方式都会使用compile表明的依赖;
    • test:在单元测试才会使用test表明的依赖;
    • provided:在编译才会使用test表明的依赖;
  2. Flume配置文件中添加拦截器

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder # 全类名建议在IDEA中复制,Builder也需要根据自己的代码函数名修改
  1. 重新生成数据,查看是否根据数据本身的时间戳存放到对应的HDFS分区文件中。

3. 业务数据同步

3.1 同步策略

  1. 全量同步:每天将所有数据同步一份,业务数据量小,优先考虑全量同步。
  2. 增量同步:每天只将新增和变化进行同步,业务数据量大,优先考虑增量同步。

3.2 数据同步工具

全量:DataX、Sqoop
增量:Maxwell、Canal

3.3 DataX

是一个数据同步工具,致力于实现包括关系型数据库HDFS、Hive、ODPS、HBase、MySQL等等数据源之间的互传。

  1. 架构= reader + framework + writer
  2. 运行流程
    • job: 单个数据同步的作业,会启动一个进程。
    • Task: 根据不同数据源的切分策略,一个Job会切分为多个Task,Task是DataX作业的最小单元,每个Task负责一部分,由一个线程执行。
  3. 调度策略:会根据系统资源设置并发度,并发度为线程同时执行的个数,任务会按照并发度一组一组执行。

3.4 DataX安装

  1. 下载解压DataX安装包
  2. bin/datax.py job/job.json测试安装包是否完整
  3. MySQL Reader配置文件的书写
  4. HDFS Writer配置文件的书写
  5. 执行datax命令python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2022-06-08" /opt/module/datax/job/import/gmall.activity_info.json
  6. 执行完后可以使用hadoop fs cat 路径名 | zcat,来查看压缩文件是否正确

相关文章:

数据采集项目之业务数据(三)

1. Maxwell框架 开发公司为Zendesk公司开源&#xff0c;用java编写的MySQL变更数据抓取软件。内部是通过监控MySQL的Binlog日志&#xff0c;并将变更数据以JSON格式发送到Kafka等流处理平台。 1.1 MySQL主从复制 主机每次变更数据都会生成对应的Binlog日志&#xff0c;从机可…...

vuedraggable影响点击事件的解决办法

在工作中有很多场景需要针对广告、商品、信息推广等进行一个排序,或者对展示的顺序做出调整,方便放用户第一眼看到自己感兴趣的信息,因此避免不了需要用到排序的插件,这里以vue为例子,采用的插件是vuedraggable,这个插件针对于排序的功能相对完善,官网地址:vuedraggable官网 但…...

Linux 中的 grep 命令

Linux 中的 grep 命令是一个强大的文本搜索工具&#xff0c;它允许用户在文件中查找指定的文本模式&#xff0c;并将匹配的行打印出来。grep 是“Global Regular Expression Print”的缩写&#xff0c;它使用正则表达式来进行文本搜索&#xff0c;因此具有强大的灵活性和功能。…...

阶段五-Day03-Ajax

一、JavaWeb中路径的说明 1. JavaWeb中的路径 在JavaWeb中, 路径分为相对路劲和绝对路径两种: 相对路径: ./ 表示当前目录 ../ 表示当前文件所在目录的上一级目录 绝对路径: 完整的路径名 2. 在JavaWeb中/的不同意义 /斜杠如果被浏览器解析,得到的是 协议本地ip端口号…...

EPOLL单线程版本 基于reactor 的 httpserver文件下载 支持多个客户端同时处理

之前写了一个httpserver的问价下载服务器 如果有多个客户端请求过来只能串行处理必须得等当前的操作完成之后才会处理 另外还存在 文件大的时候 会出错 处理不了 原因就是 sendfile是在一个while循环中处理的 当调用send失败返回-1之后 就 结束了 而一般来讲 se…...

uniapp实现微信小程序隐私协议组件封装

uniapp实现微信小程序隐私协议组件封装。 <template><view class"diygw-modal basic" v-if"showPrivacy" :class"showPrivacy?show:" style"z-index: 1000000"><view class"diygw-dialog diygw-dialog-modal bas…...

【Node.js】NPM 和 package.json

NPM npm 是 Node.js 的包管理工具&#xff0c;基于命令行&#xff0c;用于安装、升级、移除、管理依赖项。 常用命令&#xff1a; npm init&#xff1a;初始化一个新的 npm 项目&#xff0c;创建 package.json 文件。&#xff08;括号里为默认值&#xff09; description&am…...

周总结【java项目】

项目进度&#xff1a; 学习了JavaFX&#xff0c;下载了sceneBuilder辅助工具构建窗口&#xff08;目前建立了登陆&#xff0c;注册&#xff0c;忘记密码的界面&#xff09;&#xff0c;然后是学习了MySQL的连接&#xff0c;现在的项目是刚连上数据库&#xff1b; 下一步&…...

《深度不确定条件下的决策:从理论到实践》PDF

制定未来计划时需要预测变化&#xff0c;尤其是制定长期计划或针对罕见事件的计划时。当这些变化存在高度不确定性的时候&#xff0c;这种预期就变得越来越困难。 今天给大家介绍的这本《深度不确定条件下的决策&#xff1a;从理论到实践》正是解决以上问题的良方。完整书籍文…...

【MySQL】表的基础增删改查

前面我们已经知道怎么来创建表了&#xff0c;接下来就来对创建的表进行一些基本操作。 这里先将上次创建的表删除掉&#xff1a; mysql> use test; Database changedmysql> show tables; ---------------- | Tables_in_test | ---------------- | student | -----…...

第11章 Redis(二)

11.11 Redis 哨兵机制和集群有什么区别 难度:★★★ 重点:★★ 白话解析 前面的题目都是Redis的原理,接下来就是实际使用的问题了,首先Redis为了保证高可用,在微服务场景下必须是部署集群的,而Redis的集群部署通常就两种方式:主从和Redis Cluster。 参考答案 1、主从…...

mybatis配置entity下不同文件夹同类型名称的多个类型时启动springboot项目出现TypeException源码分析

记录问题&#xff1a;当配置了 mybatis.type-aliases-packagecom.runjing.erp.entity 配置项时&#xff0c;如果entity文件夹下存在不同子文件夹下的同名类型时&#xff0c;mybatis初始化加载映射时会爆出org.apache.ibatis.type.TypeException&#xff1a; The alias TestDemo…...

淘宝商品评论数据分析接口,淘宝商品评论接口

淘宝商品评论数据分析接口可以通过淘宝开放平台API获取。 通过构建合理的请求URL&#xff0c;可以向淘宝服务器发起HTTP请求&#xff0c;获取商品评论数据。接口返回的数据一般为JSON格式&#xff0c;包含了商品的各种评价信息。 获取到商品评论数据后&#xff0c;可以对其进…...

RK3288 android7.1 修改双屏异触usb tp触摸方向

一&#xff0c;问题描述&#xff1a; android机器要求接两个屏&#xff08;lvdsmipi&#xff09;两个usb tp要实现双屏异触。由于mipi的方向和lvds方向转成一样的了。两个usb tp的方向在异显示的时候也要作用一样。这个时候要根据pid和vid修改触摸上报的数据。usb tp有通用的触…...

软考 系统架构设计师系列知识点之软件架构风格(8)

接前一篇文章&#xff1a;软考 系统架构设计师系列知识点之软件架构风格&#xff08;7&#xff09; 这个十一注定是一个不能放松、保持“紧”的十一。由于报名了全国计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试&#xff0c;11月4号就要考试&#xff0c;因此…...

ubuntu安装ssh

安装 OpenSSH 服务器&#xff08;如果尚未安装&#xff09;&#xff1a; apt-get update && apt-get upgrade -y sudo apt-get install -y openssh-server 检查 SSH 服务是否正在运行&#xff1a; sudo service ssh status 如果 SSH 服务未运行&#xff0c;请通过以…...

webpack不同环境下使用CSS分离插件mini-css-extract-plugin

1.背景描述 使用mini-css-extract-plugin插件来打包css文件&#xff08;从css文件中提取css代码到单独的文件中&#xff0c;对css代码进行代码压缩等&#xff09;。 本次采用三个配置文件&#xff1a; 公共配置文件&#xff1a;webpack.common.jsdev开发环境配置文件&#x…...

[MongoDB]-权限验证管理

[MongoDB]-权限验证管理 senge | 2023年9月 背景说明&#xff1a;现有两套MongoDB副本集群给开发人员使用时未开启认证。 产生影响&#xff1a;用户若输入账号以及密码则会进行校验&#xff0c;但用户可以在不输入用户名和密码的情况下也可直接登录。 倘若黑客借此进行攻击勒索…...

bootstrapjs开发环境搭建

Bootstrapjs是一个web前端页面应用开发框架&#xff0c;其提供功能丰富的JavaScript工具集以及用户界面元素或组件的样式集&#xff0c;本文主要描述bootstrapjs的开发环境搭建。 如上所示&#xff0c;使用nodejs运行时环境、使用npm包管理工具、使用npm初始化一个项目工程test…...

远程实时监控管理:5G物联网技术助力配电站管理

配电站远程监控管理系统是基于物联网和大数据处理等技术的一种创新解决方案。该系统通过实时监测和巡检配电场所设备的状态、环境情况、安防情况以及火灾消防等信息&#xff0c;实现对配电站的在线实时监控与现场设备数据采集。 配电站远程监控管理系统通过回传数据进行数据系…...

ubuntu 23.04安装中文输入法

使用ubuntu 23.04安装中文输入法&#xff0c;尝试了最新的搜狗&#xff0c;谷歌拼音&#xff0c;fcitx的原始拼音&#xff0c;最终的结果就是使用了谷歌拼音。 搜狗输入法&#xff1a;好用&#xff0c;但是用了没几天发现各种闪退&#xff0c;一打开就闪烁&#xff0c;根本无法…...

java:解析json的几种方式

Java是一种流行的编程语言&#xff0c;它提供了很多实用的库和工具&#xff0c;在处理JSON数据时也不例外。在本文中&#xff0c;我们将介绍Java中如何解析JSON数据。 JSON是一种轻量级的数据交换格式&#xff0c;它已经成为Web应用程序中最流行的数据格式之一。Java提供了许多…...

pytorch_神经网络构建1

文章目录 pytorch简介神经网络基础分类问题分析:逻辑回归模型逻辑回归实现多层神经网络多层网络搭建保存模型 pytorch简介 为什么神经网络要自定义数据类型torch.tensor? tensor可以放在gpu上训练,支持自动求导,方便快速训练,同时支持numpy的运算,是加强版,numpy不支持这些 为…...

Android 多线程并发详解

一&#xff0c;基础概念 1.1什么是CPU 中央处理器&#xff08;CPU&#xff09;&#xff0c;是电子计算机的主要设备之一&#xff0c;电脑中的核心配件。其功能主要是解释计算机指令以及处理计算机软件中的数据。CPU是计算机中负责读取指令&#xff0c;对指令译码并执行指令的…...

系统架构设计:8 论软件架构风格

目录 一 软件架构风格 1 数据流风格 (1)批处理风格 (2)管道-过滤器风格...

[Elasticsearch] 邻近匹配 (一) - 短语匹配以及slop参数

本文翻译自Elasticsearch官方指南的Proximity Matching一章。 邻近匹配(Proximity Matching) 使用了TF/IDF的标准全文搜索将文档&#xff0c;或者至少文档中的每个字段&#xff0c;视作"一大袋的单词"(Big bag of Words)。match查询能够告诉我们这个袋子中是否包含了…...

Bootstrap中让元素尽可能往父容器的左侧靠近或右侧造近(左浮动和右浮动)

在Bootstrap中&#xff0c;float-left是一个用于浮动元素的CSS类。它的作用是将一个元素向左浮动&#xff0c;使其在父容器内尽可能靠近左侧边缘&#xff0c;同时允许其他元素在其右侧排列。 使用float-left类可以创建多列布局&#xff0c;将元素水平排列在一行上&#xff0c;…...

网络流量安全分析-工作组异常

在网络中&#xff0c;工作组异常分析具有重要意义。以下是网络中工作组异常分析的几个关键点&#xff1a; 检测网络攻击&#xff1a;网络中的工作组异常可能是由恶意活动引起的&#xff0c;如网络攻击、病毒感染、黑客入侵等。通过对工作组异常的监控和分析&#xff0c;可以快…...

Flink之Watermark源码解析

1. WaterMark源码分析 在Flink官网中介绍watermark和数据是异步处理的,通过分析源码得知这个说法不够准确或者说不够详细,这个异步处理要分为两种情况: watermark源头watermark下游 这两种情况的处理方式并不相同,在watermark的源头确实是异步处理的,但是在下游只是做的判断,这…...

基于支持向量机SVM和MLP多层感知神经网络的数据预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 一、支持向量机&#xff08;SVM&#xff09; 二、多层感知器&#xff08;MLP&#xff09; 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 .…...