一、恶意登录
对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。
因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。
二、数据源格式
937166,1715,beijing,beijing,1511661606
937166,1715,beijing,beijing,1511661607
937166,1715,beijing,beijing,1511661608
161501,36156,jiangsu,nanjing,1511661608
937166,1715,beijing,beijing,1511661609
937166,1715,beijing,beijing,1511661610
937166,1715,beijing,beijing,1511661611
937166,1715,beijing,beijing,1511661612
三、封装数据
package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginEvent {private Long userId;private String ip;private String eventType;private Long eventTime;
}
四、代码实现逻辑
实现逻辑:
统计连续失败的次数:
- 把失败的时间戳放入到List中,
- 当List中的长度到达2的时候, 判断这个两个时间戳的差是否小于等于2s
- 如果是, 则这个用户在恶意登录
- 否则不是, 然后删除List的第一个元素
- 用于保持List的长度为2
- 如果出现成功, 则需要清空List集合
五、代码实现
package com.lyh.flink11;import com.lyh.bean.LoginEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.List;
import java.util.Map;public class Login_ey {public static void main(String[] args) throws Exception {//创建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//创建水印策略WatermarkStrategy<LoginEvent> wms = WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {@Overridepublic long extractTimestamp(LoginEvent element, long recordTimestamp) {return element.getEventTime();}});//读入数据KeyedStream<LoginEvent, Long> watersencerStream = env.readTextFile("input/LoginLog.csv").map(line -> {String[] datas = line.split(",");return new LoginEvent(Long.valueOf(datas[0]),datas[1],datas[2],Long.valueOf(datas[3]));// 指定水印和时间戳}).assignTimestampsAndWatermarks(wms)// 按照用户ID分组.keyBy(LoginEvent::getUserId);// Flink CEP 也叫做Flink复杂事件处理,// 可以在无穷无界的事件流中检测事件规则,通过模式规则匹配的方式对重要信息进行跟踪和分析,从而在实时数据中发掘出有价值的信息//定义模式Pattern<LoginEvent, LoginEvent> fail = Pattern.<LoginEvent>begin("fail").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return "fail".equals(value.getEventType());}}).timesOrMore(2).consecutive().until(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return "success".equals(value.getEventType());}}).within(Time.seconds(2));// 把模式用在流上PatternStream<LoginEvent> ps = CEP.pattern(watersencerStream, fail);//获取匹配到的结果ps.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> pattern) throws Exception {return pattern.get("fail").toString();}}).print();env.execute();}
}