大数据-玩转数据-Flink窗口函数

一、Flink窗口函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。

二、ReduceFunction(增量聚合函数)

输入和输出必须一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}

运行结果
在这里插入图片描述
在这里插入图片描述

三、AggregateFunction(增量聚合函数)

输入和输出可以不一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}

运行结果
在这里插入图片描述
在这里插入图片描述

四、ProcessWindowFunction(全窗口函数)

上面例子里已经用到

new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/108980.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql 命令行 执行sql文件

方法1 source source file.sql; file.sql : 绝对路径或 相对路径。 方法2 mysql -u xxx -p < file.sql 方法3 MySQLImport 工具 mysqlimport [options] database file_name 其中&#xff0c;database为要导入数据的数据库名&#xff0c;file_name为要导入的SQL文件名。还可以…

算法通过村第四关-栈白银笔记|括号问题

文章目录 前言1. 括号匹配问题2. 最小栈问题3. 最大栈 总结 前言 提示&#xff1a;如果让我送给年轻人四个字&#xff0c;就是&#xff1a;量力而行。 量力而行不会失眠&#xff0c;不会啃老&#xff0c;不会为各种考试焦虑。顺其自然活得轻松。其实&#xff0c;量力而行最易大…

Docker学习笔记

Docker学习笔记 docker的作用docker的基本组成安装docker阿里云镜像加速run的流程和docker原理 docker的思想来自于集装箱。 核心思想&#xff1a; 隔离 docker可以通过隔离机制将服务器利用到极致。 虚拟机&#xff1a;在windows中装一个Vmware&#xff0c;通过这个软件可以虚…

使用EF Core更新与修改生产数据库

使用EF Core的Code First&#xff0c;在设计阶段&#xff0c;直接使用Database.EnsureCreated()和EnsureDeleted()可以快速删除、更新最新的数据结构。由于没有什么数据&#xff0c;删除的风险非常低。但是对于已经投入生产的数据库&#xff0c;这个方法就绝对不可行了。 考虑…

粒子群算法的基本原理和Matlab实现

1.案例背景 1.1 PSO算法介绍 粒子群优化算法(Particle Swarm Optimization,PSO)是计算智能领域,除了蚁群算法,鱼群算法之外的一种群体智能的优化算法,该算法最早是由Kennedy和 Eberhart 在1995年提出的。PSO算法源于对鸟类捕食行为的研究,鸟类捕食时,每只鸟找到食物最简单有效…

mac清理磁盘空间软件有哪些 mac清理磁盘空间怎么清理

随着时间的推移&#xff0c;Mac电脑上的文件会越来越多&#xff0c;很快就会占满磁盘空间。这时候&#xff0c;我们需要一个好的Mac清理磁盘空间软件来释放空间&#xff0c;保持电脑的良好性能。那么&#xff0c;mac清理磁盘空间软件有哪些呢&#xff1f;接下来&#xff0c;我将…

用 PHP 和 JavaScript 显示地球卫星照片

向日葵 8 号气象卫星是日本宇宙航空研究开发机构设计制造的向日葵系列卫星之一&#xff0c;重约 3500 公斤&#xff0c;设计寿命 15 年以上。该卫星于 2014 年 10 月 7 日由 H2A 火箭搭载发射成功&#xff0c;主要用于监测暴雨云团、台风动向以及持续喷发活动的火山等防灾领域。…

几个nlp的小任务(生成式任务——语言模型(CLM与MLM))

@TOC 本章节需要用到的类库 微调任意Transformers模型(CLM因果语言模型、MLM遮蔽语言模型) CLM MLM 准备数据集 展示几个数据的结构

【Spring Boot】数据库持久层框架MyBatis — MyBatis简介

MyBatis简介 本节首先会介绍什么是ORM、什么是MyBatis、MyBatis的特点以及核心概念&#xff0c;最后介绍MyBatis是如何启动、如何加载配置文件的&#xff1f; 1.什么是ORM ORM&#xff08;Object Relational Mapping&#xff0c;对象关系映射&#xff09;是为了解决面向对象…

下一代存储解决方案:湖仓一体

