16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例:Elasticsearch
    • 1、maven依赖(java编码依赖)
    • 2、创建 Elasticsearch 表并写入数据
    • 3、连接器参数
    • 4、特性
      • 1)、Key 处理
      • 2)、动态索引
    • 5、数据类型映射
  • 二、Flink SQL示例:将kafka数据写入es
    • 1、依赖环境
    • 2、创建表并提交任务
    • 3、验证
      • 1)、创建es表
      • 2)、创建kafka表
      • 3)、提交任务
      • 4)、创建kafkatopic
      • 5)、往kafka topic中写入数据
      • 6)、查看es中的数据


本文介绍了Elasticsearch连接器的使用,并以2个示例完成了外部系统是Elasticsearch的介绍,即使用datagen作为数据源写入Elasticsearch和kafka作为数据源写入Elasticsearch中。
本文依赖环境是Flink、kafka、Elasticsearch、hadoop环境好用,如果是ha环境则需要zookeeper的环境。
本文分为2个部分,即Elasticsearch的基本介绍及示例和Elasticsearch与kafka的使用示例。

一、Table & SQL Connectors 示例:Elasticsearch

Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中(不支持读取,截至1.17版本)。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。

连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。

如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。

1、maven依赖(java编码依赖)

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.17</version>
</dependency>

2、创建 Elasticsearch 表并写入数据

本示例的Elasticsearch是7.6,故需要Elasticsearch7的jar文件

flink-sql-connector-elasticsearch7_2.11-1.13.6.jar

CREATE TABLE source_table (userId INT,age INT,balance DOUBLE,userName STRING,t_insert_time AS localtimestamp,WATERMARK FOR t_insert_time AS t_insert_time
) WITH ('connector' = 'datagen','rows-per-second'='5','fields.userId.kind'='sequence','fields.userId.start'='1','fields.userId.end'='5000','fields.balance.kind'='random','fields.balance.min'='1','fields.balance.max'='100','fields.age.min'='1','fields.age.max'='1000','fields.userName.length'='10'
);CREATE TABLE alan_flink_es_user_idx (userId INT,age INT,balance DOUBLE,userName STRING,t_insert_time AS localtimestamp,PRIMARY KEY (userId) NOT ENFORCED
) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://192.168.10.41:9200,http://192.168.10.42:9200,http://192.168.10.43:9200','index' = 'alan_flink_es_user_idx'
);INSERT INTO alan_flink_es_user_idx
SELECT userId,   age, balance , userName FROM source_table;---------------------具体操作如下-----------------------------------
Flink SQL> CREATE TABLE source_table (
>  userId INT,
>  age INT,
>  balance DOUBLE,
>  userName STRING,
>  t_insert_time AS localtimestamp,
>  WATERMARK FOR t_insert_time AS t_insert_time
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='5',
>  'fields.userId.kind'='sequence',
>  'fields.userId.start'='1',
>  'fields.userId.end'='5000',
> 
>  'fields.balance.kind'='random',
>  'fields.balance.min'='1',
>  'fields.balance.max'='100',
> 
>  'fields.age.min'='1',
>  'fields.age.max'='1000',
> 
>  'fields.userName.length'='10'
> );
[INFO] Execute statement succeed.Flink SQL> 
> 
> CREATE TABLE alan_flink_es_user_idx (
>  userId INT,
>  age INT,
>  balance DOUBLE,
>  userName STRING,
>  t_insert_time AS localtimestamp,
>  PRIMARY KEY (userId) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'http://192.168.10.41:9200,http://192.168.10.42:9200,http://192.168.10.43:9200',
>   'index' = 'alan_flink_es_user_idx'
> );
[INFO] Execute statement succeed.Flink SQL> 
> 
> INSERT INTO alan_flink_es_user_idx
> SELECT userId,   age, balance , userName FROM source_table;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1163eb7a404c2678322adaa89409bcda
-----由于es的表不支持source,故不能查询,查询会报如下错误----
Flink SQL> select * from alan_flink_es_user_idx;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'elasticsearch-7' can only be used as a sink. It cannot be used as a source.

Elasticsearch结果如下图
在这里插入图片描述

3、连接器参数

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4、特性

1)、Key 处理

Elasticsearch sink 可以根据是否定义了一个主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 消息的查询。

在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。 Elasticsearch 连接器通过使用 document-id.key-delimiter 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。 某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES,ROW,ARRAY,MAP 等。 如果未指定主键,Elasticsearch 将自动生成文档 id。

