DataStream编程模型之数据源、数据转换、数据输出

Flink之DataStream数据源、数据转换、数据输出(scala)

0.前言–数据源

在进行数据转换之前,需要进行数据读取。
数据读取分为4大部分:

(1)内置数据源;

又分为文件数据源;在这里插入图片描述
socket数据源;
在这里插入图片描述

集合数据源三类
在这里插入图片描述

(2)Kafka数据源

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
第二个参数用到的SimpleStringSchema对象是一个内置的DeserializationSchema对象,可以把字节数据反序列化程一个String对象。
另外,FlinkKafkaConsumer开始读取Kafka消息时,可以配置他的 读 起始位置,有如下四种。
在这里插入图片描述

import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
object KafkaWordCount {def main(args: Array[String]): Unit = {val kafkaProps = new Properties()//Kafka的一些属性kafkaProps.setProperty("bootstrap.servers", "localhost:9092")//所在的消费组kafkaProps.setProperty("group.id", "group1")//获取当前的执行环境val evn = StreamExecutionEnvironment.getExecutionEnvironment
//创建Kafka的消费者,wordsendertest是要消费的Topicval kafkaSource = new FlinkKafkaConsumer[String]("wordsendertest",new SimpleStringSchema,kafkaProps)//设置从最新的offset开始消费kafkaSource.setStartFromLatest()//自动提交offset
kafkaSource.setCommitOffsetsOnCheckpoints(true)//绑定数据源val stream = evn.addSource(kafkaSource)//设置转换操作逻辑val text = stream.flatMap{ _.toLowerCase().split("\\W+")filter{ _.nonEmpty} }.map{(_,1)}.keyBy(0).timeWindow(Time.seconds(5)).sum(1)//打印输出text.print()//程序触发执行evn.execute("Kafka Word Count")}
}

(3)HDFS数据源

在这里插入图片描述

(4)自定义数据源

在这里插入图片描述
一个例子:

import java.util.Calendar
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.Randomcase class StockPrice(stockId:String,timeStamp:Long,price:Double)
object StockPriceStreaming {def main(args: Array[String]) { //设置执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度    
env.setParallelism(1)    
//股票价格数据流val stockPriceStream: DataStream[StockPrice] = env//该数据流由StockPriceSource类随机生成.addSource(new StockPriceSource)//打印结果stockPriceStream.print()//程序触发执行env.execute("stock price streaming")}class StockPriceSource extends RichSourceFunction[StockPrice]{ var isRunning: Boolean = trueval rand = new Random()//初始化股票价格var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)var stockId = 0var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {while (isRunning) {//每次从列表中随机选择一只股票stockId = rand.nextInt(priceList.size)val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05priceList = priceList.updated(stockId, curPrice)val curTime = Calendar.getInstance.getTimeInMillis//将数据源收集写入SourceContextsrcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))Thread.sleep(rand.nextInt(10))}
} override def cancel(): Unit = {isRunning = false}}
}

1.数据转换之map操作

1.数据转换算子的四种类型
基于单条记录:fliter、map
基于窗口:window
合并多条数据流:union,join,connect
拆分多条数据流:split

2.map(func)操作将一个DataStream中的每个元素传递到函数func中,并将结果返回为一个新的DataStream。输出的数据流DataStream[OUT]类型可能和输入的数据流DataStream[IN]不同
理解:一 一对应的关系,一个x得到一个y

val dataStream = env.fromElements(1,2,3,4,5)
val mapStream = dataStream.map(x=>x+10)

在这里插入图片描述
3.演示代码

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentcase class StockPrice(stockId:String,timeStamp:Long,price:Double) 
object MapFunctionTest {def main(args: Array[String]): Unit = {//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设定程序并行度env.setParallelism(1)//创建数据源val dataStream: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7)//设置转换操作逻辑val richFunctionDataStream = dataStream.map {new MyMapFunction()}//打印输出richFunctionDataStream.print()//程序触发执行env.execute("MapFunctionTest")}//自定义函数,继承RichMapFunctionclass MyMapFunction extends RichMapFunction[Int, String] {override def map(input: Int): String =("Input : " + input.toString + ", Output : " + (input * 3).toString)}
}

2.数据转换之flatMap操作

1.flatMap和map相似,每个输入元素都可以映射到0或多个输出结果。

val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val flatMapStream = dataStream.flatMap(line => line.split(" "))

在这里插入图片描述
可以理解为flatMap比map多了flat操作。如图。map是将输入数据映射成数组,flat是将数据拍扁,成为一个个元素。把元素映射成了多个。

2.代码演示

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double) 
object FlatMapFunctionTest {def main(args: Array[String]): Unit = {//设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment//设定程序并行度
env.setParallelism(1)
//设置数据源
val dataStream: DataStream[String] = env.fromElements("Hello Spark", "Flink is excellent“) //针对数据集的转换操作逻辑
val result = dataStream.flatMap(new WordSplitFlatMap(15)) //打印输出
result.print() 
//程序触发执行env.execute("FlatMapFunctionTest")} //使用FlatMapFunction实现过滤逻辑,只对字符串长度大于threshold的内容进行切词class WordSplitFlatMap(threshold: Int) extends FlatMapFunction[String, String] {override def flatMap(value: String, out: Collector[String]): Unit = {if (value.size > threshold) {value.split(" ").foreach(out.collect)}}}
}

预计输出:

Flink
is
excellent

这里只对字符长度超过15的做切割。threshold是阈值,少于15的不做切割。

3.数据转换之filter和keyBy操作

1.filter(func)操作会筛选出满足函数func的元素,并返回一个新的数据集
2.代码举例

val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val filterStream = dataStream.filter(line => line.contains("Flink"))

如图所示
在这里插入图片描述

3.keyBy(注意方法里k小写B大写):将相同Key的数据放置在相同的分区中。
keyBy算子根据元素的形状对数据进行分组,相同形状的元素被分到了一起,可被后续算子统一处理

比如在词频统计时:

