尚硅谷大数据项目《在线教育之实时数仓》笔记004

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第8章 数仓开发之DIM层

P024

P025

P026

P027

P028

P029

P030


第8章 数仓开发之DIM层

P024

package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +"user=root&password=123456&useUnicode=true&" +"characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("op");BroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);if ("d".equals(type)) {// 从状态中删除对应的表格DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);tableConfigState.remove(before.getSourceTable());// 从configMap中删除对应的表格configMap.remove(before.getSourceTable());} else {DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);//TODO 3 将数据写入到状态 广播出去tableConfigState.put(after.getSourceTable(), after);//TODO 2 检查phoenix中是否存在表 不存在创建String sinkTable = after.getSinkTable();String sinkColumns = after.getSinkColumns();String sinkPk = after.getSinkPk();String sinkExtend = after.getSinkExtend();checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);}}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}

P025

8.3.2 根据MySQL的配置表,动态进行分流

6)创建连接池工具类

package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +"user=root&password=123456&useUnicode=true&" +"characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("op");BroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);if ("d".equals(type)) {// 从状态中删除对应的表格DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);tableConfigState.remove(before.getSourceTable());// 从configMap中删除对应的表格configMap.remove(before.getSourceTable());} else {DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);//TODO 3 将数据写入到状态 广播出去tableConfigState.put(after.getSourceTable(), after);//TODO 2 检查phoenix中是否存在表 不存在创建String sinkTable = after.getSinkTable();String sinkColumns = after.getSinkColumns();String sinkPk = after.getSinkPk();String sinkExtend = after.getSinkExtend();checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);}}private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {// create table if not exists table (id string pk, name string...)// 拼接建表语句的sqlStringBuilder sql = new StringBuilder();sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");// 判断主键// 如果主键为空,默认使用idif (sinkPk == null) {sinkPk = "";}if (sinkExtend == null) {sinkExtend = "";}// 遍历字段拼接建表语句String[] split = sinkColumns.split(",");for (int i = 0; i < split.length; i++) {sql.append(split[i] + " varchar");if (split[i].equals(sinkPk)) {sql.append(" primary key");}if (i < split.length - 1) {sql.append(",\n");}}sql.append(") ");sql.append(sinkExtend);PhoenixUtil.executeDDL(sql.toString());}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}

P026

P027

启动hadoop、zookeeper、kafka、hbase。p41

