SeaTunnel扩展Transform插件,自定义转换插件

代码结构

在seatunnel-transforms-v2中新建数据包名,新建XXXTransform,XXXTransformConfig,XXXTransformFactory三个类

自定义转换插件功能说明

这是个适配KafkaSource的转换插件,接收到的原文格式为:

{"path":"xxx.log.gz","code":"011","cont":"{\"ID\":\"1\",\"NAME\":\"zhangsan\",\"TABLE\":\"USER\",\"create_time\":\"20230904\"}","timestamp":"20230823160246"}

需要转换为只保留cont里面的数据

{"create_time":"20230904","NAME":"zhangsan","TABLE":"USER","ID":"999"}

任务配置文件

env {# You can set engine configuration here STREAMING BATCHexecution.parallelism = 1job.mode = "STREAMING"#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"}source {# This is a example source plugin **only for test and demonstrate the feature source plugin**Kafka {bootstrap.servers = "xxxxx:9092"topic = "test_in2"consumer.group = "167321237613format="text"result_table_name="kafka"}}transform {ExtractFromCJ {source_table_name="kafka"result_table_name="kafka1"schema = {fields {NAME = "string"TABLE = "string"create_time = "string"ID="string"}}}}sink {kafka {source_table_name="kafka1"topic = "test_out2"bootstrap.servers = "xxxx:9092"kafka.request.timeout.ms = 60000semantics = EXACTLY_ONCE}}

代码说明

XXXConfig代码,这个类主要用来保存transform的配置项

package org.apache.seatunnel.transform.extract;import lombok.Getter;import lombok.Setter;import org.apache.seatunnel.api.configuration.Option;import org.apache.seatunnel.api.configuration.Options;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import java.io.Serializable;import java.util.Map;@Getter@Setterpublic class ExtractFromCJTransformConfig implements Serializable {public static final Option<Map<String, String>> SCHEMA =Options.key("schema.fields").mapType().noDefaultValue().withDescription("Specify the field mapping relationship between input and output");private Map<String, String> fieldColumns;public static ExtractFromCJTransformConfig of(ReadonlyConfig config) {ExtractFromCJTransformConfig extractFromCJTransformConfig = new ExtractFromCJTransformConfig();Map<String, String> fieldColumns = config.get(SCHEMA);extractFromCJTransformConfig.setFieldColumns(fieldColumns);return extractFromCJTransformConfig;}}

XXXTransformFactory说明,工厂类,主要用来初始化具体的转换类

package org.apache.seatunnel.transform.extract;import com.google.auto.service.AutoService;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.OptionRule;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.connector.TableTransform;import org.apache.seatunnel.api.table.factory.Factory;import org.apache.seatunnel.api.table.factory.TableFactoryContext;import org.apache.seatunnel.api.table.factory.TableTransformFactory;@AutoService(Factory.class)public class ExtractFromCJTransformFactory implements TableTransformFactory {@Overridepublic String factoryIdentifier() {return  "ExtractFromCJ";}@Overridepublic OptionRule optionRule() {return OptionRule.builder().optional(ExtractFromCJTransformConfig.SCHEMA).build();}@Overridepublic TableTransform createTransform(TableFactoryContext context) {CatalogTable catalogTable = context.getCatalogTable();ReadonlyConfig options = context.getOptions();ExtractFromCJTransformConfig extractFromCJTransformConfig =ExtractFromCJTransformConfig.of(options);return () -> new ExtractFromCJTransform(extractFromCJTransformConfig, catalogTable);}}

XXXXTransform,具体的转换类,主要用于对source数据的处理,还有数据结构类型的保存

package org.apache.seatunnel.transform.extract;import cn.hutool.core.collection.CollUtil;import cn.hutool.json.JSONObject;import cn.hutool.json.JSONUtil;import com.google.auto.service.AutoService;import lombok.NoArgsConstructor;import lombok.NonNull;import lombok.extern.slf4j.Slf4j;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.ConfigValidator;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;import org.apache.seatunnel.api.table.catalog.Column;import org.apache.seatunnel.api.table.catalog.ConstraintKey;import org.apache.seatunnel.api.table.catalog.PhysicalColumn;import org.apache.seatunnel.api.table.catalog.PrimaryKey;import org.apache.seatunnel.api.table.catalog.TableIdentifier;import org.apache.seatunnel.api.table.catalog.TableSchema;import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.api.table.type.SeaTunnelRow;import org.apache.seatunnel.api.table.type.SeaTunnelRowType;import org.apache.seatunnel.api.transform.SeaTunnelTransform;import org.apache.seatunnel.shade.com.typesafe.config.Config;import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;@AutoService(SeaTunnelTransform.class)@NoArgsConstructor@Slf4jpublic class ExtractFromCJTransform extends AbstractCatalogSupportTransform {private ExtractFromCJTransformConfig config;protected SeaTunnelRowType inputRowType;@Overridepublic String getPluginName() {return "ExtractFromCJ";}public ExtractFromCJTransform(@NonNull ExtractFromCJTransformConfig config, @NonNull CatalogTable catalogTable) {super(catalogTable);this.config = config;}@Overrideprotected void setConfig(Config pluginConfig) {ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig)).validate(new ExtractFromCJTransformFactory().optionRule());this.config = ExtractFromCJTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));}@Overrideprotected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {return inputRowType;}@Overrideprotected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {Object content = inputRow.getFields()[0];String data = content.toString();Object[] outputDataArray = new Object[0];if (JSONUtil.isJson(data)) {JSONObject cont = JSONUtil.parseObj(data).getJSONObject("cont");if (!cont.isEmpty()) {if (!CollUtil.isEmpty(this.config.getFieldColumns())) {outputDataArray = new Object[this.config.getFieldColumns().size()];int t = 0;for (String key : this.config.getFieldColumns().keySet()) {String value = cont.getStr(key);outputDataArray[t] = value;t++;}} else {outputDataArray = new Object[1];outputDataArray[0] = JSONUtil.toJsonStr(cont);}}}SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);outputRow.setRowKind(inputRow.getRowKind());outputRow.setTableId(inputRow.getTableId());return outputRow;}@Overrideprotected TableSchema transformTableSchema() {List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();List<ConstraintKey> outputConstraintKeys =inputCatalogTable.getTableSchema().getConstraintKeys().stream().map(ConstraintKey::copy).collect(Collectors.toList());PrimaryKey copiedPrimaryKey =inputCatalogTable.getTableSchema().getPrimaryKey() == null? null: inputCatalogTable.getTableSchema().getPrimaryKey().copy();if (CollUtil.isEmpty(this.config.getFieldColumns())) {return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(inputColumns).constraintKey(outputConstraintKeys).build();} else {List<Column> transformColumns = new ArrayList<>();for (String key : this.config.getFieldColumns().keySet()) {SeaTunnelDataType<?> dataType = CatalogTableUtil.parseDataType(this.config.getFieldColumns().get(key));transformColumns.add(PhysicalColumn.of(key, dataType, 0, true, null, null));}return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(transformColumns).constraintKey(outputConstraintKeys).build();}}@Overrideprotected TableIdentifier transformTableIdentifier() {return inputCatalogTable.getTableId().copy();}}

