Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

在这里插入图片描述
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。

2、代码实现

2.1 引入maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面这个依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

2.2 基本流程

//1、获取原始事件流
DataStream<Event> input = ......; 
//2、定义匹配器
Pattern<Event,?> pattern = .......; 
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(new PatternProcessFunction<Event, Result>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Result> out) throws Exception {}
});

2.3 完整代码

注意:代码运行前,先启动2.4 nlk socket服务

package com.roy.flink.project.userlogin;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配*/
public class MyUserLoginAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket测试env.setParallelism(1);// 1、获取原始事件流(10.86.97.206改为实际地址)final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {@Overridepublic UserLoginRecord map(String s) throws Exception {final String[] splitVal = s.split(",");return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime()));// 2、定义匹配器// 2.1:10秒内出现3次登录失败的记录(不一定连续)// Flink CEP定义消息匹配器。
//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
//            @Override
//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
//                return 1 == userLoginRecord.getLoginRes();
//            }
//        }).times(3).within(Time.seconds(10));// 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followByfinal Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("two").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("three").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).within(Time.seconds(10));// 3、获取匹配流final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);final MyProcessFunction myProcessFunction = new MyProcessFunction();// 4、将匹配流中的数据处理成结果数据流final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);badUserStream.print("badUser");env.execute("UserLoginAna");}// mainpublic static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{@Overridepublic void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {// 针对2.1 连续3次登录失败
//            final List<UserLoginRecord> records = match.get("start");
//            for(UserLoginRecord record : records){
//                out.collect(record);
//            }// 针对2.2 非连续3次登录失败final List<UserLoginRecord> records = match.get("three");for(UserLoginRecord record : records){out.collect(record);}}// processMarch}// MyProcessFunction
}

UserLoginRecord对象,如下:


public class UserLoginRecord {private String userId;private int loginRes; // 0-成功, 1-失败private long loginTime;public UserLoginRecord() {}public UserLoginRecord(String userId, int loginRes, long loginTime) {this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@Overridepublic String toString() {return "UserLoginRecord{" +"userId='" + userId + '\'' +", loginRes=" + loginRes +", loginTime=" + loginTime +'}';}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public int getLoginRes() {return loginRes;}public void setLoginRes(int loginRes) {this.loginRes = loginRes;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}
}

2.4 nlk模拟socket服务端

在这里插入图片描述

2.5 IDEA控制台打印

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/247873.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络防御安全:2-6天笔记

第二章&#xff1a;防火墙 一、什么是防火墙 防火墙的主要职责在于&#xff1a;控制和防护。 防火墙可以根据安全策略来抓取流量之后做出对应的动作。 二、防火墙的发展 区域&#xff1a; Trust 区域&#xff0c;该区域内网络的受信任程度高&#xff0c;通常用来定义内部…

单片机介绍

本文为博主 日月同辉&#xff0c;与我共生&#xff0c;csdn原创首发。希望看完后能对你有所帮助&#xff0c;不足之处请指正&#xff01;一起交流学习&#xff0c;共同进步&#xff01; > 发布人&#xff1a;日月同辉,与我共生_单片机-CSDN博客 > 欢迎你为独创博主日月同…

electron-builder vue 打包后element-ui字体图标不显示问题

当使用electron打包完成的时候&#xff0c;启动项目发现使用的element-ui字体图标没显示都变成了小方块&#xff0c;并出现报错&#xff0c;请看下图&#xff1a; 解决方法&#xff1a; 在vue.config.js中设置 customFileProtocol字段&#xff1a;pluginOptions: {electronBui…

两个近期的计算机领域国际学术会议(软件工程、计算机安全):欢迎投稿

近期&#xff0c;受邀担任两个国际学术会议的Special session共同主席及程序委员会成员&#xff08;TPC member&#xff09;&#xff0c;欢迎广大学界同行踊跃投稿&#xff0c;分享最新研究成果。期待这个夏天能够在夏威夷檀香山或者加利福尼亚圣荷西与各位学者深入交流。 SERA…

防御保护常用知识

防火墙的主要职责在于&#xff1a;控制和防护 --- 安全策略 --- 防火墙可以根据安全策略来抓取流量之 后做出对应的动作 防火墙分类主要有四类&#xff1a; 防火墙吞吐量 --- 防火墙同一时间能处理的数据量多少 防火墙的发展主要经过以下阶段&#xff1b; 传统防火墙&#xf…

格子表单GRID-FORM | 嵌套子表单与自定义脚本交互

格子表单/GRID-FORM已在Github 开源&#xff0c;如能帮到您麻烦给个星&#x1f91d; GRID-FORM 系列文章 基于 VUE3 可视化低代码表单设计器嵌套表单与自定义脚本交互 新版本功能 &#x1f389; 不觉间&#xff0c;GRID-FORM 已经开源一年&#xff08;2023年1月29日首次提交…

大数据StarRocks(八):资源隔离实战

前言 自 2.2 版本起&#xff0c;StarRocks 支持资源组管理&#xff0c;集群可以通过设置资源组&#xff08;Resource Group&#xff09;的方式限制查询对资源的消耗&#xff0c;实现多租户之间的资源隔离与合理利用。在 2.3 版本中&#xff0c;StarRocks 支持限制大查询&#…

