【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

何为FLINK-CDC?

CDC是Change Data Capture的缩写,中文意思是变更数据获取,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。
下图是flink-cdc最新支持的数据源类型:
在这里插入图片描述

对有记录事务操作的kafka数据源通过flink-cdc实现实时数据同步

kafka的数据源要通过flink-cdc进行实时数据同步,并更新到目标数据库:例如mysql、postgres、oracle等传统关系型数据库,或者是clickhouse、TiDb等关系型数据库,或者是其他,首先要符合以下条件:

  1. kafka的数据记录了事务操作
  2. kakfa的数据描述了主键
  3. kafka的数据有严格的更新时间先后顺序,即源端先更新(增、删、改)的数据会先进入kafka。
    符合以下几点的kafka数据即可以作为flink-cdc采集的数据源,并实时同步到目标库。

如何通过具体编程来实现以上思路(例子:kafka数据源通过flink-cdc实时入库mysql)

本文不掺杂任何代码,只提供思路,思路大致可按以下步骤实现:

  1. 新建一个kafka的topic,用于接收kafka格式转换后的数据,topic只设置一个分区(原因是保证cdc读取数据时是顺序读取)。
  2. 编写一个map函数,用于转换kafka的数据为debezium格式的json,或者转换为canal格式的json,flink的官网分别有这两种格式的样例
    debezium:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/formats/debezium/
    canal:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/formats/canal/
    函数的输入是源kafka-topic的一条数据,格式为字符串,函数的输出为经过转换后的kafka-topic的一条数据,格式为字符串。
  3. 将2的数据,即map输出的rdd,写入1中的kafka,控制写入的线程数为单线程(原因是保证数据顺序写入)。
  4. 编写flink-sql建表语句,使用flink的table api,选取kafka connectorjdbc connector作为转换后的kafka源端和mysql目标端的连接方式,如果kafka数据源转换为debezium格式则’value.format’ = ‘debezium-json’,如果kafka的数据源转换为canal格式则’value.format’ = ‘canal-json’。
  5. 编写flink-sql数据插入语句(由kafka数据源的flink表数据插入到mysql也就是jdbc数据源的数据库),再提交flink任务。

思路总结

以上思路使用了kafka数据源转换成flink-cdc可识别的kafka数据源的思想,再利用flink的table api,实现程序内数据源的指定,通过flink-sql,实现数据源之间(源端与目标端之间的数据同步),flink就会对目标端数据源进行实时增删改。

创作不易,各位看官且看且珍惜!如果该文章能让您在探索开发的路上有所帮助,麻烦点个关注支持一下,本人后续会持续有新的作品推出,各位的支持是本人持续创作最伟大的动力!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/168269.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过热敏电阻计算温度(二)---ODrive实现分析

文章目录 通过热敏电阻计算温度(二)---ODrive实现分析测量原理图计算分析计算拟合的多项式系数根据多项式方程计算温度的函数温度计算调用函数 通过热敏电阻计算温度(二)—ODrive实现分析 ODrive计算热敏电阻的温度采用的时B值的…

# 开发趋势 Java Lambda 表达式 第三篇

开发趋势 Java Lambda 表达式 第三篇 一&#xff0c;Lambda 整合集合常规操作 List Java Lambda 表达式可以与List集合和常规操作进行整合&#xff0c;以提供一种更简洁、更可读的代码编写方式。以下是几个示例&#xff1a; 集合遍历操作&#xff1a; List<String> n…

超声波清洗机频率如何选择?高频和低频有什么区别

超声波清洗原理就是在清洗液中产生“空化效应”&#xff0c;即清洗液产生拉伸和压缩现象&#xff0c;清洗液拉伸时会产生大量微小气泡&#xff0c;清洗液压缩时气泡会被压碎破裂。这些气泡产生和破裂的局部压强可达到上千个大气压的冲击力&#xff0c;这种极强大的压力足以使得…

HBuilder打包的安卓app开屏页广告如何关闭

HBuilder打包的安卓app开屏页广告如何关闭 如上图所示&#xff0c;在打包安卓app时会默认勾选 基础开屏广告 而且无法取消 解决办法 1. 登陆 uni-ad广告联盟 网站 2. 访问广告设置链接 3. 4. 选择你的项目 5. 6. 7.

二叉排序树(BST)

