代码结构
在seatunnel-transforms-v2中新建数据包名,新建XXXTransform,XXXTransformConfig,XXXTransformFactory三个类
自定义转换插件功能说明
这是个适配KafkaSource的转换插件,接收到的原文格式为:
{"path":"xxx.log.gz","code":"011","cont":"{\"ID\":\"1\",\"NAME\":\"zhangsan\",\"TABLE\":\"USER\",\"create_time\":\"20230904\"}","timestamp":"20230823160246"}
需要转换为只保留cont里面的数据
{"create_time":"20230904","NAME":"zhangsan","TABLE":"USER","ID":"999"}
任务配置文件
env {# You can set engine configuration here STREAMING BATCHexecution.parallelism = 1job.mode = "STREAMING"#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"}source {# This is a example source plugin **only for test and demonstrate the feature source plugin**Kafka {bootstrap.servers = "xxxxx:9092"topic = "test_in2"consumer.group = "167321237613format="text"result_table_name="kafka"}}transform {ExtractFromCJ {source_table_name="kafka"result_table_name="kafka1"schema = {fields {NAME = "string"TABLE = "string"create_time = "string"ID="string"}}}}sink {kafka {source_table_name="kafka1"topic = "test_out2"bootstrap.servers = "xxxx:9092"kafka.request.timeout.ms = 60000semantics = EXACTLY_ONCE}}
代码说明
XXXConfig代码,这个类主要用来保存transform的配置项
package org.apache.seatunnel.transform.extract;import lombok.Getter;import lombok.Setter;import org.apache.seatunnel.api.configuration.Option;import org.apache.seatunnel.api.configuration.Options;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import java.io.Serializable;import java.util.Map;@Getter@Setterpublic class ExtractFromCJTransformConfig implements Serializable {public static final Option<Map<String, String>> SCHEMA =Options.key("schema.fields").mapType().noDefaultValue().withDescription("Specify the field mapping relationship between input and output");private Map<String, String> fieldColumns;public static ExtractFromCJTransformConfig of(ReadonlyConfig config) {ExtractFromCJTransformConfig extractFromCJTransformConfig = new ExtractFromCJTransformConfig();Map<String, String> fieldColumns = config.get(SCHEMA);extractFromCJTransformConfig.setFieldColumns(fieldColumns);return extractFromCJTransformConfig;}}
XXXTransformFactory说明,工厂类,主要用来初始化具体的转换类
package org.apache.seatunnel.transform.extract;import com.google.auto.service.AutoService;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.OptionRule;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.connector.TableTransform;import org.apache.seatunnel.api.table.factory.Factory;import org.apache.seatunnel.api.table.factory.TableFactoryContext;import org.apache.seatunnel.api.table.factory.TableTransformFactory;@AutoService(Factory.class)public class ExtractFromCJTransformFactory implements TableTransformFactory {@Overridepublic String factoryIdentifier() {return "ExtractFromCJ";}@Overridepublic OptionRule optionRule() {return OptionRule.builder().optional(ExtractFromCJTransformConfig.SCHEMA).build();}@Overridepublic TableTransform createTransform(TableFactoryContext context) {CatalogTable catalogTable = context.getCatalogTable();ReadonlyConfig options = context.getOptions();ExtractFromCJTransformConfig extractFromCJTransformConfig =ExtractFromCJTransformConfig.of(options);return () -> new ExtractFromCJTransform(extractFromCJTransformConfig, catalogTable);}}
XXXXTransform,具体的转换类,主要用于对source数据的处理,还有数据结构类型的保存
package org.apache.seatunnel.transform.extract;import cn.hutool.core.collection.CollUtil;import cn.hutool.json.JSONObject;import cn.hutool.json.JSONUtil;import com.google.auto.service.AutoService;import lombok.NoArgsConstructor;import lombok.NonNull;import lombok.extern.slf4j.Slf4j;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.ConfigValidator;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;import org.apache.seatunnel.api.table.catalog.Column;import org.apache.seatunnel.api.table.catalog.ConstraintKey;import org.apache.seatunnel.api.table.catalog.PhysicalColumn;import org.apache.seatunnel.api.table.catalog.PrimaryKey;import org.apache.seatunnel.api.table.catalog.TableIdentifier;import org.apache.seatunnel.api.table.catalog.TableSchema;import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.api.table.type.SeaTunnelRow;import org.apache.seatunnel.api.table.type.SeaTunnelRowType;import org.apache.seatunnel.api.transform.SeaTunnelTransform;import org.apache.seatunnel.shade.com.typesafe.config.Config;import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;@AutoService(SeaTunnelTransform.class)@NoArgsConstructor@Slf4jpublic class ExtractFromCJTransform extends AbstractCatalogSupportTransform {private ExtractFromCJTransformConfig config;protected SeaTunnelRowType inputRowType;@Overridepublic String getPluginName() {return "ExtractFromCJ";}public ExtractFromCJTransform(@NonNull ExtractFromCJTransformConfig config, @NonNull CatalogTable catalogTable) {super(catalogTable);this.config = config;}@Overrideprotected void setConfig(Config pluginConfig) {ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig)).validate(new ExtractFromCJTransformFactory().optionRule());this.config = ExtractFromCJTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));}@Overrideprotected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {return inputRowType;}@Overrideprotected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {Object content = inputRow.getFields()[0];String data = content.toString();Object[] outputDataArray = new Object[0];if (JSONUtil.isJson(data)) {JSONObject cont = JSONUtil.parseObj(data).getJSONObject("cont");if (!cont.isEmpty()) {if (!CollUtil.isEmpty(this.config.getFieldColumns())) {outputDataArray = new Object[this.config.getFieldColumns().size()];int t = 0;for (String key : this.config.getFieldColumns().keySet()) {String value = cont.getStr(key);outputDataArray[t] = value;t++;}} else {outputDataArray = new Object[1];outputDataArray[0] = JSONUtil.toJsonStr(cont);}}}SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);outputRow.setRowKind(inputRow.getRowKind());outputRow.setTableId(inputRow.getTableId());return outputRow;}@Overrideprotected TableSchema transformTableSchema() {List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();List<ConstraintKey> outputConstraintKeys =inputCatalogTable.getTableSchema().getConstraintKeys().stream().map(ConstraintKey::copy).collect(Collectors.toList());PrimaryKey copiedPrimaryKey =inputCatalogTable.getTableSchema().getPrimaryKey() == null? null: inputCatalogTable.getTableSchema().getPrimaryKey().copy();if (CollUtil.isEmpty(this.config.getFieldColumns())) {return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(inputColumns).constraintKey(outputConstraintKeys).build();} else {List<Column> transformColumns = new ArrayList<>();for (String key : this.config.getFieldColumns().keySet()) {SeaTunnelDataType<?> dataType = CatalogTableUtil.parseDataType(this.config.getFieldColumns().get(key));transformColumns.add(PhysicalColumn.of(key, dataType, 0, true, null, null));}return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(transformColumns).constraintKey(outputConstraintKeys).build();}}@Overrideprotected TableIdentifier transformTableIdentifier() {return inputCatalogTable.getTableId().copy();}}
文中的转换实现的是AbstractCatalogSupportTransform类,Seatunel还提供SingleFieldOutputTransform和MultipleFieldOutputTransform,分别对应单字段和多字段的数据处理,具体扩展可根据需求来实现对应的类
执行结果
来源消息
结果消息
以上就是对转换插件的扩展分享,有需求的小伙伴可以参考,也欢迎大家一起评论沟通~