flink cdc oceanbase(binlog模式)

接上文:一文说清flink从编码到部署上线
环境:①操作系统:阿里龙蜥 7.9(平替CentOS7.9);②CPU:x86;③用户:root。

预研初衷:现在很多项目有国产化的要求,操作系统、数据库需要国产化,然后就想着找既能开源免费,又能很好的兼容MySQL,还能很好支持flink。然后就在信创目录找到OceanBase数据库。

flink探索:flink CDC 找到这个文章Flink CDC 配置 OceanBase 实战指南,官网论坛感觉比较靠谱,然而发现按照说明引入依赖后,相关语法是不支持的。也在网上找了比较多的其它资料,中间比较坎坷,都未解决,不再赘述。最后转换思路:既然OceanBase支持MySQL binlog,那就把OceanBase当MySQL用,使用MySQL CDC是不是可以,最后问题得到解决。下面展开说明。

1.OceanBase部署

1.1 obd 部署

官方文档:oceanbase部署

注意:①这个地方最好选择obd 图形化部署,docker部署虽然简单,但是后续安装obbinlog会比较麻烦。②操作系统不要使用CentOS了,好多yum源不能用了。可以使用“阿里龙蜥 7.9”。

部署完,记得保存相关账号信息(供参考):

[{"component": "oceanbase-ce","access_url": "10.86.97.168:2881","user": "root","password": "pwd","connect_url": "obclient -h10.86.97.168 -P2881 -uroot -p'pwd' -Doceanbase -A"},{"component": "obproxy-ce","access_url": "10.86.97.168:2883","user": "root@proxysys","password": "Y6.B4s)pt","connect_url": "obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A \n"},{"component": "ocp-express","access_url": "10.86.97.168:8180","user": "admin","password": "DSxF-{odkdX-bmL6fjrF2{3mLL","connect_url": "http://10.86.97.168:8180"}
]

这个“ocp-express”是个监控页面,能看到集群信息,访问“http://10.86.97.168:8180”:
在这里插入图片描述

1.2 常用命令

启动:obd cluster start myoceanbase(改成具体集群名称)常用命令:
# 查看集群列表
obd cluster list
# 查看集群状态,以部署名为 obtest 为例
obd cluster display obtest
# 停止运行中的集群,以部署名为 obtest 为例
obd cluster stop obtest
# 销毁已部署的集群,以部署名为 obtest 为例
obd cluster destroy obtest

2.obbinlog

2.1 部署

官方文档:obbinlog部署

部署过程中,会遇到这个错误:“https://mirrors.oceanbase.com/community/stable/el/7.9/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found”

解决方法:修改“/etc/yum.repos.d/OceanBase.repo”中,“$releasever”改为“7”。
在这里插入图片描述
解决完上面这个错误,其它地方就比较顺利了。

查看是否安装成功:

netstat -anp | grep 2983

2.2 创建租户

由于“不可以用root@sys创建binlog任务”,所以要创建租户。

1.查看所有的租户信息:
SELECT * FROM oceanbase.DBA_OB_TENANTS;2.查看resource pool:
SELECT * FROM oceanbase.DBA_OB_RESOURCE_POOLS;3.创建“资源规格(UNIT)”
CREATE RESOURCE UNIT S1_unit_flink_testMEMORY_SIZE = '2G',MAX_CPU = 1, MIN_CPU = 1,LOG_DISK_SIZE = '6G',MAX_IOPS = 10000, MIN_IOPS = 10000, IOPS_WEIGHT=1;4.创建resource pool(仅 sys 租户的 root 用户(root@sys)可以创建资源池,其他租户不支持创建资源池)
-- sys_unit_config大概2GB内存。
CREATE RESOURCE POOL tenant_pool_flink_test UNIT='sys_unit_config', UNIT_NUM=1, ZONE_LIST=('zone1');5.创建租户:创建一个名为  flink_test_tenant 的租户(默认为 MySQL 模式租户),副本数为1,资源池指定为 flink_test_tenant_pool_01,Primary Zone 为 zone1,允许所有 IP 连接数据库。
CREATE TENANT IF NOT EXISTS flink_test_tenant  PRIMARY_ZONE='zone1', RESOURCE_POOL_LIST=('tenant_pool_flink_test') set OB_TCP_INVITED_NODES='%';6.使用新创建的租户管理员登录:
用户名:root@flink_test_tenant
密码:默认为空(有需要可以自己设置密码)7.创建用户( CREATE USER 权限较大,默认仅集群管理员和租户管理员拥有此系统权限):
CREATE USER 'test' IDENTIFIED BY 'pwd';
GRANT ALL ON *.* TO 'test';8.常用命令
其它命令,删除用户:
drop user 'test';
删除“资源规格”:
DROP RESOURCE UNIT S1_unit_flink_test;
查询已有的“资源规格”信息:
SELECT * FROM oceanbase.DBA_OB_UNIT_CONFIGS;

