当前位置: 首页 > news >正文

Flink测试利器之DataGen初探 | 京东云技术团队

什么是 Flinksql

Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用 Flink SQL,可以执行各种数据操作,如过滤、聚合、连接和转换等。它还提供了窗口操作、时间处理和复杂事件处理等功能,以满足流式数据处理的需求。

Flink SQL 提供了许多扩展功能和语法,以适应 Flink 的流式和批处理引擎的特性。他是Flink最高级别的抽象,可以与 DataStream API 和 DataSet API 无缝集成,利用 Flink 的分布式计算能力和容错机制。

使用 Flink SQL处理数据的基本步骤:

  1. 定义输入表:使用 CREATE TABLE 语句定义输入表,指定表的模式(字段和类型)和数据源(如 Kafka、文件等)。

  2. 执行 SQL 查询:使用 SELECT、INSERT INTO 等 SQL 语句来执行数据查询和操作。您可以在 SQL 查询中使用各种内置函数、聚合操作、窗口操作和时间属性等。

  3. 定义输出表:使用 CREATE TABLE 语句定义输出表,指定表的模式和目标数据存储(如 Kafka、文件等)。

  4. 提交作业:将 Flink SQL 查询作为 Flink 作业提交到 Flink 集群中执行。Flink会根据查询的逻辑和配置自动构建执行计划,并将数据处理任务分发到集群中的任务管理器进行执行。

总而言之,我们可以通过Flink SQL 查询和操作来处理流式和批处理数据。它提供了一种简化和加速数据处理开发的方式,尤其适用于熟悉 SQL 的开发人员和数据工程师。

什么是 connector

Flink Connector 是指用于连接外部系统和数据源的组件。它允许 Flink 通过特定的连接器与不同的数据源进行交互,例如数据库、消息队列、文件系统等。它负责处理与外部系统的通信、数据格式转换、数据读取和写入等任务。无论是作为输入数据表还是输出数据表,通过使用适当的连接器,可以在 Flink SQL 中访问和操作外部系统中的数据。目前实时平台提供了很多常用的连接器:

例如:

  1. JDBC :用于与关系型数据库(如 MySQL、PostgreSQL)建立连接,并支持在 Flink SQL 中读取和写入数据库表的数据。

  2. JDQ :用于与 JDQ 集成,可以读取和写入 JDQ 主题中的数据。

  3. Elasticsearch :用于与 Elasticsearch 集成,可以将数据写入 Elasticsearch 索引或从索引中读取数据。

  4. File Connector:用于读取和写入各种文件格式(如 CSV、JSON、Parquet)的数据。

还有如HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive等,用于与不同的数据源进行集成。通过使用 Flink SQL Connector,我们可以轻松地与外部系统进行数据交互,将数据导入到 Flink 进行处理,或将处理结果导出到外部系统。

DataGen Connector

DataGen 是 Flink SQL 提供的一个内置连接器,用于生成模拟的测试数据,以便在开发和测试过程中使用。

使用 DataGen,可以生成具有不同数据类型和分布的数据,例如整数、字符串、日期等。这样可以模拟真实的数据场景,并帮助验证和调试 Flink SQL 查询和操作。

demo

以下是一个使用 DataGen 函数的简单示例:

-- 创建输入表
CREATE TABLE input_table (order_number BIGINT,price DECIMAL(32,2),buyer ROW<first_name STRING, last_name STRING>,order_time TIMESTAMP(3)
) WITH ('connector' = 'datagen',
);

在上面的示例中,我们使用 DataGen 连接器创建了一个名为 `input_table` 的输入表。该表包含了 `order_number`、`price` 和 `buyer` ,`order_time`四个字段。默认是random随机生成对应类型的数据,生产速率是10000条/秒,只要任务不停,就会源源不断的生产数据。当然也可以指定一些参数来定义生成数据的规则,例如每秒生成的行数、字段的数据类型和分布。

生成的数据样例:

{"order_number":-6353089831284155505,"price":253422671148527900374700392448,"buyer":{"first_name":"6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2","last_name":"d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"},"order_time":"2023-09-21 06:22:29.618"}
{"order_number":1102733628546646982,"price":628524591222898424803263250432,"buyer":{"first_name":"4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c","last_name":"7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"},"order_time":"2023-09-21 06:23:01.69"}

支持的类型