文章首发地址 湖仓一体是将数据湖和数据仓库相结合的一种数据架构&#xff0c;它可以同时满足大数据存储和传统数据仓库的需求。具体来说&#xff0c;湖仓一体可以实现以下几个方面的功能&#xff1a; 数据集成&#xff1a; 湖仓一体可以集成多个数据源&#xff0c;包括结构…

C++ 网络编程项目fastDFS分布式文件系统(九)总结

1. Location语法 1. 语法规则 location [ |~|~ * |^~ ] /uri/ { … } 正则表达式中的特殊字符 : - . () {} [] * ? 2. Location 优先级说明 在 nginx 的 location 和配置中 location 的顺序没有太大关系。 与 location 表达式的类型有关。 相同类型的表达式&a…

K8S最新版本集群部署(v1.28) + 容器引擎Docker部署(上)

温故知新 &#x1f4da;第一章 前言&#x1f4d7;背景&#x1f4d7;目的&#x1f4d7;总体方向 &#x1f4da;第二章 基本环境信息&#x1f4d7;机器信息&#x1f4d7;软件信息&#x1f4d7;部署用户kubernetes &#x1f4da;第三章 Kubernetes各组件部署&#x1f4d7;安装kube…

BlazorServer中C#与JavaScript的相互调用

BlazorServer中C#与JavaScript的相互调用 前言&#xff1a; ​ 虽然BlazorServer中推荐使用C#在razor页面中的替代JavaScript来完成逻辑的编写&#xff0c;但当需要使用第三方的javascript文件/组件里的内容时&#xff0c;则难免要在C#中调用其方法或对象。反之当你的(用到第…

Java:HashMap、LinkedHashMap、TreeMap集合的底层原理和集合的嵌套

HashMap的底层原理 LinkedHashMap的底层原理 TreeMap集合的底层原理 集合的嵌套

Elasticsearch 8.X reindex 源码剖析及提速指南

1、reindex 源码在线地址 为方便大家验证&#xff0c;这里给出 reindex github 源码地址。 https://github.com/elastic/elasticsearch/blob/001fcfb931454d760dbccff9f4d1b8d113f8708c/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java reindex 常见…

IDEA对Web和Tomcat的一些配置

这里只是做了自己学习中的一点记录&#xff0c;仅供参考哈&#xff01; 配置Tomcat Modules新增Web 新增module后新增Artifacts 新增Artifacts后Tomcat新增布署 将指定的module由普通java项目变成web项目 直接创建布署到Tomcat时所需要的Aritifacts包 配置Servlet的依赖包 配置…

Vue2学习笔记の使用Vue脚手架

目录 使用Vue脚手架脚手架文件结构关于不同版本的Vuevue.config.js配置文件ref属性props配置项mixin(混入)插件scoped样式总结TodoList案例webStorage组件的自定义事件全局事件总线&#xff08;GlobalEventBus&#xff09;消息订阅与发布&#xff08;pubsub&#xff09;nextTic…

redis windows 版本安装

1. 下载windows安装包并解压 如果是Linux版本可以直接到官网下载&#xff0c;自3.x起官网和微软网站就没有redis安装包更新了&#xff0c;好在github有开发者在编译发布更新&#xff08;目前最新有5.0.9版本可下&#xff09;&#xff0c;地址&#xff1a;redis windows 5版本下…

Maven 基础之安装和命令行使用

Maven 的安装和命令行使用 1. 下载安装 下载解压 maven 压缩包&#xff08;http://maven.apache.org/&#xff09; 配置环境变量 前提&#xff1a;需要安装 java 。 在命令行执行如下命令&#xff1a; mvn --version如出现类似如下结果&#xff0c;则证明 maven 安装正确…

MyBatisPlus实现多租户功能

前言&#xff1a;多租户是一种软件架构技术&#xff0c;在多用户的环境下&#xff0c;共有同一套系统&#xff0c;并且要注意数据之间的隔离性。 一、SaaS多租户简介 1.1、SaaS多租户 SaaS&#xff0c;是Software-as-a-Service的缩写名称&#xff0c;意思为软件即服务&#x…