Flink多流处理之Broadcast(广播变量)

写过Spark批处理的应该都知道,有一个广播变量broadcast这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有broadcast,简单来说和Spark中的类似,但是有所区别,首先Spark中的broadcast是静态的数据,而Flink中的broadcast是动态的,也就是源源不断的数据流.在Flink中会将广播的数据存到state中.
在这里插入图片描述
在Flink中主流数据可以获取state中的所有状态数据,使用过window的应该都清楚,当两个streamData中的数据到达窗口的时间刚好错过时就会发生关联不上的情况,如window2S,sreamData1到达窗口的时间刚好卡在这个2S窗口的尾端,而streamData到达窗口时,这个窗口已经结束了,这种情况就算这两条数据有相同id也无法进行关联了.
但是broadcast会将到达的数据都存储在state中,这样主流到达的每一条数据都可以和state中的广播流数据进行关联比较.
在这里插入图片描述
流程图内容可能不够准确,只是为了看起来方便理解.

  • 数据源
    # 主流数据
    ➜  ~ nc -lk 1234
    101,浏览商品,2023-08-02
    102,浏览商品,2023-08-02
    103,查看商品价格,2023-08-04
    101,商品加入购物车,2023-08-03
    101,从购物车删除商品,2023-08-03
    102,下单,2023-08-02
    102,申请延期发货,2023-08-03
    103,点击商品详情页,2023-08-04
    104,点击收藏,2023-08-05
    104,下单,2023-08-05
    104,付款,2023-08-06
    105,浏览商品,2023-08-07
    106,浏览商品,2023-08-07
    106,加入购物车,2023-08-08
    107,浏览商品,2023-08-10
    
    # 广播流数据
    ➜  ~ nc -lk 5678
    101,小明
    102,张丽
    103,公孙飞天
    104,王二虎
    106,李四
    108,赵屋面
    
  • 代码
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/11* @Description: 多流操作-广播流**/
    public class FlinkBroadcast {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据集源1作为主流数据(用户行为日志[id,behavior,date])DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234);// 将字符串切割处理SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() {});// 数据源2作为广播流数据(用户信息(id,name))DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678);// 将字符串切割处理SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() {});// 将广播流数据源进行广播/***参数说明* 这里需要我们传入一个MapStateDescriptor,其实就是一个Map结构的数据<k,v>* <String, Tuple2<String, String>>,第一个String类型就是广播流和主流连接的字段,在这个代码中就是id,由实际业务决定* <String, Tuple2<String, String>>,第二个Tuple2<String, String>就是实际广播数据流的数据,由实际业务决定* "userInfo"就是给一个名字,这个自定义无强制要求**/// 先构建一个状态,后面也会使用MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState);// 将主流数据和广播流数据使用connect连接/*** 我们将数据转变成广播流之后,在Flink中也不知哪个数据流需要使用这个广播流(userInfoBroadStream),* 这个时候就需要我们自己将主流数据和该广播流数据进行连接**/BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream);/*** 在process()中有两类函数供我们选择,KeyedBroadcastProcessFunction和BroadcastProcessFunction,* 这里要注意当"connectedStream"是KeyedStream时选择KeyedBroadcastProcessFunction* 当"connectedStream"不是KeyedStream时选择BroadcastProcessFunction就可以.* 使用keyBy算子返回的就是KeyedStream**/SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() {// 这个方法写主流数据处理逻辑@Overridepublic void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {/*** 要注意,这里我们最好从ReadOnlyContext来获取广播状态数据,因为获取只读的状态数据可以保证数据的安全性,* 如果是通过成员变量的方式获取可修改的状态数据,就会存在数据不安全的问题,如在代码逻辑中出现了对状态数据* 修改的代码,那么共享此状态的并行算子可能看到的状态数据不一致,就会导致数据错误或者代码报错.* 而使用ReadOnlyContext就可以保证processElement这个方法中我们只对状态数据进行读取.**/ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);if (broadcastState != null) {// 通过主流中的ID作为key获取广播变量中的用户信息Tuple2<String, String> userInfo = broadcastState.get(value.f0);// 输出数据的形式(id,behavior,date,name)if (userInfo == null) {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1);}} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");}}// 这个方法写广播流数据处理逻辑@Overridepublic void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception {// 使用Context获取状态BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);// 将数据存入到状态中broadcastState.put(value.f0, value);}});// 打印结果resultStream.print();env.execute("Flink broadcast");}
    }
    
  • 结果
    3> 101,浏览商品,2023-08-02,小明
    3> 101,商品加入购物车,2023-08-03,小明
    3> 102,申请延期发货,2023-08-03,张丽
    3> 104,下单,2023-08-05,王二虎
    3> 106,浏览商品,2023-08-07,李四
    1> 102,浏览商品,2023-08-02,张丽
    1> 101,从购物车删除商品,2023-08-03,小明
    1> 103,点击商品详情页,2023-08-04,公孙飞天
    1> 104,付款,2023-08-06,王二虎
    1> 106,加入购物车,2023-08-08,李四
    2> 103,查看商品价格,2023-08-04,公孙飞天
    2> 102,下单,2023-08-02,张丽
    2> 104,点击收藏,2023-08-05,王二虎
    2> 105,浏览商品,2023-08-07,NULL
    2> 107,浏览商品,2023-08-10,NULL
    
    代码内容就不进行详细解释了,注释基本都写清楚了,如有疑问可评论提问,共同探讨.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/86616.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker 安装和架构说明