2.3 创建数据库

账号:test@flink_test_tenant
密码:pwd。
使用上面账号登录oceanbase后创建数据库。

CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;SET FOREIGN_KEY_CHECKS=0;-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (`dt` varchar(10) NOT NULL ,`uuid` varchar(30) DEFAULT NULL,`report_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;2024-12-25	uid20241225	1735090201740
2024-12-26	uid20241226	1735090201741

2.4 创建binlog

进入binlog server服务:

cd /home/oceanbase-all-in-one/obclient/u01/obclient/bin
obclient -h127.0.0.1 -P2983

创建binlog:

CREATE BINLOG FOR TENANT `myoceanbase`.`flink_test_tenant` TO USER `root` PASSWORD `pwd` WITH CLUSTER URL `http://10.86.97.168:8080/services?Action=ObRootServiceInfo&ObCluster=myoceanbase`,REPLICATE NUM 2;

2.5 配置ODP

账号密码见安装完成保存的账号信息。
obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A
ALTER proxyconfig SET binlog_service_ip='10.86.97.168:2983';

2.6 验证结果

obclient -h10.86.97.168 -P2883 -uroot@flink_test_tenant  -p -Doceanbase -A
默认密码为空(到输密码时直接回车就行)。SHOW MASTER STATUS;SHOW BINLOG EVENTS;

2.7 常见问题

问题描述:CREATE BINLOG 报 “ERROR 1236 (HY000): Internal error”

查看日志:“[error] mysql_connecton_wrapper.cpp(121): Failed to execute query, error: (conn=3221748588) Table ‘flink_test.instances_gtid_seq’ doesn’t exist”,提示没有binlog相关表。

日志路径:/home/ds/oblogproxy/log/logproxy.log

解决:重新执行“sudo sh env/deploy.sh -m deploy -f env/deploy.conf.json”
相关的数据表会重建。然后再执行“CREATE BINLOG”即可。

或者说:应该先创建数据库,再安装obbinlog组件。安装后会在数据库创建binlog相关数据库表,如下:
在这里插入图片描述

3. fink CDC

3.1 核心代码

package com.zl.oceanbase;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;/*** 就当成MySQL使用就行。*/
public class OceanBaseCDCLikeMySQLExample {public static void main(String[] args) throws Exception {List<String> SYNC_TABLES = Arrays.asList("flink_test.rv_table");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("10.86.97.168").port(2883)// oceanbase安装时obproxy-ce组件端口.databaseList("flink_test").tableList(String.join(",", SYNC_TABLES)).username("root@flink_test_tenant").password("")// 记得修改为实际密码.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/OceanBaseCDCLikeMySQLExample");// 如果不能正常读取mysql的binlog:①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).print();env.execute("Print MySQL Snapshot + Binlog");}
}

3.2 flink web

在这里插入图片描述

3.3 控制台日志

在这里插入图片描述

3.4 完整代码

完成代码见:flink-cdc-mysql

4.扩展

本文主要基于oceanbase oblogproxy的binlog模式。
其实oblogproxy还支持CDC模式,详见官网文档:CDC模式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/503970.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Web】0基础学Web—事件对象、事件委托(事件代理)——星级评论案例

0基础学Web—事件对象、事件委托&#xff08;事件代理&#xff09;——星级评论案例 事件对象关闭鼠标右键的点击事件关闭鼠标滚轮的事件点击的目标对象点击鼠标的左键0 滚轮1 右键2获得被点击的节点的名称或取相对于浏览器左上角的距离&#xff08;会受页面滚动条的影响&#…

el-table 多级表头

1.结构 <el-table:data"tableData"border:height"700"style"width: 100% !important; overflow: auto":header-cell-style"{ background: #becee1, color: #333 }":cell-style"{ padding: 5px }"><template v-for…

计算机网络基础——网络协议

""" 资料的搬运工 """ 网络协议是为计算机网络中进行数据交换而建立的规则、标准或者说是约定的集合。 1、网络层次划分 OSI七层网络模型 1&#xff09;物理层 确保原始的数据可在各种物理媒体上传输 中继器&#xff08;放大器&#xff09;和集…

源代码编译安装X11及相关库、vim,配置vim(2)

一、编译安装vim 编译时的cofigure选项如下.只有上一步的X11的包安装全了&#xff08;具体哪些是必须的&#xff0c;哪些是多余的没验证&#xff09;&#xff0c;configure才能认为X的库文件和头文件是可以用的。打开多个编程语言的支持特性。 ./configure --prefixpwd/mybui…

Numpy数组的属性

NumPy中最重要的一个特点就是其n维数组对象&#xff0c;即ndarray(别名array)对象&#xff0c;该对象具有矢量算术能力和复杂的广播能力&#xff0c;可以执行一些科学计算。不同于Python内置的数组类型&#xff0c; array对象拥有对高维数组的处理能力&#xff0c;这也是数值计…

如何隐藏 Nginx 版本号 并自定义服务器信息,提升安全性

&#x1f3e1;作者主页&#xff1a;点击&#xff01; Nginx-从零开始的服务器之旅专栏&#xff1a;点击&#xff01; &#x1f427;Linux高级管理防护和群集专栏&#xff1a;点击&#xff01;点击&#xff01;点击&#xff01; ⏰️创作时间&#xff1a;2025年1月8日8点14分…

ProtonBase 荣获 Datafun “数智技术最佳探索奖”

2024年&#xff0c;数智领域迎来技术创新的高峰&#xff0c;尖端技术和用户案例呈现井喷式增长&#xff0c;成为引领时代潮流的关键词。DataFun 社区作为数智前沿阵地&#xff0c;汇聚全球数智精英&#xff0c;推动技术革新和知识共享&#xff0c;助力技术加速发展。 由 DataFu…

用豆包MarsCode IDE打造精美数据大屏:从零开始的指南

原标题&#xff1a;用豆包MarsCode IDE&#xff0c;从0到1画出精美数据大屏&#xff01; 豆包MarsCode IDE 是一个云端 AI IDE 平台&#xff0c;通过内置的 AI 编程助手&#xff0c;开箱即用的开发环境&#xff0c;可以帮助开发者更专注于各类项目的开发。 作为一名前端开发工…

/src/utils/request.ts:axios 请求封装,适用于需要统一处理请求和响应的场景

文章目录 数据结构解释1. 核心功能2. 代码结构分析请求拦截器响应拦截器 3. 改进建议4. 总结 console.log(Intercepted Response:, JSON.stringify(response));{"data": {"code": 0,"msg": "成功","data": {"id":…

LabVIEW调用不定长数组 DLL数组

在使用 LabVIEW 调用 DLL 库函数时&#xff0c;如果函数中的结构体包含不定长数组&#xff0c;直接通过 调用库函数节点&#xff08;Call Library Function Node&#xff09; 调用通常会遇到问题。这是因为 LabVIEW 需要与 DLL 中的数据结构完全匹配&#xff0c;而包含不定长数…

课题推荐——基于GPS的无人机自主着陆系统设计

关于“基于GPS的无人机自主着陆系统设计”的详细展开&#xff0c;包括项目背景、具体内容、实施步骤和创新点。如需帮助&#xff0c;或有导航、定位滤波相关的代码定制需求&#xff0c;请点击文末卡片联系作者 文章目录 项目背景具体内容实施步骤相关例程MATLAB例程python例程 …

深入Android架构(从线程到AIDL)_18 SurfaceView的UI多线程02

目录 2、 使用SurfaceView画2D图 范例一 设计GameLoop(把小线程移出来) 范例二 2、 使用SurfaceView画2D图 范例一 以SurfaceView绘出Bitmap图像设计SpriteView类别来实作SurfaceHolder.Callback接口首先来看个简单的程序&#xff0c;显示出一个Bitmap图像。这个图像就构…

Redis Java 集成到 Spring Boot

Hi~&#xff01;这里是奋斗的明志&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f331;&#x1f331;个人主页&#xff1a;奋斗的明志 &#x1f331;&#x1f331;所属专栏&#xff1a;Redis &#x1f4da;本系列文章为个人学习笔…

鸿蒙开发(29)弹性布局 (Flex)

概述 弹性布局&#xff08;Flex&#xff09;提供更加有效的方式对容器中的子元素进行排列、对齐和分配剩余空间。常用于页面头部导航栏的均匀分布、页面框架的搭建、多行数据的排列等。 容器默认存在主轴与交叉轴&#xff0c;子元素默认沿主轴排列&#xff0c;子元素在主轴方…

LangChain速成课程_构建基于OpenAI_LLM的应用

思维导图 什么是LangChain 特点描述基于语言模型LangChain 是一个专为语言模型&#xff08;如 GPT-4&#xff09;设计的开发框架。模型输入/输出支持灵活的模型输入和输出处理&#xff0c;可以适应各种不同的应用需求。数据感知能够将语言模型与其他数据源&#xff08;如维基百…

将txt转成excel正则化公式的调整

将训练的结果转换成excel是送到画图的关键&#xff0c;但是在转的过程中出现了问题&#xff0c;发现是正则化公式的结果。 使用网站进行调试&#xff0c;最终可以转了。下面是调试的工具以及调试好的代码。 regex101: build, test, and debug regex 上面是正则化公式&#xf…

Linux的proc目录与什么有关?【以及它里面的文件各自记录着什么信息】

在 Linux 系统中&#xff0c;/proc 目录是一个虚拟文件系统&#xff0c;提供了关于内核、进程和系统状态的实时信息。它与系统的 内核 和 进程 运行状态紧密相关&#xff0c;是系统管理员、开发人员和用户了解系统运行状况的重要途径。 /proc 目录的名称来源于 “process”&am…

28、使用StreamPark管理作业中,关于默认环境变量设置和默认动态参数设置的修改

在使用过一段时间的streampark后&#xff0c;发现flink on k8s作业配置过于繁琐了&#xff0c;特别是pod-template.yaml的编写&#xff08;主要是环境变量设置&#xff0c;环境变量关系着前面的日志插件中通过环境变量获取作业名称&#xff09;&#xff0c;动态参数的编写&…

springboot + vue+elementUI图片上传流程

1.实现背景 前端上传一张图片&#xff0c;存到后端数据库&#xff0c;并将图片回显到页面上。上传组件使用现成的elementUI的el-upload。、 2.前端页面 <el-uploadclass"upload-demo"action"http://xxxx.xxx.xxx:9090/file/upload" :show-file-list&q…

《HeadFirst设计模式》笔记(上)

设计模式的目录&#xff1a; 1 设计模式介绍 要不断去学习如何利用其它开发人员的智慧与经验。学习前人的正统思想。 我们认为《Head First》的读者是一位学习者。 一些Head First的学习原则&#xff1a; 使其可视化将文字放在相关图形内部或附近&#xff0c;而不是放在底部…