深入浅出Flink CEP丨如何通过Flink SQL作业动态更新Flink CEP作业

复杂事件处理(CEP)是一种对事件流进行分析的技术,它能够识别出数据流中的事件序列是否符合特定的模式,并允许用户对这些模式进行处理。Flink CEP 是 CEP 在 Apache Flink 中的具体实现,是 Apache Flink 的一个库,使用户可以在 Flink 的流处理引擎上进行复杂事件的处理。

该技术广泛应用于金融风控、网络安全、物流跟踪和电商推荐系统等领域。比如在风险用户识别中,CEP可以读取并分析客户行为日志,将 5 分钟内转账次数超过 10 次且金额大于 10000 的客户识别为异常用户。

而在袋鼠云实时开发平台上,有更加高效的功能支持,允许通过Flink SQL作业动态更新Flink CEP作业。本文将通过产品Demo演示,展示如何在实时开发平台中构建一个灵活加载最新规则的 Flink CEP 作业,处理来自上游 Kafka 的数据流,让大家更加直观地了解这一整套流程及其优势所在。

一、FLink CEP简介

1、CEP 的核心概念

模式(Pattern):这是定义复杂事件的规则,它由一系列简单的事件组成,这些事件之间通过一定的逻辑关系相连。

事件流(Event Stream):这是实时流入系统的数据流,CEP 会在这些流上寻找匹配的模式。

匹配(Match):当事件流中的事件序列符合定义的模式时,就会产生一个匹配。

2、Flink CEP 的使用流程

定义模式:使用 Flink CEP 的 API 定义感兴趣的事件模式。

应用模式:将定义的模式应用到事件流上,Flink CEP 会在流上寻找匹配该模式的事件序列。

处理匹配事件:对找到的匹配事件进行处理,执行相应的业务逻辑。

Flink CEP 的优势在于能够处理高吞吐量、低延迟的实时数据流,并且提供了丰富的 API 来定义复杂的事件模式。同时,Flink CEP 也支持基于事件时间的处理,允许用户处理乱序到达的数据。

3、动态FLink CEP 背景介绍

在风险控制或者模式匹配等场景下,用户经常会想要在模式仍然能够提供服务的情况下,改变事件需要匹配的模式。在目前的 Flink CEP 中,一个 CEP 算子有一个固定的 CEP 模式,不支持改变。因此,为了达到上述目的,用户必须重启 Flink 作业,并等待相对较长的更新时间。

另一种常见情况是,一个事件流需要与多个模式匹配。虽然当前的 Flink CEP 不支持在一个 CEP 运算符中匹配多个模式,但用户必须为每个模式设置一个 Flink 作业或一个运算符。这可能会浪费内存和计算资源。为此我们支持了动态加载规则,支持任务不重启Flink 作用动态的匹配规则。

4、FLink CEP 的应用场景

Flink CEP 的应用场景包括:

风险控制:例如,检测用户行为模式,如果发现异常行为(如短时间内多次登录失败),则可以触发相应的风险控制措施。

用户画像:通过分析用户的行为事件流,可以构建用户画像,进而实现精准营销。

运维监控:在企业服务的运维管理中,CEP 可以用来配置复杂的监控规则,以实现对服务状态的实时监控。

二、ELink 动态CEP方案

file

PatternProcessorDiscoverer 在 Jobmanager上定期抓取规则更新信息并将规则更新信息推送给 taskmanager 上的 CEP 算子。CEP 算子获取到更新的规则后更新 NFA(非确定有限自动机)。

1、PatternProcessorDiscoverer

PatternProcessorDiscoverer内部维护一个定时线程,通过定时线程定期轮询 database 获取规则信息。如果感知到规则变更则会将规则变更信息通知给 PatternProcessorManager。

2、DynamicCepOperatorCoordinator

DynamicCepOperatorCoordinator实现了 PatternProcessorManager和 OperatorCoordinator接口。 DynamicCepOperatorCoordinator接受到来自 PatternProcessorManager的变更信息后会将变更信息通过UpdatePatternProcessorEvent发送给每一个 DynamicCep 算子。

3、DynamicCepOperator

DynamicCepOperator实现了OperatorEventHandler接口,接收来自DynamicCepOperatorCoordinator 的UpdatePatternProcessorEvent。对于每个UpdatePatternProcessorEvent,DynamicCepOperator会利用规则处理器将 JSON 或者 SQL 表示的规则转为Pattern,再由Pattern获得 NFA。多个Pattern 的情况下会生成多个 NFA。NFA 会与PatternProcessFunction一起被保存到 Map 中。

