环境准备
环境要求
Java 11
Python 3.7, 3.8, 3.9 or 3.10
文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/
打开 Anaconda3 Prompt
> java -version
java version "11.0.22" 2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)> python --version
Python 3.8.8> conda env list
# conda environments:
#
base * d:\ProgramData\Anaconda3
tensorflow2.4 d:\ProgramData\Anaconda3\envs\tensorflow2.4> conda create -n PyFlink-1.17.1 python==3.8.8> conda activate PyFlink-1.17.1> python -m pip install apache-flink==1.17.1> conda list
# packages in environment at d:\ProgramData\Anaconda3\envs\PyFlink-1.17.1:
#
# Name Version Build Channel
apache-beam 2.43.0 pypi_0 pypi
apache-flink 1.17.1 pypi_0 pypi
apache-flink-libraries 1.17.1 pypi_0 pypi
avro-python3 1.9.2.1 pypi_0 pypi
ca-certificates 2023.12.12 haa95532_0 defaults
certifi 2023.11.17 pypi_0 pypi
charset-normalizer 3.3.2 pypi_0 pypi
cloudpickle 2.2.0 pypi_0 pypi
crcmod 1.7 pypi_0 pypi
dill 0.3.1.1 pypi_0 pypi
docopt 0.6.2 pypi_0 pypi
fastavro 1.4.7 pypi_0 pypi
fasteners 0.19 pypi_0 pypi
grpcio 1.60.0 pypi_0 pypi
hdfs 2.7.3 pypi_0 pypi
httplib2 0.20.4 pypi_0 pypi
idna 3.6 pypi_0 pypi
numpy 1.21.6 pypi_0 pypi
objsize 0.5.2 pypi_0 pypi
openssl 1.1.1w h2bbff1b_0 defaults
orjson 3.9.12 pypi_0 pypi
pandas 1.3.5 pypi_0 pypi
pip 23.3.1 py38haa95532_0 defaults
proto-plus 1.23.0 pypi_0 pypi
protobuf 3.20.3 pypi_0 pypi
py4j 0.10.9.7 pypi_0 pypi
pyarrow 8.0.0 pypi_0 pypi
pydot 1.4.2 pypi_0 pypi
pymongo 3.13.0 pypi_0 pypi
pyparsing 3.1.1 pypi_0 pypi
python 3.8.0 hff0d562_2 defaults
python-dateutil 2.8.2 pypi_0 pypi
pytz 2023.3.post1 pypi_0 pypi
regex 2023.12.25 pypi_0 pypi
requests 2.31.0 pypi_0 pypi
setuptools 68.2.2 py38haa95532_0 defaults
six 1.16.0 pypi_0 pypi
sqlite 3.41.2 h2bbff1b_0 defaults
typing-extensions 4.9.0 pypi_0 pypi
urllib3 2.1.0 pypi_0 pypi
vc 14.2 h21ff451_1 defaults
vs2015_runtime 14.27.29016 h5e58377_2 defaults
wheel 0.41.2 py38haa95532_0 defaults
zstandard 0.22.0 pypi_0 pypi
下载的包存储在Anaconda3\envs\PyFlink-1.17.1\Lib\site-packages
PyFlink 案例
从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试 PyFlink 作业了。
打开 VSCode 切换到 PyFlink-1.17.1 环境,按照 教程 写一个 Table API 的示例
learn_pyflink/tableAPIJob.py
import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().set("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout,level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)
要在PyFlink-1.17.1
环境下运行
> python tableAPIJob.pyUsing Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
+I[To, 1]
+I[be,, 1]
+I[or, 1]
+I[not, 1]
+I[to, 1]
+I[be,--that, 1]
+I[is, 1]
+I[the, 1]
+I[question:--, 1]
+I[Whether, 1]
+I['tis, 1]
+I[nobler, 1]
+I[in, 1]
-U[the, 1]
+U[the, 2]
+I[mind, 1]
-U[to, 1]
+U[to, 2]
.
.
.
提交 PyFlink 作业到 Flink
参考:https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/cli/#submitting-pyflink-jobs
我的 Flink 是安装在 WSL 上面的,因此也要准备环境。
下载 java-11-linux:https://download.oracle.com/otn/java/jdk/11.0.22%2B9/8662aac2120442c2a89b1ee9c67d7069/jdk-11.0.22_linux-x64_bin.tar.gz
> tar -zxf jdk-11.0.22_linux-x64_bin.tar.gz -C /usr/lib/jdk# 生成 jre
> bin/jlink --module-path jmods --add-modules java.desktop --output jre> vi /etc/profileexport JAVA_HOME=/usr/lib/jdk/jdk-11.0.22
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH> source /etc/profile > ls -l /usr/bin/python*lrwxrwxrwx 1 root root 9 Mar 26 2019 /usr/bin/python3 -> python3.7
lrwxrwxrwx 1 root root 16 Mar 26 2019 /usr/bin/python3-config -> python3.7-config
-rwxr-xr-x 1 root root 1018 Mar 4 2018 /usr/bin/python3-jsondiff
-rwxr-xr-x 1 root root 3661 Mar 4 2018 /usr/bin/python3-jsonpatch
-rwxr-xr-x 1 root root 1342 May 2 2016 /usr/bin/python3-jsonpointer
-rwxr-xr-x 1 root root 398 Nov 22 2018 /usr/bin/python3-jsonschema
-rwxr-xr-x 2 root root 4877888 Apr 3 2019 /usr/bin/python3.7
lrwxrwxrwx 1 root root 33 Apr 3 2019 /usr/bin/python3.7-config -> x86_64-linux-gnu-python3.7-config
-rwxr-xr-x 2 root root 4877888 Apr 3 2019 /usr/bin/python3.7m
lrwxrwxrwx 1 root root 34 Apr 3 2019 /usr/bin/python3.7m-config -> x86_64-linux-gnu-python3.7m-config
lrwxrwxrwx 1 root root 10 Mar 26 2019 /usr/bin/python3m -> python3.7m
lrwxrwxrwx 1 root root 17 Mar 26 2019 /usr/bin/python3m-config -> python3.7m-config> python --versionCommand 'python' not found, but can be installed with:apt install python3 # version 3.7.3-1, or
apt install python # version 2.7.16-1
apt install python-minimal # version 2.7.16-1You also have python3 installed, you can run 'python3' instead.> python3 --version
Python 3.7.3# 已经安装了 python-3.7.3,创建一个软连接即可
> ln -s /usr/bin/python3.7 /usr/local/bin/python> python --version
Python 3.7.3# 设置镜像源,否则会非常慢
> python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip
> pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple# 启动 Flink-1.17.1
> bin/start-cluster.sh
除此之外,在 WSL 上还需要安装有此python脚本依赖的库,也就是 apache-flink
库。因为 Flink 需要调用 python 命令来解析 pytion 脚本,这里面涉及到 python 和 java 之间的通讯。这一块还只是在 Flink 客户端上面(bin/flink run ...
),而 Flink 的 TaskManager 在运行此任务的时候还需要调用 python 解释器,因为上面代码中有UDF
函数,这个函数在Java中是不存在的,关于 Flink 支持 Python 任务的内部原理后面再写一篇。
> python -m pip install apache-flink==1.17.1
> pip list
然后将代码中的.wait()
调用删掉
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink')
提交任务
> ./bin/flink run --python /mnt/d/dev/php/magook/trunk/server/learn-python/learn_pyflink/tableAPIJob.pyExecuting word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID a18e581d16785a9872336073efdf5df0
来到 webUI 查看任务