大数据-玩转数据-Flink-Transform

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
在这里插入图片描述

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_map {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);s_num.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer values) throws Exception {return values*values;}}).print();}
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_filter {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();elements.filter(value -> value % 2 != 0).print();}
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;import java.util.concurrent.ExecutionException;public class flatMap {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {@Overridepublic void flatMap(Integer integer, Collector<Integer> collector) throws Exception {collector.collect(integer*integer);collector.collect(integer*integer*integer);}}).print();}
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1).print();env.execute("execute");}
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.sum(1).print();env.execute("execute");}
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在这里插入图片描述

package com.lyh.flink05;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyByReduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L)).keyBy(0).reduce(new ReduceFunction<Tuple2<Long, Long>>() {@Overridepublic Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {return new Tuple2<>(values1.f0,values1.f1+values2.f1);}}).print();env.execute();}
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Process_s {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 3 == 0) {//测流数据ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);}if (value % 3 == 1) {//测流数据ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);}//主流 ,数据out.collect(value);}});DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));output1.print();env.execute();}
}

四、合流算子

4.1、connect(连接)

在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.protocol.types.Field;public class connect_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5);DataStreamSource<String> data2 = env.fromElements("a", "b", "c");ConnectedStreams<Integer, String> data3 = data1.connect(data2);data3.getFirstInput().print("data1");data3.getSecondInput().print("data2");env.execute();}
}

4.2、union(合并)

package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class union_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> data2 = env.fromElements(555,666);DataStreamSource<Integer> data3 = env.fromElements(999);DataStream<Integer> data = data1.union(data2).union(data3);data.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/85608.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

iOS开发-WebRTC本地直播高分辨率不显示画面问题

iOS开发-WebRTC本地直播高分辨率不显示画面问题 在之前使用WebRTC结合ossrs进行推流时候&#xff0c;ossrs的播放端无法看到高分辨率画面问题。根据这个问题&#xff0c;找到了解决方案。 一、WebRTC是什么 WebRTC是什么呢&#xff1f; WebRTC (Web Real-Time Communicatio…

【学习FreeRTOS】第4章——FreeRTOS任务创建与删除

1.任务创建和删除的API函数 任务的创建和删除本质就是调用FreeRTOS的API函数 动态方式创建任务——xTaskCreate()静态方式创建任务——xTaskCreateStatic()删除任务——vTaskDelete() 动态创建任务&#xff1a;任务的任务控制块以及任务的栈空间所需的内存&#xff0c;均由 F…

[Kubernetes]Kubeflow Pipelines - 基本介绍与安装方法

1. 背景 近些年来&#xff0c;人工智能技术在自然语言处理、视觉图像和自动驾驶方面都取得不小的成就&#xff0c;无论是工业界还是学术界大家都在惊叹一个又一个的模型设计。但是对于真正做过算法工程落地的同学&#xff0c;在惊叹这些模型的同时&#xff0c;更多的是在忧虑如…

【论文阅读】Deep Instance Segmentation With Automotive Radar Detection Points

基于汽车雷达检测点的深度实例分割 一个区别&#xff1a; automotive radar 汽车雷达 &#xff1a; 分辨率低&#xff0c;点云稀疏&#xff0c;语义上模糊&#xff0c;不适合直接使用用于密集LiDAR点开发的方法 &#xff1b; 返回的物体图像不如LIDAR精确&#xff0c;可以…

Redis追本溯源(四)集群:主从模式、哨兵模式、cluster模式

文章目录 一、主从模式1.主从复制——全量复制2.主从复制——增量复制 二、哨兵模式1.实时监控与故障转移2.Sentinel选举领导者 三、cluster模式1.三种分片方案2.cluster模式 Redis 有多种集群搭建方式&#xff0c;比如&#xff0c;主从模式、哨兵模式、Cluster 模式。 一、主…

15.4 【Linux】可唤醒停机期间的工作任务

15.4.1 什么是 anacron anacron 并不是用来取代 crontab 的&#xff0c;anacron 存在的目的就在于我们上头提到的&#xff0c;在处理非24 小时一直启动的 Linux 系统的 crontab 的执行&#xff01; 以及因为某些原因导致的超过时间而没有被执行的调度工作。 其实 anacron 也是…

DERT:End-to-End Object Detection with Transformers

文章目录 摘要1、简介2、相关工作2.1、集合预测2.2、Transformer与并行解码2.3、目标检测 3、DETR模型3.1、目标检测集合预测损失3.2、DETR架构 4、实验4.1、与Faster R-CNN的对比4.2、消融4.3、分析4.4、用于全景分割的DETR 5、结论附录 AA.1、初步:多头注意层A.2、损失A.3、详…

Attacks in NLP

一、 Introduction NLP对抗攻击是人工智能对抗攻击的一个重要的组成部分&#xff0c;但是最近几年才逐渐开始兴起&#xff0c;究其原因在于NLP对抗攻击与传统computer vision或者audio对抗攻击有很大的不同&#xff0c;主要在于值空间的连续性&#xff08;CV、audio&#xff0…

