16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例: Apache Hive
    • 1、支持的Hive版本
    • 2、依赖项
      • 1)、使用 Flink 提供的 Hive jar
      • 2)、用户定义的依赖项
      • 3)、移动 planner jar 包
    • 3、Maven 依赖
    • 4、连接到Hive
    • 5、DDL&DML


本文介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。
本文依赖环境是hadoop、zookeeper、hive、flink环境好用,本文内容以flink1.17版本进行介绍的,具体示例是在1.13版本中运行的(因为hadoop集群环境是基于jdk8的,flink1.17版本需要jdk11)。
更多的内容详见后续关于hive的介绍。

一、Table & SQL Connectors 示例: Apache Hive

Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

Flink 与 Hive 的集成包含两个层面。

一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

1、支持的Hive版本

Flink 支持以下的 Hive 版本。

  • 2.3
    2.3.0
    2.3.1
    2.3.2
    2.3.3
    2.3.4
    2.3.5
    2.3.6
    2.3.7
    2.3.8
    2.3.9
  • 3.1
    3.1.0
    3.1.1
    3.1.2
    3.1.3

某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:

  • Hive 内置函数在使用 Hive-2.3.0 及更高版本时支持。
  • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
  • 更改表的统计信息,在使用 Hive-2.3.0 及更高版本时支持。
  • DATE列统计信息,在使用 Hive-2.3.0 及更高版时支持。

2、依赖项

要与 Hive 集成,您需要在 Flink 下的/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。 或者,您可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的-C或-l选项将它们添加到 classpath 中。

Apache Hive 是基于 Hadoop 之上构建的, 首先您需要 Hadoop 的依赖,请参考 Providing Hadoop classes:

export HADOOP_CLASSPATH=`hadoop classpath`

有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar包。您可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包。如果您使用的 Hive 版本尚未在此处列出,则第二种方法会更适合。

注意:建议您优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足您的需求时,再考虑使用分开添加 jar 包的方式。

1)、使用 Flink 提供的 Hive jar

下表列出了所有可用的 Hive jar。您可以选择一个并放在 Flink 发行版的/lib/ 目录中。
在这里插入图片描述

2)、用户定义的依赖项

您可以在下方找到不同Hive主版本所需要的依赖项。

  • Hive 2.3.4
/flink-1.17.1/lib// Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jarsflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-2.3.4.jar// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar
  • Hive 3.1.0
/flink-1.17.1/lib// Flink's Hive connectorflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-3.1.0.jarlibfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar

3)、移动 planner jar 包

把 FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.1.jar 移动到 FLINK_HOME/lib 下,并且将 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.1.jar 移出去。 具体原因请参见 FLINK-25128。你可以使用如下命令来完成移动 planner jar 包的工作:

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.1.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.1.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.1.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.1.jar

只有当要使用 Hive 语法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移动。 但是在集成 Hive 的时候,推荐进行上述的操作。

3、Maven 依赖

如果您在构建自己的应用程序,则需要在 mvn 文件中添加以下依赖项。 您应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们。

<!-- Flink Dependency -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><!-- Hive Dependency -->
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version><scope>provided</scope>
</dependency>

4、连接到Hive

通过 TableEnvironment 或者 YAML 配置,使用 Catalog 接口 和 HiveCatalog连接到现有的 Hive 集群。