有关 PRIMARY KEY 语法的更多详细信息,请参见 22、Flink 的table api与sql之创建表的DDL。

2)、动态索引

Elasticsearch sink 同时支持静态索引和动态索引。

如果你想使用静态索引,则 index 选项值应为纯字符串,例如 ‘myusers’,所有记录都将被写入到 “myusers” 索引中。

如果你想使用动态索引,你可以使用 {field_name} 来引用记录中的字段值来动态生成目标索引。 你也可以使用 ‘{field_name|date_format_string}’ 将 TIMESTAMP/DATE/TIME 类型的字段值转换为 date_format_string 指定的格式。 date_format_string 与 Java 的 DateTimeFormatter 兼容。 例如,如果选项值设置为 ‘myusers-{log_ts|yyyy-MM-dd}’,则 log_ts 字段值为 2020-03-27 12:25:55 的记录将被写入到 “myusers-2020-03-27” 索引中。

你也可以使用 ‘{now()|date_format_string}’ 将当前的系统时间转换为 date_format_string 指定的格式。now() 对应的时间类型是 TIMESTAMP_WITH_LTZ 。 在将系统时间格式化为字符串时会使用 session 中通过 table.local-time-zone 中配置的时区。 使用 NOW(), now(), CURRENT_TIMESTAMP, current_timestamp 均可以。

使用当前系统时间生成的动态索引时, 对于 changelog 的流,无法保证同一主键对应的记录能产生相同的索引名, 因此使用基于系统时间的动态索引,只能支持 append only 的流。

5、数据类型映射

Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。 Flink 为 Elasticsearch 连接器使用内置的 ‘json’ 格式。更多类型映射的详细信息,请参阅 35、Flink 的JSON Format。

二、Flink SQL示例:将kafka数据写入es

本示例是将kafka的数据通过Flink 的任务写入es中。

1、依赖环境

需要增加kafka和es相关的jar包,本示例用到如下:

flink-sql-connector-elasticsearch7_2.11-1.13.6.jar
flink-sql-connector-kafka_2.11-1.13.5.jar

2、创建表并提交任务

在flink sql中运行

CREATE TABLE alan_flink_es_kafka_user_idx (userId INT,age INT,balance DOUBLE,userName STRING,t_insert_time AS localtimestamp,PRIMARY KEY (userId) NOT ENFORCED
) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://192.168.10.41:9200','index' = 'alan_flink_es_kafka_user_idx_test'
);CREATE TABLE alanchan_kafka_table (`id` INT,name STRING,age INT,balance DOUBLE,ts BIGINT, -- 以毫秒为单位的时间t_insert_time AS TO_TIMESTAMP_LTZ(ts,3),WATERMARK FOR t_insert_time AS t_insert_time - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义 watermark
) WITH ('connector' = 'kafka','topic' = 't_kafkasource2','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092','format' = 'csv'
);INSERT INTO alan_flink_es_kafka_user_idx
SELECT id,   age, balance , name FROM alanchan_kafka_table;

3、验证

本示例没有特别说明,则是在flink sql cli中操作,kafka则是kafka的运行环境命令。

1)、创建es表

Flink SQL> CREATE TABLE alan_flink_es_kafka_user_idx (
>  userId INT,
>  age INT,
>  balance DOUBLE,
>  userName STRING,
>  t_insert_time AS localtimestamp,
>  PRIMARY KEY (userId) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'http://192.168.10.41:9200',
>   'index' = 'alan_flink_es_kafka_user_idx_test'
> );
[INFO] Execute statement succeed.

2)、创建kafka表

Flink SQL> CREATE TABLE alanchan_kafka_table (
>     `id` INT,
>     name STRING,
>     age INT,
>     balance DOUBLE,
>     ts BIGINT, -- 以毫秒为单位的时间
>     t_insert_time AS TO_TIMESTAMP_LTZ(ts,3),
>     WATERMARK FOR t_insert_time AS t_insert_time - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义 watermark
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 't_kafkasource2',
>     'scan.startup.mode' = 'earliest-offset',
>     'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>     'format' = 'csv'
> );
[INFO] Execute statement succeed.

3)、提交任务

Flink SQL> INSERT INTO alan_flink_es_kafka_user_idx
> SELECT id,   age, balance , name FROM alanchan_kafka_table;
........
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: dc19c9b904f69985d40eca372af9553a

