Flink CEP(三)pattern动态更新

        线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

2.代码实现

       首先实现一个用户API。

package cep.functions;import java.io.Serializable;import org.apache.flink.api.common.functions.Function;import cep.pattern.Pattern;/*** @author StephenYou* Created on 2023-07-23* Description: 动态Pattern接口(用户调用API)不区分key*/
public interface DynamicPatternFunction<T> extends Function, Serializable {/**** 初始化* @throws Exception*/public void init() throws Exception;/*** 注入新的pattern* @return*/public Pattern<T,T> inject() throws Exception;/*** 一个扫描周期:ms* @return*/public long getPeriod() throws Exception;/*** 规则是否发生变更* @return*/public boolean isChanged() throws Exception;
}

        希望上述API的调用方式如下。

//正常调用CEP.pattern(dataStream,pattern);//动态PatternCEP.injectionPattern(dataStream, new UserDynamicPatternFunction())

        所以需要修改CEP-Lib源码

        b.增加injectionPattern函数。

public class CEP {/**** Dynamic injection pattern function * @param input* @param dynamicPatternFunction* @return* @param <T>*/public static <T> PatternStream<T> injectionPattern throws Exception (DataStream<T> input,DynamicPatternFunction<T> dynamicPatternFunction){return new PatternStream<>(input, dynamicPatternFunction); }
}

        增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。

public class PatternStream<T> {PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));}
}

        修改PatternStreamBuilder.build, 增加调用函数的过程。

        final CepOperator<IN, K, OUT> operator = null;if (patternFunction == null ) {operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);} else {operator = new CepOperator<>(inputSerializer,isProcessingTime,patternFunction,comparator,null,processFunction,lateDataOutputTag);}

        增加对应的CepOperator构造函数。

    public CepOperator(final TypeSerializer<IN> inputSerializer,final boolean isProcessingTime,final DynamicPatternFunction patternFunction,@Nullable final EventComparator<IN> comparator,@Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,final PatternProcessFunction<IN, OUT> function,@Nullable final OutputTag<IN> lateDataOutputTag) {super(function);this.inputSerializer = Preconditions.checkNotNull(inputSerializer);this.patternFunction = patternFunction;this.isProcessingTime = isProcessingTime;this.comparator = comparator;this.lateDataOutputTag = lateDataOutputTag;if (afterMatchSkipStrategy == null) {this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();} else {this.afterMatchSkipStrategy = afterMatchSkipStrategy;}this.nfaFactory = null;}

        加载Pattern,构造NFA

    @Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);//初始化if (patternFunction != null) {patternFunction.init();Pattern pattern = patternFunction.inject();afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);long period = patternFunction.getPeriod();// 注册定时器检测规则是否变更if (period > 0) {getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);}}nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

        状态清理一共分为两块: 匹配状态数据清理、定时器清理;

        进行状态清理:

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (patternFunction != null) {// 规则版本更新if (needRefresh.value() < refreshVersion.get()) {//清除状态computationStates.clear();elementQueueState.clear();partialMatches.releaseCacheStatisticsTimer();//清除定时器Iterable<Long> registerTime = registerTimeState.get();if (registerTime != null) {Iterator<Long> iterator = registerTime.iterator();while (iterator.hasNext()) {Long l = iterator.next();//删除定时器timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);//状态清理iterator.remove();}}//更新当前的版本needRefresh.update(refreshVersion.get());}}
}

        上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。

    @Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//初始化状态if (patternFunction != null) {/*** 两个标识位状态*/refreshFlagState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));if (context.isRestored()) {if (refreshFlagState.get().iterator().hasNext()) {refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());}} else {refreshVersion = new AtomicInteger(0);}needRefresh = context.getKeyedStateStore().getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));}
}