当上游算子的数据发送到DynamicCepOperator时,同一条数据会被多个 NFA 处理,当某一个NFA match到数据之后,会将数据发送给对应的PatternProcessFunction处理,被PatternProcessFunction处理完的数据则会被发送给下游算子。这里需要注意的是:当有多个规则与PatternProcessFunction 时,不同的PatternProcessFunction处理完的数据结构需要保持一致,因为数据都会统一发送到下游算子,下游算子能接受的数据结构是固定的。

4、PatternProcessFunction

PatternProcessFunction的作用是处理匹配到的查询。PatternProcessFunction会在jobmanager中初始化,然后通过序列化后发送到taskmanager中,因此需要注意下jobmanager和中taskmanager加载到的PatternProcessFunction来自同一个依赖包,否则会有序列化问题。目前不支持对包含了PatternProcessFunction的jar进行热加载,所以如果需要更换新jar来增加PatternProcessFunction,只能通过重启任务实现。

对于使用 Flink Dynamic CEP on SQL 模式:还需要提供一种手段将包含了 PatternProcessFunction的 jar 提交到jm和tm上,数栈上可以通过ADD JAR语法实现。

三、FLink 动态CEP DEMO演示

1、规则说明

此规则定义了一个简单的流程,它有三个节点,每个节点都基于 action 的值来决定是否消费事件。如果 action 的值依次为 0、1、2,那么事件将依次通过 start、middle、end 节点。如果 action 的值不满足当前节点的条件,流程将跳至下一个节点。这个规则没有定义时间窗口,并且匹配到事件模式后不会跳过任何事件。

2、定义匹配规则

在Mysql 中进行SQL 编辑页面创建一张数据表,命名为 cep_demo,四个字段 id、version、pattern、function、type、is_deleted。id 和 version 用于标名唯一的版本和 id 信息,pattern 代表了序列化后的 Pattern Stream,function 用于指代规则 match 后执行的 function。规则 match 到数据后,数据会由该 function 处理然后发送到下游算子。type 用于代表规则表示类型。如果以 SQL 表示规则,则填 SQL;如果以 JSON 表示规则,则填写 JSON。然后编写 FlinkSQL 作业并提交到任务调度中运行。

{

"afterMatchStrategy": {

"type": "NO_SKIP"

},

"edges": [

{"source": "middle","target": "end","type": "STRICT"},{"source": "start","target": "middle","type": "SKIP_TILL_NEXT"}

],

"name": "end",

"nodes": [

{"condition": {"expression": "action == 2","type": "AVIATOR"},"name": "end","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"]},"type": "ATOMIC"},{"condition": {"expression": "action == 1","type": "AVIATOR"},"name": "middle","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"]},"type": "ATOMIC"},{"condition": {"expression": "action == 0","type": "AVIATOR"},"name": "start","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"]},"type": "ATOMIC"}

],

"quantifier": {

"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"]

},

"type": "COMPOSITE",

"version": 1,

"window": null

}

3、数据开发编写FlinkSQL

FlinkSQL:

ADD JAR WITH /data/sftp/11_dynamic-cep-jar-1_dynamic-cep-jar-1.0-SNAPSHOT.jar AS functions.jar;

CREATE TABLE source (

id  INT,name      VARCHAR,productionId INT,action INT,eventTime BIGINT,procTime AS PROCTIME()

) WITH (

'connector' = 'kafka-x','topic'     = 'stream','properties.group.id'     = 'stream','scan.startup.mode'     = 'latest-offset','properties.bootstrap.servers' = 'localhost:9092','format'    = 'json'

);

CREATE TABLE sink

(

id          int

) WITH (

  'connector' = 'mysql-x',  'url' = 'jdbc:mysql://localhost:3306/stream',  'schema-name' = 'stream',  'table-name' = 'stream_cep_001',  'username' = 'drpeco',  'password' = '******',  'sink.buffer-flush.max-rows' = '1024', 'sink.buffer-flush.interval' = '10000', 'sink.all-replace' = 'true','sink.parallelism' = '1');

INSERT INTO sink

SELECT id_total as id

FROM source