4)、创建kafkatopic

[alanchan@server3 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafkasource2 --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic t_kafkasource2.>

5)、往kafka topic中写入数据

[alanchan@server3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource2
>1,alan,15,100,1692593500222
>2,alanchan,20,200,1692593501230
>3,alanchanchn,25,300,1692593502242
>4,alan_chan,30,400,1692593503256
>5,alan_chan_chn,500,45,1692593504270
>

6)、查看es中的数据

在这里插入图片描述
以上,完成了外部系统是Elasticsearch的介绍,使用了2个示例,即使用datagen作为数据源写入Elasticsearch和kafka作为数据源写入Elasticsearch中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/105732.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

cortex-A7核PWM实验--STM32MP157

实验目的&#xff1a;驱动风扇&#xff0c;蜂鸣器&#xff0c;马达进行工作 目录 一&#xff0c;PWM相关概念 有源蜂鸣器和无源蜂鸣器 二&#xff0c;分析电路图&#xff0c;框图 三&#xff0c;分析RCC章节 1&#xff0c;确定总线连接 2&#xff0c;根据总线内容确定基…

Python案例|Matplotlib库实现的数据分析

数据展示是数据分析和挖掘中的重要环节&#xff0c;通过图形的形式可以直观、清晰地呈现数据内在的规律。 本文所用数据采用上一篇案例实现后的数据表&#xff0c;数据存储在newbj_lianJia.csv文件中&#xff0c;具体代码如下。 import pandas as pd #导入库 import matplot…

Linux安装Redis数据库,无需公网IP实现远程连接

文章目录 1. Linux(centos8)安装redis数据库2. 配置redis数据库3. 内网穿透3.1 安装cpolar内网穿透3.2 创建隧道映射本地端口 4. 配置固定TCP端口地址4.1 保留一个固定tcp地址4.2 配置固定TCP地址4.3 使用固定的tcp地址连接 Redis作为一款高速缓存的key value键值对的数据库,在…

AI 时代,程序员无需焦虑 | 《服务端开发:技术、方法与实用解决方案》(文末送书福利4.0)

文章目录 &#x1f4cb;前言&#x1f3af;程序员会被 AI 取代么&#xff1f;&#x1f3af;服务端开发尚难被 AI 取代&#x1f3af; 服务端开发何去何从&#xff1f;&#x1f3af;业界首部体系化、全景式解读服务端开发的著作&#x1f4ac;读者对象&#x1f4da;本书优势&#x…

Vue3学习

Proxy和definedProperty对比 Proxy 作为新标准将受到浏览器厂商重点持续的性能优化。Proxy 不兼容IE&#xff0c;也没有 polyfill, defineProperty 能支持到IE9。Proxy 能观察的类型比 defineProperty 更丰富。Object.definedProperty 是劫持对象的属性&#xff0c;新增元素需…

FPGA应用于图像处理

FPGA应用于图像处理 FPGA&#xff08;Field-Programmable Gate Array&#xff09;直译过来就是现场可编程门阵列。是一种可以编程的逻辑器件&#xff0c;具有高度的灵活性&#xff0c;可以根据具体需求就像编程来实现不同的功能。 FPGA器件属于专用的集成电流中的一种半定制电…

适配小程序隐私保护指引设置

由于小程序发布了一个公告&#xff0c;那么接下来就是怎么改简单的问题了。毕竟不太想大的改动历史上的代码。尽量简单的适配隐私策略就可以了。 整体思路也是参考现在App普遍的启动就让用户同意隐私策略&#xff0c;不同意不让用&#xff0c;同意了之后才能够继续使用。 公告…

SpringBoot入门篇1 - 简介和工程创建

目录 SpringBoot是由Pivotal团队提供的全新框架&#xff0c; 其设计目的是用来简化Spring应用的初始搭建以及开发过程。 1.创建入门工程案例 ①创建新模块&#xff0c;选择Spring初始化&#xff0c;并配置模块相关基础信息 ②开发控制器类 controller/BookController.jav…

第P2周:彩色图片识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 我的环境&#xff1a; 语言环境&#xff1a;Python3.10.7编译器&#xff1a;VScode深度学习环境&#xff1a;TensorFlow 2.13.0 一、前期工作&#xff1a; …

2023京东酒类市场数据分析(京东数据开放平台)

