五.海量数据实时分析-FlinkCDC+DorisConnector实现数据的全量增量同步

前言

前面四篇文字都在学习Doris的理论知识,也是比较枯燥,当然Doris的理论知识还很多,我们后面慢慢学,本篇文章我们尝试使用SpringBoot来整合Doris完成基本的CRUD。

由于 Doris 高度兼容 Mysql 协议,两者在 SQL 语法方面有着比较强的一致性,另外 Mysql 客户端也是 Doris 官方选择的客户端。因此,如需对 Mysql 进行数据分析,使用 Doris 的迁移成本较低。但是对于数据规模特别大的情况下Mysql的方式还是不太建议的,所以这里还会介绍一种 方式就是通过CDC+DorisConnector

一.整合Mybatis操作Doris

1.准备数据库

CREATE TABLE `doris_test` (`id` int NULL COMMENT "id",`price` decimal(10,2) NULL COMMENT "价格",`title` varchar(20)NULL COMMENT "标题") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 1PROPERTIES ("replication_num" = "1");
Query OK, 0 rows affected (0.06 sec)

2.搭建SpringBoot项目

第一步:创建SpringBoot项目,导入依赖,主要是Mybatis相关的依赖

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--如果要用传统的xml或properties配置,则需要添加此依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId></dependency><!--mybatisplus持久层依赖--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.23</version></dependency></dependencies>

3.整合Mybatis

创建好启动类,实体类,服务层,持久层,SQL映射文件等,这里和我们操作Mysql没有任何区别.

启动类如下,通过 @MapperScan 来扫描Mapper接口

@SpringBootApplication
@MapperScan(basePackages = "cn.whale.mapper")
public class DorisApplication {public static void main(String[] args) {SpringApplication.run(DorisApplication.class,args);}
}

服务层和持久层代码如下 , 省略了部分代码

@Service
public class DorisServiceImpl implements DorisService {@AutowiredDorisMapper dorisMapper;@Overridepublic List<Order> listDoris() {return dorisMapper.listDoris();}@Overridepublic int add(Order order) {return dorisMapper.add(order);}
}public interface DorisMapper {List<Order> listDoris();int add(Order order);}

下面是sql映射文件,和操作数据库没有任何区别

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.whale.mapper.DorisMapper"><select id="listDoris" resultType="cn.whale.domain.Order">select id,price,title from orders</select><insert id="add" parameterType="cn.whale.domain.Order">INSERT INTO orders(id,price,title) VALUES(#{id},#{price},#{title})</insert>
</mapper>

4.编写配置文件

server:port: 8080
spring:#Doris数据库连接配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.220.253:9030/app_db?characterEncoding=utf-8&useSSL=falseusername: rootpassword: 123456type: com.alibaba.druid.pool.DruidDataSourceinitial-size: 500min-idle: 500max-active: 500#mybatis的相关配置
mybatis:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.wudl.doris.domain.DorisTest

5.编写测试类

注入Service,完成查询和添加的测试

package cn.whale.service;import cn.whale.DorisApplication;
import cn.whale.domain.Order;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.math.BigDecimal;
import java.util.List;@RunWith(SpringRunner.class)
@SpringBootTest(classes= DorisApplication.class)
public class DorisServiceTest {@Autowiredprivate DorisService dorisService;@Testpublic void listDoris() {List<Order> orders = dorisService.listDoris();orders.stream().forEach(System.out::println);}@Testpublic void addDoris() throws InterruptedException {for (int i = 100 ; i>0 ; i--){Order order = new Order();order.setId(Long.valueOf(i));order.setPrice(new BigDecimal(i));order.setTitle(i+"元");dorisService.add(order);}Thread.sleep(100000);}
}

二.SpringBoot整合FlinkCDC+doris-connector实时同步