文中的转换实现的是AbstractCatalogSupportTransform类,Seatunel还提供SingleFieldOutputTransform和MultipleFieldOutputTransform,分别对应单字段和多字段的数据处理,具体扩展可根据需求来实现对应的类

执行结果

来源消息

结果消息

以上就是对转换插件的扩展分享,有需求的小伙伴可以参考,也欢迎大家一起评论沟通~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/120623.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARP欺骗

1.定义 ARP欺骗&#xff08;英语&#xff1a;ARP spoofing&#xff09;&#xff0c;又称ARP毒化&#xff08;ARP poisoning&#xff0c;网络上多译为ARP病毒&#xff09;或ARP攻击&#xff0c;是针对以太网地址解析协议&#xff08;ARP&#xff09;的一种攻击技术&#xff0c;…

【GAMES202】Real-Time Global Illumination(in 3D)—实时全局光照(3D空间)

一、SH for Glossy transport 1.Diffuse PRT回顾 上篇我们介绍了PRT&#xff0c;并以Diffuse的BRDF作为例子分析了预计算的部分&#xff0c;包括Lighting和Light transport&#xff0c;如上图所示。 包括我们还提到了SH&#xff0c;可以用SH的有限阶近似拟合球面函数&#xff…

PHP8函数包含文件-PHP8知识详解

