需求
需要实现将Excel中的数字类型的单元格像数据库中字符串类型的字段中推送
问题原因
Seatunnel在读取字段类型的时候都是使用强转的形式去获取数据的
假如说数据类型不一样的话直接强转就会报错
修改位置
org/apache/seatunnel/api/table/type/SeaTunnelRow.java
org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
修改的代码
@Overridepublic PreparedStatement toExternal(TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)throws SQLException {SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);int statementIndex = fieldIndex + 1;Object fieldValue = row.getField(fieldIndex);if (fieldValue == null) {statement.setObject(statementIndex, null);continue;}switch (seaTunnelDataType.getSqlType()) {case STRING://TODO wxt//regiontry{//直接类型强转会出问题 比如double类型就不能转成String// 可以使用下面的toString解决这种类型问题statement.setString(statementIndex, (String) row.getField(fieldIndex));}catch (Exception e){statement.setString(statementIndex, row.getField(fieldIndex).toString());}//endregionbreak;case BOOLEAN:statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex));break;case TINYINT:statement.setByte(statementIndex, (Byte) row.getField(fieldIndex));break;case SMALLINT:statement.setShort(statementIndex, (Short) row.getField(fieldIndex));break;case INT:statement.setInt(statementIndex, (Integer) row.getField(fieldIndex));break;case BIGINT:statement.setLong(statementIndex, (Long) row.getField(fieldIndex));break;case FLOAT:statement.setFloat(statementIndex, (Float) row.getField(fieldIndex));break;case DOUBLE:statement.setDouble(statementIndex, (Double) row.getField(fieldIndex));break;case DECIMAL:statement.setBigDecimal(statementIndex, (BigDecimal) row.getField(fieldIndex));break;case DATE:LocalDate localDate = (LocalDate) row.getField(fieldIndex);statement.setDate(statementIndex, java.sql.Date.valueOf(localDate));break;case TIME:writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex));break;case TIMESTAMP:LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);statement.setTimestamp(statementIndex, java.sql.Timestamp.valueOf(localDateTime));break;case BYTES:statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex));break;case NULL:statement.setNull(statementIndex, java.sql.Types.NULL);break;case ARRAY:Object[] array = (Object[]) row.getField(fieldIndex);if (array == null) {statement.setNull(statementIndex, java.sql.Types.ARRAY);break;}statement.setObject(statementIndex, array);break;case MAP:case ROW:default:throw new JdbcConnectorException(CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,"Unexpected value: " + seaTunnelDataType);}}return statement;}
private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {if (v == null) {return 0;}SqlType sqlType = dataType.getSqlType();switch (sqlType) {case STRING://region//TODO 避免强转出现问题try{return ((String) v).length();}catch (Exception e){return ( v.toString()).length();}//endregioncase BOOLEAN:case TINYINT:return 1;case SMALLINT:return 2;case INT:case FLOAT:return 4;case BIGINT:case DOUBLE:return 8;case DECIMAL:return 36;case NULL:return 0;case BYTES:return ((byte[]) v).length;case DATE:return 24;case TIME:return 12;case TIMESTAMP:return 48;case ARRAY:return getBytesForArray(v, ((ArrayType) dataType).getElementType());case MAP:int size = 0;MapType<?, ?> mapType = ((MapType<?, ?>) dataType);for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {size +=getBytesForValue(entry.getKey(), mapType.getKeyType())+ getBytesForValue(entry.getValue(), mapType.getValueType());}return size;case ROW:int rowSize = 0;SeaTunnelRowType rowType = ((SeaTunnelRowType) dataType);SeaTunnelDataType<?>[] types = rowType.getFieldTypes();SeaTunnelRow row = (SeaTunnelRow) v;for (int i = 0; i < types.length; i++) {rowSize += getBytesForValue(row.fields[i], types[i]);}return rowSize;default:throw new UnsupportedOperationException("Unsupported type: " + sqlType);}}
如何源码打包
用maven插件打包就行
打完包的话东西在dist下面 如截图所示