PyFlink使用教程,Flink,Python,Java

环境准备

环境要求

Java 11
Python 3.7, 3.8, 3.9 or 3.10

文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/

打开 Anaconda3 Prompt

> java -version
java version "11.0.22" 2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)> python --version
Python 3.8.8> conda env list
# conda environments:
#
base                  *  d:\ProgramData\Anaconda3
tensorflow2.4            d:\ProgramData\Anaconda3\envs\tensorflow2.4> conda create -n PyFlink-1.17.1 python==3.8.8> conda activate PyFlink-1.17.1> python -m pip install apache-flink==1.17.1> conda list
# packages in environment at d:\ProgramData\Anaconda3\envs\PyFlink-1.17.1:
#
# Name                    Version                   Build  Channel
apache-beam               2.43.0                   pypi_0    pypi
apache-flink              1.17.1                   pypi_0    pypi
apache-flink-libraries    1.17.1                   pypi_0    pypi
avro-python3              1.9.2.1                  pypi_0    pypi
ca-certificates           2023.12.12           haa95532_0    defaults
certifi                   2023.11.17               pypi_0    pypi
charset-normalizer        3.3.2                    pypi_0    pypi
cloudpickle               2.2.0                    pypi_0    pypi
crcmod                    1.7                      pypi_0    pypi
dill                      0.3.1.1                  pypi_0    pypi
docopt                    0.6.2                    pypi_0    pypi
fastavro                  1.4.7                    pypi_0    pypi
fasteners                 0.19                     pypi_0    pypi
grpcio                    1.60.0                   pypi_0    pypi
hdfs                      2.7.3                    pypi_0    pypi
httplib2                  0.20.4                   pypi_0    pypi
idna                      3.6                      pypi_0    pypi
numpy                     1.21.6                   pypi_0    pypi
objsize                   0.5.2                    pypi_0    pypi
openssl                   1.1.1w               h2bbff1b_0    defaults
orjson                    3.9.12                   pypi_0    pypi
pandas                    1.3.5                    pypi_0    pypi
pip                       23.3.1           py38haa95532_0    defaults
proto-plus                1.23.0                   pypi_0    pypi
protobuf                  3.20.3                   pypi_0    pypi
py4j                      0.10.9.7                 pypi_0    pypi
pyarrow                   8.0.0                    pypi_0    pypi
pydot                     1.4.2                    pypi_0    pypi
pymongo                   3.13.0                   pypi_0    pypi
pyparsing                 3.1.1                    pypi_0    pypi
python                    3.8.0                hff0d562_2    defaults
python-dateutil           2.8.2                    pypi_0    pypi
pytz                      2023.3.post1             pypi_0    pypi
regex                     2023.12.25               pypi_0    pypi
requests                  2.31.0                   pypi_0    pypi
setuptools                68.2.2           py38haa95532_0    defaults
six                       1.16.0                   pypi_0    pypi
sqlite                    3.41.2               h2bbff1b_0    defaults
typing-extensions         4.9.0                    pypi_0    pypi
urllib3                   2.1.0                    pypi_0    pypi
vc                        14.2                 h21ff451_1    defaults
vs2015_runtime            14.27.29016          h5e58377_2    defaults
wheel                     0.41.2           py38haa95532_0    defaults
zstandard                 0.22.0                   pypi_0    pypi

下载的包存储在Anaconda3\envs\PyFlink-1.17.1\Lib\site-packages

PyFlink 案例

从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试 PyFlink 作业了。

打开 VSCode 切换到 PyFlink-1.17.1 环境,按照 教程 写一个 Table API 的示例

