TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

  • 一、技术流程
  • 二、搭建环境
  • 三、创建Kafka changefeed
  • 四、写入数据以产生变更日志
  • 五、配置 Flink 消费 Kafka 数据

一、技术流程

  • 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群
  • 创建 changefeed,将 TiDB 增量数据输出至 Kafka
  • 使用 go-tpc 写入数据到上游 TiDB
  • 使用 Kafka console consumer 观察数据被写入到指定的 Topic
  • (可选)配置 Flink 集群消费 Kafka 内数据

二、搭建环境

部署包含 TiCDC 的 TiDB 集群

在实验或测试环境中,可以使用 TiUP Playground 功能,快速部署 TiCDC,命令如下:

tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1
# 查看集群状态
tiup status

三、创建Kafka changefeed

1.创建 changefeed 配置文件

根据 Flink 的要求和规范,每张表的增量数据需要发送到独立的 Topic 中,并且每个事件需要按照主键值分发 Partition。因此,需要创建一个名为 changefeed.conf 的配置文件,填写如下内容:

[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
]

2.创建一个 changefeed,将增量数据输出到 Kafka

tiup ctl:v<CLUSTER_VERSION> cdc changefeed 
create --server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" 
--changefeed-id="kafka-changefeed" 
--config="changefeed.conf"

如果命令执行成功,将会返回被创建的 changefeed 的相关信息,包含被创建的 changefeed 的 ID 以及相关信息,内容如下:

Create changefeed successfully!
ID: kafka-changefeed
Info: {... changfeed info json struct ...}

如果命令长时间没有返回,你需要检查当前执行命令所在服务器到 sink-uri 中指定的 Kafka 机器的网络可达性,保证二者之间的网络连接正常。

生产环境下 Kafka 集群通常有多个 broker 节点,你可以在 sink-uri 中配置多个 broker 的访问地址,这有助于提升 changefeed 到 Kafka 集群访问的稳定性,当部分被配置的 Kafka 节点故障的时候,changefeed 依旧可以正常工作。假设 Kafka 集群中有 3 个 broker 节点,地址分别为 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092,可以参考如下 sink-uri 创建 changefeed:

tiup ctl:v<CLUSTER_VERSION> cdc changefeed create 
--server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" 
--config="changefeed.conf"

3.Changefeed 创建成功后,执行如下命令,查看 changefeed 的状态

tiup ctl:v<CLUSTER_VERSION> cdc changefeed list --server="http://127.0.0.1:8300"

四、写入数据以产生变更日志

完成以上步骤后,TiCDC 会将上游 TiDB 的增量数据变更日志发送到 Kafka,下面对 TiDB 写入数据,以产生增量数据变更日志。

1.模拟业务负载

在测试实验环境下,可以使用 go-tpc 向上游 TiDB 集群写入数据,以让 TiDB 产生事件变更数据。如下命令,首先在上游 TiDB 创建名为 tpcc 的数据库,然后使用 TiUP bench 写入数据到这个数据库中。

tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s

2.消费 Kafka Topic 中的数据