DYNAMIC MATCH_RECOGNIZE (PARTITION BY productionId ORDER BY procTime OUTPUT (id_total int)  WITH_PATTERN ('tableName' = 'dynamic_cep', 'user' = 'drpeco', 'password' = '******', 'driver' = 'com.mysql.cj.jdbc.Driver', 'jdbcUrl' = 'jdbc:mysql://localhost:3306/cep','jdbcIntervalMillis' = '1000'))  AS T;

file

4、提交调度运行

FlinkSQL任务语法检查成功后,提交调度任务运行中状态时,kafka 源表Topic 中依次打入Demo数据。

{

"id": 1,"name" : "middle","productionId" : 11,"action" : 0,"eventTime" : 1

};

{

"id": 2,"name" : "middle","productionId" : 11,"action" : 1,"eventTime" : 1

};

{

"id": 3,"name" : "middle","productionId" : 11,"action" : 2,"eventTime" : 1

}

file

接下来我们检查匹配的结果,action 的值依次为 0、1、2。Jar包中的function为将匹配到的数据,id字段sum相加得出最后id_totle值作为输出条件。最后输出的结果 id_total 值为 6。

file

此时在任务不停止的状态下,直接操作Mysql数据库修改CEP Json 将规则匹配条件改为 action 的值依次为 0、1、3 为匹配条件。再次打入与上次同样的 kafka数据,此时匹配失败则不输出id_total值。

四、总结

本文和大家分享了什么是CEP,Flink CEP的原理与使用方法,并介绍了如何通过Flink SQL作业动态更新Flink CEP作业,可以说是干货满满。

《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=szsm

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=szsm

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=szsm

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=szsm

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szsm

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/492542.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为数通最新题库 H12-821 HCIP稳定过人中

以下是成绩单和考试人员 HCIP H12-831 HCIP H12-725 安全中级

Facebook 与数字社交的未来走向

随着数字技术的飞速发展,社交平台的角色和形式也在不断演变。作为全球最大社交平台之一,Facebook(现Meta)在推动数字社交的进程中扮演了至关重要的角色。然而,随着互联网的去中心化趋势和新技术的崛起,Face…

STM32中ADC模数转换器

一、ADC简介 ADC模拟-数字转换器 ADC可以将引脚连续变化的模拟电压转换为内存中存储的数字变量,建立模拟电路到数字电路的桥梁 12位逐次逼近型ADC,1us转换时间 输入电压范围: 0~3.3V,转换结果范围:0~4095 18个输入…

fpga系列 HDL:Quartus II PLL (Phase-Locked Loop) IP核 (Quartus II 18.0)

在 Quartus II 中使用 PLL (Phase-Locked Loop) 模块来将输入时钟分频或倍频,并生成多个相位偏移或频率不同的时钟信号: 1. 生成 PLL 模块 在 Quartus II 中: 打开 IP Components。 file:///C:/intelFPGA_lite/18.0/quartus/common/help/w…

Springboot3.x配置类(Configuration)和单元测试

配置类在Spring Boot框架中扮演着关键角色,它使开发者能够利用Java代码定义Bean、设定属性及调整其他Spring相关设置,取代了早期版本中依赖的XML配置文件。 集中化管理:借助Configuration注解,Spring Boot让用户能在一个或几个配…

【游戏中orika完成一个Entity的复制及其Entity异步落地的实现】 1.ctrl+shift+a是飞书下的截图 2.落地实现