上面通过Mysql客户端的方式来进行数据的同步方式虽然使用起来比较简单,但是不适合大量数据的实时同步,性能不是很好。Doris官方提供了doris-connector进行Doris读写,那么我们可以通过FlinkCDC监听Mysql数据变更,然后同步到SpringBoot项目中,对数据进行处理后,然后通过doris-connector同步到Doris。虽然我们之前也介绍过直接通过FlinkCDC同步Mysql到Doris,那种方式我们没办法对数据进行处理。

1.搭建项目导入依赖

版本兼容
在这里插入图片描述

  • flink-connector-mysql-cdc :flinkCDC,实时监听Mysql增量或者全量同步
  • flink-doris-connector :用来读写Doris的驱动
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.13</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!--mysql -cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency><!-- flink doris connector --><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.13_2.12</artifactId><version>1.0.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency></dependencies>

2.定义消息序列化器

定义序列化器,当拿到Mysql的数据后我们可以通过虚拟化器对数据进行格式化


/*** @desc mysql消息读取自定义序列化**/
public class MysqlDeserialization implements DebeziumDeserializationSchema<String> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String CREATE = "CREATE";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String UPDATE = "UPDATE";/**** 反序列化数据,转为变更JSON对象*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) {//拿到数据Struct struct = (Struct) sourceRecord.value();//输出数据collector.collect(getJsonObject(struct, AFTER).toJSONString());}/**** 从原数据获取出变更之前或之后的数据*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}
}

3.Mysql监听器

监听器的作用主要是监听Mysql的数据变更后,通过序列化器把数据进行处理后,然后同步到Doris中


/*** @desc mysql变更监听**/
@Component
public class MysqlEventListener implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//设置Mysql数据源DebeziumSourceFunction<String> dataChangeInfoMySqlSource = buildDataChangeSource();DataStream<String> streamSource = env.addSource(dataChangeInfoMySqlSource, "mysql-source").setParallelism(1);//streamSource.addSink(dataChangeSink);Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("strip_outer_array", "true");//配置Doris信息streamSource.addSink(DorisSink.sink(DorisExecutionOptions.builder().setBatchSize(3).setBatchIntervalMs(10L).setMaxRetries(3).setStreamLoadProp(pro).setEnableDelete(true).build(),DorisOptions.builder().setFenodes("192.168.220.253:8030").setTableIdentifier("app_db.orders").setUsername("root").setPassword("123456").build()));env.execute("mysql-stream-cdc");}/*** 构造变更数据源*/private DebeziumSourceFunction<String> buildDataChangeSource() {return MySqlSource.<String>builder().hostname("192.168.220.253").port(3307).databaseList("app_db").tableList("app_db.orders").username("root").password("123456")/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)* latest:只进行增量导入(不读取历史变化)* timestamp:指定时间戳进行数据导入(大于等于指定时间读取数据)*/.startupOptions(StartupOptions.latest())//序列化.deserializer(new MysqlDeserialization()).serverTimeZone("GMT+8").build();}
}

4.最后编写一个启动类

@SpringBootApplication
public class FlinkCdcApplication {public static void main(String[] args) {SpringApplication.run(FlinkCdcApplication.class, args);}}

5.准备好数据库

我的Mysql使用的是 :app_db.orders 对应的是Doris中的 app_db.orders ,字段类型,字段名需要一致哦。然后启动项目,修改Mysql的数据,Doris会自动同步过去

三.其他同步方式

Doris还支持其他的数据读写方式,具体的可以根据官网案例去尝试地址:https://doris.apache.org/zh-CN/docs/1.2/ecosystem/spark-doris-connector

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/438764.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis数据库与GO(二):list,set

一、list&#xff08;列表&#xff09; list&#xff08;列表&#xff09;是简单的字符串列表&#xff0c;按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。List本质是个链表&#xff0c; list是一个双向链表&#xff0c;其元素是有序的&#xff0c;元…

GS-SLAM论文阅读笔记-CaRtGS

