Flink之流的转换

ProcessFuncion处理函数

  • 功能
    • 拥有富函数功能
      - 生命周期方法
      - 状态编程
    • 对元素的处理功能processElement, 在不同的处理函数中,该方法的名字略有区别
    • 定时器编程
      • TimeService:定时服务,可以用于注册定时器,删除定时器
      • ontimer():定时器触发后会自动调用该方法,我们将需要完成的工作写到该方法中
    • 侧输出流
  • 分类
    • processFunction: 普通流DataStream调用
    • keyedProcessFunction: KeyedStream, 经过Keyby的数据流
    • ProcessWindowFunction:按键分区经过window操作的数据流,WindowedStream,全窗口函数
    • ProcessAllWindowFunction:非按键分区的window数据流调用
    • CoProcessFunction:ConnectedStreams, 由两个数据流经过connect后得到的,没有经过keyby时调用
    • ProcessJoinFunction:IntervalJoined , 两个流经过IntervalJoin后得到的流
    • BroadcastProcessFunction:一个普通流connect广播流后得到,之后调用process需要传入该Function
    • KeyedBroadcastProcessFunction: 一个keyby流connect广播流后得到

常用流之间的转换关系转换关系

  • SingleOutputDataSteam是继承自DataStream的。
  • window操作必须是基于keyby流
  • 特殊流经过reduce, aggreagate,process, apply等聚合操作后就变为SingleOutputDataStream

processFunction的定时器方法

定时服务 和定时器

  • TimerService:定时服务,用于注册定时器,删除定时
    • long currentProcessingTime():获取当前的处理时间
    • registerProcessingTimer(): 注册处理时间定时器
    • registerEventTimeTimer():注册事件时间定时器
    • currentWatermark():获取当前水位线
    • deleteEventTimeTimer():注册事件时间定时器
  • Timer:定时器,在未来的某个时间注册一个事件,定时器触发,会执行定义的事件
  • ontimer(): 定时器触发以后,会自动调用该方法
  • 注意:
    • 要定时先keyby, 设置定时器必须基于一个keyby流
    • 同一个key在注册多个相同时间的定时器,未来之后触发一次,不同的key注册相同时间的定时器,每个key都会触发一次。

TopN 实现

