flink的带状态的RichFlatMapFunction函数使用

背景

使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法

RichFlatMapFunction使用示例

下面的例子的输入是不用name下的count数量值,当本次name的数量和前一次name的数量相差超过配置的阈值100时,打印出来一条告警日志,详细代码如下:

package wikiedits.func.state;import java.util.Objects;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** Tuple2<String, Integer> 是输入的数据类型 String 是监控到异常值后的输出数据类型*/
public class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Integer>, String> {// 键值分区状态,对应每个name一个值ValueState<StateEntity> nameState;@Overridepublic void open(Configuration parameters) throws Exception {// 创建一个键值分区状态ValueStateDescriptor<StateEntity> state = new ValueStateDescriptor<>("nameState", StateEntity.class);nameState = getRuntimeContext().getState(state);}@Overridepublic void flatMap(Tuple2<String, Integer> input, Collector<String> collector) throws Exception {// 判断状态值是否为空(状态默认值是空)if (Objects.isNull(nameState.value())) {StateEntity sFalg = new StateEntity(input.f0, input.f1);nameState.update(sFalg);return;}// 和上一次的状态值比较StateEntity value = nameState.value();if (Math.abs(value.count - input.f1) > 100) {collector.collect(new String("监控到异常值,名称: " + input.f0 + " 上次的值:" + value + " 本次的值:" + input));}value.setName(input.f0);value.setCount(input.f1);// 更新状态值nameState.update(value);}}package wikiedits.func.state;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class RichFlatMapFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX,YYY,ZZZ三种nameString name = (0 == i % 3) ? "XXX" : ((i % 3 == 1) ? "YYY" : "ZZZ");int count = RandomUtils.nextInt(0, 1000);// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, count), timeStamp);// 每发射一次就延时1秒Thread.sleep(5000);}}@Overridepublic void cancel() {}});dataStream.keyBy((f) -> {return f.f0;}).flatMap(new MyRichFlatMapFunction()).print();env.execute();}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}

结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/185409.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OceanBase 如何通过日志观测冻结转储流程?

本文旨在通过日志解析 OceanBase 的冻结转储流程&#xff0c;以其冻结检查线程为切入点&#xff0c;以租户&#xff08;1002&#xff09;的线程名为例。 作者&#xff1a;陈慧明&#xff0c;爱可生测试工程师&#xff0c;主要参与 DMP 和 DBLE 自动化测试项目。 爱可生开源社区…

技术分享|基于 Cluster API 的 Kubernetes 集群生命周期管理

作者&#xff1a;SmartX SKS 产品研发工程师 杨海剑 背景 容器的发展催生了容器编排技术&#xff0c;而容器编排技术反过来又推动了容器的发展。容器编排领域则一度出现了 Swarm、Mesos 和 Kubernetes 等百家争鸣的局面。但随着 Kubernetes 脱颖而出&#xff0c;Kubernetes 成为…

早安心语微语早读,好好善待自己,珍惜今天,期待明天

1、保持阳光心态&#xff0c;积极面对人生&#xff0c;每个人&#xff0c;都沿着不同的轨道在活着&#xff0c;人生是一趟单程车&#xff0c;我们最应该做的&#xff0c;就是好好善待自己&#xff0c;珍惜今天&#xff0c;期待明天&#xff0c;那些走过的&#xff0c;错过的&am…

零代码编程:用ChatGPT批量将Mp4视频转为Mp3音频

文件夹中有很多mp4视频文件&#xff0c;如何利用ChatGPT来全部转换为mp3音频呢&#xff1f; 在ChatGPT中输入提示词&#xff1a; 你是一个Python编程专家&#xff0c;要完成一个批量将Mp4视频转为Mp3音频的任务&#xff0c;具体步骤如下&#xff1a; 打开文件夹&#xff1a;…

2023最新版JavaSE教程——第4天:流程控制语句之循环语句

目录 一、循环语句二、for循环2.1 基本语法2.2 应用举例2.3 练习 三、while循环3.1 基本语法3.2 应用举例3.3 练习 四、do-while循环4.1 基本语法4.2 应用举例4.3 练习4.4 对比三种循环结构4.5 "无限"循环4.5.1 基本语法4.5.2 应用举例 4.6 嵌套循环(或多重循环)4.6.…

网络流量分类概述

1. 什么是网络流量&#xff1f; 一条网络流量是指在一段特定的时间间隔之内&#xff0c;通过网络中某一个观测点的所有具有相同五元组(源IP地址、目的IP地址、传输层协议、源端口和目的端口)的分组的集合。 比如(10.134.113.77&#xff0c;47.98.43.47&#xff0c;TLSv1.2&…

【K8s集群离线安装-kubeadm】

1、kubeadm概述 kubeadm是官方社区推出的一个用于快速部署kubernetes集群的工具。这个工具能通过两条指令快速完成一个kubernetes集群的部署。 2、环境准备 2.1 软件环境 软件版本操作系统CentOS 7Docker19.03.13K8s1.23 2.2 服务器 最小硬件配置&#xff1a;2核CPU、2G内存…

【深入浅出Spring原理及实战】「夯实基础系列」360全方位渗透和探究Spring配置开发实战详解

360全方位渗透和探究Spring配置开发实战详解 Spring对于配置的转折点Xml配置 vs Java配置Xml配置模式的优点Xml配置模式的缺点Java配置模式的优点Java配置模式的缺点 Java编程配置流程配置代码案例 组件注入Bean注解配置自动扫描包路径和规则Filter常用的拦截类型FilterType.AS…

Hadoop 视频分析系统

视频分析系统 业务流程 原始数据 vedio.json {"rank":1,"title":"《逃出大英博物馆》第二集","dzl":"77.8","bfl":"523.9","zfl":"39000","type":"影视",&quo…

为什么冰酒会被视为珍品?

在某些年份&#xff0c;珍贵稀有的葡萄酒让酿酒师有了冒险的意愿&#xff0c;葡萄比平时在藤上停留更长时间&#xff0c;需要等待至少-7℃的温度&#xff0c;酿酒师需要与自然玩游戏&#xff0c;可以持续到1月&#xff0c;在罕见的情况下可以持续到2月。对于酿酒师来说&#xf…

基于鱼鹰算法的无人机航迹规划-附代码

基于鱼鹰算法的无人机航迹规划 文章目录 基于鱼鹰算法的无人机航迹规划1.鱼鹰搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用鱼鹰算法来优化无人机航迹规划。 1.鱼鹰搜索算法 …

Django初窥门径-自定义用户模型

前言 自定义用户模型在Django应用中是一个重要的话题&#xff0c;它涉及到如何根据您的项目需求以及特定的用户身份验证和授权需求来调整用户模型。在以下前言中&#xff0c;我将讲述为什么自定义用户模型是如此重要以及其潜在的优势&#xff1a; 随着Web应用的不断发展&…

JS 处理文档选择和范围创建【createRange | getSelection】

介绍 1、const selection window.getSelection(); 说明&#xff1a; 1、用于获取用户当前文档选择的对象&#xff1b; 2、它返回一个 Selection 对象&#xff0c;该对象代表了用户选择的文本范围&#xff08;可以包含一个或多个范围&#xff0c;因为用户可以同时选择多个不相…

OpenAI 工程师平均薪酬 92.5 万美元;SpaceX 明年将每两天发射一次丨 RTE 开发者日报 Vol.81

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE &#xff08;Real Time Engagement&#xff09; 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文…

Spring Boot 中自动装配机制的原理

&#xff08;摘自mic老师面试题&#xff09; 最近一个粉丝说&#xff0c;他面试了 4 个公司&#xff0c;有三个公司问他&#xff1a;“Spring Boot 中自动装配 机制的原理” 他回答了&#xff0c;感觉没回答错误&#xff0c;但是怎么就没给 offer 呢&#xff1f; 对于这个问题…

CRM系统中的客户保留是什么意思?有多少客户可以留下来?

一家企业&#xff0c;在销售过程中有多少客户是有效的&#xff1f;又有多少客户可以留下来&#xff1f;如果企业只顾着开发新客户&#xff0c;而忽略了客户保留&#xff0c;那么将会造成资源的浪费。那么CRM系统中的客户保留是什么意思&#xff1f; 什么是客户保留&#xff1f…

Web自动化测试入门篇详解

一、目的 web自动化测试作为软件自动化测试领域中绕不过去的一个“香饽饽”&#xff0c;通常都会作为广大测试从业者的首选学习对象&#xff0c;相较于C/S架构的自动化来说&#xff0c;B/S有着其无法忽视的诸多优势&#xff0c;从行业发展趋、研发模式特点、测试工具支持&…

【广州华锐互动】影视制作VR在线学习:身临其境,提高学习效率

随着科技的不断发展&#xff0c;影视后期制作技术也在日新月异。然而&#xff0c;传统的教学方式往往难以满足学员的学习需求&#xff0c;无法充分展现影视后期制作的魅力和潜力。近年来&#xff0c;虚拟现实(VR)技术的崛起为教学领域带来了新的机遇。通过VR教学课件&#xff0…

【日积月累】SpringBoot 通过注解@CacheConfig @Cacheable @CacheEvict @CachePut @Caching使用缓存

目录 1.前言2.引入依赖3.启动类加入注解EnableCaching4.常用注解4.1CacheConfig4.2Cacheable4.3CacheEvict4.4CachePut4.5Caching 5.总结6.参考 文章所属专区 日积月累 1.前言 Spring在3.1版本&#xff0c;就提供了一条基于注解的缓存策略&#xff0c;实际使用起来还是很丝滑…

数据仓库工具箱-零售业务

文章目录 一、维度模型设计的4步过程1.1 第一步&#xff1a;选择业务过程1.2 第二步&#xff1a;声明粒度1.3 第三步&#xff1a;确定维度1.4 第四步&#xff1a;确定事实 二、零售业务案例研究2.1 第一步&#xff1a;选择业务过程2.2 第二步&#xff1a;声明粒度2.3 第三步&am…