采用Flink CDC操作SQL Server数据库获取增量变更数据
Flink CDC 1.12版本引入了对SQL Server的支持,包括SqlServerCatalog
和SqlServerTable
。在SqlServerCatalog
中,你可以根据表名获取对应的字段和字段类型。
SQL Server 2008 开始支持变更数据捕获 (CDC) 功能。CDC 允许你捕获对表中数据更改的数据,这样你就可以查询更改的数据而不需要扫描整个表。
1、准备工作
软件版本
Flink 1.17.1
数据库版本 Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64)
1.1、数据库准备 启动CDC
-- 开启SQL Server数据库CDC。 在需要开启CDC的数据库执行此命令
EXEC sys.sp_cdc_enable_db
-- 查询开启CDC的数据库
select name, is_cdc_enabled from sys.databases
1.2、开启SQL Server代理
打开 SQL Server配置管理器 => 选择SQL Server服务 => 选择SQL Server代理 右击开启
1.3、为需要跟踪更改的表启用 CDC。
-- 开启表级别的CDC --需要开启先SQL Server代理 然后执行 EXEC sys.sp_cdc_enable_table@source_schema = 'dbo', -- source_schema@source_name = 'AIR_STATION_HOUR_DATA', -- table_name@capture_instance = NULL, -- capture_instance@supports_net_changes = 1, -- supports_net_changes@role_name = NULL -- role_name-- 验证表是否开启cdc成功EXEC sys.sp_cdc_help_change_data_capture
2、代码编写
2.1、引入依赖
<properties><flink.version>1.17.1</flink.version></properties><dependencies><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>mssql-jdbc</artifactId><version>9.4.1.jre8</version></dependency> <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-sqlserver-cdc</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency></dependencies>
2.2、代码编写
2.2.1 、数据库配置文件编写
public class SQLServerConstant {public static final String SQLSERVER_HOST = "0.0.0.0"; //数据库地址public static final Integer SQLSERVER_PORT = 1433; //端口public static final String SQLSERVER_DATABASE = "HBDC_AQI"; //库public static final String SQLSERVER_TABLE_LIST= "dbo.AIR_STATION_HOUR_DATA"; // 表public static final String SQLSERVER_USER_NAME = "sa"; //用户public static final String SQLSERVER_PASSWORD = "*******"; //密码
}
2.2.2 CDC数据实体类
@Data
public class DataChangeInfo implements Serializable {/*** 数据库名*/private String database;/*** 表名*/private String tableName;/*** 变更时间*/private LocalDateTime changeTime;/*** 变更类型 1新增 2修改 3删除*/private Integer eventType;/*** 变更前数据*/private String beforeData;/*** 变更后数据*/private String afterData;}
2.2.2 、SQLServer消息读取自定义序列化
@Slf4j
public class SQLServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {public static final String TS_MS = "ts_ms";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String CREATE = "CREATE";public static final String UPDATE = "UPDATE";@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception {try {String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct = (Struct) sourceRecord.value();final Struct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo = new DataChangeInfo();dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());// 获取操作类型 CREATE UPDATE DELETE 1新增 2修改 3删除Envelope.Operation operation = Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;dataChangeInfo.setEventType(eventType);dataChangeInfo.setDatabase(database);dataChangeInfo.setTableName(tableName);ZoneId zone = ZoneId.systemDefault();Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));//7.输出数据collector.collect(dataChangeInfo);} catch (Exception e) {log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());}}/**** 从源数据获取出变更之前或之后的数据*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<DataChangeInfo> getProducedType() {return TypeInformation.of(DataChangeInfo.class);}
}
2.2.3 、功能工具类
public class FlinkSourceUtil {/*** 构造SQL Server CDC数据源*/public static DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {String[] tables = SQLSERVER_TABLE_LIST.replace(" ", "").split(",");return SqlServerSource.<DataChangeInfo>builder().hostname(SQLSERVER_HOST).port(SQLSERVER_PORT).database(SQLSERVER_DATABASE) // monitor sqlserver database.tableList(tables) // monitor products table.username(SQLSERVER_USER_NAME).password(SQLSERVER_PASSWORD)/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)* latest:只进行增量导入(不读取历史变化)*/.startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.initial()).deserializer(new SQLServerJsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();DataStream<DataChangeInfo> streamSource = env.addSource(dataChangeInfoMySqlSource, "SQLServer-source").setParallelism(1);streamSource.print();env.execute("SQLServer-stream-cdc");}
}