【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式

一、背景:

CDC数据中包含了,数据的变更过程。当CDC写入传统数据库最终每一个primary key下会保存一条数据。当然可以使用特殊手段保存多分记录但是显然造成了数据膨胀。
另外数据湖Hudi(0.13.1)是不支持保存所有Changelog其Compaction机制会清除所有旧版本的内容。Iceberg支持TimeTravel,能查到某个时间点的数据状态,但是不能列举的单条记录的Change过程。
所以目前只能手动实现。
其实,实现思路很简单,将原PrimaryKey+Cdc的 ts_ms 一起作为新表的 PrimaryKey就可以了。但需要注意的是一条数据可能变更很多次,但一般需要保存近几次的变更,所以就需要删除部分旧变更记录。ts_ms 就是CDC数据中记录的日志实际产生的时间,具体参见debezium 。如果原表primarykey是联合主键,即有多个字段共同组成,则最好将这些字段拼接为一个字符串,方便后续关联。

本文思路
CDC --写入-> Phoenix + 定期删除旧版本记录

CDC数据写入略过,此处使用SQL模拟写入。

二、Phoenix旧版记录删除(DEMO)

phoenix doc

bin/sqlline.py www.xx.com:2181
-- 直接创建phoenix表
create table TEST.TEST_VERSION(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
) VERSIONS=5;

再去hbase shell中查看,hbase 关联表已经有phoenix创建了。

hbase(main):032:0> desc "TEST:TEST_VERSION"
Table TEST:TEST_VERSION is ENABLED
TEST:TEST_VERSION, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRe
gionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|80
5306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.builder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix
.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', T
TL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPE
N => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
-- 在phoenix中向表插入数据
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:00:00'),'zhangsan');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:00:00'),'lisi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 12:00:00'),'wangwu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 13:00:00'),'zhaoliu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 14:00:00'),'liuqi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 15:00:00'),'sunba');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 07:00:00'),'sunyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 08:00:00'),'chaoyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:00:00'),'xuri');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:30:00'),'chenxi');
-- OK再查询一下数据插入情况
SELECT * FROM TEST.TEST_VERSION;

以下假设每个PrimaryKey需要保留最新的3版本数据。所以红色框内是需要删除的数据。
在这里插入图片描述

现在需要使用row_number的函数给每个primarykey的不通version数据标识。但是phoenix并没有开窗函数。只有agg聚合函数。
phoenix对SQL的限制还是比较多的如:
(1)join 非等值连接不支持,如on a.id>s.id 是不支持的,也不支持数组比较连接,如on a.id = ARRAY[1,2,3]。 会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)where exists 格式的非等值连接不支持。select ... from A where exists (select 1 from B where A.id>B.id) 是不支持的。会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)没有开窗window函数
(3)DELETE FROM不支持JOIN

最终发下有一下函数可用
(1)NTH_VALUE 获取分组排序的第N个值。 返回原值的类型。
(2)FIRST_VALUESLAST_VALUES 获取分区排序后的前、后的N个值,返回ARRAY类型。
此三个函数官网doc中,案例是这样的 FIRST_VALUES( name, 3 ) WITHIN GROUP (ORDER BY salary DESC) 是全局分组,而实际使用中是需要搭配 GROUP BY 使用的。

所以可以获取到

-- 方案一:使用NTH_VALUE获取阈值
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES-- 方案二:使用FIRST_VALUES获取到一个ARRAY 
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS);

由于phoenix支持行子查询,以下是官方案例。这样就能绕过不使用DELETE … JOIN了。

Row subqueries
A subquery can return multiple fields in one row, which is considered returning a row constructor. The row constructor on both sides of the operator (IN/NOT IN, EXISTS/NOT EXISTS or comparison operator) must contain the same number of values, like in the below example:
SELECT column1, column2
FROM t1
WHERE (column1, column2) IN(SELECT column3, column4FROM t2WHERE column5 = ‘nowhere’);
This query returns all pairs of (column1, column2) that can match any pair of (column3, column4) in the second table after being filtered by condition: column5 = ‘nowhere’.

最终实现删除 除N个较新的以外的所有旧版本数据, SQL如下:

-- NTH_VALUE方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES
);-- FIRST_VALUES方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS)
);

删除后效果:
在这里插入图片描述

三、探索

3.1 Phoenix的Row Timestamp 探索

Phoenix的Row Timestamp是为了在meta中更快检索数据而设置的。不能实现hbase 中的versions 数据在phoenix中展现。
如下测试案例:
phoenix建表,并插入数据:

create table TEST.TEST_ROW_TIMESTAMP(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS ROW_TIMESTAMP)
) VERSIONS=5;UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 09:30:00'),'windows');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:30:00'),'mac');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'linux');

在hbase中查询表:

hbase(main):050:0> desc 'TEST:TEST_ROW_TIMESTAMP'
Table TEST:TEST_ROW_TIMESTAMP is ENABLED
TEST:TEST_ROW_TIMESTAMP, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver|805306366|', coprocessor$3
=> '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|805306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.b
uilder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICAT
ION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
1 row(s)
Took 0.0235 secondshbase(main):049:0> scan 'TEST:TEST_ROW_TIMESTAMP'
ROW                                                            COLUMN+CELLrk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00              column=0:\x00\x00\x00\x00, timestamp=1577871000000, value=xrk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00              column=0:\x80\x0B, timestamp=1577871000000, value=windowsrk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00              column=0:\x00\x00\x00\x00, timestamp=1577874600000, value=xrk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00              column=0:\x80\x0B, timestamp=1577874600000, value=macrk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00           column=0:\x00\x00\x00\x00, timestamp=1577878200000, value=xrk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00           column=0:\x80\x0B, timestamp=1577878200000, value=linux
3 row(s)
Took 0.0072 seconds

如上查询结果,我们希望在hbase中只有一行数据,并保存为对多个版本,但实际查询到了多条数据,timestamp做为hbase表的rowkey的一部分了。phoenix在创建表时候没有使用hbase多版本保存机制。

3.2 phoenix 和 hbase表结构不一致

先创建hbase Table

create 'TEST:TEST_DIF_TS',{NAME => 'COLS', VERSIONS => 3}
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhangsan'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189085000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','lisi'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189090000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','wangwu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189095000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhaoliu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189105000get 'TEST:TEST_DIF_TS','001',{COLUMN=>'COLS:NAME',VERSIONS=>3}
# 结果:
COLUMN                                             CELLCOLS:NAME                                         timestamp=1695784642879, value=zhaoliuCOLS:NAME                                         timestamp=1695784642857, value=wangwuCOLS:NAME                                         timestamp=1695784642830, value=lisi

创建Phoenix Table

create table TEST.TEST_DIF_TS(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
);
UPSERT INTO TEST.TEST_DIF_TS(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'XXX');0: jdbc:phoenix:...> select * from  TEST.TEST_DIF_TS;
+--------+--------------------------+-------+
|   ID   |            TS            | NAME  |
+--------+--------------------------+-------+
| rk001  | 2020-01-01 11:30:00.000  | XXX   |
+--------+--------------------------+-------+

再翻查hbase Table数据

hbase(main):004:0> scan 'TEST:TEST_DIF_TS'
ROW                                                COLUMN+CELL001                                               column=COLS:NAME, timestamp=1695784642879, value=zhaoliu001                                               column=COLS:TS, timestamp=1695784643741, value=1695189105000rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x00\x00\x00\x00, timestamp=1695786568345, value=x00rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x80\x0B, timestamp=1695786568345, value=XXX

可以看到Phoenix只能查询到自己插入的数据,但是hbase可以查询到phoenix,所以phoenix会把不符合自己表结构的数据过滤掉。phoenix的会将自己所有的primary key字段拼接后作为hbase 的rowkey存入hbase。

参考文章:

Phoenix实践 —— Phoenix SQL常用基本语法总结小记
Phoenix 对 Hbase 中表的映射
phoenix使用详解
Phoenix 简介及使用方式
phoenix创建映射表和创建索引、删除索引、重建索引

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/142978.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

信息安全:恶意代码防范技术原理.

信息安全&#xff1a;恶意代码防范技术原理. 恶意代码的英文是 Malicious Code, 它是一种违背目标系统安全策略的程序代码&#xff0c;会造成目标系统信息泄露、资源滥用&#xff0c;破坏系统的完整性及可用性。 目录&#xff1a; 恶意代码概述&#xff1a; &#xff08;1&a…

基于体系结构-架构真题2022(四十一)

给定关系模式R&#xff08;U,F&#xff09;&#xff0c;其中U为属性集&#xff0c;F是U上的一组函数依赖&#xff0c;那么函数依赖的公理系统中分解规则是指&#xff08;&#xff09;为F所蕴含。 解析&#xff1a; 伪传递是x到y&#xff0c;wy到z&#xff0c;则xw到z 传递是z…

AVL Cruise 2020.1 安装教程

文章目录 安装包安装破解 安装包 链接&#xff1a;https://pan.baidu.com/s/1GxbeDj_SyvKFyPeTsstvTQ?pwd6666 提取码&#xff1a;6666 安装 安装文件&#xff1a; 双击setup.exe&#xff1a; 一直netx&#xff0c;中间要修改两次路径&#xff0c;第一次是安装位置&#xf…

Go-Ldap-Admin | Ldap 同步钉钉、企业微信、飞书组织架构实践和部分小坑

目录 一、Docker-compose快速拉起demo测试环境 二、原生部署流程 安装MySQL&#xff1a;5.7数据库 安装openLDAP 修改域名&#xff0c;新增con.ldif 创建一个组织 安装OpenResty 下载后端 下载前端 部署后端 部署前端 三、管理动态字段 钉钉 企业微信 飞书 四、…

计算机里的神灵(SCIP)

计算机程序的构造和解释 我找到计算机里的神灵了&#xff0c;开心一刻 下面是从MIT官网下载的 SCIP求值器&#xff08;解释器&#xff09;的代码&#xff0c;这个官网是个宝藏库 还有其他视频课程和 SCIP的问题答案和可运行代码 链接&#xff1a;https://ocw.mit.edu/courses/6…

