pyflink 环境测试以及测试案例

1. py 的 环境以来采用Anaconda环境包

安装版本:https://www.anaconda.com/distribution/#download-section
Python3.8.8版本:Anaconda3-2021.05-Linux-x86_64.sh
下载地址
https://repo.anaconda.com/archive/

2. 安装

bash Anaconda3-2021.05-Linux-x86_64.sh

2.1 如图

在这里插入图片描述

在这里插入图片描述

3. 配置配置anaconda的环境变量:

vim /etc/profile
##增加如下配置
export ANACONDA_HOME=/root/anaconda3/bin
export PATH=$PATH:$ANACONDA_HOME/bin
重新加载环境变量: source /etc/profile

4. 修改bashrc文件

sudo vim ~/.bashrc
添加如下内容:
export PATH=~/anaconda3/bin:$PATH

说明:

profile
其实看名字就能了解大概了, profile 是某个用户唯一的用来设置环境变量的地方, 因为用户可以有多个 shell 比如 bash, sh, zsh 之类的, 但像环境变量这种其实只需要在统一的一个地方初始化就可以了, 而这就是 profile.
bashrc
bashrc 也是看名字就知道, 是专门用来给 bash 做初始化的比如用来初始化 bash 的设置, bash 的代码补全, bash 的别名, bash 的颜色. 以此类推也就还会有 shrc, zshrc 这样的文件存在了, 只是 bash 太常用了而已.

5. 启动anaconda并测试

注意: 请将当前连接node1的节点窗口关闭,然后重新打开,否则无法识别
如图:
在这里插入图片描述

如果没有可以重启服务器。

如果大家发现命令行最前面出现了 (base) 信息, 可以通过以下方式, 退出Base环境

vim ~/.bashrc
拉到文件的最后面: 输入 i 进入插入模式
将以下内容添加:
conda deactivate

6. Anaconda相关组件命令

地址:https://www.continuum.io/downloads

安装包:pip install xxx,conda install xxx
卸载包:pip uninstall xxx,conda uninstall xxx
升级包:pip install upgrade xxx,conda update xxx

6.1 功能:

Anaconda自带,无需单独安装
实时查看运行过程
基本的web编辑器(本地)
ipynb 文件分享
可交互式
记录历史运行结果
修改jupyter显示的文件路径:
通过jupyter notebook --generate-config命令创建配置文件,之后在进入用户文件夹下面查看.jupyter隐藏文件夹,修改其中文件jupyter_notebook_config.py的202行为计算机本地存在的路径。

IPython:

命令:ipython,其功能如下
1.Anaconda自带,无需单独安装
2.Python的交互式命令行 Shell
3.可交互式
4.记录历史运行结果
5.及时验证想法

7. Anaconda中的conda命令做详细介绍和配置。

** 7.1. conda命令及pip命令**

conda install  包名    pip install 包名
conda uninstall 包名   pip uninstall 包名
conda install -U 包名   pip install -U 包名

7.2 Anaconda设置为国内下载镜像

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --set show_channel_urls yes

7.3 conda创建虚拟环境

conda env list
conda create py_env python=3.8.8 #创建python3.8.8环境
activate py_env   #激活环境
deactivate py_env #退出环境

----------------------------------------------- Pyflink 环境安装-------------------------------------

8. pyflink 环境安装

激活虚拟环境

source ~/Documents/install/miniconda/bin/activate

创建 pyflink 虚拟环境

conda create --name py310_pyflink171_venv -y -q python=3.10.8
conda activate py310_pyflink171_venv
pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
pip install apache-flink==1.17.1 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517

或者 flink 14.5

# 创建 pyflink 虚拟环境
conda create --name py314_pyflink171_venv -y -q python=3.8.8
conda activate py314_pyflink171_venv
pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
pip install apache-flink==1.14.5 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517

9. 官网测试例子

地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/api/python/examples/table/word_count.html