在php中&#xff0c;可以使用以下函数来包含其他文件&#xff1a;include()、include_once()、require()、require_once()。 1、include(): 包含并运行指定文件中的代码。如果文件不存在或包含过程中出现错误&#xff0c;将发出警告。 <?php include filename.php; ?>…

【mybatis-plus进阶】多租户场景中多数据源自定义来源dynamic-datasource实现

Springbootmybatis-plusdynamic-datasourceDruid 多租户场景中多数据源自定义来源dynamic-datasource实现 文章目录 Springbootmybatis-plusdynamic-datasourceDruid 多租户场景中多数据源自定义来源dynamic-datasource实现0.前言1. 作者提供了接口2. 基于此接口的抽象类实现自…

macOS通过钥匙串访问找回WiFi密码

如果您忘记了Mac电脑上的WiFi密码&#xff0c;可以通过钥匙串访问来找回它。具体步骤如下&#xff1a; 1.打开Mac电脑的“启动台”&#xff0c;然后在其他文件中找到“钥匙串访问”。 2.运行“钥匙串访问”应用程序&#xff0c;点击左侧的“系统”&#xff0c;然后在右侧找到…

Gin学习记录3——模版与渲染

模版与渲染 一. 返回二. 模版2.1 基础模版2.2 同名模版2.3 模版继承2.4 模版语法 一. 返回 如果只是想返回数据&#xff0c;可以使用以下函数&#xff1a; func (c *Context) JSON(code int, obj any) func (c *Context) JSONP(code int, obj any) func (c *Context) String(…

Shotcut for Mac:一款强大而易于使用的视频编辑器

随着数码相机的普及&#xff0c;视频编辑已成为我们日常生活的一部分。对于许多专业和非专业用户来说&#xff0c;找到一个易于使用且功能强大的视频编辑器是至关重要的。今天&#xff0c;我们将向您介绍Shotcut——一款专为Mac用户设计的强大视频编辑器。 什么是Shotcut&…

POI基于Excel模板导出数据

1、基于模板导出列表数据 1.1、需求 注意&#xff1a;使用附件的形式下载&#xff0c;前端访问必须通过window.open(),否则附件可能无法下载。 按照以下样式导出excel 1.2、思路 首先准备一个excel模板&#xff0c;这个模板把复杂的样式和固定的内容先准备好并且放入到项…

佳作导读 | 《C++ Core Guidelines》

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; 佳作导读 | 《C Core Guidelines》 《C Core Guidelines》由Bjarne Stroustrup和Herb Sutter等共同编写关于使用C编程语言的指南&#xff1b;旨在提供关于如何使用C进…

在Ubuntu上安装CUDA和cuDNN以及验证安装步骤

在Ubuntu上安装CUDA和cuDNN以及验证安装步骤 本教程详细介绍了如何在Ubuntu操作系统上安装CUDA&#xff08;NVIDIA的并行计算平台&#xff09;和cuDNN&#xff08;深度神经网络库&#xff09;&#xff0c;以及如何验证安装是否成功。通过按照这些步骤操作&#xff0c;您将能够…

