视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili
目录
第8章 数仓开发之DIM层
P024
P025
P026
P027
P028
P029
P030
第8章 数仓开发之DIM层
P024
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +"user=root&password=123456&useUnicode=true&" +"characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("op");BroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);if ("d".equals(type)) {// 从状态中删除对应的表格DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);tableConfigState.remove(before.getSourceTable());// 从configMap中删除对应的表格configMap.remove(before.getSourceTable());} else {DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);//TODO 3 将数据写入到状态 广播出去tableConfigState.put(after.getSourceTable(), after);//TODO 2 检查phoenix中是否存在表 不存在创建String sinkTable = after.getSinkTable();String sinkColumns = after.getSinkColumns();String sinkPk = after.getSinkPk();String sinkExtend = after.getSinkExtend();checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);}}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}
P025
8.3.2 根据MySQL的配置表,动态进行分流
6)创建连接池工具类
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +"user=root&password=123456&useUnicode=true&" +"characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("op");BroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);if ("d".equals(type)) {// 从状态中删除对应的表格DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);tableConfigState.remove(before.getSourceTable());// 从configMap中删除对应的表格configMap.remove(before.getSourceTable());} else {DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);//TODO 3 将数据写入到状态 广播出去tableConfigState.put(after.getSourceTable(), after);//TODO 2 检查phoenix中是否存在表 不存在创建String sinkTable = after.getSinkTable();String sinkColumns = after.getSinkColumns();String sinkPk = after.getSinkPk();String sinkExtend = after.getSinkExtend();checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);}}private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {// create table if not exists table (id string pk, name string...)// 拼接建表语句的sqlStringBuilder sql = new StringBuilder();sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");// 判断主键// 如果主键为空,默认使用idif (sinkPk == null) {sinkPk = "";}if (sinkExtend == null) {sinkExtend = "";}// 遍历字段拼接建表语句String[] split = sinkColumns.split(",");for (int i = 0; i < split.length; i++) {sql.append(split[i] + " varchar");if (split[i].equals(sinkPk)) {sql.append(" primary key");}if (i < split.length - 1) {sql.append(",\n");}}sql.append(") ");sql.append(sinkExtend);PhoenixUtil.executeDDL(sql.toString());}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}
P026
P027
启动hadoop、zookeeper、kafka、hbase。p41
[atguigu@node001 ~]$ myhadoop.sh start================ 启动 hadoop集群 ================---------------- 启动 hdfs ----------------
Starting namenodes on [node001]
Starting datanodes
Starting secondary namenodes [node003]--------------- 启动 yarn ---------------
Starting resourcemanager
Starting nodemanagers--------------- 启动 historyserver ---------------
[atguigu@node001 ~]$ zookeeper.sh start
---------- zookeeper node001 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node002 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node003 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[atguigu@node001 ~]$
[atguigu@node001 ~]$
[atguigu@node001 ~]$ kafka.sh start
--------------- node001 Kafka 启动 ---------------
--------------- node002 Kafka 启动 ---------------
--------------- node003 Kafka 启动 ---------------
[atguigu@node001 ~]$
[atguigu@node001 ~]$
[atguigu@node001 ~]$ start-hbase.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hbase/hbase-2.0.5/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node001.out
node002: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node002.out
node003: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node003.out
node001: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node001.out
node002: running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node002.out
[atguigu@node001 ~]$ jpsall
================ node001 ================
4880 HMaster
4615 Kafka
4183 QuorumPeerMain
3017 DataNode
2858 NameNode
5083 HRegionServer
3676 JobHistoryServer
3454 NodeManager
5406 Jps
================ node002 ================
2080 ResourceManager
4050 Jps
3397 Kafka
3574 HRegionServer
2966 QuorumPeerMain
3719 HMaster
1833 DataNode
2233 NodeManager
================ node003 ================
3265 Kafka
2833 QuorumPeerMain
3634 Jps
2067 SecondaryNameNode
2245 NodeManager
1941 DataNode
3430 HRegionServer
[atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/
[atguigu@node001 apache-phoenix-5.0.0-HBase-2.0-bin]$ cd bin
[atguigu@node001 bin]$ pwd
/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin
[atguigu@node001 bin]$ ./sqlline.py node001:2181
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix:node001:2181 none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:node001:2181
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
23/10/26 20:07:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 5.0)
Driver: PhoenixEmbeddedDriver (version 5.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
133/133 (100%) Done
Done
sqlline version 1.2.0
0: jdbc:phoenix:node001:2181> create schema EDU_REALTIME;
No rows affected (3.039 seconds)
0: jdbc:phoenix:node001:2181> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_RO |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
0: jdbc:phoenix:node001:2181> !tables
+------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE |
+------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | |
| | EDU_REALTIME | DIM_BASE_CATEGORY_INFO | TABLE | | | | | |
| | EDU_REALTIME | DIM_BASE_PROVINCE | TABLE | | | | | |
| | EDU_REALTIME | DIM_BASE_SOURCE | TABLE | | | | | |
| | EDU_REALTIME | DIM_BASE_SUBJECT_INFO | TABLE | | | | | |
| | EDU_REALTIME | DIM_CHAPTER_INFO | TABLE | | | | | |
| | EDU_REALTIME | DIM_COURSE_INFO | TABLE | | | | | |
| | EDU_REALTIME | DIM_KNOWLEDGE_POINT | TABLE | | | | | |
| | EDU_REALTIME | DIM_TEST_PAPER | TABLE | | | | | |
| | EDU_REALTIME | DIM_TEST_PAPER_QUESTION | TABLE | | | | | |
| | EDU_REALTIME | DIM_TEST_POINT_QUESTION | TABLE | | | | | |
| | EDU_REALTIME | DIM_TEST_QUESTION_INFO | TABLE | | | | | |
| | EDU_REALTIME | DIM_TEST_QUESTION_OPTION | TABLE | | | | | |
| | EDU_REALTIME | DIM_USER_INFO | TABLE | | | | | |
| | EDU_REALTIME | DIM_VIDEO_INFO | TABLE | | | | | |
+------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
0: jdbc:phoenix:node001:2181>
P028
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +"user=root&password=123456&useUnicode=true&" +"characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("op");BroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);if ("d".equals(type)) {// 从状态中删除对应的表格DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);tableConfigState.remove(before.getSourceTable());// 从configMap中删除对应的表格configMap.remove(before.getSourceTable());} else {DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);//TODO 3 将数据写入到状态 广播出去tableConfigState.put(after.getSourceTable(), after);//TODO 2 检查phoenix中是否存在表 不存在创建String sinkTable = after.getSinkTable();String sinkColumns = after.getSinkColumns();String sinkPk = after.getSinkPk();String sinkExtend = after.getSinkExtend();checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);}}private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {// create table if not exists table (id string pk, name string...)// 拼接建表语句的sqlStringBuilder sql = new StringBuilder();sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");// 判断主键// 如果主键为空,默认使用idif (sinkPk == null) {sinkPk = "";}if (sinkExtend == null) {sinkExtend = "";}// 遍历字段拼接建表语句String[] split = sinkColumns.split(",");for (int i = 0; i < split.length; i++) {sql.append(split[i] + " varchar");if (split[i].equals(sinkPk)) {sql.append(" primary key");}if (i < split.length - 1) {sql.append(",\n");}}sql.append(") ");sql.append(sinkExtend);PhoenixUtil.executeDDL(sql.toString());}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据ReadOnlyBroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);DimTableProcess tableProcess = tableConfigState.get(value.getString("table"));// 补充情况,防止kafka数据到的过快 造成数据丢失if (tableProcess == null) {tableProcess = configMap.get(value.getString("table"));}if (tableProcess != null) {String type = value.getString("type");if (type == null) {System.out.println("maxwell采集的数据不完整...");} else {JSONObject data = value.getJSONObject("data");//TODO 2 过滤出需要的维度字段String sinkColumns = tableProcess.getSinkColumns();filterColumns(data, sinkColumns);//TODO 3 补充输出字段data.put("sink_table", tableProcess.getSinkTable());// 添加数据的类型data.put("type", type);out.collect(data);}}}private void filterColumns(JSONObject data, String sinkColumns) {Set<Map.Entry<String, Object>> entries = data.entrySet();List<String> stringList = Arrays.asList(sinkColumns.split(","));entries.removeIf(entry -> !stringList.contains(entry.getKey()));}
}
P029
package com.atguigu.edu.realtime.util;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.common.EduConfig;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;import java.lang.reflect.InvocationTargetException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;public class PhoenixUtil {private static DruidDataSource druidDataSource = DruidDSUtil.getDruidDataSource();public static void executeDDL(String sqlString) {DruidPooledConnection connection = null;PreparedStatement preparedStatement = null;try {connection = druidDataSource.getConnection();} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("连接池获取连接异常...");}try {preparedStatement = connection.prepareStatement(sqlString);} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("编译sql异常...");}try {preparedStatement.execute();} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("建表语句错误...");}// 关闭资源try {preparedStatement.close();} catch (SQLException throwables) {throwables.printStackTrace();}try {connection.close();} catch (SQLException throwables) {throwables.printStackTrace();}}public static void executeDML(String sinkTable, JSONObject jsonObject) {// TODO 2 拼接sql语言StringBuilder sql = new StringBuilder();Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();ArrayList<String> columns = new ArrayList<>();ArrayList<Object> values = new ArrayList<>();StringBuilder symbols = new StringBuilder();for (Map.Entry<String, Object> entry : entries) {columns.add(entry.getKey());values.add(entry.getValue());symbols.append("?,");}sql.append("upsert into " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(");// 拼接列名String columnsStrings = StringUtils.join(columns, ",");String symbolStr = symbols.substring(0, symbols.length() - 1).toString();sql.append(columnsStrings).append(")values(").append(symbolStr).append(")");DruidPooledConnection connection = null;try {connection = druidDataSource.getConnection();} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("连接池获取连接异常...");}PreparedStatement preparedStatement = null;try {preparedStatement = connection.prepareStatement(sql.toString());// 传入参数for (int i = 0; i < values.size(); i++) {preparedStatement.setObject(i + 1, values.get(i) + "");}} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("编译sql异常...");}try {preparedStatement.executeUpdate();} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("写入phoenix错误...");}try {preparedStatement.close();} catch (SQLException throwables) {throwables.printStackTrace();}try {connection.close();} catch (SQLException throwables) {throwables.printStackTrace();}}public static <T> List<T> queryList(String sql, Class<T> clazz) {ArrayList<T> resultList = new ArrayList<>();DruidPooledConnection connection = null;PreparedStatement preparedStatement = null;try {connection = druidDataSource.getConnection();preparedStatement = connection.prepareStatement(sql);ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {T obj = clazz.newInstance();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);Object columnValue = resultSet.getObject(i);BeanUtils.setProperty(obj, columnName, columnValue);}resultList.add(obj);}} catch (Exception e) {e.printStackTrace();}if (preparedStatement != null) {try {preparedStatement.close();} catch (SQLException throwables) {throwables.printStackTrace();}}if (connection != null) {try {connection.close();} catch (SQLException throwables) {throwables.printStackTrace();}}return resultList;}
}
package com.atguigu.edu.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.DimUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;public class DimPhoenixSinkFunc implements SinkFunction<JSONObject> {@Overridepublic void invoke(JSONObject jsonObject, Context context) throws Exception {// TODO 1 获取输出的表名String sinkTable = jsonObject.getString("sink_table");// String type = jsonObject.getString("type");
// String id = jsonObject.getString("id");jsonObject.remove("sink_table");
// jsonObject.remove("type");// TODO 2 使用工具类写出数据PhoenixUtil.executeDML(sinkTable, jsonObject);// TODO 3 如果类型为update,删除redis对应缓存
// if ("update".equals(type)) {
// DimUtil.deleteCached(sinkTable, id);
// }}
}
P030
启动hadoop、zookeeper、kafka、hbase、maxwell。
org.apache.phoenix.schema.ColumnNotFoundException: ERROR 504 (42703): Undefined column. columnName=EDU_REALTIME.DIM_BASE_PROVINCE.TYPEat org.apache.phoenix.schema.PTableImpl.getColumnForColumnName(PTableImpl.java:828)at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.resolveColumn(FromCompiler.java:477)at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:452)at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:784)at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:770)at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:401)at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:391)at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:390)at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:378)at org.apache.phoenix.jdbc.PhoenixPreparedStatement.executeUpdate(PhoenixPreparedStatement.java:206)at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeUpdate(DruidPooledPreparedStatement.java:255)at com.atguigu.edu.realtime.util.PhoenixUtil.executeDML(PhoenixUtil.java:105)at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:19)at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:7)at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:148)at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:21)at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.processElement1(CoBroadcastWithNonKeyedOperator.java:110)at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)at java.lang.Thread.run(Thread.java:748)
写入phoenix错误...
😘👌💕