以下是如何连接到 Hive 的示例:

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf";HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");----------------------示例----------------------------
import java.util.List;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;/*** @author alanchan**/
public class TestHiveCatalogDemo {/*** @param args* @throws DatabaseNotExistException * @throws CatalogException */public static void main(String[] args) throws CatalogException, DatabaseNotExistException {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);String name = "alan_hive";// testhive 数据库名称String defaultDatabase = "testhive";String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);tenv.registerCatalog("alan_hive", hiveCatalog);// 使用注册的catalogtenv.useCatalog("alan_hive");List<String> tables = hiveCatalog.listTables(defaultDatabase); for (String table : tables) {System.out.println("Database:testhive  tables:" + table);}}}
  • sql
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'mydatabase','hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;------------------具体示例如下----------------------------
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in setFlink SQL> CREATE CATALOG alan_hivecatalog WITH (
>     'type' = 'hive',
>     'default-database' = 'testhive',
>     'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
> );
[INFO] Execute statement succeed.Flink SQL> show catalogs;
+------------------+
|     catalog name |
+------------------+
| alan_hivecatalog |
|  default_catalog |
+------------------+
2 rows in setFlink SQL> use alan_hivecatalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [alan_hivecatalog] does not exist in the catalog: [default_catalog].Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.Flink SQL> show tables;
+-----------------------------------+
|                        table name |
+-----------------------------------+
| alan_hivecatalog_hivedb_testtable |
|                         apachelog |
|                          col2row1 |
|                          col2row2 |
|                       cookie_info |
|                              dual |
|                         dw_zipper |
|                               emp |
|                          employee |
|                  employee_address |
|               employee_connection |
|                 ods_zipper_update |
|                          row2col1 |
|                          row2col2 |
|                            singer |
|                           singer2 |
|                           student |
|                      student_dept |
|               student_from_insert |
|                      student_hdfs |
|                    student_hdfs_p |
|                      student_info |
|                     student_local |
|                 student_partition |
|              t_all_hero_part_msck |
|                     t_usa_covid19 |
|                   t_usa_covid19_p |
|                              tab1 |
|                         tb_dept01 |
|                    tb_dept_bucket |
|                            tb_emp |
|                          tb_emp01 |
|                     tb_emp_bucket |
|                     tb_json_test1 |
|                     tb_json_test2 |
|                          tb_login |
|                      tb_login_tmp |
|                          tb_money |
|                      tb_money_mtn |
|                            tb_url |
|              the_nba_championship |
|                             tmp_1 |
|                        tmp_zipper |
|                         user_dept |
|                     user_dept_sex |
|                             users |
|                 users_bucket_sort |
|                   website_pv_info |
|                  website_url_info |
+-----------------------------------+
49 rows in set
  • ymal
execution:...current-catalog: alan_hivecatalog  # set the HiveCatalog as the current catalog of the sessioncurrent-database: testhivecatalogs:- name: alan_hivecatalog  type: hivehive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf

下表列出了通过 YAML 文件或 DDL 定义 HiveCatalog 时所支持的参数。

在这里插入图片描述

5、DDL&DML

在 Flink 中执行 DDL 操作 Hive 的表、视图、分区、函数等元数据时,参考:33、Flink之hive
Flink 支持 DML 写入 Hive 表,请参考:33、Flink之hive
以上,介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/121295.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决六大痛点促进企业更好使用生成式AI,亚马逊云科技顾凡采访分享可用方案

亚马逊云科技大中华区战略业务发展部总经理顾凡在接受21世纪经济报道记者专访时表示&#xff0c;生成式人工智能将从四个方面为企业带来机遇&#xff1a;第一是创造全新的客户体验&#xff1b;第二是提高企业内部员工的生产力&#xff1b;第三是帮助企业提升业务运营效率&#…

lement-ui 加载本地图片

实现图片展示时&#xff0c;发先本地的图片加载不了。 代码&#xff1a; <template><div><el-image src"../assets/logo.png" ></el-image></div> </template>结果发现不对&#xff0c;加载不出来&#xff0c;一查资料&#xf…

el-select 加多选框使用

解决方法&#xff1a; el-select 添加属性 multiple&#xff0c; <el-form-item label"订单来源&#xff1a;"><el-selectv-model"tableFrom.userType"clearablemultipleplaceholder"请选择"class"selWidth"><el-opt…

Beats:安装及配置 Metricbeat (二)- 8.x

这篇文章是继文章 “Beats&#xff1a;安装及配置 Metricbeat &#xff08;一&#xff09;- 8.x” 的续篇。你可以先阅读之前的那篇文章再继续阅读这篇文章。我们在这篇文章中继续之前的探讨。 使用 fingerprint 来代替证书 在实际的使用中&#xff0c;我们需要从 Elasticsear…

时序预测 | MATLAB实现基于PSO-GRU、GRU时间序列预测对比

时序预测 | MATLAB实现基于PSO-GRU、GRU时间序列预测对比 目录 时序预测 | MATLAB实现基于PSO-GRU、GRU时间序列预测对比效果一览基本描述程序设计参考资料 效果一览 基本描述 MATLAB实现基于PSO-GRU、GRU时间序列预测对比。 1.MATLAB实现基于PSO-GRU、GRU时间序列预测对比&…

软件测试/测试开发丨Web自动化测试 关键数据记录

点此获取更多相关资料 本文为霍格沃兹测试开发学社学员学习笔记分享 原文链接&#xff1a;https://ceshiren.com/t/topic/27105 记录关键数据的作用 内容作用日志1、记录代码执行情况&#xff0c;方便复现场景&#xff0c;也可以作为bug依据截图1、断言失败或成功的截图&#…

分类算法系列②:KNN算法

目录 KNN算法 1、简介 2、原理分析 数学原理 相关公式及其过程分析 距离度量 k值选择 分类决策规则 3、API 4、⭐案例实践 4.1、分析 4.2、代码 5、K-近邻算法总结 &#x1f343;作者介绍&#xff1a;准大三网络工程专业在读&#xff0c;努力学习Java&#xff0c;涉…

go语言基本操作---三

变量的内存和变量的地址 指针是一个代表着某个内存地址的值。这个内存地址往往是在内存中存储的另一个变量的值的起始位置。Go语言对指针的支持介于java语言和C/C语言之间&#xff0c;它即没有想Java语言那样取消了代码对指针的直接操作的能力&#xff0c;也避免了C/C语言中由…

MySQL - Left Join和Inner Join的效率对比,以及优化

最近在写代码的时候&#xff0c;遇到了需要多表连接的一个问题&#xff0c;初始sql类似于&#xff1a; select * from a left join b on a.id b.aid left join c on c.bid b.id left join d on d.cid c.id 这样的多个left join组合&#xff0c;总觉得这种写法是有问题…

华为Mate 60系列发售,北斗卫星通信技术进一步深入大众消费市场

近日&#xff0c;华为Mate 60系列手机在没有举办发布会的情况下在官方商城突然上架开售&#xff0c;人气火爆。 值得一提的是&#xff0c;华为Mate60 Pro支持卫星通话&#xff0c;无地面网络时&#xff0c;也能拨打和接听卫星电话&#xff0c;还可自由编辑卫星消息。华为 Mate6…

机器人中的数值优化(九)——拟牛顿方法(下)、BB方法

本系列文章主要是我在学习《数值优化》过程中的一些笔记和相关思考&#xff0c;主要的学习资料是深蓝学院的课程《机器人中的数值优化》和高立编著的《数值最优化方法》等&#xff0c;本系列文章篇数较多&#xff0c;不定期更新&#xff0c;上半部分介绍无约束优化&#xff0c;…

【Unity编辑器扩展】 | 编辑器扩展入门基础

前言 【Unity编辑器扩展】 | 编辑器扩展入门基础一、基本概念二、核心知识点 简述三、相关API 总结 前言 当谈到游戏开发工具&#xff0c;Unity编辑器是一个备受赞誉的平台。它为开发者提供了一个强大且灵活的环境&#xff0c;使他们能够创建令人惊叹的游戏和交互式体验。然而…

0829|C++day7 auto、lambda、C++数据类型转换、C++标准模板库(STL)、list、文件操作

一、思维导图 二、【试编程】将实例化类对象写入容器后&#xff0c;写入.txt文本中&#xff0c;再重新定义一个类容器&#xff0c;将.txt中的内容读取出来&#xff0c;输出到终端 封装一个学生的类&#xff0c;定义一个学生这样类的vector容器, 里面存放学生对象&#xff08;至…

字节前端实习的两道算法题,看看强度如何

最长严格递增子序列 题目描述 给你一个整数数组nums&#xff0c;找到其中最长严格递增子序列的长度。 子序列是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2,7…

Hadoop 集群一直处于安全模式,强制退出后出现数据丢失警告。解决方法

文章目录 安全模式相关命令分析集群为什么一直处于安全模式解决方法 安全模式相关命令 # 查看安全模式状态 hdfs dfsadmin -safemode get# 进入安全模式 hdfs dfsadmin -safemode enter# 离开安全模式 hdfs dfsadmin -safemode leave# 强制退出安全模式 hdfs dfsadmin -safemo…

【补充】助力工业物联网,工业大数据之AirFlow安装

【补充】助力工业物联网&#xff0c;工业大数据之AirFlow安装 直接在node1上安装 1、安装Python 安装依赖 yum -y install zlib zlib-devel bzip2 bzip2-devel ncurses ncurses-devel readline readline-devel openssl openssl-devel openssl-static xz lzma xz-devel sqlit…

iOS实时监控与报警器

在现代信息化社会中&#xff0c;即使我们不在电脑前面也能随时获取到最新的数据。而苹果公司提供的iOS推送通知功能为我们带来了一种全新的方式——通过手机接收实时监控和报警信息。 首先让我们了解一下iOS推送通知。它是一个强大且灵活可定制化程度高、适用于各类应用场景&a…

图片转pdf软件有哪些?这几款收藏下来

图片转pdf软件有哪些&#xff1f;图片转PDF的需求很常见。有时候我们需要将一些图片文件合并成一个PDF文件&#xff0c;方便浏览和共享。比如说&#xff0c;你可能需要将一份报告或者简历的图片转换成PDF文件&#xff0c;以便于分享给其他人。此外&#xff0c;将图片转换成PDF文…

elasticsearch的搜索补全提示

当用户在搜索框输入字符时&#xff0c;我们应该提示出与该字符有关的搜索项 拼音分词器 下载 要实现根据字母做补全&#xff0c;就必须对文档按照拼音分词&#xff0c;GitHub上有拼音分词插件 GitHub - medcl/elasticsearch-analysis-pinyin: This Pinyin Analysis plugin…