Docker 并非是一个通用的容器工具&#xff0c;它依赖于已存在并运行的Linux内核环境。 Docker实质上是在已经运行的Liunx下制造了一个隔离的文件环境&#xff0c;因此他的执行效率几乎等同于所部署的linux主机。因此Docker必须部署在Linux内核系统上。如果其他系统想部署Docke…

阿里云服务器部署Drupal网站教程基于CentOS系统

阿里云百科分享如何在CentOS 7操作系统的ECS实例上搭建Drupal电子商务网站。Drupal是使用PHP语言编写的开源内容管理框架&#xff08;CMF&#xff09;&#xff0c;它由内容管理系统&#xff08;CMS&#xff09;和PHP开发框架&#xff08;Framework&#xff09;共同构成。它用于…

LeetCode 206.反转链表

文章目录 &#x1f4a1;题目分析&#x1f4a1;解题思路&#x1f6a9;方法1: 反转指针指向&#x1f514;接口源码&#xff1a;&#x1f6a9;方法2:取节点头插&#x1f514;接口源码&#xff1a; 题目链接&#x1f449;LeetCode 206.反转链表&#x1f448; &#x1f4a1;题目分析…

2023网络安全常用工具汇总(附学习资料+工具安装包)

几十年来&#xff0c;攻击方、白帽和安全从业者的工具不断演进&#xff0c;成为网络安全长河中最具技术特色的灯塔&#xff0c;并在一定程度上左右着网络安全产业发展和演进的方向&#xff0c;成为不可或缺的关键要素之一。 话不多说&#xff0c;网络安全10款常用工具如下 1、…

【PostgreSQL内核学习(十一)—— OpenGauss源码学习(CopyTo)】

可优化语句执行 概述什么是列存储&#xff1f;列存的优势 相关函数CopyToCStoreCopyToCopyStatetupleDescCStoreScanDesc CStoreBeginScanRelationSnapshotProjectionInfo GetCStoreNextBatchRunScanFillVecBatchCStoreIsEndScan CStoreEndScan 声明&#xff1a;本文的部分内容…

Centos 从0搭建grafana和Prometheus 服务以及问题解决

下载 虚拟机下载 https://customerconnect.vmware.com/en/downloads/info/slug/desktop_end_user_computing/vmware_workstation_player/17_0 cenos 镜像下载 https://www.centos.org/download/ grafana 服务下载 https://grafana.com/grafana/download/7.4.0?platformlinux …

【C语言】结构体详解

现实生活中一个事物&#xff0c;会有许多属性连接起来。而C语言引入一种构造数据类型——结构体 将属于一个事物的多个数据组织起来以体现其内部联系。 一、结构体类型的定义 结构体类型 是一种 构造类型&#xff0c;它是由若干成员组成的&#xff0c;每个成员可以是一个基本…

springBoot集成caffeine,自定义缓存配置 CacheManager

目录 springboot集成caffeine Maven依赖 配置信息&#xff1a;properties文件 config配置 使用案例 Caffeine定制化配置多个cachemanager springboot集成redis并且定制化配置cachemanager springboot集成caffeine Caffeine是一种基于服务器内存的缓存库。它将数据存储在…

零拷贝详解

1、在没有DMA技术之前的I/O过程是这样的&#xff1a; CPU发出对应的指令给磁盘控制器&#xff0c;然后返回磁盘控制器收到指令后&#xff0c;于是就开始准备数据&#xff0c;会把数据放入到磁盘控制器的内部缓冲区&#xff0c;然后产生中断CPU收到中断信号后&#xff0c;停下手…

springBoot的日志文件