SpringCloud整体架构概览

什么是SpringCloud 目标 协调任何服务&#xff0c;简化分布式系统开发。 简介 构建分布式系统不应该是复杂的&#xff0c;SpringCloud对常见的分布式系统模式提供了简单易用的编程模型&#xff0c;帮助开发者构建弹性、可靠、协调的应用程序。SpringCloud是在SpringBoot的基…

【Wamp】安装 | 局域网内设备访问

安装教程&#xff1a; https://wampserver.site/article/1.html 下载 https://www.wampserver.com/en/ 安装路径上不能有中文 安装好之后图标呈绿色 放入网页文件 将网页文件放置于wamp文件夹的www子文件夹 例如&#xff1a;\Wamp\program\www 修改http端口 WAMP服务器…

C# 使用FFmpeg.Autogen对byte[]进行编解码

C# 使用FFmpeg.Autogen对byte[]进行编解码&#xff0c;参考&#xff1a;https://github.com/vanjoge/CSharpVideoDemo 入口调用类&#xff1a; using System; using System.IO; using System.Drawing; using System.Runtime.InteropServices; using FFmpeg.AutoGen;namespace F…

虚拟世界探索:科技之下的未来可能性

随着科技的飞速发展&#xff0c;人们对于虚拟世界的憧憬和探索也日益加深。虚拟世界&#xff0c;那是一个超越现实的概念&#xff0c;一个充满想象力和创造力的领域。然而&#xff0c;虚拟世界究竟有可能实现吗&#xff1f;这是一个引人深思的问题。 虚拟世界&#xff0c;首先让…

激光切割机的操作中蛙跳技术是什么意思

其实&#xff0c;蛙跳技术就是指在激光切割机运行的过程中&#xff0c;机器换位置的方式。打个比方&#xff0c;你刚刚在这儿把孔1切好了&#xff0c;接下来就得跑到那儿把孔2切了。 在这个过程中&#xff0c;激光切割机就像是一只青蛙&#xff0c;要从一个位置跳到另一个位置。…

机器学习笔记值优化算法(十四)梯度下降法在凸函数上的收敛性

机器学习笔记之优化算法——梯度下降法在凸函数上的收敛性 引言回顾&#xff1a;收敛速度&#xff1a;次线性收敛二次上界引理 梯度下降法在凸函数上的收敛性收敛性定理介绍证明过程 引言 本节将介绍梯度下降法在凸函数上的收敛性。 回顾&#xff1a; 收敛速度&#xff1a;次…

数据结构 | 二叉树的应用

目录 一、解析树 二、树的遍历 一、解析树 我们可以用解析树来表示现实世界中像句子或数学表达式这样的构造。 我们可以将((73)*(5-2))这样的数学表达式表示成解析树。这是完全括号表达式&#xff0c;乘法的优先级高于加法和减法&#xff0c;但因为有括号&#xff0c;所以在…

【Linux进阶之路】进程(上)

文章目录 前言一、操作系统加载过程二、进程1.基本概念2.基本信息①运行并观察进程②创建子进程③僵尸与孤儿进程&#xff08;父子进程衍生出来的问题&#xff09;1. 僵尸进程&#xff08;Zombie状态&#xff09;2. 孤儿进程 3.基本状态①操作系统的状态&#xff08;统一&#…

5.利用matlab完成 符号矩阵的转置和 符号方阵的幂运算(matlab程序)

1.简述 Matlab符号运算中的矩阵转置 转置向量或矩阵 B A. B transpose(A) 说明 B A. 返回 A 的非共轭转置&#xff0c;即每个元素的行和列索引都会互换。如果 A 包含复数元素&#xff0c;则 A. 不会影响虚部符号。例如&#xff0c;如果 A(3,2) 是 12i 且 B A.&#xff0…

【C++】红黑树模拟实现插入功能(包含旋转和变色)

红黑树模拟实现并封装为map和set 前言正式开始红黑树概念红黑树基本要求大致框架树节点树 调整红黑树使其平衡第一种&#xff1a;cur红&#xff0c;p红&#xff0c;g黑&#xff0c;u存在且为红第二种&#xff1a;cur红&#xff0c;p红&#xff0c;g黑&#xff0c;u不存在或为黑…

CentOS7安装Maven详细教程

&#x1f60a; 作者&#xff1a; Eric &#x1f496; 主页&#xff1a; https://blog.csdn.net/weixin_47316183?typeblog &#x1f389; 主题&#xff1a;CentOS7安装Maven详细教程 ⏱️ 创作时间&#xff1a; 2023年08月06日 第一步&#xff1a;上传或下载安装包&#x…

2021年12月 C/C++(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

第1题&#xff1a;输出整数部分 输入一个双精度浮点数f&#xff0c; 输出其整数部分。 时间限制&#xff1a;1000 内存限制&#xff1a;65536 输入 一个双精度浮点数f(0 < f < 100000000)。 输出 一个整数&#xff0c;表示浮点数的整数部分。 样例输入 3.8889 样例输出 3…