[atguigu@node001 ~]$ myhadoop.sh start================ 启动 hadoop集群 ================---------------- 启动 hdfs ----------------
Starting namenodes on [node001]
Starting datanodes
Starting secondary namenodes [node003]--------------- 启动 yarn ---------------
Starting resourcemanager
Starting nodemanagers--------------- 启动 historyserver ---------------
[atguigu@node001 ~]$ zookeeper.sh start
---------- zookeeper node001 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node002 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node003 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ kafka.sh start
--------------- node001 Kafka 启动 ---------------
--------------- node002 Kafka 启动 ---------------
--------------- node003 Kafka 启动 ---------------
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ start-hbase.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hbase/hbase-2.0.5/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node001.out
node002: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node002.out
node003: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node003.out
node001: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node001.out
node002: running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node002.out
[atguigu@node001 ~]$ jpsall 
================ node001 ================
4880 HMaster
4615 Kafka
4183 QuorumPeerMain
3017 DataNode
2858 NameNode
5083 HRegionServer
3676 JobHistoryServer
3454 NodeManager
5406 Jps
================ node002 ================
2080 ResourceManager
4050 Jps
3397 Kafka
3574 HRegionServer
2966 QuorumPeerMain
3719 HMaster
1833 DataNode
2233 NodeManager
================ node003 ================
3265 Kafka
2833 QuorumPeerMain
3634 Jps
2067 SecondaryNameNode
2245 NodeManager
1941 DataNode
3430 HRegionServer
[atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/
[atguigu@node001 apache-phoenix-5.0.0-HBase-2.0-bin]$ cd bin
[atguigu@node001 bin]$ pwd
/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin
[atguigu@node001 bin]$ ./sqlline.py node001:2181
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix:node001:2181 none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:node001:2181
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
23/10/26 20:07:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 5.0)
Driver: PhoenixEmbeddedDriver (version 5.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
133/133 (100%) Done
Done
sqlline version 1.2.0
0: jdbc:phoenix:node001:2181> create schema EDU_REALTIME;
No rows affected (3.039 seconds)
0: jdbc:phoenix:node001:2181> !tables 
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
| TABLE_CAT  | TABLE_SCHEM  | TABLE_NAME  |  TABLE_TYPE   | REMARKS  | TYPE_NAME  | SELF_REFERENCING_COL_NAME  | REF_GENERATION  | INDEX_STATE  | IMMUTABLE_RO |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
|            | SYSTEM       | CATALOG     | SYSTEM TABLE  |          |            |                            |                 |              | false        |
|            | SYSTEM       | FUNCTION    | SYSTEM TABLE  |          |            |                            |                 |              | false        |
|            | SYSTEM       | LOG         | SYSTEM TABLE  |          |            |                            |                 |              | true         |
|            | SYSTEM       | SEQUENCE    | SYSTEM TABLE  |          |            |                            |                 |              | false        |
|            | SYSTEM       | STATS       | SYSTEM TABLE  |          |            |                            |                 |              | false        |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
0: jdbc:phoenix:node001:2181> !tables 
+------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
| TABLE_CAT  |  TABLE_SCHEM  |        TABLE_NAME         |  TABLE_TYPE   | REMARKS  | TYPE_NAME  | SELF_REFERENCING_COL_NAME  | REF_GENERATION  | INDEX_STATE  |
+------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
|            | SYSTEM        | CATALOG                   | SYSTEM TABLE  |          |            |                            |                 |              |
|            | SYSTEM        | FUNCTION                  | SYSTEM TABLE  |          |            |                            |                 |              |
|            | SYSTEM        | LOG                       | SYSTEM TABLE  |          |            |                            |                 |              |
|            | SYSTEM        | SEQUENCE                  | SYSTEM TABLE  |          |            |                            |                 |              |
|            | SYSTEM        | STATS                     | SYSTEM TABLE  |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_BASE_CATEGORY_INFO    | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_BASE_PROVINCE         | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_BASE_SOURCE           | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_BASE_SUBJECT_INFO     | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_CHAPTER_INFO          | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_COURSE_INFO           | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_KNOWLEDGE_POINT       | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_PAPER            | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_PAPER_QUESTION   | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_POINT_QUESTION   | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_QUESTION_INFO    | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_QUESTION_OPTION  | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_USER_INFO             | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_VIDEO_INFO            | TABLE         |          |            |                            |                 |              |
+------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
0: jdbc:phoenix:node001:2181> 

P028

package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +"user=root&password=123456&useUnicode=true&" +"characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("op");BroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);if ("d".equals(type)) {// 从状态中删除对应的表格DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);tableConfigState.remove(before.getSourceTable());// 从configMap中删除对应的表格configMap.remove(before.getSourceTable());} else {DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);//TODO 3 将数据写入到状态 广播出去tableConfigState.put(after.getSourceTable(), after);//TODO 2 检查phoenix中是否存在表 不存在创建String sinkTable = after.getSinkTable();String sinkColumns = after.getSinkColumns();String sinkPk = after.getSinkPk();String sinkExtend = after.getSinkExtend();checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);}}private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {// create table if not exists table (id string pk, name string...)// 拼接建表语句的sqlStringBuilder sql = new StringBuilder();sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");// 判断主键// 如果主键为空,默认使用idif (sinkPk == null) {sinkPk = "";}if (sinkExtend == null) {sinkExtend = "";}// 遍历字段拼接建表语句String[] split = sinkColumns.split(",");for (int i = 0; i < split.length; i++) {sql.append(split[i] + " varchar");if (split[i].equals(sinkPk)) {sql.append(" primary key");}if (i < split.length - 1) {sql.append(",\n");}}sql.append(") ");sql.append(sinkExtend);PhoenixUtil.executeDDL(sql.toString());}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据ReadOnlyBroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);DimTableProcess tableProcess = tableConfigState.get(value.getString("table"));// 补充情况,防止kafka数据到的过快  造成数据丢失if (tableProcess == null) {tableProcess = configMap.get(value.getString("table"));}if (tableProcess != null) {String type = value.getString("type");if (type == null) {System.out.println("maxwell采集的数据不完整...");} else {JSONObject data = value.getJSONObject("data");//TODO 2 过滤出需要的维度字段String sinkColumns = tableProcess.getSinkColumns();filterColumns(data, sinkColumns);//TODO 3 补充输出字段data.put("sink_table", tableProcess.getSinkTable());// 添加数据的类型data.put("type", type);out.collect(data);}}}private void filterColumns(JSONObject data, String sinkColumns) {Set<Map.Entry<String, Object>> entries = data.entrySet();List<String> stringList = Arrays.asList(sinkColumns.split(","));entries.removeIf(entry -> !stringList.contains(entry.getKey()));}
}