vi word_count2.py

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import argparse
import logging
import sysfrom pyflink.table import TableEnvironment, EnvironmentSettings, TableDescriptor, Schema,\DataTypes, FormatDescriptor
from pyflink.table.expressions import col, lit
from pyflink.table.udf import udfwords = ["flink", "window", "timer", "event_time", "processing_time", "state","connector", "pyflink", "checkpoint", "watermark", "sideoutput", "sql","datastream", "broadcast", "asyncio", "catalog", "batch", "streaming"]max_word_id = len(words) - 1def streaming_word_count(output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# define the source# randomly select 5 words per second from a predefined listt_env.create_temporary_table('source',TableDescriptor.for_connector('datagen').schema(Schema.new_builder().column('word_id', DataTypes.INT()).build()).option('fields.word_id.kind', 'random').option('fields.word_id.min', '0').option('fields.word_id.max', str(max_word_id)).option('rows-per-second', '5').build())tab = t_env.from_path('source')# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udf(result_type=DataTypes.STRING())def id_to_word(word_id):return words[word_id]# compute word counttab.select(id_to_word(col('word_id'))).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)streaming_word_count(known_args.output)

10.执行命令

 python word_count2.py

11. 执行结果

在这里插入图片描述

12 实例2

12.1 安装mysql docker 安装
https://blog.csdn.net/wudonglianga/article/details/133927305
12.2 创建表语句

CREATE TABLE `bigdatauser` (`id` int(32) NOT NULL AUTO_INCREMENT,`name` varchar(64) DEFAULT NULL,`age` int(32) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4

12.3 python 脚本

from pyflink.table import EnvironmentSettings, TableEnvironmentprint('step 01')
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
print('step 02')table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-connector-jdbc_2.12-1.14.5.jar;file:/opt/flink/lib/mysql-connector-java-5.1.49.jar")
print('step 03')# 2. create source Table=
table_env.execute_sql("""CREATE TABLE products (id INT,name STRING,age INT,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'jdbc',  'url' = 'jdbc:mysql://192.168.43.185:3306/wudl','username'='root','password'='123456','table-name' = 'bigdatauser')
""")# 3. create sink Table
table_env.execute_sql("""CREATE TABLE dproducts (id int,name STRING,age int) WITH ('connector' = 'print')
""")print('step 04')
table_env.execute_sql("INSERT INTO dproducts SELECT  p.id, p.name, p.age FROM products AS p").wait()
print('step 06')

12.4 执行结果
在这里插入图片描述

13. 任务提交服务器运行

打包运行环境

# 找到 minconda(安装路径 envs目录下) 或者对应虚拟环境安装目录
# 打包 py310_pyflink171_venv 虚拟环境
cd ~/Documents/install/miniconda/env
zip -r py310_pyflink171_venv.zip py310_pyflink171_venv

** 1.提交至 jobmanager**

./flink run \
--jobmanager localhost:8081 \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-py /workplace/src/word_count.py

2. 带目录,指定入口模块提交

./flink run \
--jobmanager localhost:8081 \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyfs /workplace/src \
-pym word_count

3. 提交至 yarn 集群管理
提交运行
3.1.本地 py虚拟环境

./flink run -m yarn-cluster \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-py word_count.py

3.2. hdfs py虚拟环境

./flink run  -m yarn-cluster \
-pyarch hdfs://dae-ns/py_env/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv \
-py word_count.py

3.3.带目录 src

./bin/flink run-application -t yarn-application \
-Dyarn.application.name=wordcount \
-Dyarn.ship-files=/workplace/src \
-pyarch shipfiles/py310_pyflink171_venv.zip \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyfs src \
-pym word_count

注意:

虚拟环境打包,该虚拟环境创建方式建议使用 conda,或者virtualenv --always-copy 方式创建,这样打的虚拟环境更全
提交虚拟环境地址:py310_pyflink171_venv.zip/py310_pyflink171_venv 注意这个地址是双层

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/163172.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

送水订水商城小程序的作用是什么

无论瓶装水还是桶装水在市场中的需求度总是很高,相关送水公司或零售水企业也不少,消费者的购物方式一般是品牌直售或通过经销商,零售水则是超市/商场等场景。随着人们健康品质生活提升,家庭或办公等场所对桶装水或瓶装水的需求日益…

群硕与Microsoft Dynamics全球团队密切协作,加速ERP产品迭代

群硕具备强大的软件研发能力,搭建自动化测试平台,保证高质量交付。 ERP系统的引入被视为企业走向数字化转型的关键一步。 此系统有助于实现企业内部资源与外部资源的整合,通过软件把人、财、物、产、供、销及相应的物流、信息流、资金流、管…

找不到msvcr120.dll怎么办?msvcr120.dll丢失如何修复?

MSVCR120.dll是一个动态链接库文件,它是Microsoft Visual C 2012 Redistributable Package的一部分。这个文件包含了许多用于运行C应用程序的函数和类。当我们的计算机上缺少这个文件时,就会导致一些程序无法正常运行,甚至会出现系统崩溃的情…

基于当量因子法、InVEST、SolVES模型等多技术融合在生态系统服务功能社会价值评估中的应用及论文写作、拓展分析

查看原文>>>基于当量因子法、InVEST、SolVES模型等多技术融合在生态系统服务功能社会价值评估中的应用及论文写作、拓展分析 生态系统服务是人类从自然界中获得的直接或间接惠益,可分为供给服务、文化服务、调节服务和支持服务4类,对提升人类福…

黑豹程序员-架构师学习路线图-百科:Maven

文章目录 1、什么是maven官网下载地址 2、发展历史3、Maven的伟大发明 1、什么是maven Apache Maven is a software project management and comprehension tool. Based on the concept of a project object model (POM), Maven can manage a project’s build, reporting and…

“智能+”时代,深维智信如何借助阿里云打造AI内容生成系统

云布道师 前言: 随着数字经济的发展,线上数字化远程销售模式越来越成为一种主流,销售流程也演变为线上视频会议、线下拜访等多种方式的结合。根据 Gartner 报告,到 2025 年 60% 的 B2B 销售组织将从基于经验和直觉的销售转变为数…

window.location对象实例详解

一、前言 Window.location 只读属性返回一个 Location 对象,其中包含当前标签页文档的网页地址信息。 Window.location 是一个只读 Location 对象,但是我们仍然可以去重新赋值更改对象值。 下面就让我们详细介绍一下location的常用属性和方法&#xf…

【Machine Learning】01-Supervised learning

01-Supervised learning 1. 机器学习入门1.1 What is Machine Learning?1.2 Supervised learning1.3 Unsupervised learning 2. Supervised learning2.1 单元线性回归模型2.1.1 Linear Regression Model(线性回归模型)2.1.2 Cost Function(代…

asp.net酒店管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio

一、源码特点 asp.net酒店管理系统是一套完善的web设计管理系统,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为vs2010,数据库为sqlserver2008,使用c#语言开发 asp.net 酒店管理系统1 二、功能介绍 …

redhat配置本地yum源(超详细,超简单)

目录 ​编辑 1、硬件配置 2、配置本地yum源 1、硬件配置 注意这里要使用is

有什么小程序可以下载视频号的视频?

​最近有一些朋友问我,【视频号下载助手】和【视频下载bot】小程序,有什么作用? 首先视频号下载助手是协助用户进行下载的,但由于下载要符合平台规定,我们就将视频下载助手与视频下载bot小程序想结合的模式&#xff0…

93. 递归实现组合型枚举

题目: 93. 递归实现组合型枚举 - AcWing题库 思路: 1.从n个数中选择m个数,问有多少种选法。---->抽象为有m个坑位(设置kenway[N]表示),其中填入编号为1~n的萝卜,问有几种填法。这里我们可…

EthernetIP 转MODBUS RTU协议网关连接FANUC机器人作为EthernetIP通信从站

远创智控YC-EIPM-RTU网关产品是一款高效的数据采集工具,它可以通过各种数据接口与工业领域的仪表、PLC、计量设备等产品连接,实时采集这些设备中的运行数据、状态数据等信息。采集到的数据经过整合和运算等操作后,可以被传输到其他设备或者云…

你的DOT即将解锁,请注意以下事项

作者: David 还记得两年前Polkadot平行链卡槽拍卖质押吗? 参与平行链众贷,质押DOT两年,选择投票的项目方,获得相应token奖励。当年质押的DOT即将解锁,就在十月底,10月24日请注意。 第一批解锁…

【TES720D】青翼科技基于复旦微的FMQL20S400全国产化ARM核心模块

板卡概述 TES720D是一款基于上海复旦微电子FMQL20S400的全国产化核心模块。该核心模块将复旦微的FMQL20S400(兼容FMQL10S400)的最小系统集成在了一个50*70mm的核心板上,可以作为一个核心模块,进行功能性扩展,特别是用…

【数据结构】队列的实现与优化指南

一、前言 队列是一种重要的数据结构,它按照“先入先出”(FIFO)的原则管理数据。本文将介绍队列的概念、应用场景,以及如何使用数组实现普通队列和环形队列。 二、内容 2.1 概述 (1)什么是队列&#xff1…

《软件方法》2023版第1章(10)应用UML的建模工作流-大图

DDD领域驱动设计批评文集 做强化自测题获得“软件方法建模师”称号 《软件方法》各章合集 1.4 应用UML的建模工作流 1.4.1 概念 我用类图表示建模工作流相关概念如图1-16。 图1-16 建模工作流相关概念 图1-16左侧灰色部分定义了“游戏规则”,右侧则是在“游戏规…

30W网络对讲广播一体音柱

SV-7042T 30W网络对讲广播一体音柱 一、描述 SV-7042T是深圳锐科达电子有限公司的一款壁挂式网络有源音柱,具有10/100M以太网接口,可将网络音源通过自带的功放和喇叭输出播放,其采用防水设计,功率可以从20W到40W。SV-7042T作为网…

pycharm操作git

pycharm操作git 之前用命令做的所有操作,使用pychrm点点就可以完成 克隆代码 上方工具栏Git ⇢ \dashrightarrow ⇢ Clone ⇢ \dashrightarrow ⇢ 填写地址(http、ssh) 提交到暂存区,提交到版本库,推送到远程 直接…

淘宝拍立淘接口,按图搜索商品接口,图片识别接口,图片上传搜索接口,图片搜索API接口,以图搜货接口

淘宝拍立淘图片搜索接口可以通过上传或输入图片链接的方式,调用淘宝的图片搜索引擎,返回与该图片相关的所有淘宝商品。 使用该接口需要先申请淘宝开放平台的App Key和App Secret,获取相应的API访问权限。在调用接口时,需要传入商…