gitee-快速设置

快速设置— 如果你知道该怎么操作&#xff0c;直接使用下面的地址 HTTPS SSH: gitgitee.com:liuzl33078235/esp-idf.git 我们强烈建议所有的git仓库都有一个README, LICENSE, .gitignore文件 初始化 readme 文件 Git入门&#xff1f;查看 帮助 , Visual Studio / TortoiseG…

mysql实际调优

一般实际调优的情况就不需要去考虑mysql数据库结构或者命名优化那些。做这些优化是大动作&#xff0c;也不是咱们一般人去接触到的。 所以我们针对mysql的调优其实大部分还是针对索引进行优化。 我们刚接触这个表的话可以先查询当前表中所有的索引 使用 SHOW INDEX FROM yo…

指针笔试题详解

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力&#xff0c;一起奔赴大厂。 目录 1.前言 2.指针题写出下列程序的结…

第1章 数据结构绪论

1.1 开场白 1.2 你数据结构怎么学的 1.3 数据结构起源 早期人们都把计算机理解为数值计算工具&#xff0c;就是感觉计算机当然是用来计算的&#xff0c;所以计算机解决问题&#xff0c;应该是先从具体问题中抽象出一个适当的数据模型&#xff0c;设计出一个解此数据模型的算…

栈(Java)

目录 1.什么是栈 2.栈的使用 3.栈的模拟实现 1.什么是栈 栈&#xff1a;是一种特殊的线性表&#xff0c;只允许在其固定的一端进行插入和删除操作。栈中的元素遵循先进后出&#xff08;后进先出&#xff09;原则 栈顶&#xff1a;进行插入和删除数据的一端 栈底&#xff1a…

怎么压缩word文档的大小?

怎么压缩word文档的大小&#xff1f;Word文件压缩成一个普遍存在的挑战&#xff0c;现在看来至少是这样的。最近&#xff0c;我们接到了许多用户的疑问&#xff0c;他们想知道如何压缩Word文件大小。这个问题似乎广泛存在于办公场景中&#xff0c;因此我们需要找到解决方案。导…

测试用例:在线音乐播放器

从 功能测试、界面测试、性能测试、兼容性测试、易用性测试、安全测试、弱网测试等 七个方面对在线音乐播放器进行设计测试用例

【广州华锐互动】利用VR开展工业事故应急救援演练,确保救援行动的可靠性和有效性

在工业生产中&#xff0c;事故的突发性与不可预测性常常带来巨大的损失。传统的应急演练方式往往存在场地限制、成本高、效果难以衡量等问题。然而&#xff0c;随着虚拟现实&#xff08;VR&#xff09;技术的快速发展&#xff0c;VR工业事故应急救援演练应运而生&#xff0c;为…

Cannot find module ‘core-js/modules/es6.regexp.constructor‘

npm run dev 之后报如下错误 解决方法&#xff1a;npm install core-js2 如果超时或者下载时间慢可以尝试 用cnpm install core-js2

Exception in thread “main“ java.sql.SQLException: No suitable driver

详细报错信息如下&#xff1a; Exception in thread "main" java.sql.SQLException: No suitable driver at java.sql.DriverManager.getDriver(DriverManager.java:315) at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverC…

宝塔nginx搭建Ftp文件服务器

一&#xff1a;创建FTP 填入账号密码后&#xff0c;选择根目录&#xff0c;这个根目录就是nginx要代理的目录 二&#xff1a;配置nginx root的地址就是上面填的FTP根目录 三&#xff1a;http访问 服务器ip端口号加图片 例如我放了一个320.jp 我服务器ip是110.120.120.120 那…

PSINS工具箱学习(二)姿态的表示:姿态阵、四元数、欧拉角、等效旋转矢量的概念和转换

原始 Markdown文档、Visio流程图、XMind思维导图见&#xff1a;https://github.com/LiZhengXiao99/Navigation-Learning 文章目录 一、基础概念1、坐标系定义1. 惯性坐标系&#xff08; i 系 &#xff09;2. 地心地固坐标系&#xff08; e 系 )3. 导航坐标系&#xff08; n 系&…

Multisim14.0仿真(二十五)高频小信号调谐放大器

一、仿真原理图&#xff1a; 二、仿真效果图&#xff1a;

API(十一) 获取openresty编译信息

一 ngx.config 说明&#xff1a; 不常用,了解即可 ngx.config.subsystem 说明&#xff1a; 用的四层还是七层代理 ngx.config.debug 说明&#xff1a; 返回的是boolean类型, openresty rpm安装一般没有 --with-debug编译选项对比&#xff1a; nginx rpm 安装一般携带 --wi…

面试算法13:二维子矩阵的数字之和

题目 输入一个二维矩阵&#xff0c;如何计算给定左上角坐标和右下角坐标的子矩阵的数字之和&#xff1f;对于同一个二维矩阵&#xff0c;计算子矩阵的数字之和的函数可能由于输入不同的坐标而被反复调用多次。例如&#xff0c;输入图2.1中的二维矩阵&#xff0c;以及左上角坐标…