字段类型数据生成方式
BOOLEANrandom
CHARrandom / sequence
VARCHARrandom / sequence
STRINGrandom / sequence
DECIMALrandom / sequence
TINYINTrandom / sequence
SMALLINTrandom / sequence
INTrandom / sequence
BIGINTrandom / sequence
FLOATrandom / sequence
DOUBLErandom / sequence
DATErandom
TIMErandom
TIMESTAMPrandom
TIMESTAMP_LTZrandom
INTERVAL YEAR TO MONTHrandom
INTERVAL DAY TO MONTHrandom
ROWrandom
ARRAYrandom
MAPrandom
MULTISETrandom

连接器属性

属性是否必填默认值类型描述
connectorrequired(none)String‘datagen’.
rows-per-secondoptional10000Long数据生产速率
number-of-rowsoptional(none)Long指定生产的数据条数,默认是不限制。
fields.#.kindoptionalrandomString指定字段的生产数据的方式 random还是sequence
fields.#.minoptional(Minimum value of type)(Type of field)random生成器 指定字段 # 最小值, 支持数字类型
fields.#.maxoptional(Maximum value of type)(Type of field)random生成器的指定字段 # 最大值, 支持数字类型
fields.#.lengthoptional100Integerchar/varchar/string/array/map/multiset 类型的长度.
fields.#.startoptional(none)(Type of field)sequence生成器的开始值
fields.#.endoptional(none)(Type of field)sequence生成器的结束值

DataGen使用

了解了dategen的基本使用方法,那么下面来结合其他类型的连接器实践一下吧。

场景1 生成一亿条数据到hive表