				hello flink hello hadoophello zhangsan

这里 词频(hello,1),(hello,1),(hello,1)统计出来之后,通过keyBy,就可以聚合,放在了相同的分区里进行统一计算。

在这里插入图片描述
通过聚合函数后又可以吧KeyedStream转换成DataStream。

4.在使用keyBy算子时,需要向keyBy算子传递一个参数, 可使用数字位置来指定Key
比如刚才词频统计时,keyBy(0)就是hello这个单词。

val dataStream: DataStream[(Int, Double)] =env.fromElements((1, 2.0), (2, 1.7), (1, 4.9), (3, 8.5), (3, 11.2))
//使用数字位置定义Key 按照第一个字段进行分组
val keyedStream = dataStream.keyBy(0)

这里keyby 是第一个字段1或者2或者3分组(分类)。

5.keyBy代码举例:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object KeyByTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)
//创建数据源val stockList = List(StockPrice("stock_4",1602031562148L,43.4D),StockPrice("stock_1",1602031562148L,22.9D),StockPrice("stock_0",1602031562153L,8.2D),StockPrice("stock_3",1602031562153L,42.1D),StockPrice("stock_2",1602031562153L,29.2D),StockPrice("stock_0",1602031562159L,8.1D),StockPrice("stock_4",1602031562159L,43.7D),StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList) //设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId“) //打印输出keyedStream.print() //程序触发执行env.execute("KeyByTest")}
}

在这里插入图片描述
这里看起来没什么变换 ,因为没进行聚合操作,所以什么变化都没有,原样输出。
我加上聚合函数,看起来就有变化了。

//简写上面的代码 加上聚合函数val keyedStream = dataStream.keyBy("stockId")val aggre = keyedStream.sum(2) //这里相加的是价格price(第三个字段)// keyedStream.print()aggre.print()//聚合后打印

结果
在这里插入图片描述
对比上面哪里变化了呢?
stcok_id顺序,4-1-0-3-2-0(这里之前也有0,就会加上之前的0,变为16.299,后面的4也在累加前面的price了

4.数据转换之reduce操作和聚合操作

1.reduce:reduce算子将输入的KeyedStream通过传入的用户自定义函数滚动地进行数据聚合处理,处理以后得到一个新的DataStream,如下实例

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object ReduceTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//创建数据源val stockList = List(StockPrice("stock_4",1602031562148L,43.4D),StockPrice("stock_1",1602031562148L,22.9D),StockPrice("stock_0",1602031562153L,8.2D),StockPrice("stock_3",1602031562153L,42.1D),StockPrice("stock_2",1602031562153L,29.2D),StockPrice("stock_0",1602031562159L,8.1D),StockPrice("stock_4",1602031562159L,43.7D),StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList)//设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId")val reduceStream = keyedStream.reduce((t1,t2)=>StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price))//打印输出reduceStream.print()//程序触发执行env.execute("ReduceTest")}
}

reduce结果和上面的一样,就是累加
在这里插入图片描述

2.flink也支持自定义的reduce函数

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类,包含三个字段:股票ID,交易时间,交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object MyReduceFunctionTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//创建数据源val stockList = List(StockPrice("stock_4",1602031562148L,43.4D),StockPrice("stock_1",1602031562148L,22.9D),StockPrice("stock_0",1602031562153L,8.2D),StockPrice("stock_3",1602031562153L,42.1D),StockPrice("stock_2",1602031562153L,29.2D),StockPrice("stock_0",1602031562159L,8.1D),StockPrice("stock_4",1602031562159L,43.7D),StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList) //设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId")val reduceStream = keyedStream.reduce(new MyReduceFunction)//打印输出reduceStream.print()//程序触发执行env.execute("MyReduceFunctionTest")}class MyReduceFunction extends ReduceFunction[StockPrice] {override def reduce(t1: StockPrice,t2:StockPrice):StockPrice = {StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price)}}
}