HAL STM32+EC11编码器实现增减调节及单击、双击、长按功能

HAL STM32EC11编码器实现增减调节及单击、双击、长按功能 &#x1f4fa;实现效果演示&#xff1a; &#x1f4d8;内容提要 &#x1f4dd;本文主要实现&#xff0c;通过STM32 HAL库开发&#xff0c;实现的EC11编码器功能&#xff0c;按键结合状态机思想实现的拓展单击、双击、…

Linux之快速入门(CentOS 7)

文章目录 一、Linux目录结构二、常用命令2.1 切换用户2.2查看ip地址2.3 cd2.4 目录查看2.5 查看文件内容2.6 创建目录及文件2.7 复制和移动2.8 其他2.9 tar3.0 which3.1 whereis3.2 find&#xff08;这个命令尽量在少量用户使用此软件时运行&#xff0c;因为此命令是真的读磁盘…

计数排序(六)——计数排序及排序总结

目录 一.前言 二.归并小补充 三.计数排序 操作步骤&#xff1a; 代码部分&#xff1a; 四.稳定性的概念&#xff1a; 五.排序大总结&#xff1a; ​六.结语 一.前言 我们已经进入排序的尾篇了&#xff0c;本篇主要讲述计数排序以及汇总各类排序的特点。码字不易&#x…

【JavaScript 漫游】【002】JS 的数据类型总览

文章简介 本文为【JavaScript 漫游】专栏的第 002 篇文章&#xff0c;主要记录了笔者学习 JS 数据类型中所了解的基本知识点。 ES5 的数据类型有哪些如何区分 ES5 的数据类型null 和 undefined 的相同点和不同点布尔值的转换规则parseInt 和 parseFloat 的基本用法 作为 JS …

使用plotly dash 画3d圆柱(Python)

plotly3D &#xff08;3d charts in Python&#xff09;可以画3维图形 在做圆柱的3D装箱项目&#xff0c;需要装箱的可视化&#xff0c;但是Mesh &#xff08;3d mesh plots in Python&#xff09;只能画三角形&#xff0c;所以需要用多个三角形拼成一个圆柱&#xff08;想做立…

网站小程序分类目录网源码系统+会员注册登录功能 附带完整的搭建教程

随着互联网的发展&#xff0c;小程序分类目录网站已经成为了人们获取各类信息的重要渠道。而在这个领域中&#xff0c;罗峰给大家分享一款网站小程序分类目录网源码系统以其强大的功能和易用性&#xff0c;脱颖而出。本系统集成了会员注册登录功能&#xff0c;让用户能够更加便…

【git】git update-index --assume-unchanged(不改动.gitignore实现忽略文件)

文章目录 原因分析&#xff1a;添加忽略文件(取消跟踪)的命令&#xff1a;取消忽略文件(恢复跟踪)的命令&#xff1a;查看已经添加了忽略文件(取消跟踪)的命令&#xff1a; 原因分析&#xff1a; 已经维护的项目&#xff0c;文件已经被追踪&#xff0c;gitignore文件不方便修…

Layui + Echarts 5.0

Layui 怎么整合最新版本的 Echarts 5.0&#xff0c;Echarts 4 升级到 5后&#xff0c;有了很大改变&#xff0c;新的配置项4是无法兼容的&#xff0c;所以想要使用新的功能&#xff0c;都需要升级&#xff01; 新建一个echarts.js文件 layui.define(function (exports) {// 这…

Optional lab: Linear Regression using Scikit-LearnⅠ

scikit-learn是一个开源的、可用于商业的机器学习工具包&#xff0c;此工具包包含本课程中需要使用的许多算法的实现 Goals In this lab you will utilize scikit-learn to implement linear regression using Gradient Descent Tools You will utilize functions from sci…

微服务技术总结

微服务&#xff01; SrpingClound 微服务主要解决项目拆分后所产生的一系列问题。SpringClound主要解决服务的治理问题 单体VS分布式 单体&#xff1a;部署简单、成本低 缺点&#xff1a;服务耦合度高 2兼容1 服务拆分注意事项 远程调用分析 提供者&#xff1a;服务的提供方…

QT 使用XML保存操作记录

文章目录 1 实现程序保存操作记录的思路2 XML文档基本结构3 QDomDocument实现XML读写3.1 QDomDocument实现生成XML文件3.2 QDomDocument实现读取XML文件 4 QXmlStreamWriter实现读写4.1 QXmlStreamWriter实现生成XML4.2 QXmlStreamWriter实现读取XML 1 实现程序保存操作记录的思…

【大数据】Flink 架构(三):事件时间处理

《Flink 架构》系列&#xff08;已完结&#xff09;&#xff0c;共包含以下 6 篇文章&#xff1a; Flink 架构&#xff08;一&#xff09;&#xff1a;系统架构Flink 架构&#xff08;二&#xff09;&#xff1a;数据传输Flink 架构&#xff08;三&#xff09;&#xff1a;事件…

04.对象树

一、引入 1.QT实现输出"hello world" 使用QT编写"hello world"程序&#xff0c;有两种实现方式&#xff1a; &#xff08;1&#xff09;直接在生成的ui文件中&#xff0c;拖入一个label控件&#xff0c;双击控件编辑内容即可实现 &#xff08;2&#xff0…