Flink实现实时异常登陆监控(两秒内多次登陆失败进行异常行为标记)

Flink实现异常登陆监控(两秒内多次登陆失败进行异常行为标记)

在大数据处理领域,Apache Flink 是一个流行的开源流处理框架,能够高效处理实时数据流。在这篇博客中,我们将展示如何使用 Apache Flink 从 MySQL 中读取数据并进行实时异常监控处理,最终将结果写回到 MySQL 数据库中的err_login表中。

项目概述

我们的示例程序将会执行以下任务:

从 MySQL 数据库读取用户登录数据。
过滤出特定状态的登录记录。
对这些记录进行时间窗口处理。
将处理结果写回 MySQL 数据库。

依赖环境

在开始之前,请确保你已经安装了以下环境:
pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>EastMoney</artifactId><version>1.0-SNAPSHOT</version><repositories><repository><id>central</id><name>Maven Central Repository</name><url>https://repo.maven.apache.org/maven2</url></repository></repositories><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.14.6</version></dependency><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.6</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency></dependencies></project>

MySQL 数据库

CREATE TABLE `login_detail` (`id` int NOT NULL AUTO_INCREMENT,`username` varchar(255) DEFAULT NULL,`password` varchar(255) DEFAULT NULL,`time` varchar(255) DEFAULT NULL,`status` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=127 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci CREATE TABLE `err_login` (`id` int NOT NULL AUTO_INCREMENT,`username` varchar(255) DEFAULT NULL,`status` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=74 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 

1. 数据模型定义

首先,我们定义了一个简单的 User case class,用于表示从 MySQL 中读取的用户数据。

case class User(id: Int, username: String, password: String, time: String, status: Int)

2.自定义 MySQL 数据源

我们实现了一个自定义的 RichSourceFunction,从 MySQL 数据库中读取数据。该函数会不断地查询数据库,并将新数据发送到 Flink 流中。

class MySQLInsertSource(jdbcUrl: String, username: String, password: String, tableName: String) extends RichSourceFunction[User] {@volatile private var isRunning = trueprivate var connection: Connection = _private var lastMaxTime: String = _override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {super.open(parameters)connection = DriverManager.getConnection(jdbcUrl, username, password)// Initial loadval statement = connection.createStatement()val resultSet = statement.executeQuery(s"SELECT * FROM $tableName")while (resultSet.next()) {val user = User(resultSet.getInt("id"),resultSet.getString("username"),resultSet.getString("password"),resultSet.getString("time"),resultSet.getInt("status"))// Update lastMaxTimeif (lastMaxTime == null || user.time > lastMaxTime) {lastMaxTime = user.time}}}override def run(ctx: SourceFunction.SourceContext[User]): Unit = {val statement = connection.createStatement()while (isRunning) {val query = s"SELECT * FROM $tableName WHERE time > '$lastMaxTime'"val resultSet = statement.executeQuery(query)while (resultSet.next()) {val user = User(resultSet.getInt("id"),resultSet.getString("username"),resultSet.getString("password"),resultSet.getString("time"),resultSet.getInt("status"))ctx.collect(user)// Update lastMaxTimeif (user.time > lastMaxTime) {lastMaxTime = user.time}}Thread.sleep(2000) // sleep for 2 seconds}}override def cancel(): Unit = {isRunning = falseif (connection != null) {connection.close()}}
}

变量声明:
isRunning: 用于控制数据源是否继续运行。
connection: 用于连接 MySQL 数据库的 Connection 对象。
lastMaxTime: 记录上次读取数据的最大时间戳,用于增量查询。
open 方法:在数据源启动时初始化数据库连接并进行初始加载,读取全部数据,更新 lastMaxTime。
run 方法:在数据源运行时不断查询数据库,获取新数据并发送到 Flink 流中。每隔2秒执行一次查询,并更新 lastMaxTime。
cancel 方法:在数据源取消时关闭数据库连接。

3. 时间戳分配器和水位线

为了确保事件按时间顺序处理,我们为数据流分配时间戳并生成水位线。

val userStreamWithTimestamps = userStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[User](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[User] {override def extractTimestamp(element: User, recordTimestamp: Long): Long = {val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val date = format.parse(element.time)date.getTime}}))

WatermarkStrategy:定义了水位线生成策略。forBoundedOutOfOrderness 表示允许事件在1秒的乱序范围内到达。

SerializableTimestampAssigner:定义了时间戳提取器,从 User 对象的 time 字段提取时间戳。

4. 数据过滤和窗口处理

我们过滤出 status 为 0 的记录,并对这些记录进行2秒的窗口处理。

val filteredStream = userStreamWithTimestamps.filter(_.status == 0)val windowedStream = filteredStream.keyBy(_.username).timeWindow(Time.seconds(2)).process(new WriteToDatabaseFunction(jdbcUrl, username, password))

过滤:filter 操作保留 status 为 0 的记录。(0为登陆失败)
窗口处理:对每个 username 进行2秒的时间窗口处理,并使用自定义的 WriteToDatabaseFunction 进行处理。

5. 窗口处理函数

我们实现了一个 ProcessWindowFunction,在窗口结束时将获取到的异常登陆用户写入 MySQL 数据库。

class WriteToDatabaseFunction(url: String, username: String, password: String) extends ProcessWindowFunction[User, String, String, TimeWindow] {val insertSql = "INSERT INTO err_login (username, status) VALUES (?, ?)"override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {val allStatusOne = elements.forall(_.status == 0)if (allStatusOne) {out.collect(s"Username: $key had status 1 for 2 seconds")val connection = DriverManager.getConnection(url, username, password)val preparedStatement = connection.prepareStatement(insertSql)try {for (user <- elements) {preparedStatement.setString(1, user.username)preparedStatement.setInt(2, user.status)preparedStatement.addBatch()}preparedStatement.executeBatch()} finally {preparedStatement.close()connection.close()}}}
}

变量声明:insertSql 为插入错误登录记录的 SQL 语句。
process 方法:
检查窗口内的所有记录 status 是否都为 0。
如果是,打印日志并将记录写入 err_login 表中。
使用批量插入提高效率。

6. 主函数

最后,我们将所有部分组装在一起,并执行 Flink 作业。

object FlinkMySQLExample {val jdbcUrl = "jdbc:mysql://localhost:3306/big_data"val username = "root"val password = "12345678"val tableName = "login_detail"def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval mySQLSource = new MySQLInsertSource(jdbcUrl, username, password, tableName)val userStream = env.addSource(mySQLSource)userStream.print()val userStreamWithTimestamps = userStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[User](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[User] {override def extractTimestamp(element: User, recordTimestamp: Long): Long = {val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val date = format.parse(element.time)date.getTime}}))val filteredStream = userStreamWithTimestamps.filter(_.status == 0)val windowedStream = filteredStream.keyBy(_.username).timeWindow(Time.seconds(2)).process(new WriteToDatabaseFunction(jdbcUrl, username, password))windowedStream.print()env.execute("Flink MySQL Example")}
}