changefeed 正常运行时,会向 Kafka Topic 写入数据,你可以通过由 Kafka 提供的 kafka-console-consumer.sh,观测到数据成功被写入到 Kafka Topic 中:

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`

至此,TiDB 的增量数据变更日志就实时地复制到了 Kafka。下一步,你可以使用 Flink 消费 Kafka 数据。当然,你也可以自行开发适用于业务场景的 Kafka 消费端。

五、配置 Flink 消费 Kafka 数据

1.安装 Flink Kafka Connector

在 Flink 生态中,Flink Kafka Connector 用于消费 Kafka 中的数据并输出到 Flink 中。Flink Kafka Connector 并不是内建的,因此在 Flink 安装完毕后,还需要将 Flink Kafka Connector 及其依赖项添加到 Flink 安装目录中。下载下列 jar 文件至 Flink 安装目录下的 lib 目录中,如果你已经运行了 Flink 集群,请重启集群以加载新的插件。

  • flink-connector-kafka-1.17.1.jar
  • flink-sql-connector-kafka-1.17.1.jar
  • kafka-clients-3.5.1.jar

2.创建一个表

可以在 Flink 的安装目录执行如下命令,启动 Flink SQL 交互式客户端:

[root@flink flink-1.15.0]# ./bin/sql-client.sh

随后,执行如下语句创建一个名为 tpcc_orders 的表:

CREATE TABLE tpcc_orders (o_id INTEGER,o_d_id INTEGER,o_w_id INTEGER,o_c_id INTEGER,o_entry_d STRING,o_carrier_id INTEGER,o_ol_cnt INTEGER,o_all_local INTEGER
) WITH (
'connector' = 'kafka',
'topic' = 'tidb_tpcc_orders',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest'
)

请将 topic 和 properties.bootstrap.servers 参数替换为环境中的实际值。

3.查询表内容

执行如下命令,查询 tpcc_orders 表中的数据:

SELECT * FROM tpcc_orders;

执行成功后,可以观察到有数据输出,如下图

在这里插入图片描述
至此,就完成了 TiDB 与 Flink 的数据集成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/92909.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

三种MMIC放大器偏置电压顺序

HBT自偏置放大器偏置顺序 1、有两种HBT放大器&#xff0c;自偏置和带电流控制的自偏置&#xff0c;下图是HBT自偏置放大器最简单的偏置。这些放大器只需要接通集电极电压。有一个设置电流的偏置电阻。放大器有一个电流反射镜来控制基极电压。 2、 电阻值的计算 3、打开电源并…

【BASH】回顾与知识点梳理(三十一)

【BASH】回顾与知识点梳理 三十一 三十一. 进程的管理31.1 给进程发送讯号kill -signal PIDlinux系统后台常驻进程killall -signal 指令名称 31.2 关于进程的执行顺序Priority 与 Nice 值nice &#xff1a;新执行的指令即给予新的 nice 值renice &#xff1a;已存在进程的 nice…

机器学习笔记:线性链条件随机场(CRF)

0 引入&#xff1a;以词性标注为例 比如我们要对如下句子进行标注&#xff1a; “小明一把把把把住了”那么我么可能有很多种词性标注的方法&#xff0c;中间四个“把”&#xff0c;可以是“名词名词动词名词”&#xff0c;可以是“名词动词动词名词”等多种形式。 那么&#…

图片转换成pdf格式?这几种转换格式方法了解一下

图片转换成pdf格式&#xff1f;将图片转换成PDF格式的好处有很多。首先&#xff0c;PDF格式具有通用性&#xff0c;可以在几乎任何设备上查看。其次&#xff0c;PDF格式可以更好地保护文件&#xff0c;防止被篡改或者复制。此外&#xff0c;PDF格式还可以更好地压缩文件大小&am…

django-基本环境配置

文章目录 django 环境安装1. 安装环境1.1 安装 Python (配置虚拟环境)1.1.1 步骤 1.2 Conda配置环境参考 django 环境安装 1. 安装环境 1.1 安装 Python (配置虚拟环境) 由于国外源速度慢&#xff0c;可以pip添加清华源 pip config set global.index-url https://pypi.tuna.…

汽车级36V、4A同步降压转换器MAX20404AFOD/VY、MAX20404AFOC/VY、MAX20404AFOA/VY开关稳压器

MAX20404是小型同步降压转换器&#xff0c;集成了高端和低端开关。这些IC均设计为可在3V到36V的宽输入电压范围内提供高达4A的电流。电压质量可以通过观察PGOOD信号来监测。该器件可以在99%的占空比下运行&#xff0c;非常适合汽车和工业应用。 MAX20404提供可编程输出电压或5…

使用RoBERT进行fine tune来复现GLUE的效果

文章目录 一. 参考博客or文献二. Proprocess GLUE task data2.1 下载GLUE的数据集2.2 预处理GLUE的数据集2.2.1 算法思路与整体代码以及运行结果图2.2.2 完整代码与处理结果 三. 使用预处理好的数据集进行 finetune3.1 将RoBERTa的模型下载到本地3.2 微调任务之RTE(句子二分类任…

python编程需要的电脑配置,python编程对电脑的要求

大家好&#xff0c;给大家分享一下python编程用什么笔记本电脑&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 不打游戏&#xff0c;只学编程。刚开始自学 Python小发猫伪原创,python下载需要花钱吗。 如果不搞机器学习的话&#xff0c;也…

TypeScript 语法

环境搭建 以javascript为基础构建的语言&#xff0c;一个js的超集&#xff0c;可以在任何支持js的平台中执行&#xff0c;ts扩展了js并且添加了类型&#xff0c;但是ts不能被js解析器直接执行&#xff0c;需要编译器编译为js文件&#xff0c;然后引入到 html 页面使用。 ts增…

Blender增强现实3D模型制作指南【AR】

推荐&#xff1a;用 NSDT编辑器 快速搭建可编程3D场景 将静态和动画 3D 内容集成到移动增强现实 (AR) 体验中是增强用户沉浸感和参与度的高效方法。 然而&#xff0c;为 AR 创建 3D 对象可能相当艰巨&#xff0c;尤其是对于那些缺乏 3D 建模经验的人来说。 与添加视频或照片 AR…

Mariadb高可用MHA (四十二)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、概述 1.1 概念 1.2 组成 1.3 特点 1.4 工作原理 二、构建MHA 2.1 ssh免密登录 2.2 主从复制 2.3 MHA安装 2.3.1所有节点安装perl环境 2.3..2 node 2.3.…

零售行业供应链管理核心KPI指标(三)

完美订单满足率和退货率 完美订单满足率有三个方面的因素影响&#xff1a;订单按时、足量、无损交货。通常情况下零售企业追求线上订单履行周期慢慢达到行业平均水平&#xff0c;就是交付的速度变快了&#xff0c;这个肯定是一件好事情&#xff0c;趋势越来越好。 同时&#…

Vim的插件管理器之Vundle

1、安装Vundle插件管理器 Vim可以安装插件&#xff0c;但是需要手动安装比较麻烦&#xff0c;Vim本身没有提供插件管理器&#xff0c;所以会有很多的第三方的插件管理器&#xff0c;有一个vim的插件叫做 “vim-easymotion”&#xff0c;在它的github的安装说明里有列出对于不同…

log4j:WARN No appenders could be found for logger问题

本文将idea场景下的使用。 IDEA中&#xff0c;将配置文件命名为log4j.properties&#xff08;该命名才会被自动加载&#xff09;&#xff0c; 并放到某个目录下&#xff08;通常放到resources目录&#xff09;&#xff0c;并在resources上右键&#xff0c;找到Mark Directory a…

Nginx转发请求到后端服务报400 Bad Request

问题描述 系统部署好后&#xff0c;进行测试时发现有部分接口出错&#xff0c;项目采用Nginx作为后端代理服务器&#xff0c;有Nginx统一将请求转发到后端的网关服务&#xff0c;再由网关服务路由到具体的服务上&#xff0c;发布好后&#xff0c;大部分接口都是正常的&#xff…

POSTGRESQL 关于2023-08-14 数据库自动启动文章中使用KILL 来进行配置RELOAD的问题解释...

开头还是介绍一下群&#xff0c;如果感兴趣Polardb ,mongodb ,MySQL ,Postgresql ,redis &#xff0c;SQL SERVER ,ORACLE,Oceanbase 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请加 liuaustin3微信号 &…

常见排序集锦-C语言实现数据结构

目录 排序的概念 常见排序集锦 1.直接插入排序 2.希尔排序 3.选择排序 4.堆排序 5.冒泡排序 6.快速排序 hoare 挖坑法 前后指针法 非递归 7.归并排序 非递归 排序实现接口 算法复杂度与稳定性分析 排序的概念 排序 &#xff1a;所谓排序&#xff0c;就是使一串记录&#…

【计算机网络】13、ARP 包:广播自己的 mac 地址和 ip

机器启动时&#xff0c;会向外广播自己的 mac 地址和 ip 地址&#xff0c;这个即称为 arp 协议。范围是未经过路由器的部分&#xff0c;如下图的蓝色部分&#xff0c;范围内的设备都会在本地记录 mac 和 ip 的绑定信息&#xff0c;若有重复则覆盖更新&#xff08;例如先收到 ma…

ESP32+VSCode开发环境搭建(全网最强最终解决方案)

文章目录 1 安装步骤2 开发机器环境准备3 安装ESP-IDF-tools离线包4 创建VSCode配置文件(纯净的开发环境)5 安装espressif IDF 插件6 程序测试7 常见问题7.1环境变量设置问题&#xff1f;问题1&#xff1a;到底是设置IDF_TOOLS_PATH和IDF_PATH还是只配置一个IDF_TOOLS_PATH? 7…

Spring的简介ioc容器及注入方式

一.Spring的简介 1.Spring的特性 Spring是一个开源框架&#xff0c;它由Rod Johnson创建。它是为了解决企业应用开发的复杂性而创建的。 Spring使用基本的JavaBean来完成以前只可能由EJB完成的事情。 然而&#xff0c;Spring的用途不仅限于服务器端的开发。从简单性、可测试性…