主要不同的就是创建了MyReduceFunction ().
3.聚合算子
在这里插入图片描述
和excel一样。
代码举例:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double) 
object AggregationTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)    
//创建数据源val stockList = List(StockPrice("stock_4",1602031562148L,43.4D),StockPrice("stock_1",1602031562148L,22.9D),StockPrice("stock_0",1602031562153L,8.2D),StockPrice("stock_3",1602031562153L,42.1D),StockPrice("stock_2",1602031562153L,29.2D),StockPrice("stock_0",1602031562159L,8.1D),StockPrice("stock_4",1602031562159L,43.7D),StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList)//设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId")val aggregationStream = keyedStream.sum(2)  //区别在这里   sum聚合 2表示第三个字段//打印输出aggregationStream.print()//执行操作env.execute(" AggregationTest")}
}

运行结果
在这里插入图片描述

5.数据输出

1.基本数据输出包括:文件输出,客户端输出,socket网络端口输出。
文件输出具体代码

val dataStream = env.fromElements("hadoop","spark","flink")
//文件输出
dataStream.writeAsText("file:///home/hadoop/output.txt")
//hdfs输出//把数据写入HDFS
dataStream.writeAsText("hdfs://localhost:9000/output.txt“) //通过writeToSocket方法将DataStream数据集输出到指定socket端口
dataStream.writeToSocket(outputHost,outputPort,new SimpleStringSchema())

2.输出到kafka
代码举例:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerobject SinkKafkaTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//加载或创建数据源val dataStream = env.fromElements("hadoop","spark","flink")//把数据输出到Kafka
dataStream.addSink(new FlinkKafkaProducer [String]("localhost:9092", "sinkKafka", new SimpleStringSchema()))//程序触发执行env.execute()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/474088.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Django5 2024全栈开发指南(三):数据库模型与ORM操作

目录 一、模型的定义二、数据迁移三、数据表关系四、数据表操作4.1 Shell工具4.2 数据新增4.3 数据修改4.4 数据删除4.5 数据查询4.6 多表查询4.7 执行SQL语句4.8 数据库事务 Django 对各种数据库提供了很好的支持,包括 PostgreSQL、MySQL、SQLite 和 Oracle&#x…

ASP.NET Core Webapi 返回数据的三种方式

ASP.NET Core为Web API控制器方法返回类型提供了如下几个选择&#xff1a; Specific type IActionResult ActionResult<T> 1. 返回指定类型&#xff08;Specific type&#xff09; 最简单的API会返回原生的或者复杂的数据类型&#xff08;比如&#xff0c;string 或者…

网关在能源物联网中扮演了什么角色?

随着通信、物联网、云平台等技术的飞速发展&#xff0c;越来越多能源用户希望借助先进的管理手段&#xff0c;对能源进行分布式监测、集中管理&#xff0c;构建能源物联网。准确的分布式监测和集中管理有助于制定更科学合理的节能减排计划。企业或能源使用单位可以依据能源物联…

【快速入门】前端御三家:HTML、CSS和JS

HTML HTML&#xff0c;超文本标记语言&#xff0c;可以理解成骨架&#xff0c;是一个基础的东西。 一.基础结构 如图所示&#xff1a; 二.常见标签 1.标题标签 在页面上定义标题性的内容 <h1>一级标题</h1> <h2>二级标题</h2> <h3>三级标…

WebSocket实战,后台修改订单状态,前台实现数据变更,提供前端和后端多种语言

案例场景&#xff1a; 在实际的后台中需要变更某个订单的状态&#xff0c;在官网中不刷新页面&#xff0c;可以自动更新状态 在前端页面实现订单状态的实时更新&#xff08;不刷新页面&#xff09;&#xff0c;可以通过 WebSocket 的方式与后台保持通信&#xff0c;监听订单状态…

Django5 2024全栈开发指南(二):Django项目配置详解

目录 一、基本配置信息二、资源文件配置2.1 资源路由——STATIC_URL2.2 资源集合——STATICFILES_DIRS2.3 资源部署——STATIC_ROOT2.2.4 媒体资源——MEDIA 三、模板配置四、数据库配置4.1 mysqlclient连接MySQL4.2 pymysql连接MySQL4.3 多个数据库的连接方式4.4 使用配置文件…

近几年新笔记本重装系统方法及一些注意事项

新笔记本怎么重装系统&#xff1f; 近几年的新笔记本默认开启了raid on模式或vmd选项&#xff0c;安装过程中会遇到问题&#xff0c;新笔记本电脑重装自带的系统建议采用u盘方式安装&#xff0c;默认新笔记本有bitlocker加密机制&#xff0c;如果采用一键重装系统或硬盘方式安装…

黑马智数Day10

项目背景说明 后台管理部分使用的技术栈是Vue2&#xff0c;前台可视化部分使用的技术栈是Vue3 前台可视化项目不是独立存在&#xff0c;而是和后台管理项目共享同一个登录页面 微前端的好处 微前端是一种前端架构模式&#xff0c;它将大型单体应用程序分解为小的、松散耦合的…

Visual Studio 圈复杂度评估

VisualStudio自带的有工具 之后就可以看到分析结果

prop校验,prop和data区别

prop:组件上注册的一些自定义属性 prop作用&#xff1a;向子组件传递数据 特点&#xff1a; 可以传递任意数量&#xff0c;任意类型的prop 父组件 &#xff08;一个个地传递比较麻烦&#xff0c;可以直接打包成一个对象传过去&#xff0c;然后通过点属性接收&#xff09; <t…

ubuntu显示管理器_显示导航栏

ubuntu文件管理器_显示导航栏 一、原始状态&#xff1a; 二、显示导航栏状态&#xff1a; 三、原始状态--->导航栏状态: 1、打开dconf编辑器&#xff0c;直接在搜索栏搜索 dconf-editor ------如果没有安装&#xff0c;直接按流程安装即可。 2、进入目录&#xff1a;org …

跨平台WPF框架Avalonia教程 一

安装 安装 Avalonia UI 模板​ 开始使用 Avalonia 的最佳方式是使用模板创建一个应用程序。 要安装 Avalonia 模板&#xff0c;请运行以下命令&#xff1a; dotnet new install Avalonia.Templates 备注 对于 .NET 6.0 及更早版本&#xff0c;请将 install 替换为 --inst…

UE5 材质里面画圆锯齿严重的问题

直接这么画圆会带来锯齿&#xff0c;我们对锯齿位置进行模糊 可以用smoothstep&#xff0c;做值的平滑过渡&#xff08;虽然不是模糊&#xff0c;但是类似&#xff09;

【MySql】实验十六 综合练习:图书管理系统数据库结构

文章目录 创建图书管理系统数据库结构一、创建数据表1.1 book表1.2 reader表1.3 borrow表 二、插入示例数据2.1 向book表插入数据2.2 向reader表插入数据2.3 向borrow表插入数据 三、查询操作3.1 根据语义为借书表borrow的bno列和 rno列建立外键3.2 查询张小海编写的“数据库原…

QT QLabel双击事件

新建类&#xff1a; DoubleClickLabel .h #pragma once#include <QLabel>class DoubleClickLabel : public QLabel {Q_OBJECTpublic:DoubleClickLabel(QWidget *parent);~DoubleClickLabel(); signals:void doubleClicked();protected: //这里重写双击事件virtual v…

Vue3中实现插槽使用

目录 一、前言 二、插槽类型 三、示例 四、插槽的分类实现 1. 基本插槽 2. 命名插槽 3. 默认插槽内容 4. 作用域插槽&#xff08;Scoped Slots&#xff09; 5. 多插槽与具名插槽组合 一、前言 在 Vue 3 中&#xff0c;插槽&#xff08;Slot&#xff09;用于实现组件的内…

【学习笔记】科学计算

[pytorch 加速] CPU传输 & GPU计算的并行&#xff08;pin_memory&#xff0c;non_blocking&#xff09; https://www.bilibili.com/video/BV15Xxve1EtZ from IPython.display import Image import os os.environ[http_proxy] http://127.0.0.1:7890 os.environ[https_pr…

2、计算机网络七层封包和解包的过程

计算机网络osi七层模型 1、网络模型总体预览2、数据链路层4、传输层5.应用层 1、网络模型总体预览 图片均来源B站&#xff1a;网络安全收藏家&#xff0c;没有本人作图 2、数据链路层 案例描述&#xff1a;主机A发出一条信息&#xff0c;到路由器A&#xff0c;这里封装目标MAC…

在云服务器搭建 Docker

操作场景 本文档介绍如何在腾讯云云服务器上搭建和使用 Docker。本文适用于熟悉 Linux 操作系统&#xff0c;刚开始使用腾讯云云服务器的开发者。如需了解更多关于 Docker 相关信息&#xff0c;请参见 Docker 官方。 说明&#xff1a; Windows Subsystem for Linux&#xff…

Isaac Sim+SKRL机器人并行强化学习

目录 Isaac Sim介绍 OmniIssacGymEnvs安装 SKRL安装与测试 基于UR5的机械臂Reach强化学习测评 机器人控制 OMNI GYM环境编写 SKRL运行文件 训练结果与速度对比 结果分析 运行体验与建议 Isaac Sim介绍 Isaac Sim是英伟达出的一款机器人仿真平台&#xff0c;适用于做机…