大数据-玩转数据-Flink CEP编程

一、Flink CEP

FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理库。它可以让你在无界流中检测出特定的数据,有机会掌握数据中重要的那部分。
是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。

  1. 目标:从有序的简单事件流中发现一些高阶特征
  2. 输入:一个或多个由简单事件构成的事件流
  3. 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
  4. 输出:满足规则的复杂事件

二、Flink CEP应用场景

风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。

三、CEP开发基本步骤

导入CEP相关依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>

代码案例

package com.lyh.flink11;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.List;
import java.util.Map;public class Flink_CEP_S {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> stream = env.readTextFile("input/sensor.txt").map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, timeStamp) -> element.getTs()));Pattern<WaterSensor, WaterSensor> sensor_1 = Pattern.<WaterSensor>begin("sensor_1").where(new SimpleCondition<WaterSensor>() {@Overridepublic boolean filter(WaterSensor value) throws Exception {return "sensor_1".equals(value.getId());}});PatternStream<WaterSensor> pattern = CEP.pattern(stream, sensor_1);pattern.select(new PatternSelectFunction<WaterSensor, String>() {@Overridepublic String select(Map<String, List<WaterSensor>> map) throws Exception {return map.toString();}}).print();
env.execute();}
}

四、运行结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/135405.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JDK8源码阅读环境配置

说明 环境 jdk 版本&#xff1a;1.8.0_381 系统&#xff1a;macos 13.5.1 Intel 目的 学习 jdk8 源码&#xff0c;并能自定注释。 新建 java 工程 在 idea 中新建 java 工程&#xff0c;注意并非 maven 工程。如下图&#xff1a;完成后&#xff0c;如下图&#xff1a; 配置…

kali搭建vulhub漏洞靶场

安装kali 下载kali作为虚拟环境&#xff0c; Get Kali | Kali Linux 通过vmvare打开&#xff0c;默认账号密码kali/kali 修改root密码 su passwd root 如果一些配置普通用户做不了就切换kali&#xff0c;或sudo 命令 kali配置 apt换源 echo > /etc/apt/sources.list v…

JS中BigInt的使用

JS中BigInt的使用 BigInt是一种内置对象&#xff0c;它提供了一种方法来表示大于2^53 - 1的整数&#xff0c;通俗来讲就是提供了一种可以表示任意大整数的方法&#xff0c;当我们使用Number来表示一个超过了2 ^53 - 1的整数的时候&#xff0c;会出错。所以此时我们需要使用Big…

LinkedList 源码分析

LinkedList 是一个基于双向链表实现的集合类。 LinkedList 插入和删除元素的时间复杂度 头部插入/删除&#xff1a;只需要修改头结点的指针即可完成插入/删除操作&#xff0c;因此时间复杂度为 O(1)。尾部插入/删除&#xff1a;只需要修改尾结点的指针即可完成插入/删除操作…

STM32的HAL库SPI操作(master 模式)-根据时序图配置SPI

SPI相关基础知识 SPI基本概念请自行百度&#xff0c;参考&#xff1a;百度百科SPI简介.我们讲重点和要注意的地方。 master模式下要关注的地方 接线一一对应 也就是说主控的MISO,MOSI,SCLK,[CSn]分别和设备的MISO,MOSI,SCLK,[CSn]一一对应相连&#xff0c;不交叉&#xff0…

Linux 命令大全(下)

Linux 命令大全&#xff08;上&#xff09; 本文目录 6. 网络通讯 常用命令6.1 ssh 命令 – 安全的远程连接服务器6.1.1 含义6.1.2 语法格式6.1.3 常用参数6.1.4 参考示例 6.2 netstat 命令 – 显示网络状态6.2.1 含义6.2.2 语法格式6.2.3 常用参数6.2.4 参考示例 6.3 dhclient…

爬虫逆向实战(32)-某号店登录(RSA、补环境、混淆)

一、数据接口分析 主页地址&#xff1a;某号店 1、抓包 通过抓包可以发现登录接口是/publicPassport/login.do 2、判断是否有加密参数 请求参数是否加密&#xff1f; 通过查看“载荷”模块可以发现&#xff0c;有三个加密参数&#xff1a;username、password、captchaTok…

swift 问答app

结构体 mvc模式 不变性 试一试

航天航空及国防制造领军企业「同步电子」携手企企通,推进电子制造供应链管理数字化升级

从地球表面到广阔的星空&#xff0c;从近地轨道到深远的太空&#xff0c;中国的航天事业正在以前所未有的速度向前发展。每一次成功的发射&#xff0c;每一次精确的降落&#xff0c;都展现了国人无比的毅力和精湛的技术。而在北斗导航、长征火箭、嫦娥月球探测器等多个航天设备…

