大数据Flink(一百一十四):PyFlink的作业开发入门案例

文章目录

PyFlink的作业开发入门案例

一、批处理的入门案例

1、示例

2、​​​​​​​​​​​​​​开发步骤

3、参考代码:基于DataStreamAPI编程 

二、​​​​​​​​​​​​​​流处理的入门案例

1、​​​​​​​​​​​​​​示例

2、​​​​​​​​​​​​​​开发步骤

3、​​​​​​​​​​​​​​参考代码:基于DataStreamAPI编程


PyFlink的作业开发入门案例

一、​​​​​​​批处理的入门案例

1、​​​​​​​示例

编写Flink程序,读取表中的数据,并根据表中的字段信息进行统计每个单词出现的数量。

2、​​​​​​​​​​​​​​开发步骤

  1. 创建批式处理的运行环境
  2. 构建数据源
  3. 对数据进行处理
  4. 对处理后的结果输出打印
  5. 启动执行
  6. 准备测试数据

3、参考代码:基于DataStreamAPI编程 

在pyflink_study项目下新建data文件夹,在data下新建input文件夹,在input下新建wordcount.txt文件,内容如下:

Total,time,BUILD,SUCCESS
Final,Memory,Finished,at
Total,time,BUILD,SUCCESS
Final,Memory,Finished,at
Total,time,BUILD,SUCCESS
Final,Memory,Finished,at
BUILD,SUCCESS
BUILD,SUCCESS
BUILD,SUCCESS
BUILD,SUCCESS
BUILD,SUCCESS
BUILD,SUCCESS

 在pyflink_study项目下新建Easy_Case_Batch_DataStream.py文件,代码如下:

from pyflink.common import Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSink, OutputFileConfig,RollingPolicy)def word_count():# 1. 创建流式处理环境env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)# 3. 将source添加到环境中,环境会生成一个datastream,也就是我们进行操作的数据类ds = env.read_text_file("D:\work_space\pyflink_study\data\input\wordcount.txt")# 4. transformdef split(line):yield from line.split(",")# compute word countds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# 5. sinkds.print()# 6. 真正执行代码env.execute("word_count")if __name__ == '__main__':word_count()

注意read_text_file后的地址要与实际地址对应。

运行结果如下:

二、​​​​​​​​​​​​​​流处理的入门案例

1、​​​​​​​​​​​​​​示例

编写Flink程序,接收socket的单词数据,并以逗号进行单词拆分打印。

2、​​​​​​​​​​​​​​开发步骤

  1. 获取流处理运行环境
  2. 构建socket流数据源,并指定IP地址和端口号
  3. 对接收到的数据进行拆分
  4. 对拆分后的单词,每个单词记一次数
  5. 对拆分后的单词进行分组
  6. 根据单词的次数进行聚合
  7. 打印输出
  8. 在云服务器中,使用nc -lk 端口号 监听端口
  9. 启动执行
  10. 向监听端口发送单词

3、​​​​​​​​​​​​​​参考代码:基于DataStreamAPI编程

新建Easy_Case_Stream_DataStream.py文件,代码如下:

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, DataStream, RuntimeExecutionModeif __name__ == '__main__':# 1. 创建流式处理环境env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)# 3. 创建sourceds = DataStream(env._j_stream_execution_environment.socketTextStream('8.140.192.198', 9999))# 4. transformdef split(line):yield from line.split(",")# compute word countds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# 5. sinkds.print()# 6. 真正执行代码env.execute("word_count")

注意:socketTextStream后的ip是云服务器ecs的公网ip。

先进入云服务器ECS开启netcat,监听9999端口号。(如果没有安装可以使用yum安装nc: yum install -y nc)

nc -lk 9999

 然后运行本地程序

 在ecs依次发送单词

flink,spark,hadoop
flink,flink,spark
flink,hadoop

 查看结果


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/420903.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【树和二叉树的相关定义】概念

1.回顾与概览 2.什么是树型结构 3.树的(递归)定义与基本术语 3.1树的定义 注意:除了根结点以外,任何一个结点都有且仅有一个前驱 3.2树的其他表示方式 3.3树的基本术语 结点:数据元素以及指向子树的分支根结点:非空…

人员随机分组

如何实现男女比例平均分组? 在团队活动中,合理地将人员分组是一项重要的组织工作,它有助于提高团队合作的效率和质量。云分组小程序提供了一个便捷的解决方案,通过智能算法帮助用户快速实现人员分组。本文将详细介绍如何使用云分组…

考试:软件工程(01)

软件开发生命周期 ◆软件定义时期:包括可行性研究和详细需求分析过程,任务是确定软件开发工程必须完成的总目标, 具体可分成问题定义、可行性研究、需求分析等。 ◆软件开发时期:就是软件的设计与实现,可分成概要设计…

【逐行注释】自适应Q的AUKF|MATLAB代码(附下载链接)

文章目录 逐行注释的说明运行结果自适应UKF介绍实现过程 部分代码各模块解释 逐行注释的说明 每一行都标有中文注释: 是我自己一个字一个字打的,如果有错别字等问题,欢迎指正。 运行结果 三轴的估计值、真值、滤波前的值对比&#xff1a…