日志是程序的重要组成部分&#xff0c;主要可以用来定位和排查问题。除此之外&#xff0c;还可以用来&#xff1a; 1. 记录用户的登录日志&#xff0c;方便分析用户是正常登录还是恶意破解&#xff1b; 2. 记录系统的操作日志&#xff0c;方便数据恢复和定位操作人&#xff1b;…

过滤器,监听器与拦截器的区别

过滤器&#xff0c;监听器与拦截器的区别 ​ 过滤器和监听器不是Spring MVC中的组件&#xff0c;而是Servlet的组件&#xff0c;由Servlet容器来管理。拦截器是Spring MVC中的组件&#xff0c;由Spring容器来管理 ​ Servlet过滤器与Spring MVC 拦截器在Web应用中所处的层次如…

javaWeb项目--二级评论完整思路

先来看前端需要什么吧&#xff1a; 通过博客id&#xff0c;首先需要显示所有一级评论&#xff0c;包括评论者的头像&#xff0c;昵称&#xff0c;评论时间&#xff0c;评论内容 然后要显示每个一级评论下面的二级评论&#xff0c;包括&#xff0c;评论者的头像&#xff0c;昵称…

【Spring】-Spring中Bean对象的存取

作者&#xff1a;学Java的冬瓜 博客主页&#xff1a;☀冬瓜的主页&#x1f319; 专栏&#xff1a;【Framework】 主要内容&#xff1a;往spring中存储Bean对象的三大方式&#xff1a;XML方式(Bean标签)&#xff1b;五大类注解&#xff1b;方法注解。从spring中取对象的两种方式…

查看单元测试用例覆盖率新姿势:IDEA 集成 JaCoCo

1、什么是 IDEA IDEA 全称 IntelliJ IDEA&#xff0c;是 Java 编程语言开发的集成环境。IntelliJ 在业界被公认为最好的 Java 开发工具&#xff0c;尤其在智能代码助手、代码自动提示、重构、JavaEE 支持、各类版本工具(git、SVN 等)、JUnit、CVS 整合、代码分析、 创新的 GUI…

DNS:使用 bind9 配置主从权威DNS服务器

写在前面 分享一些 使用 bind9 配置主从权威名称服务器的笔记理解不足小伙伴帮忙指正 对每个人而言&#xff0c;真正的职责只有一个&#xff1a;找到自我。然后在心中坚守其一生&#xff0c;全心全意&#xff0c;永不停息。所有其它的路都是不完整的&#xff0c;是人的逃避方式…

ML类CFAR检测器在不同环境中检测性能的分析

摘要&#xff1a;该文是楼主翻阅书籍以及一些论文总结出来的关于ML(均值)类CFAR检测器在不同环境中的性能对比&#xff0c;以及优缺点的总结&#xff0c;可以帮助大家面对不同情形如何选择CFAR问题。由于楼主见识短浅&#xff0c;文中难免出现不足之处&#xff0c;望各位指出。…

怎么做Tik Tok海外娱乐公会呢?新加坡市场怎么样?

一、为什么选择TikTok直播 1. 海外市场潜力巨大 • 自2016年始&#xff0c;多家直播平台陆续拓展至东南亚、中东、俄罗斯、日韩、欧美、拉美等地区。 • 海外市场作为直播发展新蓝海&#xff0c;2021年直播行业整申请cmxyci体规模达百亿美元&#xff0c;并维持高速增长。 &a…

geeemap学习总结(1)——Anaconda-VSCode-geemap环境安装与配置

配置conda geemap 环境 通过Anaconda配置geemap环境较为方便&#xff0c;首先需在系统中完成 Anaconda安装。创建名为geemap的环境conda create -n geemap切换到新建的环境conda activate geemap安装geemap依赖包conda install -c conda-forge geemap 安装mambaconda install …

网络编程 tcp udp http编程流程 网络基础知识

讲解 网络基础知识网络编程tcp编程流程图示理解bind和accept函数理解监视套接字和链接套接字理解linux和window下的编程实现tcp特点 udp编程流程图示理解udp特点 http编程流程图示理解编程实现-网站服务器 网络基础知识 OSI分层&#xff1a;应用层 表示层 会话层 传输层 网络层…

上传图片视频

分布式文件系统MinIo MinIO提供多个语言版本SDK的支持&#xff0c;下边找到java版本的文档&#xff1a; 地址&#xff1a;https://docs.min.io/docs/java-client-quickstart-guide.html MinIO测试&#xff08;上传、删除、下载&#xff09; public class MinioTest {MinioC…