主函数:
获取 Flink 的执行环境。
添加自定义数据源 MySQLInsertSource,从 MySQL 数据库中读取数据。
将数据流赋予时间戳和水位线。
过滤出 status 为 0 的记录。
对过滤后的记录进行2秒的窗口处理,并将结果写入 MySQL 数据库。
执行 Flink 作业。
在这里插入图片描述
在这里插入图片描述

7.总结

这段代码展示了如何使用 Apache Flink 处理实时数据流,并与 MySQL 数据库进行交互。通过自定义数据源、时间戳和水位线分配、窗口处理和自定义窗口函数,我们可以构建强大的流处理应用程序。

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于MapReduce, MySQL, python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/337614.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot + Vue前后端项目(第十四记)

项目实战第十三记 写在前面1. 建立字典表2. 后端DictController3. Menu.vue4. 建立sys_role_menu中间表5.分配菜单接口6. 前端Role.vue改动总结写在最后 写在前面 本篇主要讲解动态分配菜单第二章节 菜单页面优化 引入图标 角色界面优化 角色自主分配菜单&#xff0c;并保存至…

诸神混战:2023年中国头部物流集成商财报公开:几家欢喜几家愁~

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》人俱乐部 在全球供应链经历前所未有的考验之时&#xff0c;物流装备行业的领军企业们在2023年展现了他们的韧性和创新能力。 从智能仓储到自动化搬…

