大数据学习之Flink算子、了解(Transformation)转换算子(基础篇三)

Transformation转换算子(基础篇三)


目录

Transformation转换算子(基础篇三)

三、转换算子(Transformation)

1.基本转换算子

1.1 映射(Map)

1.2 过滤(filter)

1.3 扁平映射(flatmap)

1.4基本转换算子的例子

2.聚合算子(Aggregation)

2.1 按键分区(keyBy)

2.2 简单聚合

2.3 归约聚合(reduce)

3.用户自定义函数(UDF)

3.1 函数类(Function Classes)

3.2 富函数类(Rich Function Classes)

4.物理分区(Physical Partitioning)

4.1 随机分区(shuffle)

4.2 轮询分区(Round-Robin)

4.3 重缩放分区(rescale)

4.4 广播(broadcast)

4.5 全局分区(global)

4.6 自定义分区(Custom)


三、转换算子(Transformation)

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream,如图所示。一个Flink程序的核心,其实就是所有的转换操作,它们决定了处理的业务逻辑。

1.基本转换算子

1.1 映射(Map)

map算子接收一个函数作为参数,并把这个函数应用于DataStream的每个元素,最后将函数的返回结果作为结果DataStream中对应元素的值,即将DataStream的每个元素转换成新的元素。

1.2 过滤(filter)

filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。

1.3 扁平映射(flatmap)

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理

1.4基本转换算子的例子

代码如下:

import org.apache.flink.streaming.api.scala._object Practice_of_Simple_Operators {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) //设置并行度为1//常见的简单算子 有:map、flatmap、filter//map//从集合中获取不同数据类型数据val dataStream1 = env.fromCollection(List(1,2,3))//对每一个数 都乘以2val resultStream1 = dataStream1.map(data => data * 2)resultStream1.print("resultStream1")//flatmapval dataStream2 = env.fromCollection(List("hello word","hello flink","hello spark"))val resultStream2 = dataStream2.flatMap(_.split(" "))resultStream2.print("resultStream2")//filterval resultStream3 = dataStream1.filter(_%2==0)resultStream3.print("resultStream3")env.execute("Stream Transform")//启动flink作业}
}

运行结果:

2.聚合算子(Aggregation)

  • 直观上看,基本转换算子确实是在“转换”——因为它们都是基于当前数据,去做了处理和输出。

  • 而在实际应用中,我们往往需要对大量的数据进行统计或整合,从而提炼出更有用的信息。比如之前 word count 程序中,要对每个词出现的频次进行叠加统计。这种操作,计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),也对应着 MapReduce 中的 reduce 操作。

2.1 按键分区(keyBy)
  • 对于 Flink 而言,DataStream是没有直接进行聚合的API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区; 这个操作就是通过keyBy来完成的。

  • keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。

  • 基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot 中进行处理了。

  • keyBy算子主要作用于元素类型是元组或数组的DataStream上。使用该算子可以将DataStream中的元素按照指定的key(字段)进行分组,具有相同key的元素将进入同一个分区中(不进行聚合),并且不改变原来元素的数据结构。在逻辑上将流划分为不相交的分区,在内部是通过哈希分区实现的。

//配置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//fromElements --> 给一个固定的元素集合 创建一个数据流(DataStream)
//数据流是以键值对的形式存在的
val source = env.fromElements((1, 2), (2, 1),(1, 6), (1, 9), (1, 7),  (2, 2), (2, 10), (3, 1))
//keyby算子
source.keyBy(temp => temp._1).print("result")// 执行 Flink 作业
env.execute("Flink FromElements Example")

运行结果:

2.2 简单聚合
  • 有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们 内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

    • sum():在输入流上,对指定的字段做叠加求和的操作。

    • min():在输入流上,对指定的字段求最小值。

    • max():在输入流上,对指定的字段求最大值。

    • minBy():与 min()类似,在输入流上针对指定字段求最小值。

      不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;

      而 minBy()则会返回包含字段最小值的整条数据。

    • maxBy():与 max()类似,在输入流上针对指定字段求最大值。

      不同的是,max()只计算指定字段的最大值,其他字段会保留最初第一个数据的值;

      而 maxBy()则会返回包含字段最大值的整条数据。

  • 简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数; 但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字 段的方式有两种:指定位置,和指定名称。 对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字 段的名称,是以1、2、_3、…来命名的。 例如,下面就是对元组数据流进行聚合的测试:

  • 对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字 段的名称,是以1、2、_3、…来命名的。

测试:

import org.apache.flink.streaming.api.scala._object TransTupleAggregation {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream = env.fromElements(("a", 1), ("a", 3), ("b", 3), ("b", 4))stream.print("原数据")stream.keyBy(_._1).sum(1).print() //对元组的索引 1 位置数据求和stream.keyBy(_._1).sum("_2").print() //对元组的第 2 个位置数据求和stream.keyBy(_._1).max(1).print() //对元组的索引 1 位置求最大值stream.keyBy(_._1).max("_2").print() //对元组的第 2 个位置数据求最大值stream.keyBy(_._1).min(1).print() //对元组的索引 1 位置求最小值stream.keyBy(_._1).min("_2").print() //对元组的第 2 个位置数据求最小值stream.keyBy(_._1).maxBy(1).print() //对元组的索引 1 位置求最大值stream.keyBy(_._1).maxBy("_2").print() //对元组的第 2 个位置数据求最大值stream.keyBy(_._1).minBy(1).print() //对元组的索引 1 位置求最小值stream.keyBy(_._1).minBy("_2").print() //对元组的第 2 个位置数据求最小值env.execute()}
}

而如果数据流的类型是样例类,那么就只能通过字段名称来指定,不能通过位置来指定了。

import org.apache.flink.streaming.api.scala._
object TransAggregationCaseClass {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream = env.fromElements(Event("Mary", "./home", 1000L),Event("Bob", "./cart", 2000L))// 使用 user 作为分组的字段,并计算最大的时间戳stream.keyBy(_.user).max("timestamp").print()env.execute()}
}

一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。 所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值 的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子, 应该只用在含有有限个 key 的数据流上。

2.3 归约聚合(reduce)
  • 与简单聚合类似,reduce()操作也会将 KeyedStream 转换为 DataStream。它不会改变流的 元素数据类型,所以输出类型和输入类型是一样的。

  • 调用 KeyedStream 的 reduce()方法时,需要传入一个参数,实现 ReduceFunction 接口。接 口在源码中的定义如下:

public interface ReduceFunction<T> extends Function, Serializable {T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处 理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再 将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据, 这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果” 作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

下面我们来看一个稍复杂的例子。

我们将数据流按照用户 id 进行分区,然后用一个 reduce()算子实现 sum()的功能,统计每 个用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce()算子实现 maxBy()的功 能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。

import org.apache.flink.streaming.api.scala._object TransReduce {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.addSource(new ClickSource).map(r => (r.user, 1L))//按照用户名进行分组.keyBy(_._1)//计算每个用户的访问频次.reduce((r1, r2) => (r1._1, r1._2 + r2._2))//将所有数据都分到同一个分区.keyBy(_ => true)//通过 reduce 实现 max 功能,计算访问频次最高的用户.reduce((r1, r2) => if (r1._2 > r2._2) r1 else r2).print()env.execute()}
}

reduce()同简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以 我们需要将 reduce()算子作用在一个有限 key 的流上。

3.用户自定义函数(UDF)

3.1 函数类(Function Classes)
3.2 富函数类(Rich Function Classes)

4.物理分区(Physical Partitioning)

4.1 随机分区(shuffle)

4.2 轮询分区(Round-Robin)

4.3 重缩放分区(rescale)

4.4 广播(broadcast)
4.5 全局分区(global)
4.6 自定义分区(Custom)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/244362.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

教学改进措施及方法

在教育的世界里&#xff0c;每一位教师都是一位探险家&#xff0c;探索着如何更好地点燃学生的求知欲望&#xff0c;帮助他们展翅飞翔。我&#xff0c;作为一位拥有多年教学经验的教师&#xff0c;也在这条路上不断摸索。今天&#xff0c;我想分享一些我在教学实践中的改进措施…

数字与数学高频问题(算法村第十三关白银挑战)

数组实现加法专题 数组实现整数加法 66. 加一 - 力扣&#xff08;LeetCode&#xff09; 给定一个由 整数 组成的 非空 数组所表示的非负整数&#xff0c;在该数的基础上加一。 最高位数字存放在数组的首位&#xff0c; 数组中每个元素只存储单个数字。 你可以假设除了整数…

如何在 Ubuntu 20.04 上安装 Nginx

前些天发现了一个人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;最重要的屌图甚多&#xff0c;忍不住分享一下给大家。点击跳转到网站。 如何在 Ubuntu 20.04 上安装 Nginx 介绍 Nginx是世界上最受欢迎的 Web 服务器之一&#xff0c;负责托管互联网…

Mac Idea安装后无法启动

1、起因 想安装一个新版的idea2023.3.2&#xff0c;结果安装完之后直接无法启动 以为是卸载不干净&#xff0c;下载了一个腾讯柠檬&#xff0c;结果将2018版也一并卸载了 好家伙&#xff0c;彻底没得用 2、找原因 1&#xff09;查看idea报错信息 网上找了一圈&#xff0c;其…

C++笔试强训选择题1

选择题 1.选择表达式 11|10 的结果&#xff08;本题数值均为十进制&#xff09;&#xff08;&#xff09; A.11 B.10 C.8 D.2 11的二进制为1011&#xff0c;10的二进制为1010 1011 |&#xff08;或&#xff09;1010 1011 | 按位或 有1为1&#xff0c;& 按位…

归并排序详解

基本思想&#xff1a; 归并排序&#xff08;MERGE-SORT&#xff09;是建立在归并操作上的一种有效的排序算法&#xff0c;该算法是采用分治法&#xff08;Divide and Conquer&#xff09;的一个非常典型的应用。将已有序的子序列合并&#xff0c;得到完全有序的序列&#xff1…

MySQL也开始支持JavaScript了

2023 年 12 月 16 日&#xff0c;Oracle 公司在一篇名为 《Introducing JavaScript support in MySQL》的文章中宣布 MySQL 数据库服务器将开始支持 JavaScript 语言。 这个举措标志着继PostgreSQL之后&#xff0c; MySQL 也支持使用 JavaScript 编写函数和存储过程了。作为最…

逻辑分析仪软件PulseView 下载链接及使用,zadig更改USB端口名称

1、打开zadig&#xff0c;List All Devices 2、选择Cypress FX2LP No EEPROM Device&#xff0c;点击Edit&#xff0c;重新命名成fx2lafw 3、打开PulseView。 PulseView 64位版本的&#xff08;pulseview-0.4.1-64bit-static-release-installer.exe&#xff09; 下载链接&…

OpenKruiseGame × KubeSphere 联合发布游戏服运维控制台,推动云原生游戏落地

作者&#xff1a;云原生游戏社区 近日&#xff0c;云原生游戏开源社区旗下 OpenKruiseGame&#xff08;以下简称&#xff1a;OKG&#xff09;基于 KubeSphere 4.0 LuBan 架构开发的游戏服运维控制台 OKG Dashboard 正式发布&#xff01;现已上架 KubeSphere Marketplace 云原生…

微信小程序(十)表单组件(入门)

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.type 属性指定表单类型 2.placeholder 属性指定输入框为空时的占位文字 源码&#xff1a; form.wxml <!-- 提前准备好的布局结构代码 --> <view class"register"><view class"…

数据结构之线性表(一般的线性表)

前言 接下来就开始正式进入数据结构环节了&#xff0c;我们先从线性表开始。 线性表 线性表&#xff08;linear list&#xff09;也叫线性存储结构&#xff0c;即数据元素的逻辑结构为线性的数据表&#xff0c;它是数据结构中最简单和最常用的一种存储结构&#xff0c;专门存…

上位机图像处理和嵌入式模块部署(自定义算法)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 我们在使用opencv的时候&#xff0c;虽然大部分算法都不需要我们自己重头开始编写&#xff0c;但是总有一些关于我们自己产品的know-how&#xff0…

智能养老设备行业研究:到2026年市场规模有望突破1760亿元

随着老年消费需求的持续增长&#xff0c;国家越来越重视智慧健康养老产业的发展&#xff0c;发布了一系列政策扶持老年用品和适老化产品&#xff0c;利好智能养老设备行业的发展。在需求和政策的推动下&#xff0c;我国智能养老设备行业的市场规模快速增长。 智能养老设备是紧密…

《WebKit 技术内幕》学习之五(4): HTML解释器和DOM 模型

4 影子&#xff08;Shadow&#xff09;DOM 影子 DOM 是一个新东西&#xff0c;主要解决了一个文档中可能需要大量交互的多个 DOM 树建立和维护各自的功能边界的问题。 4.1 什么是影子 DOM 当开发这样一个用户界面的控件——这个控件可能由一些 HTML 的标签元素…

数控六面钻技术成熟-为家具产业注入新活力

随着科技的不断发展&#xff0c;数控六面钻技术作为一项先进的加工技术&#xff0c;在家具制造领域得到了广泛应用。然而&#xff0c;对于许多企业而言&#xff0c;这一技术的成熟稳定程度仍然是他们关注的焦点。本文将围绕数控六面钻技术的成熟稳定性展开探讨&#xff0c;以期…

WordPress怎么去除jquery和CSS静态文件链接中的版本号?附2种方法

我们很多WordPress网站默认情况下所加载的jquery和CSS静态文件链接中都会带有相应的版本号&#xff0c;比如boke112百科使用的YIA主题&#xff0c;加载CSS文件时就会在链接地址后面加上?ver2.7&#xff0c;即是style.css?ver2.7 除了CSS文件会加上版本号外&#xff0c;加载主…

Cesium for Unity包无法加载

太上老君急急如律⚡令⚡ &#x1f959;关闭UnityHub&#x1f9c0;启动梯子&#x1f96a;cmd 启动UnityHub &#x1f959;关闭UnityHub &#x1f9c0;启动梯子 &#x1f96a;cmd 启动UnityHub 把批处理启动文件&#x1f448;中的exe的路径换成自己的安装目录&#xff01;保存…

frida https抓包

web端导入证书、https代理即可解决大部分需求&#xff0c;但是&#xff0c;有些app需要处理ssl pinning验证。 废话不多说。frida处理ssl pin的步骤大体如下。 安装python3.x,并在python环境中安装frida&#xff1a; pip install frida pip install frida-tools下载frida-se…

【Java程序员面试专栏 专业技能篇】MySQL核心面试指引(一):基础知识考察

关于MySQL部分的核心知识进行一网打尽,包括三部分:基础知识考察、核心机制策略、性能优化策略,通过一篇文章串联面试重点,并且帮助加强日常基础知识的理解,全局思维导图如下所示 本篇Blog为第一部分:基础知识考察,子节点表示追问或同级提问 基本概念 包括一些核心问…

vertica10.0.0单点安装_ubuntu18.04

ubuntu的软件包格式为deb&#xff0c;而rpm格式的包归属于红帽子Red Hat。 由于项目一直用的vertica-9.3.1-4.x86_64.RHEL6.rpm&#xff0c;未进行其他版本适配&#xff0c;而官网又下载不到vertica-9.3.1-4.x86_64.deb&#xff0c;尝试通过alian命令将rpm转成deb&#xff0c;但…