【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式
一、背景:
CDC数据中包含了,数据的变更过程。当CDC写入传统数据库最终每一个primary key下会保存一条数据。当然可以使用特殊手段保存多分记录但是显然造成了数据膨胀。
另外数据湖Hudi(0.13.1)是不支持保存所有Changelog其Compaction机制会清除所有旧版本的内容。Iceberg支持TimeTravel,能查到某个时间点的数据状态,但是不能列举的单条记录的Change过程。
所以目前只能手动实现。
其实,实现思路很简单,将原PrimaryKey+Cdc的 ts_ms
一起作为新表的 PrimaryKey就可以了。但需要注意的是一条数据可能变更很多次,但一般需要保存近几次的变更,所以就需要删除部分旧变更记录。ts_ms
就是CDC数据中记录的日志实际产生的时间,具体参见debezium 。如果原表primarykey是联合主键,即有多个字段共同组成,则最好将这些字段拼接为一个字符串,方便后续关联。
本文思路
CDC --写入-> Phoenix + 定期删除旧版本记录
CDC数据写入略过,此处使用SQL模拟写入。
二、Phoenix旧版记录删除(DEMO)
phoenix doc
bin/sqlline.py www.xx.com:2181
-- 直接创建phoenix表
create table TEST.TEST_VERSION(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
) VERSIONS=5;
再去hbase shell中查看,hbase 关联表已经有phoenix创建了。
hbase(main):032:0> desc "TEST:TEST_VERSION"
Table TEST:TEST_VERSION is ENABLED
TEST:TEST_VERSION, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRe
gionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|80
5306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.builder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix
.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', T
TL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPE
N => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
-- 在phoenix中向表插入数据
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:00:00'),'zhangsan');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:00:00'),'lisi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 12:00:00'),'wangwu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 13:00:00'),'zhaoliu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 14:00:00'),'liuqi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 15:00:00'),'sunba');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 07:00:00'),'sunyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 08:00:00'),'chaoyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:00:00'),'xuri');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:30:00'),'chenxi');
-- OK再查询一下数据插入情况
SELECT * FROM TEST.TEST_VERSION;
以下假设每个PrimaryKey需要保留最新的3版本数据。所以红色框内是需要删除的数据。
现在需要使用row_number的函数给每个primarykey的不通version数据标识。但是phoenix并没有开窗函数。只有agg聚合函数。
phoenix对SQL的限制还是比较多的如:
(1)join 非等值连接不支持,如on a.id>s.id
是不支持的,也不支持数组比较连接,如on a.id = ARRAY[1,2,3]
。 会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)where exists 格式的非等值连接不支持。select ... from A where exists (select 1 from B where A.id>B.id)
是不支持的。会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)没有开窗window函数
(3)DELETE FROM不支持JOIN
最终发下有一下函数可用
(1)NTH_VALUE
获取分组排序的第N个值。 返回原值的类型。
(2)FIRST_VALUES
和 LAST_VALUES
获取分区排序后的前、后的N个值,返回ARRAY类型。
此三个函数官网doc中,案例是这样的 FIRST_VALUES( name, 3 ) WITHIN GROUP (ORDER BY salary DESC)
是全局分组,而实际使用中是需要搭配 GROUP BY
使用的。
所以可以获取到
-- 方案一:使用NTH_VALUE获取阈值
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES-- 方案二:使用FIRST_VALUES获取到一个ARRAY
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS);
由于phoenix支持行子查询,以下是官方案例。这样就能绕过不使用DELETE … JOIN了。
Row subqueries
A subquery can return multiple fields in one row, which is considered returning a row constructor. The row constructor on both sides of the operator (IN/NOT IN, EXISTS/NOT EXISTS or comparison operator) must contain the same number of values, like in the below example:
SELECT column1, column2
FROM t1
WHERE (column1, column2) IN(SELECT column3, column4FROM t2WHERE column5 = ‘nowhere’);
This query returns all pairs of (column1, column2) that can match any pair of (column3, column4) in the second table after being filtered by condition: column5 = ‘nowhere’.
最终实现删除 除N个较新的以外的所有旧版本数据, SQL如下:
-- NTH_VALUE方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES
);-- FIRST_VALUES方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS)
);
删除后效果:
相关文章:
【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式
一、背景: CDC数据中包含了,数据的变更过程。当CDC写入传统数据库最终每一个primary key下会保存一条数据。当然可以使用特殊手段保存多分记录但是显然造成了数据膨胀。 另外数据湖Hudi(0.13.1)是不支持保存所有Changelog其Compaction机制会清除所有旧版…...
阿里云服务器开放的一个新端口,重启防火墙,端口未启动
问题: 阿里云网页开放的一个新端口后,重启防火墙,端口未启动,之前配置的也都停止了。 解决: 原因可能是阿里的服务控制了,只能一个个端口开启了。把新配置新端口也单独启用。 开启80端口指令 firewall-cm…...
【PHPCUSTOM】打包PHP程序为EXE
目录 一、下载PHPCUSTOM 二、PHP网站打包 1、打开PHPCUSTOM 2、配置参数 3、生成exe文件 网上很多PHP程序打包成EXE的文章,但是都不能用,最后找到了PHPCUSTOM,使用PHPCUSTOM可以把PHP程序打包成exe。我们都知道PHP是服务端语言ÿ…...
药品咨询报告合集整理平台打包(一共36597份)【专题推荐】
<医药行业从业者必看>笔者今天分享高价值医药行业报告36500余份的获取/下载方法,报告涵盖了医药细分领域研究报告药品报告(所有上市药品)医药行业分析报告医药环境观察报告药品市场调研报告药品靶点研究报告医药白皮书;数据…...
数字化管理新革命,AI数字人CEO登场引领变革!
王一博老板乐华娱乐CEO杜华推出了她的双生数字人华华子,专门替自己直播卖货。在没有任何宣传的情况下,仅仅在短短的10分钟直播时间内,观众人数就飙升至30万人!同时,“杜华AI华华子直播”更是迅速登上了微博热搜榜。这一…...
FPGA/数字IC(芯海科技2022)面试题 2(解析版)
以下仅为学习参考(非原创),如有疑惑欢迎评论区指出! 一、单选题(共20题,每题3分,共60分) 1. D触发器:Tsetup3ns,Thold1ns,Tck2q1ns, 该D触发器最大可运行时…...
SpringMVC之JSON数据返回与异常处理机制---全方面讲解
一,JSON数据返回的理解 在Spring MVC中,当需要将数据以JSON格式返回给客户端时,可以使用ResponseBody注解或RestController注解将Controller方法的返回值直接转化为JSON格式并返回。这使得开发者可以方便地将Java对象转换为JSON,并…...
信息化发展53
数据标准化 1 、数据标准化是实现数据共享的基础。 2 、数据标准化的主要内容包括元数据标准化、数据元标准化、数据模式标准化、数据分类与编码标准化和数据标准化管理。 元数据标准化 1 、元数据是关于数据的数据( Data About Data )。其实质是用于…...
Java学习笔记——字符/字符串
在 Java 语言中,字符串都被设计成「不可变」的类型,即无法直接修改字符串的某一位字符,需要新建一个字符串实现 StringBuilder 字符 字符是用单引号括起来的单个字母,在Java中,表示字符的数据类型为char。一个字符…...
数据结构与算法基础-(1)
🌈write in front🌈 🧸大家好,我是Aileen🧸.希望你看完之后,能对你有所帮助,不足请指正!共同学习交流. 🆔本文由Aileen_0v0🧸 原创 CSDN首发🐒 如…...
华为云云耀云服务器L实例评测|轻量级应用服务器对决:基于 STREAM 深度测评华为云云耀云服务器L实例的内存性能
本文收录在专栏:#云计算入门与实践 - 华为云 专栏中,本系列博文还在更新中 相关华为云云耀云服务器L实例评测文章列表如下: 华为云云耀云服务器L实例评测 | 从零开始:云耀云服务器L实例的全面使用解析指南华为云云耀云服务器L实…...
Windows安装Neo4j
图数据库概述 图数据库是基于图论实现的一种NoSQL数据库,其数据存储结构和数据查询方式都是以图论(它以图为研究对象图论中的图是由若干给定的点及连接两点的线所构成的图形)为基础的, 图数据库主要用于存储更多的连接数据。 Neo…...
vue3开发必备核心要点
1、route/router的区别 ● $route 表示当前激活的路由的状态信息,包含了当前URL解析得到的信息,包含当前的path路径,params参数,query对象,name路径名等属性 ● r o u t e r 路由器对象( n e w 的路由器对…...
针对敏感数据的安全转录服务
即便在新冠肺炎疫情期间,继续保持了最高级别的机密性 新冠肺炎疫情带来的各种限制向所有服务提供商提出了挑战,促使提供商们想方设法采取更富想象力的新方法来满足客户的需求。澳鹏采用了一种由两种方案组成的工作机制,服务于客户机密材料的…...
leetcode 10. 正则表达式匹配
2023.9.20 感觉是目前做过dp题里最难的一题了... 本题首要的就是需要理解题意,翻了评论区我才发现之前一直理解的题意是错的。 我原来理解的 “ *匹配0次” 是指:*直接消失,不会影响到前面的字符。 但是*和前一个字符其实是连体的࿰…...
Vue前端开发中的输入限制与输入规则探究
前言 在Vue前端开发中,我们经常需要对用户的输入进行限制和规范,以确保数据的准确性和安全性。本文将介绍如何使用Vue的el-input组件来实现输入限制和输入规则,并提供相应的代码示例。 一、输入限制 最大长度限制 我们可以使用maxlength属…...
自己封装 vue3+ts 组件库并且发布到 NPM
自己封装 vue3ts 组件库并且发布到 NPM 创建项目 pnpm create vite配置 package.json 按照提示创建好项目,然后再 package.json 中进行如下配置: {"name": "tribiani-vue-tools","private": false,"version"…...
MySQL学习系列(6)-每天学习10个知识
目录 1. 管理和维护大量的数据库表和数据2. 检测和修复MySQL性能瓶颈3. MySQL的视图缓存4. 处理MySQL并发问题5. 函数索引和全文索引6. UNION ALL 和 UNION 的区别7. 存储引擎的选择8. 存储过程和触发器9. 数据表管理和优化10. 数据库安全性和一致性 👍 点赞&#x…...
“毛细血管”的进化:华为分销业务如何让伙伴也有“高能级”
作者 | 曾响铃 文 | 响铃说 数字化蓬勃发展的大时代,除了那些中、大型企业,数量更为庞大的小微企业同样有借助数字化产品、服务来提升企业经营的需求,由此也带来了广袤的数字化分销市场。 这里处在聚光灯之外,很少被数字化时代…...
警惕!多本SCI/SSCI被剔除,9月SCI/SSCI期刊目录已更新~(附下载)
【SciencePub学术】 2023年9月20日,科睿唯安更新了Web of Science核心期刊目录。 继上次SCI期刊目录和SSCI期刊目录更新之后,本次9月更新共有9本期刊发生变动: • SCIE:有3本期刊不再被SCIE期刊目录收录(Editorial De-listing/Pr…...
一点整理
(1) 美国在2010年以后开始流行数字化转型的。 在2010年以前, 2006年社交网络FB “YOU”:在2004-2006 Web2.0热之前,企业是无法直接触达到每个消费者的2006年Amazon电子商务:这个是我瞎凑的,但因…...
Vulnhub系列靶机---Deathnote: 1死亡笔记
文章目录 信息收集主机发现端口扫描目录扫描dirsearchgobusterdirb扫描 漏洞利用wpscan扫描Hydra爆破 总结 靶机文档:Deathnote: 1 下载地址:Download (Mirror) 难易程度:so Easy 信息收集 主机发现 端口扫描 访问靶机的80端口,报…...
从基础到高阶:史上最小白的Attention机制详解——揭秘人工智能中的核心技术
1. Encoder-Decoder 想象一下你正在和一个会说多种语言的朋友对话。你用中文对他说了一句话,他将其“编码”成他的“内部语言”,然后再“解码”成英语给你回复。在这个过程中,“编码”就是Encoder,而“解码”就是Decoder。 在机…...
9.20金融科技(比特币)
比特币的起源和发展 2008年爆发全球金融危机,同年11月1日,一个自称中本聪(Satoshi Nakamoto)的人在P2P foundation网站上发布了比特币白皮书《比特币:一种点对点的电子现金系 ,陈述了他对电子货币的新设…...
什么是内存碎片?
在嵌入式系统中,内存是十分有限而且是十分珍贵的,用一块内存就少了一块内存,而在分配中随着内存不断被分配和释放,整个系统内存区域会产生越来越多的碎片。 因为在使用过程中,申请了一些内存,其中一些释放…...
C语言堆排序
堆排序(Heapsort)是一种在时间复杂度上达到了最优的基于比较的排序算法。堆排序算法是指利用堆这种数据结构所设计的一种排序算法。堆积是一个近似完全二叉树的结构,并同时满足堆积的性质:即子节点的键值或索引总是小于࿰…...
【学习笔记】CF573E Bear and Bowling
感觉贪心的做法比较自然🤔,推荐 这篇博客 非常经典牛逼的贪心思路: 考虑每次加入一个数,位置 i i i的贡献为 V i k i a i b i V_ik_i\times a_ib_i Vikiaibi,其中 k i k_i ki表示 i i i以前被选的位置的…...
函数扩展之——内存函数
前言:小伙伴们又见面啦。 本篇文章,我们将讲解C语言中比较重要且常用的内存函数,并尝试模拟实现它们的功能。 让我们一起来学习叭。 目录 一.什么是内存函数 二.内存函数有哪些 1.memcpy (1)库函数memcpy &…...
【在线机器学习】River对流数据进行机器学习
River是一个用于在线机器学习的Python库。它旨在成为对流数据进行机器学习的最用户友好的库。River是crme和scikit-multiflow合并的结果。 https://github.com/online-ml/river 举个简单示例,将训练逻辑回归来对网站网络钓鱼数据集进行分类。下面介绍了数据集中的…...
第 4 章 串(串的块链存储实现)
1. 背景说明 该实现和链表的实现极为相似,只是将链接的内存拆分为具体的大小的块。 2. 示例代码 1). status.h /* DataStructure 预定义常量和类型头文件 */#ifndef STATUS_H #define STATUS_H#define CHECK_NULL(pointer) if (!(pointer)) { \printf("FuncN…...
网站管理页面/如何学会推广和营销
这里介绍两种产生sdp文件的方法,仅供参考,欢迎补充。 1、环境 操作系统 :CentOS6.2_64 内核版本 :2.6.32-220.23.1.el6.x86_64 Darwin Streaming Server 版本:6.0.3 mpeg4ip版本:1.6.1 ffmpeg版本ÿ…...
榆林建设银行的网站/如何营销推广
题解: 时间超时的看这里: 用java 一不注意就容易时间超时,内存不足。所以如果没有高级函数之类的语法,可以用c和python 来做。除了比赛外,走java方向的,推荐用java 淦她。尽量用空间换时间,循环…...
wordpress会员页面/金华seo
学了C基本的语法都知道继承可以让子类拥有更多的功能,除了继承还有组合,委托,也能让一个类的功能增加。设计模式,这个设计是设计继承,组合,委托,之间相互叠加的方式,让其符合业务需求…...
一级a做爰网站中国/如何注册域名网站
把数据库从oracle迁移到PPASPPAS有两个迁移工具,一个图形界面的,一个命令行的,下面以图形界面为例。1首先需要在目标数据库系统PPAS上建立和源库对应的用户和对等的权限,再建立目标数据库。create user " USERNAMEXXX "…...
拉萨北京网站建设/免费seo推广计划
简介 Redis是一个使用ANSI C编写的开源、支持网络、基于内存、可选持久性的键值对存储数据库。从2015年6月开始,Redis的开发由Redis Labs赞助,而2013年5月至2015年6月期间,其开发由Pivotal赞助。[1]在2013年5月之前,其开发由VMwar…...
友情链接购买平台/seo 优化 工具
题目 正常情况下,数据包由起始码(16bit)、数据段(n字节,n<256)、结束码(16bit)三部分构成。起始码为0xFF00,结束码为0xFF01。在一个完整的数据包中,数据段部分不会出现起始码和结束码。请设计一个电路,在码流中检测完整且有效的数据包&am…...