vscode 代码高亮显示

很多情况下vscode显示代码无法完全高亮显示&#xff0c;就很不舒服 除了语言设置为pylance之外&#xff0c;vscode本身的主题也是很重要的一个因素 改现代神色即可

Hadoop sqoop

0目录 1.安装sqoop 2.补充sqoop流程 1.安装sqoop 解压、改名 [rootkb129 install]# tar -xvf ./sqoop-1.4.7.tar.gz -C /opt/soft/ [rootkb129 soft]# mv sqoop-1.4.7/ sqoop147 拷贝配置文件 [rootkb129 conf]# pwd /opt/soft/sqoop147/conf [rootkb129 conf]# cp sqoop-en…

Java环境搭建安装IDE

Java环境搭建、安装IDE 文章目录 Java环境搭建、安装IDE1. 下载Java JDK &#xff0c;配置环境变量&#xff0c;在命令行环境下完成hello world程序&#xff1b;简介安装Step 0 安装包准备工作Step 1 下载 Java JDKStep 2 配置环境变量配置 JAVA_HOME配置 Path配置 CLASSPATH S…

巨人互动|Facebook海外户Facebook客户反馈分数

Facebook客户反馈分数是一项用于衡量用户对Facebook产品和服务满意度的指标。该指标被广泛应用于各种调研和评估活动&#xff0c;帮助Facebook了解用户对其平台和功能的意见和建议&#xff0c;并从中识别出改进的机会。 巨人互动|Facebook海外户&Facebook新闻提要的算法&am…

django添加数据库字段进行数据迁移

1.修改view.py里面的变量 2.在model.py新增字段 3.打开terminal并将环境切到项目所在环境&#xff0c;切换方式为 4.执行命令 python manage.py makemigrations backend python manage.py migrate

社区版MyApps低代码平台,免费即刻拥有!

编者按&#xff1a;本文主要介绍了MyApps推出的免费社区版的优势&#xff0c;为企业数字化转型提供了解决方案。立即登录MyApps低代码平台&#xff0c;就能获取永久免费的低代码平台。 1.MyApps社区版的优势 1.1不受限制&#xff0c;畅享自由 无用户限制、无安装限制、全面应用…

使用Cpolar 内网穿透工具,实现公网访问SeaFile搭建的私有云盘

文章目录 1. 前言2. SeaFile云盘设置2.1 Owncould的安装环境设置2.2 SeaFile下载安装2.3 SeaFile的配置 3. cpolar内网穿透3.1 Cpolar下载安装3.2 Cpolar的注册3.3 Cpolar云端设置3.4 Cpolar本地设置 4.公网访问测试5.结语 1. 前言 现在我们身边的只能设备越来越多&#xff0c…

K8S入门前奏之VMware虚拟机网络配置

为了能在本地搭建 K8S 的运行服务器&#xff0c;在个人电脑上安装了虚拟机VMware16版本&#xff0c;并且在阿里巴巴开源镜像站下载了CentOS-7操作系统&#xff1a;阿里巴巴开源镜像站 做完一些列准备工作后&#xff0c;在虚拟机安装完CentOS-7操作系统后&#xff0c;需要对VMw…

chales 重写/断点/映射/手机代理/其他主机代理

1 chales 安装和代理配置/手机代理配置/电脑代理配置 chales 安装和代理配置/手机代理配置/电脑代理配置 2 转载:Charles Rewrite重写(详解&#xff01;必懂系列) Charles Rewrite重写(详解&#xff01;必懂系列) 1.打开charles&#xff0c;点击菜单栏的Tools选中Rewrite2.…

虹科分享 | 谷歌Vertex AI平台使用Redis搭建大语言模型

文章来源&#xff1a;虹科云科技 点此阅读原文 基础模型和高性能数据层这两个基本组件始终是创建高效、可扩展语言模型应用的关键&#xff0c;利用Redis搭建大语言模型&#xff0c;能够实现高效可扩展的语义搜索、检索增强生成、LLM 缓存机制、LLM记忆和持久化。有Redis加持的大…

wordpress使用category order and taxonomy terms order插件实现分类目录的拖拽排序

文章目录 引入实现效果安装插件使用插件 引入 使用docker快速搭建wordpress服务&#xff0c;并指定域名访问 上一节我们使用docker快速搭建了wordpress服务&#xff0c;可以看到基础的wordpress服务已经集成基础的用户管理、文章发布、页面编辑、文章分类等功能&#xff0c;但…