3.测试验证

        设置每10s变更一次Pattern。

 PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {@Overridepublic Map select(Map map) throws Exception {map.put("processingTime", System.currentTimeMillis());return map;}}).print();env.execute("SyCep");}public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {public TestDynamicPatternFunction() {this.flag = true;}boolean flag;int time = 0;@Overridepublic void init() throws Exception {flag = true;}@Overridepublic Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()throws Exception {// 2种patternif (flag) {Pattern pattern = Pattern.<Tuple3<String, Long, String>>begin("start").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("success");}}).times(1).followedBy("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("fail");}}).times(1).next("end");return pattern;} else {Pattern pattern = Pattern.<Tuple3<String, Long, String>>begin("start2").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("success2");}}).times(2).next("middle2").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("fail2");}}).times(2).next("end2");return pattern;}}@Overridepublic long getPeriod() throws Exception {return 10000;}@Overridepublic boolean isChanged() throws Exception {flag = !flag ;time += getPeriod();System.out.println("change pattern : " + time);return true;}}

打印结果:符合预期

4.源码地址

感觉有用的话,帮忙点个小星星。^_^

 GitHub - StephenYou520/SyCep: CEP 动态Pattern

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/77993.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谈谈新能源技术

目录 1.什么是新能源 2.新能源的发展历程 3.新能源的优势 1.什么是新能源 新能源是指利用自然界可再生资源、无害的高效能资源或者核能等来替代传统能源的能源形式。与传统化石燃料相比&#xff0c;新能源具有更高的可持续性、更低的环境影响和更长久的供应。以下是一些常见的…

C++多线程环境下的单例类对象创建

使用C无锁编程实现多线程下的单例模式 贺志国 2023.8.1 在多线程环境下创建一个类的单例对象&#xff0c;要比单线程环境下要复杂很多。下面介绍在多线程环境下实现单例模式的几种方法。 一、尺寸较小的类单例对象创建 如果待创建的单例类SingletonForMultithread内包含的成…

openwr折腾记7-Frpc使用自主域名解析透传本地服务

FFrpc使用自主域名解析透传本地服务 综述frp透传服务结构流程-http服务域名透传 第一部分openwrt-frpc客户端配置和使用指定服务器指定规则在自己的域名运营商处添加域名解析 第二部分gpt3.5辅助shell编码实现frp自由切换服务器并更新dns解析获取切换服务器参数脚本实现切换 更…

Kubernetes(K8s)从入门到精通系列之十:使用 kubeadm 创建一个高可用 etcd 集群

Kubernetes K8s从入门到精通系列之十&#xff1a;使用 kubeadm 创建一个高可用 etcd 集群 一、etcd高可用拓扑选项1.堆叠&#xff08;Stacked&#xff09;etcd 拓扑2.外部 etcd 拓扑 二、准备工作三、建立集群1.将 kubelet 配置为 etcd 的服务管理器。2.为 kubeadm 创建配置文件…

postgresql表膨胀处理之pgcompacttable部署及使用

环境&#xff1a; 1&#xff09;redhat-release&#xff1a;CentOS Linux release 7.6.1810 (Core) 2&#xff09;database version&#xff1a;postgresql 14.6 一、添加pgstattuple pgcompacttable工具使用过程中需要依赖pgstattuple&#xff0c;因此需先添加pgstattuple…

服务器排查并封禁ip访问

前言 购买的服务器难免会遇到被攻击的情况&#xff0c;当我们发现服务器状态异常时&#xff0c;可以通过连接当前服务器的ip排查一下&#xff0c;并对可疑ip进行封锁禁止。我们可以通过路由跟踪来查看可疑ip。以下是两种解决方案。 解决方案 iptables netstat是一个用于监视…

spring总结

目录 什么是Spring? Spring的优缺点&#xff1f; 优点&#xff1a; 缺点&#xff1a; Spring IOC的理解 Spring AOP的理解 事务的边界为什么放在service层&#xff1f; Spring Bean的生命周期 什么是单例池&#xff1f;作用是什么&#xff1f; 单例Bean的优势 Bean…

企业微信小程序在调用wx.qy.login时返回错误信息qy.login:fail

原因是大概是绑定了多个企业但是在开发者工具中没有选择正确的企业 解决方法&#xff1a; 重新选择企业后即可成功获取code

《Zookeeper》从零开始学Zookeeper源码(二)之数据序列化与通信协议

目录 序列化与反序列化通信协议请求头的数据结构响应头的数据结构 序列化与反序列化 zookeeper的客户端与服务端、服务端与服务端之间会进行一系列的网络通信&#xff0c;在进行数据的传输过程中就涉及到序列化与反序列化&#xff0c;zookeeper使用Jute作为它的序列化组件&…

文字转语音

键盘获取文字&#xff0c;转化为语音后保存本地 from win32com.client import Dispatch from comtypes.client import CreateObject from comtypes.gen import SpeechLib speakerDispatch(SAPI.SpVoice) speaker.Speak(请输入你想转化的文字) datainput(请输入&#xff1a;)#s…

拥抱创新:用Kotlin开发高效Android应用

拥抱创新&#xff1a;用Kotlin开发高效Android应用 引言 在当今数字时代&#xff0c;移动应用已经成为人们生活中不可或缺的一部分。无论是社交媒体、电子商务还是健康管理&#xff0c;移动应用已经深刻地影响了我们的生活方式。随着移动设备的普及和功能的增强&#xff0c;A…

深入了解速卖通详情API:提升商品数据获取与处理效率

1. 速卖通详情API概述 速卖通详情API是通过接口方式获取商品详细信息的一种方法。它可以让开发者按需获取商品的标题、描述、价格、库存等详细数据&#xff0c;为后续的数据分析和处理提供基础。通过API&#xff0c;开发者可以实现自动化的商品数据管理&#xff0c;提高工作效…

jenkins准备

回到目录 jenkins是一个开源的、提供友好操作界面的持续集成(CI)工具&#xff0c;主要用于持续、自动的构建/测试软件项目、监控外部任务的运行。Jenkins用Java语言编写&#xff0c;可在Tomcat等流行的servlet容器中运行&#xff0c;也可独立运行。通常与版本管理工具(SCM)、构…

django bootstrap html实现左右布局,带折叠按钮,左侧可折叠隐藏

一、实现的效果 在django项目中,需要使用bootstrap 实现一个左右分布的布局,左侧区域可以折叠隐藏起来,使得右侧的显示区域变大。(为了区分区域,左右加了配色,不好看的修改颜色即可) 点击折叠按钮,左侧区域隐藏,右侧区域铺满: 二、实现思路 1、使用col-md属性,让左…

unity行为决策树实战详解

一、行为决策树的概念 行为决策树是一种用于游戏AI的决策模型&#xff0c;它将游戏AI的行为分解为一系列的决策节点&#xff0c;并通过节点之间的连接关系来描述游戏AI的行为逻辑。在行为决策树中&#xff0c;每个节点都代表一个行为或决策&#xff0c;例如移动、攻击、逃跑等…

助力618-Y的混沌实践之路 | 京东云技术团队

一、写在前面 1、混沌是什么&#xff1f; 混沌工程&#xff08;Chaos Engineering&#xff09;的概念由 Netflix 在 2010 年提出&#xff0c;通过主动向系统中引入异常状态&#xff0c;并根据系统在各种压力下的行为表现确定优化策略&#xff0c;是保障系统稳定性的新型手段。…

K8s中的ConfigMap

ConfigMap作用&#xff1a;存储不加密数据到etcd&#xff0c;让Pod以变量或者Volume挂载到容器中 场景&#xff1a;配置文件 3.以Vlolume挂载到Pod容器中

前端实习day20

今天解决了不少bug&#xff0c;成就感满满&#xff0c;有几个问题困扰了我很久&#xff0c;我查阅了很多博客&#xff0c;终于找到解决思路&#xff0c;顺利解决&#xff0c;这里记录一下解决思路。 1、在通过this.$refs.layoutSide.style设置<a-layout-sider>的宽度时&…

计算机网络-三种交换方式

计算机网络-三种交换方式 电路交换(Circuit Switching) 电话交换机接通电话线的方式称为电路交换从通信资源分配的角度来看&#xff0c;交换(Switching)就是按照某种方式动态的分配传输线路的资源 电话交换机 为了解决电话之间通信两两之间连线过多&#xff0c;所以产生了电话…

TSINGSEE青犀视频汇聚平台EasyCVR多种视频流播放协议介绍

众所周知&#xff0c;TSINGSEE青犀视频汇聚平台EasyCVR可支持多协议方式接入&#xff0c;包括主流标准协议GB28181、RTSP/Onvif、RTMP等&#xff0c;以及厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。今天我们来说一说&#xff0c;EasyCVR平台支持分…