Flink Sql Redis Connector

经常做开发的小伙伴肯定知道用flink连接redis的时候比较麻烦,更麻烦的是解析redis数据,如果rdis可以普通数据库那样用flink sql连接并且数据可以像表格那样展示出来就会非常方便。

历时多天,我终于把flink sql redis connector写出来了,并且已经测试过可以用sql解析数据,下面直接展示写好的代码和执行结果,完整的代码可以在我的github上面看:https://github.com/niuhu3/flink_sql_redis_connector.git

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

1.使用案例和讲解

1.读取数据案例

CREATE TABLE orders (`order_id` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hgetall','key' = 'orders'
);select * from orders#集群模式
create table redis_sink (
site_id STRING,
inverter_id STRING,
start_time STRING,
PRIMARY KEY(site_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'cluster',
'cluster.nodes' = 'test3:7001,test3:7002,test3:7003,test3:8001,test3:8002,test3:8003',
'password' = '123123',
'command' = 'hgetall',
'key' = 'site_inverter'
)cluster.nodes用来定义集群ip和host,例如:host1:p1,host2:p2,host3:p3

注:redis表必须定义主键,可以是单个主键,也可以是联合主键

以下为sql读取结果,直接将redis数据解析成我们需要的表格形式

2.写入数据案例

1. generate source data
CREATE TABLE order_source (`order_number` BIGINT,`price` DECIMAL(32,2),`order_time` TIMESTAMP(3),PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '5',
'fields.order_number.min' = '1',
'fields.order_number.max' = '20',
'fields.price.min' = '1001',
'fields.price.max' = '1100'
);2. define redis sink table CREATE TABLE orders (`order_number` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hmset','key' = 'orders'
);3. insert data to redis sink table (cast data type to string)insert into redis_sinkselectcast(order_number as STRING) order_number,cast(price as STRING) price,cast(order_time as STRING) order_timefrom orders

redis表不会保存数据类型,所以在写入redis之前需要转成字符串类型,以下为写入redis数据的结果,redis的主键用 key + primary key + value 拼接而成,保证每条数据的唯一性,所以这也就要为什么redis table要定义主键

3.目前支持的功能 

1. 该connector目前支持多个写入和读取命令:

        读取:   get    hget     hgetall     hscan   lrange    smembers    zrange

        写入:   set   hset      hmset      lpush    rpush     sadd

2.针对最常用的hash类型数据支持模糊匹配,只输入表名可以查询整张表数据   

4. 连接参数说明

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter

2.动态读取和写入的工厂类

import org.apache.flink.common.RedisOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.sink.RedisDynamicTableSink;
import org.apache.flink.source.RedisDynamicTableSource;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;public class RedisSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {private ReadableConfig options;public RedisSourceSinkFactory(){}public RedisSourceSinkFactory(ReadableConfig options){this.options = options;}//DynamicTableSourceFactory的实现方法,要用flink sql 读取数据需要实现这个接口@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();options = helper.getOptions();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();return new RedisDynamicTableSource(options,columnNames,primaryKey);}/DynamicTableSinkFactory的实现方法,要用flink sql往redis中写数据这个也必须要实现@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();ReadableConfig options = helper.getOptions();return new RedisDynamicTableSink(options,columnNames,primaryKey);}@Overridepublic String factoryIdentifier() {return "redis";}//sql connector 必填项@Overridepublic Set<ConfigOption<?>> requiredOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.PASSWORD);options.add(RedisOptions.KEY);options.add(RedisOptions.MODE);return options;}//sql connector 选填项@Overridepublic Set<ConfigOption<?>> optionalOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.SINGLE_HOST);options.add(RedisOptions.SINGLE_PORT);options.add(RedisOptions.CLUSTER_NODES);options.add(RedisOptions.FIELD);options.add(RedisOptions.CURSOR);options.add(RedisOptions.EXPIRE);options.add(RedisOptions.COMMAND);options.add(RedisOptions.START);options.add(RedisOptions.END);options.add(RedisOptions.CONNECTION_MAX_TOTAL);options.add(RedisOptions.CONNECTION_MAX_IDLE);options.add(RedisOptions.CONNECTION_TEST_WHILE_IDLE);options.add(RedisOptions.CONNECTION_TEST_ON_BORROW);options.add(RedisOptions.CONNECTION_TEST_ON_RETURN);options.add(RedisOptions.CONNECTION_TIMEOUT_MS);options.add(RedisOptions.TTL_SEC);options.add(RedisOptions.LOOKUP_ADDITIONAL_KEY);options.add(RedisOptions.LOOKUP_CACHE_MAX_ROWS);options.add(RedisOptions.LOOKUP_CACHE_TTL_SEC);return options;}

3. Redis Source 读取类

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.util.Preconditions;import java.util.List;public class RedisDynamicTableSource implements ScanTableSource {private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSource(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic DynamicTableSource copy() {return new RedisDynamicTableSource(this.options, this.columns, this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table source";}@Overridepublic ChangelogMode getChangelogMode() {return ChangelogMode.all();}@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {RedisSourceFunction redisSourceFunction = new RedisSourceFunction(this.options, this.columns, this.primaryKey);return SourceFunctionProvider.of(redisSourceFunction,false);}
}

支持redis string, set ,zset ,hash数据的读取并解析成rowdata传入 flink

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanResult;import java.util.*;public class RedisSourceFunction extends RichSourceFunction<RowData>{private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private Jedis jedis;private JedisCluster jedisCluster;private String value;private String field;private String[] fields;private String cursor;private Integer start;private Integer end;private String[] keySplit;private static int position = 1;private GenericRowData rowData;public RedisSourceFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void run(SourceContext<RowData> ctx) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String[] keyArr = key.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String command = options.get(RedisOptions.COMMAND);// judge if command is redis set data command and stop methodList<String> sourceCommand = Arrays.asList(RedisCommandOptions.SET, RedisCommandOptions.HSET, RedisCommandOptions.HMSET, RedisCommandOptions.LPUSH,RedisCommandOptions.RPUSH, RedisCommandOptions.SADD);if(sourceCommand.contains(command.toUpperCase())){ return;}Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if(mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())){String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedis.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedis.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedis.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedis.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedis.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedis.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedis.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedisCluster.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedisCluster.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedisCluster.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedisCluster.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedisCluster.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedisCluster.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedisCluster.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void cancel() {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

4. Redis sink 写入类

public class RedisDynamicTableSink implements DynamicTableSink {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisDynamicTableSink.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSink(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode changelogMode) {return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).build();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {RedisSinkFunction myRedisSinkFunction = new RedisSinkFunction(this.options,this.columns,this.primaryKey);return SinkFunctionProvider.of(myRedisSinkFunction);}@Overridepublic DynamicTableSink copy() {return new RedisDynamicTableSink(this.options,this.columns,this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table sink";}
}
package org.apache.flink.sink;import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;import java.util.List;public class RedisSinkFunction extends RichSinkFunction<RowData>{private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private String fields;private Jedis jedis;private JedisCluster jedisCluster;private String[] fieldsArr;private StringBuffer redisTableKey;private String value;public RedisSinkFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void invoke(RowData rowData, Context context) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String command = options.get(RedisOptions.COMMAND);Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if (mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())) {String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedis.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}if (i != primaryKey.size() -1){redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}}for (int i = 1; i < columns.size(); i++) {if (!primaryKey.contains(columns.get(i))){value = rowData.getString(i).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedis.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedis.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedis.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedisCluster.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}for (int i = 1; i < columns.size(); i++) {value = rowData.getString(i).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedisCluster.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedisCluster.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedisCluster.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void close() throws Exception {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

对以上代码不理解为啥这样写的,可以参考我的上一篇帖子:

Flink Sql-用户自定义 Sources & Sinks_source表和sink表-CSDN博客

 最后再次希望大家可以去github或者社区支持一下,让这个连接器可以正式开源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/357671.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

预备资金有5000-6000买什么电脑比较好?大学生电脑选购指南

小新pro14 2024 处理器&#xff1a;采用了英特尔酷睿Ultra5 125H或Ultra9 185H两种处理器可选&#xff0c;这是英特尔最新的高性能低功耗处理器&#xff0c;具有18个线程&#xff0c;最高可达4.5GHz的加速频率&#xff0c;支持PCIe 4.0接口&#xff0c;内置了强大的ARC核芯显卡…

Pwn刷题记录(不停更新)

1、CTFshow-pwn04&#xff08;基础canary&#xff09; ​ 好久没碰过pwn了&#xff0c;今天临时做一道吧&#xff0c;毕竟刚联合了WSL和VSCode&#xff0c;想着试着做一道题看看&#xff0c;结果随手一点&#xff0c;就是一个很少接触的&#xff0c;拿来刷刷&#xff1a; ​ …

在SQL中使用explode函数展开数组的详细指南

目录 简介示例1&#xff1a;简单数组展开示例2&#xff1a;展开嵌套数组示例3&#xff1a;与其他函数结合使用处理结构体数组示例&#xff1a;展开包含结构体的数组示例2&#xff1a;展开嵌套结构体数组 总结 简介 在处理SQL中的数组数据时&#xff0c;explode函数非常有用。它…

吴恩达机器学习 第二课 week4 决策树

目录 01 学习目标 02 实现工具 03 问题描述 04 构建决策树 05 总结 01 学习目标 &#xff08;1&#xff09;理解“熵”、“交叉熵&#xff08;信息增益&#xff09;”的概念 &#xff08;2&#xff09;掌握决策树的构建步骤与要点 02 实现工具 &#xff08;1&#xff09;…

Web框架简介

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 如果你要从零开始建立了一些网站&#xff0c;可能会注意到你不得不反复解决一些类似的问题。这样做是令人厌烦的&#xff0c;并且违反了良好编程的核…

kotlin集合框架

1、集合框架的接口类型对比 2、不可变和可变List fun main() {// 不可变List - 不能删除或添加元素val intList: List<Int> listOf(1,2,3)intList.forEach{println(it) // 1 2 3}println("")// 可变List - 可以删除或添加元素val mutableList mutableListO…

展示3D模型的网站哪个好?

如果仅仅是模型展示&#xff0c;目前国内外值得推荐的无非就是那么几个&#xff0c;它们各自有不同的特点和优势&#xff1a; 1、Sketchfab&#xff1a;Sketchfab是一个知名的3D模型展示平台&#xff0c;提供了海量的模型资源和出色的3D展示效果。用户无需安装任何插件即可在线…

移动Web开发实战内容要点!!!

移动web开发 目录 移动web开发 第一章、Web开发标准与网页网站制作介绍 1.1Web开发标准 1.2网页基本构成元素 第二章、Web开发技术基础 2.1HTML的主要特点&#xff1a; 2.2HTML基本知识 2.3CSS样式 2.4JavaScript 第三章、打造移动Web应用程序 3.1为什么Android会成…

学生课程信息管理系统

摘 要 目前&#xff0c;随着科学经济的不断发展&#xff0c;高校规模不断扩大&#xff0c;所招收的学生人数越来越 多&#xff1b;所开设的课程也越来越多。随之而来的是高校需要管理更多的事务。对于日益增 长的学生相关专业的课程也在不断增多&#xff0c;高校对其管理具有一…

51单片机STC89C52RC——2.3 两个独立按键模拟控制LED流水灯方向

目的 按下K1键LED流水向左移动 按下K2键LED流水向右移动 一&#xff0c;STC单片机模块 二&#xff0c;独立按键 2.1 独立按键位置 2.2 独立按键电路图 这里要注意一个设计的bug P3_1 引脚对应是K1 P3_0 引脚对应是K2 要实现按一下点亮、再按一下熄灭&#xff0c;我们就需…

空间复杂度 线性表,顺序表尾插。

各位少年&#xff0c;大家好&#xff0c;我是那一脸阳光&#xff0c;本次分享的主题是时间复杂度和空间复杂度 还有顺序表文章讲解和分享&#xff0c;如有不对可以评论区指导。 时间复杂度例题 // 计算斐波那契递归Fib的时间复杂度&#xff1f; long long Fib(size_t N){if(N…

docker将容器打包提交为镜像,再打包成tar包

将容器打包成镜像可以通过以下步骤来实现。这里以 Docker 为例&#xff0c;假设你已经安装了 Docker 并且有一个正在运行的容器。 1. 找到正在运行的容器 首先&#xff0c;你需要找到你想要打包成镜像的容器的 ID 或者名字。可以使用以下命令查看所有正在运行的容器&#xff…

Leetcode3185. 构成整天的下标对数目 II

Every day a Leetcode 题目来源&#xff1a;3185. 构成整天的下标对数目 II 解法1&#xff1a;哈希 本质思路类同经典的“两数之和”。枚举右&#xff0c;用哈希表维护左。 枚举 j&#xff0c;并维护 cnt[x] 表示所有满足 i < j 的下标 i 中&#xff0c;有几个 hours[i]…

常见的LED显示屏拼接优缺点解析

LED显示屏拼接技术在现代显示技术中占据了重要地位。随着市场需求的不断增长&#xff0c;各种拼接屏技术也不断发展&#xff0c;每种技术都有其独特的优势和不足。本文将详细解析常见的几种拼接屏技术&#xff0c;包括LED显示屏拼接、投影DLP拼接和等离子PDP拼接。 LED显示屏拼…

【HTTPS云证书部署】SpingBoot部署证书

这里以华为云证书为例。 1. 下载证书 2. 解压 3. 选择.top_Tomcat复制到SpringBoot的Resource/source下 4. 在.properties文件中进行配置 修改key-store和key-store-password

实验室自用LabVIEW软件与商用软件价格差异分析

实验室自用LabVIEW软件与商用软件在价格上的差异源于功能与扩展包、技术支持与服务、使用场景与合规性、更新与维护、市场与定价策略、培训与教育资源及许可证管理与合规审计等方面的不同。商用软件提供更全面的功能和支持&#xff0c;确保高可靠性和合规性&#xff0c;因此价格…

AWS-PatchAsgInstance自动化定时ASG组打补丁

问题 需要给AWS的EC2水平自动扩展组AutoScaling Group&#xff08;ASG&#xff09;中的EC2自动定期打补丁。 创建自动化运行IAM角色 找到创建角色入口页面&#xff0c;如下图&#xff1a; 开始创建Systems Manager自动化运行的IAM角色&#xff0c;如下图&#xff1a; 设置…

【docker1】指令,docker-compose,Dockerfile

文章目录 1.pull/image&#xff0c;run/ps&#xff08;进程&#xff09;&#xff0c;exec/commit2.save/load&#xff1a;docker save 镜像id&#xff0c;不是容器id3.docker-compose&#xff1a;多容器&#xff1a;宿主机&#xff08;eth0网卡&#xff09;安装docker会生成一…

【LinuxC语言】深入理解IP地址与端口号

文章目录 前言端口号IP地址IP地址的分类主机地址与网络地址多播是什么子网掩码特殊的地址与私有的地址总结前言 在计算机网络中,IP 地址和端口号是两个非常重要的概念。IP 地址用于标识网络上的设备,而端口号则用于在同一设备上区分不同的服务或应用。在 Linux C 语言编程中…

torchinfo这个包中的summary真的很好用

1.安装直接使用 pip 进行安装即可&#xff1a; pip install torchinfo 2.导入该模块 from torchinfo import summary 3.使用模块 summary(model)#这里的model是你自己的model&#xff0c;可以添加参数进去 4.效果图&#xff1a; 第一个图片是直接打印model吗&#xff0c;…