前言 这篇文章看起来有点像Photo-slam的续作&#xff0c;行文格式和图片类型很接近&#xff0c;而且貌似是出自同一所学校的&#xff0c;所以推测可能是Photo-slam的优化与改进方法&#xff0c;接下来具体看看改进了哪些地方。 文章目录 前言1.背景介绍GS-SLAM方法总结 2.关键…

uniapp+Android面向网络学习的时间管理工具软件 微信小程序

目录 项目介绍支持以下技术栈&#xff1a;具体实现截图HBuilderXuniappmysql数据库与主流编程语言java类核心代码部分展示登录的业务流程的顺序是&#xff1a;数据库设计性能分析操作可行性技术可行性系统安全性数据完整性软件测试详细视频演示源码获取方式 项目介绍 用户功能…

STM32F103C8----3-3 蜂鸣器(跟着江科大学STM32)

一&#xff0c;电路图 &#xff08;接线图&#xff09; 面包板的的使用请参考&#xff1a;《面包板的使用_面包板的详细使用方法-CSDN博客》 二&#xff0c;目的/效果 3-3 蜂鸣器 三&#xff0c;创建Keil项目 详细参考&#xff1a;《STM32F103C8----2-1 Keil5搭建STM32项目模…

Linux ssh 免密登录配置

参考资料 ~/.ssh/configについて~/.ssh/configを使ってSSH接続を楽にする.ssh/configファイルでSSH接続を管理する 目录 一. 密钥生成1.1 生成工具1.1.1 OpenSSH1.1.2 Git 1.2 生成命令1.3 注意事项1.4 解决路径中的用户名乱码 二. 将公钥配置到目标服务&#xff0c;免密登录2…

Spring Boot集成encache快速入门Demo

1.什么是encache EhCache 是一个纯 Java 的进程内缓存框架&#xff0c;具有快速、精干等特点&#xff0c;是 Hibernate 中默认的 CacheProvider。 Ehcache 特性 优点 快速、简单支持多种缓存策略&#xff1a;LRU、LFU、FIFO 淘汰算法缓存数据有两级&#xff1a;内存和磁盘&a…

Linux bash脚本 远程开发环境配置

参考资料 太香了&#xff0c;VSCode远程开发插件&#xff0c;值得一试Visual Studio Code で Remote SSH する。Managing extensions 目录 一. 远程开发必备二. 连接远程开发服务器三. 安装远程开发插件 一. 远程开发必备 ⏹ VSCode插件 Remote - SSH 通过使用 SSH 链接虚拟…

C++之多态篇(超详细版)

1.多态概念 多态就是多种形态&#xff0c;表示去完成某个行为时&#xff0c;当不同的人去完成时会有不同的形态&#xff0c;举个例子在车站买票&#xff0c;可以分为学生票&#xff0c;普通票&#xff0c;军人票&#xff0c;每种票的价格是不一样的&#xff0c;当你是不同的身…

如何高效删除 MySQL 日志表中的历史数据?实战指南

在处理高并发的物联网平台或者其他日志密集型应用时&#xff0c;数据库中的日志表往往会迅速增长&#xff0c;数据量庞大到数百GB甚至更高&#xff0c;严重影响数据库性能。如何有效管理这些庞大的日志数据&#xff0c;特别是在不影响在线业务的情况下&#xff0c;成为了一项技…

使用Windows远程桌面连接Linux

要在Kali Linux上使用Windows远程桌面连接&#xff08;MSTSC.exe&#xff09;&#xff0c;你可以通过配置xrdp服务来实现。以下是在Kali Linux上设置xrdp以便Windows远程桌面连接的具体步骤&#xff1a; 一、安装xrdp和Xfce桌面环境 更新软件包列表&#xff1a; 打开终端&…

Python和C++混淆矩阵地理学医学物理学视觉语言模型和算法模型评估工具