钣金件设计规范

(一&#xff09; 钣金 1、钣金的概念 钣金&#xff08;sheet metal&#xff09;是针对金属薄板&#xff08;厚度通常在6mm以下&#xff09;的 一种综合冷加工工艺&#xff0c;包括冲裁、折弯、拉深、成形、锻压、铆合等&#xff0c; 其显著的特征是同一零件厚度一致。 2、钣…

善听提醒遵循易经原则。世界大同只此一路。

如果说前路是一个大深坑&#xff0c;那必然是你之前做的事情做的不太好&#xff0c;当坏的时候&#xff0c;坏的结果来的时候&#xff0c;是因为你之前的行为&#xff0c;你也就不会再纠结了&#xff0c;会如何走出这个困境&#xff0c;是好的来了&#xff0c;不骄不躁&#xf…

解决Windows 10通过SSH连接Ubuntu 20.04时的“Permission Denied”错误

在使用SSH连接远程服务器时&#xff0c;我们经常可能遇到各种连接错误&#xff0c;其中“Permission denied, please try again”是较为常见的一种。本文将分享一次实际案例的解决过程&#xff0c;帮助你理解如何排查并解决这类问题。 问题描述 在尝试从Windows 10系统通过SS…

分享一个在linux中运行通义千问的方法

分享一个在linux中和通义千问交互的方法 效果展示: 整体步骤 分享一个在linux中和通义千问交互的方法效果展示:一、在阿里云appflow控制台创建连接流1、通过以下地址,在灵积平台创建个API-KEY,用于通义千问的连接凭证2、点击连接流-创建连接流3、第一步选择webhook4.第二步…

618大促有哪些好物是最值得入手的的?请收下这份618必买好物清单!

最近聊的最多的话题就是618&#xff0c;年中购物大狂欢马上来了&#xff01;&#xff01;今天整理了一下之前购买的好物&#xff0c;发现相比之前的价格真的是太划算了&#xff0c;赶紧分享出来给大家&#xff0c;趁着这个大促赶紧多存入手~ 推荐1、南卡Neo 2——不伤耳黑科技…

Facebook开户|Facebook广告投放指南

家人们中午好~今天的文章由我们帅气逼人的大帅哥Zoey为大家分享&#xff08;狗头&#xff09;~有想要通过Facebook广告掘金的家人们&#xff01;今天就跟大家分享一下Facebook广告投放的底层逻辑和实用技巧&#xff0c;帮助大家少走弯路&#xff0c;快速入门~ 基础知识&#x…

反射获取构造方法

目录 利用反射获取构造方法 代码实现 获取class对象 ​编辑获取权限修饰符 获取参数 创建对象 利用反射获取构造方法 代码实现 Student类&#xff1a; 获取class对象 获取权限修饰符 获取参数 创建对象 因为con4的构造方法的权限修饰符是private&#xff0c;不能直接在测…

图解支付系统的渠道路由设计

大家好&#xff0c;我是隐墨星辰&#xff0c;今天和大家聊聊渠道路由设计。 这篇文章主要讲清楚&#xff1a;渠道路由是什么&#xff0c;为什么需要渠道路由&#xff0c;渠道路由的几种形态&#xff0c;一个简洁而实用的基于规则的渠道路由设计。 注&#xff1a;有些公司称渠…

基于标准库的STM32的外部中断EXTI

毕设已经告一段落了&#xff0c;接下来准备开始整理一下毕设中用到的知识与技术细节&#xff0c;今天整理的是STM32从编码器获取数据的方式-----外部中断&#xff08;EXTI&#xff09;&#xff1a; 外部中断分为四个硬件相关外设&#xff0c;GPIO/AFIO/EXTI/NVIC&#xff08;E…

11.3 指针和函数

11.3 指针和函数 本节必须掌握的知识点&#xff1a; 指针作为函数的参数 数组作为函数的参数 指针作为函数的返回值 在C语言中&#xff0c;指针的一个重要作用就是作为函数参数使用&#xff0c;本节将介绍这一重要作用。 11.3.1 指针作为函数的参数 实验一百一十三&#xff…

USART串口数据包

USART串口数据包 先来看两张图&#xff0c;本次程序是串口收发HEX数据包&#xff0c;第二种是串口收发文本数据包&#xff0c;之后两个图&#xff0c;展示的就是接收数据包的思路。 在PB1这里接了一个按键&#xff0c;用于控制。在串口助手&#xff0c;在发送模式和接收模式都…

【错题集-编程题】小红的子串(前置和 + 双指针)

牛客对应题目链接&#xff1a;小红的子串 (nowcoder.com) 一、分析题目 利用前缀和的思想&#xff0c;求种类个数在 [l, r] 区间内子串的个数&#xff0c;等于求 [1, r] 区间内个数 - [1, l - 1] 区间内个数。 求种类个数在 [1, count] 区间内子串的个数&#xff0c;可以用滑动…

(深度学习记录)第TR3周:Transformer 算法详解

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 文本的输入处理中&#xff0c;transformer会将输入文本序列的每个词转化为一个词向量&#xff0c;我们通常会选择一个合适的长度作为输入…

Linux系统监控

文章目录 一、系统监控基本介绍二、内存监控2.1、内存监控字段解析2.2、windows下查看内存2.2.1、通过cmd中命令查看内存条信息&#xff1a;2.2.2、通过cmd中命令查看物理内存信息&#xff1a;2.2.3、使用任务管理器查看内存2.2.4、使用资源监视器查看内存2.2.5、使用系统信息工…

Stable Diffusion Webui--安装与使用

最近进行的课程汇报&#xff0c;学习了2023年的CVPR文章《DreamBooth: Fine Tuning Text-to-Image Diffusion Models for Subject-Driven Generation》&#xff0c;因此尝试使用了几种方法对这篇文章的工作进行了一定的复现。本文主要介绍Stable Diffusion Web UI(webui)的安装…

锅炉智能制造工厂工业物联数字孪生平台,推进制造业数字化转型

在制造业快速发展的今天&#xff0c;数字化转型已经成为企业提升竞争力的关键途径。锅炉智能制造工厂工业物联数字孪生平台&#xff0c;作为一种创新的技术解决方案&#xff0c;正以其独特的优势&#xff0c;为制造业的数字化转型提供强大动力。锅炉智能制造工厂工业物联数字孪…

Git基本配置,使用Gitee(一)

1、设置Giter的user name和email 设置提交用户的信息 git config --global user.name "username" git config --global user.email "Your e-mail"查看配置 git config --list2、生成 SSH 公钥 通过命令 ssh-keygen 生成 SSH Key -t key 类型 -C 注释 ssh-…

Stable Diffusion生成图片的参数查看与抹除方法

前几天分享了几张Stable Diffusion生成的艺术二维码&#xff0c;有同学反映不知道怎么查看图片的参数信息&#xff0c;还有的同学问怎么保护自己的图片生成参数不会泄露&#xff0c;这篇文章就来专门分享如何查看和抹除图片的参数。 查看图片的生成参数 1、打开Stable Diffus…