小黑受到了未来的焦虑,周四继续参加团跑活动仰山跑,跑奥森的坡,越跑越上瘾更加热爱生活的leetcode之旅:LCR 008. 长度最小的子数组

小黑代码1 class Solution:def minSubArrayLen(self, target: int, nums: List[int]) -> int:# 数组长度n len(nums)# 双指针head 0tail 0# 中间变量sum_ 0# 结果变量res n1# 开始双指针迭代while tail < n:sum_ nums[tail]tail 1while sum_ > target:if tail…

0010Java程序设计-springboot+vue影院售票系统设计与实现

摘 要目 录系统实现开发环境 摘 要 看电影已经成为了人们生活中不可缺少的一部分&#xff0c;电影院售票及管理系统是电影院的日常管理及售票任务的核心&#xff0c; 在电影院中&#xff0c; 工作人员并非只是放映电影&#xff0c; 还有诸如票房统计、影片放映、影片场次安排、…

动态规划:路径和子数组问题(C++)

动态规划&#xff1a;路径和子数组问题 路径问题1.不同路径&#xff08;中等&#xff09;2.不同路径II&#xff08;中等&#xff09;3.下降路径最⼩和&#xff08;中等&#xff09;4.地下城游戏&#xff08;困难&#xff09; 子数组问题1.最大子数组和&#xff08;中等&#xf…

一篇文章教会你SpringMVC

目录 1.什么是SpringMVC 2.SpringMVC工作流程 3.SpringMVC核心组件 4.SpringMVC的配置流程 4.1导入POM依赖 4.2在WEB-INF下添加springmvc-servlet.xml(spring-mvc.xml) 4.3 修改web.xml 创建一个Controller用来存放web层的方法和内容 创建一个前端页面用来做测试展示 前言…

04 Linux补充|C/C++

目录 Linux补充 C语⾔ C语言中puts和printf的区别&#xff1f; Linux补充 (1)ubuntu安装ssh服务端openssh-server命令&#xff1a; ubuntu安装后默认只有ssh客户端&#xff0c;只能去连其它ssh服务器&#xff1b;其它客户端想要连接这个ubuntu系统&#xff0c;需要安装部署…

进制转换(二进制、八进制、十六进制、十进制)

一、进制表示 二进制&#xff1a;每一位只有两种符号表示 -> 0,1 例如 (101011)₂&#xff0c;也可写作101011B&#xff0c;其中B是Binary英文的缩写。八进制&#xff1a; 每一位有8种符号表示(0~7)&#xff0c;例如(1652)₈&#xff0c;也可写作1652O&#xff0c;其中O是O…

STL常用容器 (C++核心基础教程之STL容器详解)String的API

在C的标准模板库&#xff08;STL&#xff09;中&#xff0c;有多种容器可供使用。以下是一些常见的容器类型&#xff1a; 序列容器&#xff08;Sequential Containers&#xff09;&#xff1a; std::vector&#xff1a;动态数组&#xff0c;支持快速随机访问。 std::list&…

CS420 课程笔记 P7 - 虚拟内存 多级指针寻址

文章目录 IntroPointersMemory leaksPointer pathPointer scanningExample! Intro 上节课我们学习了静态地址&#xff0c;这节课我们将着手关注动态地址&#xff0c;我们需要了解一个叫做指针的东西 Pointers 简单地说&#xff0c;指针是对象之间的单向连接 Pointers are co…

vue集成mars3d后,basemaps加不上去

首先&#xff1a; <template> <div id"centerDiv" class"mapcontainer"> <mars-map :url"configUrl" οnlοad"onMapload" /> </div> </template> <script> import MarsMap from ../component…

C到C++的升级

C和C的关系 C继承了所有C语言的特性&#xff1b;C在C的基础上提供了更多的语法和特性&#xff0c;C语言去除了一些C语言的不好的特性。C的设计目标是运行效率与开发效率的统一。 变化一&#xff1a;所有变量都可以在使用时定义 C中更强调语言的实用性&#xff0c;所有的变量…