【教师节视频制作】飞机降落飞机机身AE模板修改文字软件生成器教程特效素材【AE模板】

教师节祝福视频制作教程飞机降落飞机机身AE模板修改文字特效广告生成神器素材祝福玩法AE模板工程 怎么如何做的【教师节视频制作】飞机降落飞机机身AE模板修改文字软件生成器教程特效素材【AE模板】 生日视频制作步骤: 下载AE模板 安装AE软件 把AE模板导入AE软件 …

客服宝:专业跨平台快捷回复软件

在这个信息爆炸的时代,客服工作的重要性不言而喻。然而,面对多渠道、高频率的咨询与互动,客服团队如何保持高效、专业且富有人情味的对话呢?客服宝——一款专业的跨平台快捷回复软件,以其独特的功能优势,为…

手机投屏到电脑怎么弄?

远程看看是一款免费的远程控制软件,它支持Windows、iOS和Android等多个系统,并且提供了文件传输、手机投屏、在线聊天等多种功能。我们可以使用远程看看软件进行手机投屏,从而帮助您的家人或朋友解决相应的手机问题。 1. 首先,将…

自闭症儿童特殊学校:为孩子的成长保驾护航

在自闭症儿童成长的道路上,每一步都充满了挑战与未知。为了给予这些特殊孩子最坚实的支持与最温暖的陪伴,自闭症儿童特殊学校应运而生,它们如同一座座灯塔,照亮了孩子们前行的方向。其中,星贝育园自闭症儿童寄宿制学校…

如何在SQL Server中恢复多个数据库?

一次性恢复多个 SQL数据库吗可以吗? "是的,可以一次性恢复多个 SQL 数据库。通常情况下,只要备份文件的名称与相应的数据库匹配,且没有附加的日期或时间信息,就可以通过有效的 T-SQL 脚本来完成恢复。如果你希望…

如何将写好的Java代码打成jar包放在hadoops上运行

1、打包java文件 2、jar包上传,hadoop执行 我们将打好的jar包上传到Linux,因为hadoop是安装在Linux上的,然后用hadoop执行,执行前要确保已经在Linux上配置了hadoop的环境变量,不然就要到hadoop的目录下执行该命令 执…

Java入门:08.Java中的static关键字

1 static关键字 可以修饰属性变量,方法和代码段 static修饰的属性称为静态属性或类属性, 在类加载时就在方法区为属性开辟存储空间,无论创建多少个对象,静态属性在内存中只有一份。 可以使用 类名.静态属性 的方式引用 static修饰…

Java Kafka生产者实现

💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storm…

MIT6.824 课程-MapReduce

MapReduce:在大型集群上简化数据处理 概要 MapReduce是一种编程模型,它是一种用于处理和生成大型数据集的实现。用户通过指定一个用来处理键值对(Key/Value)的map函数来生成一个中间键值对集合。然后,再指定一个reduce函数, 它用…

Linux环境基础开发工具使用(1)

个人主页:C忠实粉丝 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C忠实粉丝 原创 Linux环境基础开发工具使用(1) 收录于专栏[Linux学习] 本专栏旨在分享学习Linux的一点学习笔记,欢迎大家在评论区交流讨论💌 目录 Linux…

IP地址是怎么实现HTTPS访问的?

首先,需要明确的是,IP地址(Internet Protocol Address)是互联网上设备(如服务器、路由器等)的唯一标识符,它允许数据包在网络中正确地路由和传输。然而,IP地址本身并不直接支持HTTPS…

2024年T电梯修理证模拟考试题库及T电梯修理理论考试试题

题库来源:安全生产模拟考试一点通公众号小程序 2024年T电梯修理证模拟考试题库及T电梯修理理论考试试题是由安全生产模拟考试一点通提供,T电梯修理证模拟考试题库是根据T电梯修理最新版教材,T电梯修理大纲整理而成(含2024年T电梯…

Mysql基础练习题 1729.求关注者的数量 (力扣)

编写解决方案,对于每一个用户,返回该用户的关注者数量。 #按 user_id 的顺序返回结果表 题目链接: https://leetcode.cn/problems/find-followers-count/description/ 建表插入语句: Create table If Not Exists Followers(us…

【LabVIEW学习篇 - 22】:ActiveX

文章目录 ActiveXActiveX打开Windows颜色选择对话框ActiveX将浏览器嵌入到前面板 ActiveX ActiveX是微软推出的一个开放的技术集的统称,它是很早之前出现的OLE(object linking and Embedding)技术的扩展,它是基于COM(Component Object Model)技术而建立…

基于GPT3打造你的专属的个人知识库

DocsGPT是一个基于GPT3的知识库平台,其支持训练、本地部署,并支持结果导出 https://github.com/arc53/DocsGPT DocsGPT本地部署 前置依赖: pippython3.8版本以上(python3.7不支持langchain 0.0.100以上版本)如使用ma…

uniapp去除顶部标题栏

相信很多同学和我一样,刚学uniapp的时候想去除自带的这个标题栏不知道如何去除🤪 其实很简单,只需两个步骤即可彻底除掉,首先找到项目文件夹下的pages.json路由文件点开,在这个文件里可以看到你创建的所有页面&#x…