0基础学习PyFlink——使用DataStream进行字数统计

大纲

  • source
  • Map
    • Splitting
    • Mapping
  • Reduce
    • Keying
    • Reducing
  • 完整代码
  • 结构
  • 参考资料

在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。
在这里插入图片描述
本节介绍的DataStream API,则使用了类似的结构。

source

为了方便,我们依然使用from_collection从内存中读取数据。
和使用Table API类似,我们给from_collection传递的第二参数是每行数据类型。本例中是String,即“A C B”的类型。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)

可以使用下面指令输出source内容

    source.print()
A C B
A E B
E C D

Map

和上图一样,Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元,和生成map结构。

Splitting

    def split(line):for s in line.split():yield ssplitted = source.flat_map(split) 

上述splitted的结构输出是

A
C
B
A
E
B
E
C
D

Mapping

Mapping的操作就是将之前的数组结构转换成map结构

mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))

mapped的输出值如下,可以看到它还是按我们输入数据的顺序排列的。

(A,1)
(C,1)
(B,1)
(A,1)
(E,1)
(B,1)
(E,1)
(C,1)
(D,1)

Reduce

Keying

这一步对应于上图中的Shuffling&Sorting,它会将相同key的数据进行分区,以供后面reducing操作使用。

    keyed=mapped.key_by(lambda i: i[0]) 

可以看到keyed数据已经经过排序和聚合了。

(A,1)
(A,1)
(B,1)
(B,1)
(C,1)
(C,1)
(D,1)

Reducing

 reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))

reduce的方法有如下注释

Applies a reduce transformation on the grouped data stream grouped on by the given
key position. The ReduceFunction will receive input values based on the key value.
Only input values with the same key will go to the same reducer.

特别是最后一句非常有用“Only input values with the same key will go to the same reducer”(只有相同Key的输入数据才会进入相同的Reducer中)。这句话意味着上述Keyed的数据会被分组执行,于是就不会出现计算错乱。

(A,2)
(B,2)
(C,2)
(D,1)
(E,2)

完整代码

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)# source.print()def split(line):for s in line.split():yield ssplitted = source.flat_map(split) # splitted.print()mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))# mapped.print()keyed=mapped.key_by(lambda i: i[0]) # keyed.print()reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

结构

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/177566.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习之基于YoloV5火灾烟雾报警系统(GUI界面)

欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、火灾烟雾报警系统四. 总结 一项目简介 YoloV5 是深度学习中用于目标检测的一种算法,可以对输入的图像进行识别,标识出…

【计算机网络笔记】传输层——可靠数据传输之流水线机制与滑动窗口协议

系列文章目录 什么是计算机网络? 什么是网络协议? 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能(1)——速率、带宽、延迟 计算机网络性能(2)…

Linux的账号管理

本章的学习感觉如果不做系统管理员,作为简单了解就可以了 前面介绍了,用户,组,other三个角色, 每个用户创建都会有uid与之对应,创建的用户基本信息在一下两个文件中,也是我们要介绍的内容&…

【webrtc】 对视频质量的码率控制的测试与探索

目录 环境设置 transport-cc goog-remb (webrtc中的两种码率算法) 修改成remb算法 测试 效果 后续 可参考工程 环境设置 要到meshx上操作 telnet 112 然后执行factory_env show |grep meshx_ip 之后telnet meshx_ip 用户名admin 密码****.119 执行一下r…

Python自动化运维监控——批量监听页面发邮件(自由配置ini文件+smtplib)

一、程序样式 1.listen.ini配置文件 2.监控页面 3.日志 二、核心点 smtplib库:这里使用了smtp.qq.com与smtp.163.com两个发送邮件的地址,使用邮箱用户名与授权码来实现登录,端口都使用465,最后抛出异常,finally里…

计算机网络与技术——数据链路层