根据鲸参谋平台的数据统计&#xff0c;今年7月份京东平台酒类环比集体下滑&#xff0c;接下来我们一起来看白酒、啤酒、葡萄酒的详情数据。 首先来看白酒市场。 鲸参谋数据显示&#xff0c;7月份京东平台白酒的销量为210万&#xff0c;环比下滑约49%&#xff1b;销售额将近19…

前端工程化之规范化

规范化是我们践行前端工程化中重要的一部分。 为什么要有规范化标准 俗话说&#xff0c;无规矩不成方圆&#xff0c;尤其是在开发行业中&#xff0c;更是要有严谨的工作态度&#xff0c;我们都知道大多数软件开发都不是一个人的工作&#xff0c;都是需要多人协同的&#xff0…

2023MySQL+MyBatis知识点整理

文章目录 主键 外键 的区别&#xff1f;什么是范式&#xff1f;什么是反范式&#xff1f;什么是事务&#xff1f;MySQL事务隔离级别&#xff1f;MySQL事务默认提交模式&#xff1f;MySQL中int(1)和int(10)的区别MySQL 浮点数会丢失精度吗&#xff1f;MySQL支持哪几种时间类型&a…

矢量调制分析基础

前言 本文介绍VSA 的矢量调制分析和数字调制分析测量能力。某些扫频调谐频谱分析仪也能通过使用另外的数字无线专用软件来提供数字调制分析。然而&#xff0c;VSA 通常在调制格式和解调算法配置等方面提供更大的测量灵活性&#xff0c;并提供更多的数据结果和轨迹轨迹显示。本…

nginx(七十八)nginx配置http2

一 ngx_http_v2模块 1、本文不讲解HTTP2的知识2、只讲解nginx中如何配置HTTP2 ① 前置条件 1、openssl的版本必须在1.0.2e及以上2、开启https加密,目前http2.0只支持开启了https的网站编译选项&#xff1a;--with-http_ssl_module --with-http_v2_module 特点&#xff1a…

W6100-EVB-PICO进行UDP组播数据回环测试(九)

前言 上一章我们用我们的开发板作为UDP客户端连接服务器进行数据回环测试&#xff0c;那么本章我们进行UDP组播数据回环测试。 什么是UDP组播&#xff1f; 组播是主机间一对多的通讯模式&#xff0c; 组播是一种允许一个或多个组播源发送同一报文到多个接收者的技术。组播源将…

【文心一言】如何申请获得体验资格,并简单使用它的强大功能

目录 一、文心一言1.1、它能做什么1.2、技术特点1.3、申请方法 二、功能体验2.1、文心一言2.2、写冒泡排序代码 测试代码2.3、画一个爱心2.4、画一个星空 三、申请和通过3.1、申请时间3.2、通过时间 文心一言&#xff0c;国内首个大型人工智能对话模型&#xff0c;发布已经快一…

AndroidStudio升级后总是Read Time Out的解决办法

AndroidStudio升级后在gradle的时候总是Time out&#xff0c;遇到过多次&#xff0c;总结一下解决办法 1、gradle下载超时 在工程目录../gradle/wrapper/gradle-wrapper.properties中找到gradle版本的下载链接&#xff0c;如下图&#xff1a; 将其复制到迅雷里下载&#xff0…

让eslint的错误信息显示在项目界面上

1.需求描述 效果如下 让eslint中的错误&#xff0c;显示在项目界面上 2.问题解决 1.安装 vite-plugin-eslint 插件 npm install vite-plugin-eslint --save-dev2.配置插件 // vite.config.js import { defineConfig } from vite import vue from vitejs/plugin-vue import e…

MySQL高级篇——MySQL架构篇1(Linux下MySQL8的安装与使用)

目录 0 安装前0.1 Linux系统及工具的准备0.2 查看是否安装过MySQL0.3 MySQL的卸载 1 MySQL8的Linux版安装1.1 MySQL的4大版本1.2 下载MySQL指定版本1.3 CentOS7下检查MySQL依赖1.4 CentOS7下MySQL安装过程 2 MySQL登录2.1 首次登录2.2 修改密码2.3 设置远程登录 3 MySQL 8 的密…

【Linux】动态库和静态库

动态库和静态库 软链接硬链接硬链接要注意 自定义实现一个静态库(.a)解决、使用方法静态库的内部加载过程 自定义实现一个动态库&#xff08;.so&#xff09;动态库加载过程 静态库和动态库的特点 软链接 命令:ln -s 源文件名 目标文件名 软链接是独立连接文件的&#xff0c;他…