P029

package com.atguigu.edu.realtime.util;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.common.EduConfig;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;import java.lang.reflect.InvocationTargetException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;public class PhoenixUtil {private static DruidDataSource druidDataSource = DruidDSUtil.getDruidDataSource();public static void executeDDL(String sqlString) {DruidPooledConnection connection = null;PreparedStatement preparedStatement = null;try {connection = druidDataSource.getConnection();} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("连接池获取连接异常...");}try {preparedStatement = connection.prepareStatement(sqlString);} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("编译sql异常...");}try {preparedStatement.execute();} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("建表语句错误...");}// 关闭资源try {preparedStatement.close();} catch (SQLException throwables) {throwables.printStackTrace();}try {connection.close();} catch (SQLException throwables) {throwables.printStackTrace();}}public static void executeDML(String sinkTable, JSONObject jsonObject) {// TODO 2 拼接sql语言StringBuilder sql = new StringBuilder();Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();ArrayList<String> columns = new ArrayList<>();ArrayList<Object> values = new ArrayList<>();StringBuilder symbols = new StringBuilder();for (Map.Entry<String, Object> entry : entries) {columns.add(entry.getKey());values.add(entry.getValue());symbols.append("?,");}sql.append("upsert into " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(");// 拼接列名String columnsStrings = StringUtils.join(columns, ",");String symbolStr = symbols.substring(0, symbols.length() - 1).toString();sql.append(columnsStrings).append(")values(").append(symbolStr).append(")");DruidPooledConnection connection = null;try {connection = druidDataSource.getConnection();} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("连接池获取连接异常...");}PreparedStatement preparedStatement = null;try {preparedStatement = connection.prepareStatement(sql.toString());// 传入参数for (int i = 0; i < values.size(); i++) {preparedStatement.setObject(i + 1, values.get(i) + "");}} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("编译sql异常...");}try {preparedStatement.executeUpdate();} catch (SQLException throwables) {throwables.printStackTrace();System.out.println("写入phoenix错误...");}try {preparedStatement.close();} catch (SQLException throwables) {throwables.printStackTrace();}try {connection.close();} catch (SQLException throwables) {throwables.printStackTrace();}}public static <T> List<T> queryList(String sql, Class<T> clazz) {ArrayList<T> resultList = new ArrayList<>();DruidPooledConnection connection = null;PreparedStatement preparedStatement = null;try {connection = druidDataSource.getConnection();preparedStatement = connection.prepareStatement(sql);ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {T obj = clazz.newInstance();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);Object columnValue = resultSet.getObject(i);BeanUtils.setProperty(obj, columnName, columnValue);}resultList.add(obj);}} catch (Exception e) {e.printStackTrace();}if (preparedStatement != null) {try {preparedStatement.close();} catch (SQLException throwables) {throwables.printStackTrace();}}if (connection != null) {try {connection.close();} catch (SQLException throwables) {throwables.printStackTrace();}}return resultList;}
}
package com.atguigu.edu.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.DimUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;public class DimPhoenixSinkFunc implements SinkFunction<JSONObject> {@Overridepublic void invoke(JSONObject jsonObject, Context context) throws Exception {// TODO 1 获取输出的表名String sinkTable = jsonObject.getString("sink_table");//        String type = jsonObject.getString("type");
//        String id = jsonObject.getString("id");jsonObject.remove("sink_table");
//        jsonObject.remove("type");// TODO 2 使用工具类写出数据PhoenixUtil.executeDML(sinkTable, jsonObject);// TODO 3 如果类型为update,删除redis对应缓存
//        if ("update".equals(type)) {
//            DimUtil.deleteCached(sinkTable, id);
//        }}
}

P030

启动hadoop、zookeeper、kafka、hbase、maxwell。

org.apache.phoenix.schema.ColumnNotFoundException: ERROR 504 (42703): Undefined column. columnName=EDU_REALTIME.DIM_BASE_PROVINCE.TYPEat org.apache.phoenix.schema.PTableImpl.getColumnForColumnName(PTableImpl.java:828)at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.resolveColumn(FromCompiler.java:477)at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:452)at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:784)at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:770)at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:401)at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:391)at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:390)at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:378)at org.apache.phoenix.jdbc.PhoenixPreparedStatement.executeUpdate(PhoenixPreparedStatement.java:206)at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeUpdate(DruidPooledPreparedStatement.java:255)at com.atguigu.edu.realtime.util.PhoenixUtil.executeDML(PhoenixUtil.java:105)at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:19)at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:7)at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:148)at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:21)at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.processElement1(CoBroadcastWithNonKeyedOperator.java:110)at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)at java.lang.Thread.run(Thread.java:748)
写入phoenix错误...