CREATE TABLE dataGenSourceTable(order_number BIGINT,price DECIMAL(10, 2),buyer STRING,order_time TIMESTAMP(3))
WITH( 'connector'='datagen', 'number-of-rows'='100000000','rows-per-second' = '100000') ;CREATECATALOG myhive
WITH ('type'='hive','default-database'='default'
);
USECATALOG myhive;
USE dev;
SETtable.sql-dialect=hive;
CREATETABLEifnotexists shipu3_test_0932 (order_number BIGINT,price DECIMAL(10, 2),buyer STRING,order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file'
);
SETtable.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number,price,buyer,order_time, cast( CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;

当每秒生产10万条数据的时候,17分钟左右就可以完成,当然我们可以通过增加Flink任务的计算节点、并行度、提高生产速率’rows-per-second’的值等来更快速的完成大数据量的生产。

场景2 持续每秒生产10万条数到消息队列

CREATE TABLE dataGenSourceTable (order_number BIGINT,price INT,buyer ROW< first_name STRING, last_name STRING >,order_time TIMESTAMP(3),col_array ARRAY < STRING >,col_map map < STRING, STRING >)
WITH( 'connector'='datagen', --连接器类型'rows-per-second'='100000', --生产速率'fields.order_number.kind'='random', --字段order_number的生产方式'fields.order_number.min'='1', --字段order_number最小值'fields.order_number.max'='1000', --字段order_number最大值'fields.price.kind'='sequence', --字段price的生产方式'fields.price.start'='1', --字段price开始值'fields.price.end'='1000', --字段price最大值'fields.col_array.element.length'='5', --每个元素的长度'fields.col_map.key.length'='5', --map key的长度'fields.col_map.value.length'='5' --map value的长度) ;
CREATE TABLE jdqsink1(order_number BIGINT,price DECIMAL(32, 2),buyer ROW< first_name STRING, last_name STRING >,order_time TIMESTAMP(3),col_ARRAY ARRAY < STRING >,col_map map < STRING, STRING >)
WITH('connector'='jdq','topic'='jrdw-fk-area_info__1','jdq.client.id'='xxxxx','jdq.password'='xxxxxxx','jdq.domain'='db.test.group.com','format'='json') ;
INSERTINTO jdqsink1
SELECT*FROM dataGenSourceTable;

思考

通过以上案例可以看到,通过Datagen结合其他连接器可以模拟各种场景的数据

  • 性能测试:我们可以利用Flink的高处理性能,来调试任务的外部依赖的阈值(超时,限流等)到一个合适的水位,避免自己的任务有过多的外部依赖出现木桶效应;
  • 边界条件测试:我们通过使用 Flink DataGen 生成特殊的测试数据,如最小值、最大值、空值、重复值等来验证 Flink 任务在边界条件下的正确性和鲁棒性;
  • 数据完整性测试:我们通过Flink DataGen 可以生成包含错误或异常数据的数据集,如无效的数据格式、缺失的字段、重复的数据等。从而可以测试 Flink 任务对异常情况的处理能力,验证 Flink任务在处理数据时是否能够正确地保持数据的完整性。

总之,Flink DataGen 是一个强大的工具,可以帮助测试人员构造各种类型的测试数据。通过合理的使用 ,测试人员可以更有效地进行测试,并发现潜在的问题和缺陷。

作者:京东零售 石朴

来源:京东云开发者社区 转载请注明来源

相关文章:

Flink测试利器之DataGen初探 | 京东云技术团队

什么是 Flinksql Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的&#xff0c;支持ANSI SQL 标准&#xff0c;允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL&#xff0c;可以以声明式的方式描述数据处理逻辑&#xff0c;而无需编写显式的代码…...

linux更换常用软件的默认缓存路径(.conda, .huggingface等)

在使用linux的过程中&#xff0c;我们往往会使用软件安装很多packages&#xff0c;其中的大多数软件&#xff08;例如conda&#xff09;会把当前安装的packages缓存起来&#xff0c;以加速之后的相同package的安装。 而很多软件的默认缓存路径是user自己的home路径。下面罗列几…...

Kafka消费者使用案例

本文代码链接&#xff1a;https://download.csdn.net/download/shangjg03/88422633 1.消费者和消费者群组 在 Kafka 中&#xff0c;消费者通常是消费者群组的一部分&#xff0c;多个消费者群组共同读取同一个主题时&#xff0c;彼此之间互不影响。Kafka 之所以要引入消费者群组…...

SpringMVC全注解开发

在学习过程中&#xff0c;框架给我们最大的作用&#xff0c;就是想让开发人员尽可能地只将精力放在具体业务功能的实现之上&#xff0c;而对于各种映射关系的配置&#xff0c;统统由框架来进行完成&#xff0c;由此&#xff0c;注解就很好的将映射功能进行实现&#xff0c;并且…...

解决 android Cannot access ‘<init>‘: it is private in

最近要在2个非直接依赖module使用单例&#xff0c;有一种注入依赖的方式可以&#xff0c;但是报了如下错误&#xff1a; Cannot access <init>: it is private in 经过查阅资料&#xff0c;原来是依赖的单例类的构造函数不能使用private&#xff0c;这里做个记录&#…...

不容易解的题10.15

395.至少有K个重复字符的最长字串 395. 至少有 K 个重复字符的最长子串 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/longest-substring-with-at-least-k-repeating-characters/description/?envTypelist&envIdZCa7r67M自认为是不好做的题。尤其…...

Megatron-LM GPT 源码分析(二) Sequence Parallel分析

引用 本文基于开源代码 https://github.com/NVIDIA/Megatron-LM &#xff0c;延续上一篇Megatron-LM GPT 源码分析&#xff08;一&#xff09; Tensor Parallel分析 通过对GPT的模型运行示例&#xff0c;从三个维度 - 模型结构、代码运行、代码逻辑说明 对其源码做深入的分析。…...

DNA序列(DNA Consensus String, ACM/ICPC Seoul 2006, UVa1368) rust解法

输入m个长度均为n的DNA序列&#xff0c;求一个DNA序列&#xff0c;到所有序列的总Hamming距离尽量小。两个等长字符串的Hamming距离等于字符不同的位置个数&#xff0c;例如&#xff0c;ACGT和GCGA的Hamming距离为2&#xff08;左数第1, 4个字符不同&#xff09;。 输入整数m和…...

如何使用Jmeter进行http接口测试?

前言&#xff1a; 本文主要针对http接口进行测试&#xff0c;使用Jmeter工具实现。 Jmter工具设计之初是用于做性能测试的&#xff0c;它在实现对各种接口的调用方面已经做的比较成熟&#xff0c;因此&#xff0c;本次直接使用Jmeter工具来完成对Http接口的测试。 一、开发接…...

bash一行输入,多行回显demo脚本

效果图&#xff1a; 脚本&#xff1a; #!/bin/bash # 定义一个变量&#xff0c;用来存储输入的内容 input"" # 定义一个变量&#xff0c;用来存储输入的字符 char""# 为了让read能读到空格键 IFS_store$IFS IFS# 提示内容&#xff0c;在while循环中也有&a…...

IDEA spring-boot项目启动,无法加载或找到启动类问题解决

问题描述&#xff1a;找不到或无法加载主类 xxx.xxx.xxx.Classname 解决方案&#xff1a; 1.检查启动设置&#xff1a; 启动类所在包运行环境&#xff08;一般选择默认即可&#xff09;设置完成即可进行运行测试 2.如果第一步没有解决问题&#xff0c;试着第二步&#xff1a…...

【LeetCode刷题(数据结构与算法)】:完全二叉树的节点个数

完全二叉树 的定义如下&#xff1a;在完全二叉树中&#xff0c;除了最底层节点可能没填满外&#xff0c;其余每层节点数都达到最大值&#xff0c;并且最下面一层的节点都集中在该层最左边的若干位置。若最底层为第 h 层&#xff0c;则该层包含 1~ 2h 个节点 输入&#xff1a;r…...

【代码随想录】算法训练营 第一天 第一章 数组 Part 1

目录 数组基础知识补充 704. 二分查找 题目 左闭右闭方法 思路 代码 左闭右开方法 思路 代码 27. 移除元素 题目 暴力解法 思路 代码 双指针法 思路 代码 数组基础知识补充 1. 在leecode中&#xff0c;数组一般是以vector容器的形式出现的&#xff0c;虽然ve…...

286_C++_定时器的其中一个操作,定时重载接口—startTimer循环执行回调(未完全)

1、启动一个定时器,允许在一定时间间隔内执行回调函数startTimer 1、接口函数参数详解 /*** @brief startTimer 定时重载接口* @param interval 定时器触发间隔,单位毫秒 (ms)* @param notify 定时时间到后需要触发的回调* @param type 回调驱动方…...

自动驾驶学习笔记(四)——变道绕行仿真

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《2023星火培训【感知专项营】》免费课程—>传送门 文章目录 前言 仿真内容 启动Dreamview 开启Sim…...

C++位图,布隆过滤器

本期我们来学习位图&#xff0c;布隆过滤器等相关知识&#xff0c;以及模拟实现&#xff0c;需求前置知识 C-哈希Hash-CSDN博客 C-封装unordered_KLZUQ的博客-CSDN博客 目录 位图 布隆过滤器 海量数据面试题 全部代码 位图 我们先来看一道面试题 给 40 亿个不重复的无符号…...

Python多种方法实现九九乘法表

你好&#xff0c;我是悦创。 九九乘法表是一种常见的算术学习工具&#xff0c;通常用于帮助学生记住乘法的基本运算。以下是使用Python实现九九乘法表的几种方法&#xff1a; 1. 使用两个嵌套循环 for i in range(1, 10):for j in range(1, i 1):print(f"{j}x{i}{i * …...

【力扣1876】长度为三且各字符不同的子字符串

&#x1f451;专栏内容&#xff1a;力扣刷题⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、题目描述二、题目分析 一、题目描述 题目链接&#xff1a;长度为三且各字符不同的子字符串 如果一个字符串不含有任何…...

HSN:微调预训练ViT用于目标检测和语义分割,华南理工和阿里巴巴联合提出

今天跟大家分享华南理工大学和阿里巴巴联合提出的将ViT模型用于下游任务的高效微调方法HSN&#xff0c;该方法在迁移学习、目标检测、实例分割、语义分割等多个下游任务中表现优秀&#xff0c;性能接近甚至在某些任务上超越全参数微调。 论文标题&#xff1a;Hierarchical Side…...

机器学习的原理是什么?

训过小狗没? 没训过的话总见过吧? 你要能理解怎么训狗&#xff0c;就能非常轻易的理解机器学习的原理. 比如你想教小狗学习动作“坐下”一开始小狗根本不知道你在说什么。但是如果你每次都说坐下”然后帮助它坐下&#xff0c;并给它一块小零食作为奖励&#xff0c;经过多次…...

在软件开发中正确使用MySQL日期时间类型的深度解析

在日常软件开发场景中&#xff0c;时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志&#xff0c;到供应链系统的物流节点时间戳&#xff0c;时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库&#xff0c;其日期时间类型的…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

leetcodeSQL解题:3564. 季节性销售分析

leetcodeSQL解题&#xff1a;3564. 季节性销售分析 题目&#xff1a; 表&#xff1a;sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...

【JVM面试篇】高频八股汇总——类加载和类加载器

目录 1. 讲一下类加载过程&#xff1f; 2. Java创建对象的过程&#xff1f; 3. 对象的生命周期&#xff1f; 4. 类加载器有哪些&#xff1f; 5. 双亲委派模型的作用&#xff08;好处&#xff09;&#xff1f; 6. 讲一下类的加载和双亲委派原则&#xff1f; 7. 双亲委派模…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

【JavaSE】多线程基础学习笔记

多线程基础 -线程相关概念 程序&#xff08;Program&#xff09; 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序&#xff0c;比如我们使用QQ&#xff0c;就启动了一个进程&#xff0c;操作系统就会为该进程分配内存…...

PostgreSQL——环境搭建

一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在&#xff0…...

Qemu arm操作系统开发环境

使用qemu虚拟arm硬件比较合适。 步骤如下&#xff1a; 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载&#xff0c;下载地址&#xff1a;https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...