&#x1f3af;要点 优化损失函数评估指标海岸线检测算法评估遥感视觉表征和文本增强乳腺癌预测模型算法液体中闪烁光和切伦科夫光分离多标签分类任务性能评估有向无环图、多路径标记和非强制叶节点预测二元分类评估特征归因可信性评估马修斯相关系数对比其他准确度 Python桑…

基于C++和Python的进程线程CPU使用率监控工具

文章目录 0. 概述1. 数据可视化示例2. 设计思路2.1 系统架构2.2 设计优势 3. 流程图3.1 C录制程序3.2 Python解析脚本 4. 数据结构说明4.1 CpuUsageData 结构体 5. C录制代码解析5.1 主要模块5.2 关键函数5.2.1 CpuUsageMonitor::Run()5.2.2 CpuUsageMonitor::ComputeCpuUsage(…

大数据技术:Hadoop、Spark与Flink的框架演进

大数据技术&#xff0c;特别是Hadoop、Spark与Flink的框架演进&#xff0c;是过去二十年中信息技术领域最引人注目的发展之一。这些技术不仅改变了数据处理的方式&#xff0c;而且还推动了对数据驱动决策和智能化的需求。在大数据处理领域&#xff0c;选择合适的大数据平台是确…

有些硬盘录像机接入视频汇聚平台EasyCVR后通道不显示/显示不全,该如何处理?

EasyCVR视频监控汇聚管理平台是一款针对大中型项目设计的跨区域网络化视频监控集中管理平台。该平台不仅具备视频资源管理、设备管理、用户管理、运维管理和安全管理等功能&#xff0c;还支持多种主流标准协议&#xff0c;如GB28181、RTSP/Onvif、RTMP、部标JT808、GA/T 1400协…

排序算法剖析

文章目录 排序算法浅谈参考资料评价指标可视化工具概览 插入排序折半插入排序希尔排序冒泡排序快速排序简单选择排序堆排序归并排序基数排序 排序算法浅谈 参考资料 数据结构与算法 评价指标 稳定性&#xff1a;两个相同的关键字排序过后相对位置不发生变化时间复杂度空间复…

数据挖掘-padans初步使用

目录标题 Jupyter Notebook安装启动 Pandas快速入门查看数据验证数据建立索引数据选取⚠️注意&#xff1a;排序分组聚合数据转换增加列绘图line 或 **&#xff08;默认&#xff09;&#xff1a;绘制折线图。bar&#xff1a;绘制条形图。barh&#xff1a;绘制水平条形图。hist&…

QT将QBytearray的data()指针赋值给结构体指针变量后数据不正确的问题

1、问题代码 #include <QCoreApplication>#pragma pack(push, 1) typedef struct {int a; // 4字节float b; // 4字节char c; // 1字节int *d; // 8字节 }testStruct; #pragma pack(pop)#include <QByteArray> #include <QDebug>int main() {testStruct …

leetcode练习 路径总和II

给你二叉树的根节点 root 和一个整数目标和 targetSum &#xff0c;找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1&#xff1a; 输入&#xff1a;root [5,4,8,11,null,13,4,7,2,null,null,5,1], targetSum 22 输出&a…

告别传统互动:AI数字人正全面进入人类社会

大家好&#xff0c;我是Shelly&#xff0c;一个专注于输出AI工具和科技前沿内容的AI应用教练&#xff0c;体验过300款以上的AI应用工具。关注科技及大模型领域对社会的影响10年。关注我一起驾驭AI工具&#xff0c;拥抱AI时代的到来。 今天我们来聊聊AI数字人&#xff01; 当我…

全网最详细kubernetes中的资源

1、资源管理介绍 在kubernetes中&#xff0c;所有的内容都抽象为资源&#xff0c;用户需要通过操作资源来管理kubernetes。 kubernetes的本质上就是一个集群系统&#xff0c;用户可以在集群中部署各种服务。 所谓的部署服务&#xff0c;其实就是在kubernetes集群中运行一个个的…