😘👌💕

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/173275.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络流学习笔记

网络流基础 基本概念 源点&#xff08;source&#xff09; s s s&#xff0c;汇点 t t t。 容量&#xff1a;约等于边权。不存在的边流量可视为 0 0 0。 ( u , v ) (u,v) (u,v) 的流量通常记为 c ( u , v ) c(u,v) c(u,v)&#xff08;capacity&#xff09;。 流&#xff…

Vue项目搭建及使用vue-cli创建项目、创建登录页面、与后台进行交互,以及安装和使用axios、qs和vue-axios

目录 1. 搭建项目 1.1 使用vue-cli创建项目 1.2 通过npm安装element-ui 1.3 导入组件 2 创建登录页面 2.1 创建登录组件 2.2 引入css&#xff08;css.txt&#xff09; 2.3 配置路由 2.5 运行效果 3. 后台交互 3.1 引入axios 3.2 axios/qs/vue-axios安装与使用 3.2…

SpringCloud 微服务全栈体系(五)

第七章 Feign 远程调用 先来看我们以前利用 RestTemplate 发起远程调用的代码&#xff1a; 存在下面的问题&#xff1a; 代码可读性差&#xff0c;编程体验不统一 参数复杂 URL 难以维护 Feign 是一个声明式的 http 客户端&#xff0c;官方地址&#xff1a;https://github.…

【C++】缺省参数及函数重载

&#x1f4d9; 作者简介 &#xff1a;RO-BERRY &#x1f4d7; 学习方向&#xff1a;致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f4d2; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;欢迎各位关注&#xff0c;谢谢各位的支持 目录 1. 缺省参数1.1 缺省…

迭代器的封装与反向迭代器

一、反向迭代器 在list模拟实现的过程中&#xff0c;第一次接触了迭代器的封装&#xff0c;将list的指针封装成了一个新的类型&#xff0c;并且以迭代器的基本功能对其进行了运算符重载 反向迭代器是对正向迭代器的封装&#xff0c;并且体现了泛型编程的思想&#xff0c;任意…

win10虚拟机安装教程

目录 1、安装VMware 10、12、16都可以&#xff0c;看个人选择 2、开始安装系统&#xff08;以vm16为例&#xff09; 3、在虚拟机中安装win10 完成 1、安装VMware 10、12、16都可以&#xff0c;看个人选择 下面链是我虚拟机安装包&#xff0c;需要可以下载。 YR云盘 软件安…

华为OD机考算法题:高效的任务规划

题目部分 题目高效的任务规划难度难题目说明 你有 n 台机器编号为 1 ~ n&#xff0c;每台都需要完成一项工作&#xff0c; 机器经过配置后都能独立完成一项工作。 假设第 i 台机器你需要花 分钟进行设置&#xff0c; 然后开始运行&#xff0c; 分钟后完成任务。 现在&#x…

虚拟机构建部署单体项目及前后端分离项目

目录 一.部署单体项目 1.远程数据库 1.1远程连接数据库 1.2 新建数据库运行sql文件 2.部署项目到服务器中 3.启动服务器运行 二.部署前后端分离项目 1.远程数据库和部署到服务器 2.利用node环境启动前端项目 3.解决主机无法解析服务器localhost问题 方法一 ​编辑 方…

Illustrator 2024(AI v28.0)

Illustrator 2024是一款功能强大的矢量图形编辑软件&#xff0c;由Adobe公司开发。它是设计师、艺术家和创意专业人士的首选工具&#xff0c;用于创建和编辑各种矢量图形、插图、图标、标志和艺术作品。 以下是Adobe Illustrator的主要功能和特点&#xff1a; 矢量图形编辑&…

