一、Flink CEP
FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理库。它可以让你在无界流中检测出特定的数据,有机会掌握数据中重要的那部分。
是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。
- 目标:从有序的简单事件流中发现一些高阶特征
- 输入:一个或多个由简单事件构成的事件流
- 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
- 输出:满足规则的复杂事件
二、Flink CEP应用场景
风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。
三、CEP开发基本步骤
导入CEP相关依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
代码案例
package com.lyh.flink11;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.List;
import java.util.Map;public class Flink_CEP_S {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> stream = env.readTextFile("input/sensor.txt").map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, timeStamp) -> element.getTs()));Pattern<WaterSensor, WaterSensor> sensor_1 = Pattern.<WaterSensor>begin("sensor_1").where(new SimpleCondition<WaterSensor>() {@Overridepublic boolean filter(WaterSensor value) throws Exception {return "sensor_1".equals(value.getId());}});PatternStream<WaterSensor> pattern = CEP.pattern(stream, sensor_1);pattern.select(new PatternSelectFunction<WaterSensor, String>() {@Overridepublic String select(Map<String, List<WaterSensor>> map) throws Exception {return map.toString();}}).print();
env.execute();}
}