😊计算机网络与技术——数据链路层 🚀前言☃️基本概念🥏封装成帧🥏透明传输🥏差错检测 ☃️点对点协议PPP🥏PPP协议的特点🥏PPP协议的帧格式🔍PPP异步传输时透明传输(字…

Linux纯C串口开发

为什么要用纯C语言 为了数据流动加速,实现低配CPU建立高速数据流而不用CPU干预,避免串口数据流多次反复上升到软件应用层又下降低到硬件协议层。 关于termios.h 麻烦的是,在 Linux 中使用串口并不是一件最简单的事情。在处理 termios.h 标头…

Vue入门——核心知识点

简介 Vue是一套用于构建用户界面的渐进式JS框架。 构建用户界面:就是将后端返回来的数据以不同的形式(例如:列表、按钮等)显示在界面上。渐进式:就是可以按需加载各种库。简单的应用只需要一个核心库即可,复杂的应用可以按照需求…

Golang Gin 接口返回 Excel 文件

文章目录 1.Web 页面导出数据到文件由后台实现还是前端实现?2.Golang Excel 库选型3.后台实现示例4.xlsx 库的问题5.小结参考文献 1.Web 页面导出数据到文件由后台实现还是前端实现? Web 页面导出表数据到 Excel(或其他格式)可以…

【算法挑战】设计一个支持增量操作的栈(含解析、源码)

1381.设计一个支持增量操作的栈 https://leetcode-cn.com/problems/design-a-stack-with-increment-operation/ 1381.设计一个支持增量操作的栈 题目描述方法 1: 用数组或链表模拟栈 数组复杂度分析链表复杂度分析代码 方法 2: 空间换时间 图解复杂度分析代码 题目描述 请…

与云栖的浪漫邂逅:记一段寻找云端之美的旅程

云端之旅 2023 年的云栖大会如约而至,这次云栖大会也是阿里新任掌门蔡老板当任阿里巴巴董事局主席以来的第一次。大会与以往有很多不一样的地方,其中 AIGC 更是本届大会的重点议题!你会感叹,阿里还是猛啊! 我逛了下展…

算法学习打卡day40|343. 整数拆分、96.不同的二叉搜索树

343. 整数拆分 力扣题目链接 题目描述: 给定一个正整数 n ,将其拆分为 k 个 正整数 的和( k > 2 ),并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 示例 1: 输入: n 2 输出: 1 解释: 2 1 …

一条 SQL 是如何在 MyBatis 中执行的

前言 MyBatis 执行 SQL 的核心接口为 SqlSession 接口,该接口提供了一些 CURD 及控制事务的方法,另外还可以通过 SqlSession 先获取 Mapper 接口的实例,然后通过 Mapper 接口执行 SQL,Mapper 接口方法的执行最终还是委托到 SqlSe…

Unity屏幕中涂鸦

LineRenderer LineRenderer是Unity中的一个组件,用于在场景中绘制简单的线段。 LineRenderer组件允许你通过设置一系列顶点来定义线段的形状和外观。它会根据这些顶点自动在场景中绘制出线段。 下面是LineRenderer的一些重要属性和方法: positionCou…

栈及其栈的模拟实现和使用

1. 栈(Stack) 1.1 概念 栈 :一种特殊的线性表,其 只允许在固定的一端进行插入和删除元素操作 。进行数据插入和删除操作的一端称为栈顶,另一端称为栈底。栈中的数据元素遵守后进先出LIFO ( Last In First Out )的原则…

初识FFmpeg

前言 无意间见到群里的小伙伴展示视频工具。功能比较多,包括视频编码修改,画质处理,比例处理、名称提取,剪辑、标题拆解。因此开始了FFmpeg学习。以下摘自百度百科的解释。 FFmpeg是一套可以用来记录、转换数字音频、视频&#xf…

【Proteus仿真】【Arduino单片机】简易电子琴

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真Arduino单片机控制器,使用无源蜂鸣器、按键等。 主要功能: 系统运行后,按下K1-K7键发出不同音调。 二、软件设计 /* 作者:嗨小易&a…

视频平台跨网级联视频压缩解决方案

一、 简介 视频监控领域对带宽有着较大的需求,这是因为视频流需要实时占用网络带宽资源。视频监控的传输带宽是组网结构的基础保障,关系到视频监控的稳定性、可靠性和可拓展性等因素。例如,720P的视频格式每路摄像头的比特率为2Mbps&#xff…

【机器学习合集】模型设计之网络宽度和深度设计 ->(个人学习记录笔记)

文章目录 网络宽度和深度设计1. 什么是网络深度1.1 为什么需要更深的模型浅层学习的缺陷深度网络更好拟合特征学习更加简单 2. 基于深度的模型设计2.1 AlexNet2.2 AlexNet工程技巧2.3 VGGNet 3. 什么是网络宽度3.1 为什么需要足够的宽度 4. 基于宽度模型的设计4.1 经典模型的宽…

在IDEA运行spark程序(搭建Spark开发环境)

建议大家写在Linux上搭建好Hadoop的完全分布式集群环境和Spark集群环境,以下在IDEA中搭建的环境仅仅是在window系统上进行spark程序的开发学习,在window系统上可以不用安装hadoop和spark,spark程序可以通过pom.xml的文件配置,添加…