命令模式——让程序舒畅执行

● 命令模式介绍 命令模式&#xff08;Command Pattern&#xff09;&#xff0c;是行为型设计模式之一。命令模式相对于其他的设计模式来说并没有那么多条条框框&#xff0c;其实并不是一个很“规矩”的模式&#xff0c;不过&#xff0c;就是基于一点&#xff0c;命令模式相对于…

实战授权码登录流程

我是经常阅读公众号优质文章&#xff0c;也经常体验到公众号的授权登录功能。今天就来实现下&#xff0c;流程图如下 效果图 后端接口 主要用来接收微信服务器推送的公众号用户触发的事件、生成和验证授权码的有效性 解析微信服务器推送的事件通知 public String login(Se…

MySQL 概述 数据库表操作 数据增删改

目录 MySQL概述前言安装与配置MySQL登录与卸载 数据模型概述SQL简介SQL通用语法简介SQL分类 数据库设计(数据库操作)-DDL数据库操作查询数据库 show databases、select database()创建数据库 create database使用数据库 use删除数据库 drop database 图形化工具连接数据库操作数…

Node.js中的单线程服务器

为了解决多线程服务器在高并发的I/O密集型应用中的不足&#xff0c;同时避免早期简单单线程服务器的性能障碍&#xff0c;Node.js采用了基于"事件循环"的非阻塞式单线程模型&#xff0c;实现了如下两个目标&#xff1a; &#xff08;1&#xff09;保证每个请求都可以…

通过el-tree 懒加载树,创建国家地区四级树

全国四级行政地区树数据库sql下载路径&#xff1a;【免费】全国四级地区(省市县)数据表sql资源-CSDN文库https://download.csdn.net/download/weixin_51722520/88469807?spm1001.2014.3001.5503 我在后台获取地区信息添加了限制&#xff0c;只获取parentid为当前的地…

[AUTOSAR][诊断管理][ECU][$34] 下载请求

文章目录 一、简介二、服务请求报文定义肯定响应支持的NRC三、示例代码34_req_dowload.c一、简介 RequestDownload(0x34)—— 下载请求 这个服务主要是用来给ECU下载数据的,最常见的应用就是在bootloader中,程序下载工具会发起下载请求,以完成ECU程序的升级。 二、服务…

计算线阵相机 到 拍摄产品之间 摆放距离?(隐含条件:保证图像不变形)

一物体被放置在传送带上&#xff0c;转轴的直径为100mm。已知线阵相机4K7u&#xff08;一行共4096个像素单元&#xff0c;像素单元大小7um&#xff09;&#xff0c;镜头35mm&#xff0c;编码器2000脉冲/圈。保证图像不变形的条件下&#xff0c;计算相机到产品之间 摆放距离&…

国际阿里云CDN加速OSS资源教程!

当您需要加速OSS上的静态资源时&#xff0c;可以通过阿里云CDN加速OSS域名&#xff0c;实现静态资源的访问加速。本文详细介绍了通过CDN控制台实现OSS加速的操作流程和应用场景。 客户价值 阿里云OSS可提供低成本的存储&#xff0c;CDN可以实现静态资源加速分发。使用OSS作为C…

Linux学习第24天:Linux 阻塞和非阻塞 IO 实验(一): 挂起

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 在正式开始今天的笔记之前谈一下工作中遇见的一个问题。 本篇笔记主要学习Linux 阻塞和非阻塞 IO 实验&#xff0c;主要包括阻塞和非阻塞简介、等待队列、轮询、…

05.大模型大数据量

文章目录 大模型顿悟时刻&#xff1a;Emergent Ability&#xff08;涌动现象&#xff09;Calibration Inverse Scaling PrizeSwitch Transformers 大数据量数据预处理去重 模型大小与训练数据的选择Instruction-tuningHuman TeachingKNN LM 部分截图来自原课程视频《2023李宏毅…

NewStarCTF2023week4-Nmap

题目要我们找出Nmap扫描得到所有的开放端口 Nmap通常用于直接扫描目标主机&#xff0c;而不是直接扫描pcap文件。 那么这里我们还是使用wireshark来分析&#xff0c;使用过滤器&#xff1a; tcp.flags.syn 1 and tcp.flags.ack 1 这个过滤条件可以筛选出TCP端口开放的数据…