/*** title:** @Author 浪拍岸* @Create 11/12/2023 下午3:19* @Version 1.0** 实时统计一段时间的热门URL* 例如:10秒内最热门的两个URL链接,并且每5秒更新一次** 方案1:不进行keyby操作,将所有URL数据统一往一个窗口* 中收集,并且使用全量聚合,等到窗口触发计算时,在处理函数中* 对窗口内所有数据进行汇总处理**/
public class Flink03_TopN {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);SingleOutputStreamOperator<Event> ds = Flink06_EventSource.getEventSource(env).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs()));ds.print("input");ds.windowAll(
//                TumblingEventTimeWindows.of(Time.seconds(10))SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessAllWindowFunction<Event, String, TimeWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Event, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<String> out) throws Exception {//统计每个URL的点击次数long start = context.window().getStart();long end = context.window().getEnd();HashMap<String, Long> urlCountMap = new HashMap<>();for (Event element : elements) {Long count = urlCountMap.getOrDefault(element.getUrl(), 0L);urlCountMap.put(element.getUrl(), count+1);}//将map结构转换为list
//                        ArrayList<UrlUserCount> urlCountList = new ArrayList<>(urlCountMap.size());List<UrlUserCount> urlUserCountList = urlCountMap.entrySet().stream().map(entry -> new UrlUserCount(entry.getKey(), entry.getValue(), start, end)).collect(Collectors.toList());urlUserCountList.sort(new Comparator<UrlUserCount>() {@Overridepublic int compare(UrlUserCount o1, UrlUserCount o2) {return Long.compare(o2.getCount(), o1.getCount());}});//取topNStringBuilder result = new StringBuilder("*******************************\n");for (int i = 0; i < Math.min(2,urlUserCountList.size()); i++) {UrlUserCount urlUserCount = urlUserCountList.get(i);result.append("TOP."+(i+1)+" "+urlUserCount+"\n");}result.append("*****************************************\n");//输出out.collect(result.toString());}}).print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/214955.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++中字符串详解

在C语言中只能通过字符串数组来模拟字符串&#xff0c;没有字符串类型。在C引入了string类来表示字符串类型。从而用它定义字符串。 在C语言中&#xff1a; char str[] "abc"; char str[] {a&#xff0c;b,c,\0}; char* str "abc"; //这三种形式是C语言…

Java TCP(一对一)聊天简易版

客户端 import java.io.*; import java.net.Socket; import java.util.Date; import javax.swing.*;public class MyClient {private JFrame jf;private JButton jBsend;private JTextArea jTAcontent;private JTextField jText;private JLabel JLcontent;private Date data;p…

推荐一个FL Studio最适配的midi键盘?

Hello大家好&#xff01;好消息&#xff01;好消息&#xff01;特大好消息&#xff01; 水果党们&#xff01;终于有属于自己的专用MIDI键盘啦&#xff01; 万众期待的Novation FLKEY系列 正式出炉&#xff01; 做编曲和音乐制作的朋友们&#xff0c;对水果软件FLSTUDIO应该…

使用xshell连接虚拟机(服务器)

作者&#xff1a;余小小 Xshell Xshell [1] 是一个强大的安全终端模拟软件&#xff0c;它支持SSH1, SSH2, 以及Microsoft Windows 平台的TELNET 协议。Xshell 通过互联网到远程主机的安全连接以及它创新性的设计和特色帮助用户在复杂的网络环境中享受他们的工作。 Xshell可以…

2023年度盘点:智能汽车、自动驾驶、车联网必读书单

【文末送书】今天推荐几本自动驾驶领域优质书籍 前言 2023年&#xff0c;智能驾驶和新能源汽车行业仍然有着肉眼可见的新进展。自动驾驶技术继续尝试从辅助驾驶向自动驾驶的过渡&#xff0c;更重要的是相关技术成本的下降。根据《全球电动汽车展望2023》等行业报告&#xff0c…

vue-baidu-map实现在地图上选择范围并解决相关问题

vue-baidu-map实现在地图上选择范围并解决相关问题 实现地图上选择不规则范围实现功能遇到的问题1、覆盖物多边形怎么才能盖住覆盖物点2、遇到其他问题 实现地图上选择不规则范围 这个功能比较简单&#xff0c;只需要使用vue-baidu-map插件的覆盖物多边形功能就行了。直接看文…

dToF直方图之美_激光雷达多目标检测

直方图提供了一种简单有效的方法来分析信号分布并识别与目标存在相对应的峰值,并且能够可视化大量数据,让测距数形结合。在车载激光雷达中,对于多目标检测,多峰算法统计等,有着区别于摄像头以及其他雷达方案的天然优势。 如下图,当中有着清晰可见的三个峰值,我们可以非…

Java智慧校园-中小学校园管理系统源码

智慧校园系统是通过信息化手段&#xff0c;实现对校园内各类资源的有效集成 整合和优化&#xff0c;实现资源的有效配置和充分利用&#xff0c;将校务管理过程的优化协调。为校园提供数字化教学、数字化学习、数字化科研和数字化管理。 致力于为家长和教师提供一个全方位、多层…

消费增值:一种改变消费观念的新模式

据统计&#xff0c;全球电子商务市场在过去的五年内以每年20%的速度增长&#xff0c;预计到2025年将达到5.5万亿美元。然而&#xff0c;在这个庞大的市场中&#xff0c;消费者在购物后往往只获得了商品或服务本身&#xff0c;而没有获得更多的附加价值。为了改变这种消费观念&a…

内网穿透的应用-如何结合Cpolar内网穿透工具实现在IDEA中远程访问家里或者公司的数据库

文章目录 1. 本地连接测试2. Windows安装Cpolar3. 配置Mysql公网地址4. IDEA远程连接Mysql小结 5. 固定连接公网地址6. 固定地址连接测试 IDEA作为Java开发最主力的工具&#xff0c;在开发过程中需要经常用到数据库&#xff0c;如Mysql数据库&#xff0c;但是在IDEA中只能连接本…

SpringMVC项目出现404

目录 问题讲解&#xff1a; 解决方案&#xff1a; 1、处理器映射器和处理器适配器以及视图解析器没有配置好 2、Controller的包扫描没有加或者包扫描的配置是错误的 3、当然也有说jar包没有 4、请求地址是错误的 5、还有一种解决办法说web.xml配置DispatcherServlet的时…

Android Studio的笔记--String和byte[]

String和byte[]的相互转换&#xff0c;字节数组转换 String转换byte[]文本16进制字节数组 byte[]转换String文本16进制 其它 String转换byte[] 文本 将字符串&#xff08;String&#xff09;转换为字节&#xff08;byte&#xff09;的方法。默认使用的是UTF-8编码 StandardCh…

二叉树题目:在受污染的二叉树中查找元素

文章目录 题目标题和出处难度题目描述要求示例数据范围 前言解法一思路和算法代码复杂度分析 解法二思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;在受污染的二叉树中查找元素 出处&#xff1a;1261. 在受污染的二叉树中查找元素 难度 5 级 题目描述 要求…

脱碱软化树脂Tulsimer CXO-5 MP 高盐水除钙镁树脂

一、产品介绍 Tulsimer CXO-5 MP 是一款大孔弱酸性丙烯酸系阳离子交换树脂&#xff0c;能除去水中的碱度和硬度&#xff0c;特别是除去水中的碳酸氢盐、碳酸盐及其它碱性盐类&#xff0c;适合运用于纯水 ,脱碱软化及选择性的去除重金属。适合在宽广的 pH 及温度范围情况下操作…

深入理解软件测试中的Web请求流程!

在软件开发的过程中&#xff0c;软件测试是不可或缺的一环&#xff0c;它有助于确保软件系统的稳定性、可靠性和安全性。而在众多测试中&#xff0c;Web请求流程的测试显得尤为重要&#xff0c;因为几乎所有的现代应用都离不开网络交互。接下来我们将深入探讨软件测试中完整的W…

【带头学C++】----- 九、类和对象 ---- 9.10 C++设计模式之单例模式设计

❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️麻烦您点个关注&#xff0c;不迷路❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️ 目 录 9.10 C设计模式之单例模式设计 举例说明&#xff1a; 9.10 C设计模式之单例模式设计 看过我之前的文章的&#xff0c;简单讲解过C/Q…

<Linux> 进程

目录 一、进程概念 什么是task_struck task_struct包含内容 二、查看进程 1. ps 查看&#xff1a; 2. /proc/目录查看 3. top 指令 三、系统调用获取进程标示符 获取自己、父进程ID 四、创建进程 1. 初识fork 2. 理解fork创建子进程 3. fork后的数据修改 4.for…

直流电和交流电

直流电&#xff08;Direct Current&#xff0c;简称DC&#xff09;和交流电&#xff08;Alternating Current&#xff0c;简称AC&#xff09;是电流的两种基本形式。 1. 直流电 直流电是指电流方向始终保持不变的电流。在直流电中&#xff0c;电子只能沿着一个方向移动。直流电…

网络攻击(二)--情报搜集阶段

4.1. 概述 在情报收集阶段&#xff0c;你需要采用各种可能的方法来收集将要攻击的客户组织的所有信息&#xff0c;包括使用社交网络、Google Hacking技术、目标系统踩点等等。 而作为渗透测试者&#xff0c;你最为重要的一项技能就是对目标系统的探查能力&#xff0c;包括获知…

windows MYSQL下载和自定路径安装,以及解决中文乱码问题。

文章讲的很详细&#xff0c;请耐心往下看。 一、mysql下载 下载网址&#xff1a;https://www.mysql.com/downloads/ 表示不登录&#xff0c;直接下载。 以上就把安装包下载完了。下载是8.0.35版本。 二、接下来看怎么安装 1.双击安装包&#xff0c;进行安装。 注意&#x…