二叉排序树 基本介绍 二叉排序树创建和遍历 class Node:"""创建 Node 节点"""value: int 0left Noneright Nonedef __init__(self, value: int):self.value valuedef add(self, node):"""添加节点node 表示要添加的节点&quo…

Linux高性能编程学习-TCP/IP协议族

一、TCP/IP协议族结构与主要协议 分层&#xff1a;数据链路层、网络层、传输层、应用层 1. 数据链路层 功能&#xff1a;实现网卡驱动程序&#xff0c;处理数据在不同物理介质的传输 协议&#xff1a; ARP&#xff1a;将目标机器的IP地址转成MAC地址RARP&#xff1a;将MAC地…

json-server工具准备后端接口服务环境

1.安装全局工具json-server&#xff08;全局工具仅需要安装一次&#xff09; 官网&#xff1a;json-server - npm 点击Getting started可以查看使用方法 在终端中输入yarn global add json-server或npm i json-server -g 如果输入json-server -v报错 再输入npm install -g j…

向量检索库Milvus架构及数据处理流程

文章目录 背景milvus想做的事milvus之前——向量检索的一些基础近似算法欧式距离余弦距离 常见向量索引1&#xff09; FLAT2&#xff09; Hash based3&#xff09; Tree based4&#xff09; 基于聚类的倒排5&#xff09; NSW&#xff08;Navigable Small World&#xff09;图 向…

基于卷积优化优化的BP神经网络(分类应用) - 附代码

基于卷积优化优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于卷积优化优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.卷积优化优化BP神经网络3.1 BP神经网络参数设置3.2 卷积优化算法应用 4.测试结果…

目标检测的方法

目标检测大致分为两个方向:基于传统的目标检测算法和基于深度学习的目标检测算法。 1.基于传统的目标检测算法 在利用深度学习做物体检测之前,传统算法对于目标检测通常分为3个阶段:区域选取、特征提取和体征分类。 2.基于深度学习的目标检测算法 目标检测任务可分为两

密码登录虽安全,但有时很麻烦!如何禁用或删除Windows 11中的密码登录

如果你想在Windows 11上自动登录,在本指南中,我们将向你展示如何删除你的帐户密码。 在Windows 11上,你可以至少通过三种方式从帐户中删除登录密码。在你的帐户上使用密码有助于保护你的计算机和文件免受来自internet或本地的未经授权的访问。然而,在某些情况下,密码可能…

使用Github.io创建自己的博客

文章目录 1.最终效果2.操作步骤2.1 前置操作2.2 按照自己需求修改内容2.2.1 基本修改2.2.2 额外添加知乎等社交网站链接 2.3 首页修改2.4 查看发布状态2.5 奇怪的错误(头像显示错误)2.6 本地调试2.7 后续修改 3. 项目设置为私密&#xff08;要付费升级账号才行❌&#xff09;3.…

Unity3D 在做性能优化时怎么准确判断是内存、CPU、GPU瓶颈详解

Unity3D是一款广泛应用于游戏开发的跨平台游戏引擎&#xff0c;但在开发过程中&#xff0c;我们经常会遇到性能瓶颈问题&#xff0c;如内存、CPU和GPU瓶颈。本文将详细介绍在Unity3D中如何准确判断和解决这些瓶颈问题&#xff0c;并给出相应的技术详解和代码实现。 对惹&#…

【大模型应用开发教程】02_LangChain介绍

LangChain介绍 什么是 LangChain1. 模型输入/输出2. 数据连接3. 链&#xff08;Chain&#xff09;4. 记忆&#xff08;Meomory&#xff09;5. 代理&#xff08;Agents&#xff09;6.回调&#xff08;Callback&#xff09;在哪里传入回调 ?你想在什么时候使用这些东西呢&#x…

imu预积分学习(更新中)

imu预积分学习&#xff08;更新中&#xff09; IMU预积分可以做什么&#xff1f; 以上面那个经典图片为例子&#xff0c;IMU可以通过六轴数据&#xff0c;拿到第i帧和第j帧之间的相对位姿&#xff0c;这样不就可以去用来添加约束了吗 但是有一个比较大的问题是&#xff1a; I…

大数据技术学习笔记(三)—— Hadoop 的运行模式

目录 1 本地模式2 伪分布式模式3 完全分布式模式3.1 准备3台客户机3.2 同步分发内容3.2.1 分发命令3.2.2 执行分发操作 3.3 集群配置3.3.1 集群部署规划3.3.2 配置文件说明3.3.3 修改配置文件3.3.4 分发配置信息 3.4 SSH无密登录配置3.4.1 配置ssh3.4.2 无密钥配置 3.5 单点启动…

【C++面向对象】1. 类、对象

文章目录 【 1. 类 & 对象的定义 】1.1 类的定义1.2 对象的定义 【 2. 类的成员 】2.1 数据成员2.2 成员函数类的内部定义成员函数类的外部定义成员函数成员函数的访问实例 【 3. 类的访问修饰符 】3.1 public 公有成员3.2 private 私有成员3.3 protected 保护成员3.4 继承…

生成对抗网络

目录 0. Abstract 1. Introduction 2. Relatedwork 3.Experiments 4.Advantages and disadvantages 5.Conclusions and future work&#xff08;idea&#xff09; 6. 网络训练源代码 0. Abstract 我们提出了一个新的框架&#xff0c;通过一个对抗的过程来估计生成模型&#xf…

每日一题 2678. 老人的数目(简单)

简单题&#xff0c;不多说 class Solution:def countSeniors(self, details: List[str]) -> int:ans 0for l in details:if int(l[11:13]) > 60:ans 1return ans

数据结构与算法-(10)---列表(List)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…