flink判断两个事件之间有没有超时(不使用CEP)

1.为啥不使用cep呢,cep的超时时间设置不好配置化,无法满足扩展要求

2.超时怎么界定。A事件发生后,过了N时间,还没有收到B事件,算超时。

代码如下:


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;@Slf4j
public class AsyncModelTimeoutHandler extends KeyedProcessFunction<String, JSONObject, JSONObject> {private static final long serialVersionUID = -61608451659272532L;private transient ValueState<Long> firstDataTime;private transient ValueState<Long> secondDataTime;private transient ValueState<String> eventType;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Long> firstDataDescriptor = new ValueStateDescriptor<>("firstDataTime", Long.class);firstDataTime = getRuntimeContext().getState(firstDataDescriptor);ValueStateDescriptor<Long> secondDataDescriptor = new ValueStateDescriptor<>("secondDataTime", Long.class);secondDataTime = getRuntimeContext().getState(secondDataDescriptor);ValueStateDescriptor<String> eventTypeDescriptor = new ValueStateDescriptor<>("eventType", String.class);eventType = getRuntimeContext().getState(eventTypeDescriptor);}@Overridepublic void processElement(JSONObject value, KeyedProcessFunction<String, JSONObject, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {Long currentTimestamp = value.getLong("ts");if (value.containsKey("timeout")) {//异步请求消息long timeout = value.getLong("timeout");firstDataTime.update(currentTimestamp + timeout);eventType.update(value.getString("event"));ctx.timerService().registerProcessingTimeTimer(currentTimestamp + timeout);} else {secondDataTime.update(currentTimestamp);}}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<String, JSONObject, JSONObject>.OnTimerContext ctx, Collector<JSONObject> out) throws Exception {Long firstTime = firstDataTime.value();Long lastTime = secondDataTime.value();if (lastTime == null || (firstTime != null && lastTime >= firstTime)) {//超时了log.info("AsyncModelTimeoutHandler onTimer handle triggerTime={}, firstTime={}, secondTime={},key={}", timestamp, firstTime, lastTime, ctx.getCurrentKey());JSONObject r = new JSONObject();r.put("id", ctx.getCurrentKey());r.put("judgeTime", timestamp);r.put("event", eventType.value());out.collect(r);}firstDataTime.clear();secondDataTime.clear();eventType.clear();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/14593.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

38、【OS】【Nuttx】OSTest分析(3):参数传递

背景 接之前 blog 36、【OS】【Nuttx】OSTest分析&#xff08;2&#xff09;&#xff1a;环境变量测试 37、【OS】【Nuttx】OSTest分析&#xff08;2&#xff09;&#xff1a;任务创建 分析完环境变量测试&#xff0c;和任务创建的一些关键要素&#xff0c;OSTest 进入下一阶段…

储能系统-系统架构

已更新系列文章包括104、61850、modbus 、单片机等&#xff0c;欢迎关注 IEC61850实现方案和测试-1-CSDN博客 快速了解104协议-CSDN博客 104调试工具2_104协议调试工具-CSDN博客 1 电池储能系统&#xff08;BESS&#xff09; 架构 电池储能系统主要包括、电池、pcs、本地控制…

Listener监听器和Filter过滤器

一.监听器 1.是javaweb的三大组件之一,分别是Servlet程序,Listener监听器,Filter过滤器 2.Listener是JvaEE的规范,就是接口,监听器的作用就是监听某种变化(一般是对象创建/销毁,属性变化),触发对应方法完成相应的任务 3.ServletContextListener:/*当一个类实现了ServletContex…

Go 中的 7 个常见接口错误

Go 仍然是一门新语言,如果你正在使用它,它很可能不是你的第一门编程语言。 不同的语言,既为你带来了经验,也带来了偏见。你用以前的任何语言做的事情,在 Go 中用相同的方法可能不是一个好主意。 学习 Go 不仅仅是学习一种新的语法。这也是学习一种新的思维方式来思考你的…

【AI实践】Cursor上手-跑通Hello World和时间管理功能

背景 学习目的&#xff1a;熟悉Cursor使用环境&#xff0c;跑通基本开发链路。 本人背景&#xff1a;安卓开发不熟悉&#xff0c;了解科技软硬件常识 实践 基础操作 1&#xff0c;下载安装安卓Android Studio 创建一个empty project 工程&#xff0c;名称为helloworld 2&am…

存储系统、网盘系统的访问留痕

一、适用场景 1、需要了解是否存在非法访问存储系统或网盘系统&#xff1a;各企业或单位为方便远程办公或远程管理&#xff0c;若自建&#xff08;保护隐私数据或敏感资料&#xff09;了存储系统或网盘系统&#xff0c;那么到底有哪些ip地址或用户从远程公网访问存储系统或网盘…

求助DeepSeek帮我开发一个直线审批流程设计页面Vue2.0

之前使用文心一言协助开发过类似的页面&#xff0c;需求方认为某些业务表单需要添加审批流程&#xff0c;可以人为设置审批步骤&#xff0c;由于需求很模糊而且人/天有限&#xff0c;当时的提问很混乱&#xff0c;内容如下&#xff1a; 我的vue2.0系统中需要审批流程设计页面&a…

初级数据结构:栈和队列

目录 一、栈 (一)、栈的定义 (二)、栈的功能 (三)、栈的实现 1.栈的初始化 2.动态扩容 3.压栈操作 4.出栈操作 5.获取栈顶元素 6.获取栈顶元素的有效个数 7.检查栈是否为空 8.栈的销毁 9.完整代码 二、队列 (一)、队列的定义 (二)、队列的功能 (三&#xff09…

LLM:DeepSeek 系列(二)

原文链接 3、DeepSeek-V2 DeepSeek-V2 发布于 2024 年 5 月&#xff0c;为多领域专家&#xff08;MoE&#xff09;语言模型&#xff0c;包含总共 2360 亿个参数&#xff0c;其中每个词元激活 210 亿个参数&#xff0c;并支持 12.8 万个词元的上下文长度。DeepSeek-V2 采用包括…

【学术投稿】第五届计算机网络安全与软件工程(CNSSE 2025)

重要信息 官网&#xff1a;www.cnsse.org 时间&#xff1a;2025年2月21-23日 地点&#xff1a;中国-青岛 简介 第五届计算机网络安全与软件工程&#xff08;CNSSE 2025&#xff09;将于2025年2月21-23日在中国-青岛举行。CNSSE 2025专注于计算机网络安全、软件工程、信号处…

开源项目介绍-词云生成

开源词云项目是一个利用开源技术生成和展示词云的工具或框架&#xff0c;广泛应用于文本分析、数据可视化等领域。以下是几个与开源词云相关的项目及其特点&#xff1a; Stylecloud Stylecloud 是一个由 Maximilianinir 创建和维护的开源项目&#xff0c;旨在通过扩展 wordclou…

Docker基础以及单体实战

Docker 一、Docker1.1 Docker组成1.2 Dcoker运行图1.3 名称空间Namepace 1.4 docker、Docker compose、kubermetes 二、Docker安装2.1 在线Docker安装2.2 使用官方通用安装脚本2.3 二进制安装Docker三、Docker基础命令3.1 启动类3.2 镜像类3.3 容器类3.4 网络类3.5 Docker comp…

2025年日祭

本文将同步发表于洛谷&#xff08;暂无法访问&#xff09;、CSDN 与 Github 个人博客&#xff08;暂未发布&#xff09; 本蒟自2025.2.8开始半停课。 任务计划&#xff08;站外题与专题&#xff09; 数了一下&#xff0c;通过人数比较高的题&#xff0c;也就是我准备补的题&a…

UIAbility 生命周期方法

生命周期流程图 UIAbility的生命周期官方文档地址https://developer.huawei.com/consumer/cn/doc/harmonyos-guides-V13/uiability-lifecycle-V13 1. onCreate(want: Want, launchParam: LaunchParam) 触发时机&#xff1a;Ability首次创建时 作用&#xff1a;初始化核心资源…

模型 冗余系统(系统科学)

系列文章分享模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。为防故障、保运行的备份机制。 1 冗余系统的应用 1.1 冗余系统在企业管理中的应用-金融行业信息安全的二倍冗余技术 在金融行业&#xff0c;信息安全是保障业务连续性和客户资产安全的关键。随着数字化…

【大数据技术】Spark分布式实现词频统计(hadoop+python+spark)

Spark分布式实现词频统计&#xff08;hadooppythonspark&#xff09; 搭建完全分布式高可用大数据集群&#xff08;VMwareCentOSFinalShell&#xff09; 搭建完全分布式高可用大数据集群&#xff08;HadoopMapReduceYarn&#xff09; 本机PyCharm远程连接CentOS虚拟机&#x…

28.<Spring博客系统⑤(部署的整个过程(CentOS))>

引入依赖 Spring-boot-maven-plugin 用maven进行打包的时候必须用到这个插件。看看自己pom.xml中有没有这个插件 并且看看配置正确不正常。 注&#xff1a;我们这个项目打的jar包在30MB左右。 <plugin><groupId>org.springframework.boot</groupId><artif…

C++Primer学习(2.2)

2.2 变量 变量提供一个具名的、可供程序操作的存储空间。C中的每个变量都有其数据类型,数据类型决定着变量所占内存空间的大小和布局方式、该空间能存储的值的范围&#xff0c;以及变量能参与的运算。对C程序员来说,“变量(variable)”和“对象(object)”一般可以互换使用。 术…

电脑开机提示按f1原因分析及终极解决方法来了

经常有网友问到一个问题&#xff0c;我电脑开机后提示按f1怎么解决&#xff1f;不管理是台式电脑&#xff0c;还是笔记本&#xff0c;都有可能会遇到开机需要按F1&#xff0c;才能进入系统的问题&#xff0c;引起这个问题的原因比较多&#xff0c;今天小编在这里给大家列举了比…

Linux系统命令无法使用(glib库相关问题)

1.背景描述 Yum强制安装了一些软件&#xff0c;安装软件成功无报错&#xff0c;完成后不久突然发现系统出问题了&#xff0c;所有的命令无法使用了&#xff0c;如ls、mv、cat等基本命令报错。 relocation error&#xff1a; /lib64/libpthread.so.0: symbol_libc_dl_error_tsd …