learn_pyflink/tableAPIJob.py

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().set("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout,level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

要在PyFlink-1.17.1环境下运行

> python tableAPIJob.pyUsing Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
+I[To, 1]
+I[be,, 1]
+I[or, 1]
+I[not, 1]
+I[to, 1]
+I[be,--that, 1]
+I[is, 1]
+I[the, 1]
+I[question:--, 1]
+I[Whether, 1]
+I['tis, 1]
+I[nobler, 1]
+I[in, 1]
-U[the, 1]
+U[the, 2]
+I[mind, 1]
-U[to, 1]
+U[to, 2]
.
.
.

提交 PyFlink 作业到 Flink

参考:https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/cli/#submitting-pyflink-jobs

我的 Flink 是安装在 WSL 上面的,因此也要准备环境。

下载 java-11-linux:https://download.oracle.com/otn/java/jdk/11.0.22%2B9/8662aac2120442c2a89b1ee9c67d7069/jdk-11.0.22_linux-x64_bin.tar.gz

> tar -zxf jdk-11.0.22_linux-x64_bin.tar.gz -C /usr/lib/jdk# 生成 jre
> bin/jlink --module-path jmods --add-modules java.desktop --output jre> vi /etc/profileexport JAVA_HOME=/usr/lib/jdk/jdk-11.0.22
export JRE_HOME=${JAVA_HOME}/jre    
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib    
export PATH=${JAVA_HOME}/bin:$PATH> source /etc/profile > ls -l /usr/bin/python*lrwxrwxrwx 1 root root       9 Mar 26  2019 /usr/bin/python3 -> python3.7
lrwxrwxrwx 1 root root      16 Mar 26  2019 /usr/bin/python3-config -> python3.7-config
-rwxr-xr-x 1 root root    1018 Mar  4  2018 /usr/bin/python3-jsondiff
-rwxr-xr-x 1 root root    3661 Mar  4  2018 /usr/bin/python3-jsonpatch
-rwxr-xr-x 1 root root    1342 May  2  2016 /usr/bin/python3-jsonpointer
-rwxr-xr-x 1 root root     398 Nov 22  2018 /usr/bin/python3-jsonschema
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7
lrwxrwxrwx 1 root root      33 Apr  3  2019 /usr/bin/python3.7-config -> x86_64-linux-gnu-python3.7-config
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7m
lrwxrwxrwx 1 root root      34 Apr  3  2019 /usr/bin/python3.7m-config -> x86_64-linux-gnu-python3.7m-config
lrwxrwxrwx 1 root root      10 Mar 26  2019 /usr/bin/python3m -> python3.7m
lrwxrwxrwx 1 root root      17 Mar 26  2019 /usr/bin/python3m-config -> python3.7m-config> python --versionCommand 'python' not found, but can be installed with:apt install python3         # version 3.7.3-1, or
apt install python          # version 2.7.16-1
apt install python-minimal  # version 2.7.16-1You also have python3 installed, you can run 'python3' instead.> python3 --version
Python 3.7.3# 已经安装了 python-3.7.3,创建一个软连接即可
> ln -s /usr/bin/python3.7 /usr/local/bin/python> python --version
Python 3.7.3# 设置镜像源,否则会非常慢
> python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip
> pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple# 启动 Flink-1.17.1
> bin/start-cluster.sh

除此之外,在 WSL 上还需要安装有此python脚本依赖的库,也就是 apache-flink 库。因为 Flink 需要调用 python 命令来解析 pytion 脚本,这里面涉及到 python 和 java 之间的通讯。这一块还只是在 Flink 客户端上面(bin/flink run ...),而 Flink 的 TaskManager 在运行此任务的时候还需要调用 python 解释器,因为上面代码中有UDF函数,这个函数在Java中是不存在的,关于 Flink 支持 Python 任务的内部原理后面再写一篇。

> python -m pip install apache-flink==1.17.1
> pip list

然后将代码中的.wait()调用删掉

tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink')

提交任务

> ./bin/flink run --python /mnt/d/dev/php/magook/trunk/server/learn-python/learn_pyflink/tableAPIJob.pyExecuting word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID a18e581d16785a9872336073efdf5df0

来到 webUI 查看任务

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/249496.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自动驾驶:Apollo如何塑造人类的未来出行

前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家:https://www.captainbed.cn/z ChatGPT体验地址 文章目录 前言1. 什么是自定义指令?2. Apollo中的自定义指令2.1 查询中的自定…

7款免费的Midjourney平替平台

AI艺术生成器正在改变设计和内容的制作方式。像Midjourney这样的工具已经将困难的想法转化为令人惊叹的视觉效果,改变了创造力的运作方式。但是,AI艺术涵盖了许多风格和需求。这就是Midjourney替代方案变得重要的原因(特别是免费的替代方案&a…

2024年,AI 掀起数据与分析市场的新风暴

2024 年伊始,Kyligence 联合创始人兼 CEO 韩卿在其公司内部的飞书订阅号发表了多篇 Rethink Data & Analytics 的内部信,分享了对数据与分析行业的一些战略思考,尤其是 AI 带来的各种变化和革命,是如何深刻地影响这个行业乃至…

【极数系列】Flink环境搭建Linux版本 (03)

文章目录 引言01 Linux部署JDK11版本1.下载Linux版本的JDK112.创建目录3.上传并解压4.配置环境变量5.刷新环境变量6.检查jdk安装是否成功 02 Linux部署Flink1.18.0版本1.下载Flink1.18.0版本包2.上传压缩包到服务器3.修改flink-config.yaml配置4.启动服务5.浏览器访问6.停止服务…

python爬虫-多线程-数据库——WB用户

数据库database的包: Python操作Mysql数据库-CSDN博客 效果: 控制台输出: 数据库记录: 全部代码: import json import os import threading import tracebackimport requests import urllib.request from utils im…

【数据结构 08】红黑树

一、概述 红黑树,是一种二叉搜索树,每一个节点上有一个存储位表示节点的颜色,可以是Red或Black。 通过对任何一条从根到叶子的路径上各个节点着色方式的限制,红黑树确保没有一条路径会比其他路径长上两倍,因而是接进…

-1- Python环境安装

1、Python安装 1.1、Windows安装Python 进入python官网:Welcome to Python.org点击 download——>all releases;建议选择3.7.2版本(网页链接:Python Release Python 3.7.2 | Python.org);下拉&#xf…

Unity 自动轮播、滑动轮播

如图所示,可设置轮播间隔,可左右滑动进行轮播 1.在UGUI创建个Image,添加自动水平组件 2.添加并配置脚本 3.代码如下,都有注释 using UnityEngine; using UnityEngine.UI;public class IndicatorManager : MonoBehaviour {public …

【RTP】webrtc 学习2: webrtc对h264的rtp打包

切片只是拷贝帧的split的各个部分到新的rtp 包的封装中。并没有在rtp包本身标记是否为关键帧FU-A 切片 输入的H.264 数据进行split :SplitNalu SplitNalu : 按照最大1200字节进行切分 切分后会返回一个数组 对于FU-A :split的数据总大小是 去掉一个字节的nalu header size …

C语言指针学习(1)

前言 指针是C语言中一个重要概念,也是C语言的一个重要特色,正确而灵活地运用指针可以使程序简洁、紧凑、高效。每一个学习和使用C语言的人都应当深入的学习和掌握指针,也可以说不掌握指针就没有掌握C语言的精华。 一、什么是指针 想弄清楚什…

Apache Commons Collection3.2.1反序列化分析(CC1)

Commons Collections简介 Commons Collections是Apache软件基金会的一个开源项目,它提供了一组可复用的数据结构和算法的实现,旨在扩展和增强Java集合框架,以便更好地满足不同类型应用的需求。该项目包含了多种不同类型的集合类、迭代器、队…

Linux逻辑卷(LV)扩容

Linux逻辑卷(LV)扩容 1、准备物理磁盘(分区和不分区都行),可以使用lsblk命令查看新增的磁盘,如下图sde就是我们新增磁盘,容量为600G。 2、将新磁盘变成物理卷(PV) pvcr…

解释性人工智能(XAI)

引言 解释性人工智能(XAI)是指一类旨在使人能够理解和解释机器学习模型的方法和技术。XAI的目标是提高AI系统的透明度和可理解性,让人们能够理解机器学习模型的决策过程、推理方式和结果。这对于社会应用和用户信任非常重要,因为A…

【八大排序】直接插入排序 | 希尔排序 + 图文详解!!

📷 江池俊: 个人主页 🔥个人专栏: ✅数据结构冒险记 ✅C语言进阶之路 🌅 有航道的人,再渺小也不会迷途。 文章目录 一、排序的概念二、直接插入排序2.1 基本思想2.2 适用说明2.3 过程图示2.4 代码实现2.…

华为配置ARP安全综合功能实验

配置ARP安全综合功能示例 组网图形 图1 配置ARP安全功能组网图 ARP安全简介配置注意事项组网需求配置思路操作步骤配置文件 ARP安全简介 ARP(Address Resolution Protocol)安全是针对ARP攻击的一种安全特性,它通过一系列对ARP表项学习和A…

Docker 基础篇

目录 一、Docker 简介 1. Docker 2. Linux 容器 3. 传统虚拟机和容器的对比 4. Docker 的作用 5. Docker 的基本组成(Docker 三要素) 6. Docker 工作原理 7. Docker 架构 8. Docker 下载 二、Docker 安装 1. CentOS Docker 安装 2. CentOS8 …

beep蜂鸣器驱动实验-创建蜂鸣器的设备节点

一. 简介 前面我借助 pinctrl 和 gpio 子系统编写了 LED 灯驱动。 I.MX6U-ALPHA 开发板上还有一个蜂鸣器,从软件的角度考虑,蜂鸣器驱动和 LED 灯驱动其实是相同的,都是控制 IO 输出高低电平。接下来我们就来学习编写蜂鸣器的 Linux 驱动。…

5、应急响应-拒绝服务钓鱼识别DDOS压力测试邮件反制分析应用日志

目录 前言: 1、#内网应急-日志分析-爆破&横向&数据库 2、#红队APT-钓鱼邮件识别-内容&发信人&附件 3、#拒绝服务攻击-DDOS&CC-代理&防火墙防御 用途:个人学习笔记,欢迎指正! 前言: 了解和…

上海纽约大学信息技术部高级主任常潘:解密大数据引领的未来教育革命

大数据产业创新服务媒体 ——聚焦数据 改变商业 在数字化时代,大数据技术的应用已经深刻地改变着各行各业。特别是在教育领域,智慧校园建设作为现代化校园的代名词,正迎来大数据技术的巨大机遇。 1月17日,上海纽约大学信息技术部…

数据结构之红黑树

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 红黑树基础 前言一、什么是红黑树二、左旋和右旋实现三、插入的调整四、红黑树的删除1.引入库2.读入数据 总结 前言 提示:这里可以添加本文要记录的大概内容&…