一、orika工具使用 1)工具类 package com.xinyue.game.utils;import ma.glasnost.orika.MapperFactory; import ma.glasnost.orika.impl.DefaultMapperFactory;/*** author 王广帅* since 2022/2/8 22:37*/ public class XyBeanCopyUtil {private static MapperFactory mappe…

Unity 组件学习记录:Aspect Ratio Fitter

概述 Aspect Ratio Fitter是 Unity 中的一个组件,用于控制 UI 元素(如Image、RawImage等)的宽高比。它在处理不同屏幕分辨率和尺寸时非常有用,可以确保 UI 元素按照预期的比例进行显示。当添加到一个 UI 对象上时,Aspe…

uni-app开发AI康复锻炼小程序,帮助肢体受伤患者康复!

**提要:**近段时间我们收到多个康复机构用户,咨询AI运动识别插件是否可以应用于肢力运动受限患者的康复锻炼中来,插件是可以应用到AI康复锻炼中的,今天小编就为您介绍一下AI运动识别插件在康腹锻炼中的应用场景。 一、康复机构的应…

Elasticsearch:什么是信息检索?

信息检索定义 信息检索 (IR) 是一种有助于从大量非结构化或半结构化数据中有效、高效地检索相关信息的过程。信息(IR)检索系统有助于搜索、定位和呈现与用户的搜索查询或信息需求相匹配的信息。 作为信息访问的主要形式,信息检索是每天使用…

Pytest-Bdd vs Behave:选择最适合的 Python BDD 框架

Pytest-Bdd vs Behave:选择最适合的 Python BDD 框架 Pytest BDD vs Behave:选择最适合的 Python BDD 框架BDD 介绍Python BDD 框架列表Python BehavePytest BDDPytest BDD vs Behave:关键区别Pytest BDD vs Behave:最佳应用场景结…

【数据集】5种常见人类行为检测数据集3379张YOLO+VOC格式

数据集格式:VOC格式YOLO格式 压缩包内含:3个文件夹,分别存储图片、xml、txt文件 JPEGImages文件夹中jpg图片总计:3379 Annotations文件夹中xml文件总计:3379 labels文件夹中txt文件总计:3379 标签种类数&am…

唯品会Android面试题及参考答案

HTTP 和 HTTPS 的区别是什么?你的项目使用的是 HTTP 还是 HTTPS? HTTP 和 HTTPS 主要有以下区别。 首先是安全性。HTTP 是超文本传输协议,数据传输是明文的,这意味着在数据传输过程中,信息很容易被窃取或者篡改。比如,在一个不安全的网络环境下,黑客可以通过网络嗅探工具…

基于Python+Vue开发的商城管理系统,大四期末作业,实习作品

项目简介 该项目是基于PythonVue开发的商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Python编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Python的网上商城管…

在 Solana 上实现 SOL 转账及构建支付分配器

与以太坊不同,在以太坊中,钱包通过 msg.value 指定交易的一部分并“推送” ETH 到合约,而 Solana 程序则是从钱包“拉取” Solana。 因此,没有“可支付”函数或“msg.value”这样的概念。 下面我们创建了一个新的 anchor 项目&a…

WebRTC搭建与应用(一)-ICE服务搭建

WebRTC搭建与应用(一) 近期由于项目需要在研究前端WebGL渲染转为云渲染,借此机会对WebRTC、ICE信令协议等有了初步了解,在此记录一下,以防遗忘。 第一章 ICE服务搭建 文章目录 WebRTC搭建与应用(一)前言一、ICE是什么?二、什么…

Linux高性能服务器编程 | 读书笔记 | 12. 多线程编程

12. 多线程编程 注:博客中有书中没有的内容,均是来自 黑马06-线程概念_哔哩哔哩_bilibili 早期Linux不支持线程,直到1996年,Xavier Leroy等人开发出第一个基本符合POSIX标准的线程库LinuxThreads,但LinuxThreads效率…

查看Mysql数据库引擎以及修改引擎为innoDB

目录 打开Mysql命令行 打开Mysql命令行 SHOW ENGINES;innoDB在事务型数据库中应用最多,其主要支持事务安全表(ACID),行锁定和外键。 介绍下InnoDB的主要特性: 1、InnoDB给MySQL提供了具有提交、回滚和崩溃恢复能力的事…

Moretl安全日志采集工具

永久免费: 至Gitee下载 使用教程: Moretl使用说明 使用咨询: 用途 定时全量或增量采集工控机,电脑文件或日志. 优势 开箱即用: 解压直接运行.不需额外下载.管理设备: 后台统一管理客户端.无人值守: 客户端自启动,自更新.稳定安全: 架构简单,兼容性好,通过授权控制访问. 架…

密码学——密码学概述、分类、加密技术(山东省大数据职称考试)

大数据分析应用-初级 第一部分 基础知识 一、大数据法律法规、政策文件、相关标准 二、计算机基础知识 三、信息化基础知识 四、密码学 五、大数据安全 六、数据库系统 七、数据仓库. 第二部分 专业知识 一、大数据技术与应用 二、大数据分析模型 三、数据科学 密码学 大数据…

nodejs搭配express网站开发后端接口设计需要注意事项

nodejs搭配express网站开发后端接口设计需要注意事项!为了回避一些常见的误区,今天和大家汇总一下,最近我遇到的一些错误信息,虽然都是小问题,但是还是需要分享一下,以免大家再次犯错。 1:第一个…