前言
前面四篇文字都在学习Doris的理论知识,也是比较枯燥,当然Doris的理论知识还很多,我们后面慢慢学,本篇文章我们尝试使用SpringBoot来整合Doris完成基本的CRUD。
由于 Doris 高度兼容 Mysql 协议,两者在 SQL 语法方面有着比较强的一致性,另外 Mysql 客户端也是 Doris 官方选择的客户端。因此,如需对 Mysql 进行数据分析,使用 Doris 的迁移成本较低。但是对于数据规模特别大的情况下Mysql的方式还是不太建议的,所以这里还会介绍一种 方式就是通过CDC+DorisConnector
一.整合Mybatis操作Doris
1.准备数据库
CREATE TABLE `doris_test` (`id` int NULL COMMENT "id",`price` decimal(10,2) NULL COMMENT "价格",`title` varchar(20)NULL COMMENT "标题") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 1PROPERTIES ("replication_num" = "1");
Query OK, 0 rows affected (0.06 sec)
2.搭建SpringBoot项目
第一步:创建SpringBoot项目,导入依赖,主要是Mybatis相关的依赖
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--如果要用传统的xml或properties配置,则需要添加此依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId></dependency><!--mybatisplus持久层依赖--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.23</version></dependency></dependencies>
3.整合Mybatis
创建好启动类,实体类,服务层,持久层,SQL映射文件等,这里和我们操作Mysql没有任何区别.
启动类如下,通过 @MapperScan 来扫描Mapper接口
@SpringBootApplication
@MapperScan(basePackages = "cn.whale.mapper")
public class DorisApplication {public static void main(String[] args) {SpringApplication.run(DorisApplication.class,args);}
}
服务层和持久层代码如下 , 省略了部分代码
@Service
public class DorisServiceImpl implements DorisService {@AutowiredDorisMapper dorisMapper;@Overridepublic List<Order> listDoris() {return dorisMapper.listDoris();}@Overridepublic int add(Order order) {return dorisMapper.add(order);}
}public interface DorisMapper {List<Order> listDoris();int add(Order order);}
下面是sql映射文件,和操作数据库没有任何区别
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.whale.mapper.DorisMapper"><select id="listDoris" resultType="cn.whale.domain.Order">select id,price,title from orders</select><insert id="add" parameterType="cn.whale.domain.Order">INSERT INTO orders(id,price,title) VALUES(#{id},#{price},#{title})</insert>
</mapper>
4.编写配置文件
server:port: 8080
spring:#Doris数据库连接配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.220.253:9030/app_db?characterEncoding=utf-8&useSSL=falseusername: rootpassword: 123456type: com.alibaba.druid.pool.DruidDataSourceinitial-size: 500min-idle: 500max-active: 500#mybatis的相关配置
mybatis:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.wudl.doris.domain.DorisTest
5.编写测试类
注入Service,完成查询和添加的测试
package cn.whale.service;import cn.whale.DorisApplication;
import cn.whale.domain.Order;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.math.BigDecimal;
import java.util.List;@RunWith(SpringRunner.class)
@SpringBootTest(classes= DorisApplication.class)
public class DorisServiceTest {@Autowiredprivate DorisService dorisService;@Testpublic void listDoris() {List<Order> orders = dorisService.listDoris();orders.stream().forEach(System.out::println);}@Testpublic void addDoris() throws InterruptedException {for (int i = 100 ; i>0 ; i--){Order order = new Order();order.setId(Long.valueOf(i));order.setPrice(new BigDecimal(i));order.setTitle(i+"元");dorisService.add(order);}Thread.sleep(100000);}
}
二.SpringBoot整合FlinkCDC+doris-connector实时同步
上面通过Mysql客户端的方式来进行数据的同步方式虽然使用起来比较简单,但是不适合大量数据的实时同步,性能不是很好。Doris官方提供了doris-connector进行Doris读写,那么我们可以通过FlinkCDC监听Mysql数据变更,然后同步到SpringBoot项目中,对数据进行处理后,然后通过doris-connector同步到Doris。虽然我们之前也介绍过直接通过FlinkCDC同步Mysql到Doris,那种方式我们没办法对数据进行处理。
1.搭建项目导入依赖
版本兼容
- flink-connector-mysql-cdc :flinkCDC,实时监听Mysql增量或者全量同步
- flink-doris-connector :用来读写Doris的驱动
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.13</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!--mysql -cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency><!-- flink doris connector --><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.13_2.12</artifactId><version>1.0.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency></dependencies>
2.定义消息序列化器
定义序列化器,当拿到Mysql的数据后我们可以通过虚拟化器对数据进行格式化
/*** @desc mysql消息读取自定义序列化**/
public class MysqlDeserialization implements DebeziumDeserializationSchema<String> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String CREATE = "CREATE";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String UPDATE = "UPDATE";/**** 反序列化数据,转为变更JSON对象*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) {//拿到数据Struct struct = (Struct) sourceRecord.value();//输出数据collector.collect(getJsonObject(struct, AFTER).toJSONString());}/**** 从原数据获取出变更之前或之后的数据*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}
}
3.Mysql监听器
监听器的作用主要是监听Mysql的数据变更后,通过序列化器把数据进行处理后,然后同步到Doris中
/*** @desc mysql变更监听**/
@Component
public class MysqlEventListener implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//设置Mysql数据源DebeziumSourceFunction<String> dataChangeInfoMySqlSource = buildDataChangeSource();DataStream<String> streamSource = env.addSource(dataChangeInfoMySqlSource, "mysql-source").setParallelism(1);//streamSource.addSink(dataChangeSink);Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("strip_outer_array", "true");//配置Doris信息streamSource.addSink(DorisSink.sink(DorisExecutionOptions.builder().setBatchSize(3).setBatchIntervalMs(10L).setMaxRetries(3).setStreamLoadProp(pro).setEnableDelete(true).build(),DorisOptions.builder().setFenodes("192.168.220.253:8030").setTableIdentifier("app_db.orders").setUsername("root").setPassword("123456").build()));env.execute("mysql-stream-cdc");}/*** 构造变更数据源*/private DebeziumSourceFunction<String> buildDataChangeSource() {return MySqlSource.<String>builder().hostname("192.168.220.253").port(3307).databaseList("app_db").tableList("app_db.orders").username("root").password("123456")/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)* latest:只进行增量导入(不读取历史变化)* timestamp:指定时间戳进行数据导入(大于等于指定时间读取数据)*/.startupOptions(StartupOptions.latest())//序列化.deserializer(new MysqlDeserialization()).serverTimeZone("GMT+8").build();}
}
4.最后编写一个启动类
@SpringBootApplication
public class FlinkCdcApplication {public static void main(String[] args) {SpringApplication.run(FlinkCdcApplication.class, args);}}
5.准备好数据库
我的Mysql使用的是 :app_db.orders 对应的是Doris中的 app_db.orders ,字段类型,字段名需要一致哦。然后启动项目,修改Mysql的数据,Doris会自动同步过去
三.其他同步方式
Doris还支持其他的数据读写方式,具体的可以根据官网案例去尝试地址:https://doris.apache.org/zh-CN/docs/1.2/ecosystem/spark-doris-connector