ProcessFuncion处理函数
- 功能
- 拥有富函数功能
- 生命周期方法
- 状态编程 - 对元素的处理功能processElement, 在不同的处理函数中,该方法的名字略有区别
- 定时器编程
- TimeService:定时服务,可以用于注册定时器,删除定时器
- ontimer():定时器触发后会自动调用该方法,我们将需要完成的工作写到该方法中
- 侧输出流
- 拥有富函数功能
- 分类
- processFunction: 普通流DataStream调用
- keyedProcessFunction: KeyedStream, 经过Keyby的数据流
- ProcessWindowFunction:按键分区经过window操作的数据流,WindowedStream,全窗口函数
- ProcessAllWindowFunction:非按键分区的window数据流调用
- CoProcessFunction:ConnectedStreams, 由两个数据流经过connect后得到的,没有经过keyby时调用
- ProcessJoinFunction:IntervalJoined , 两个流经过IntervalJoin后得到的流
- BroadcastProcessFunction:一个普通流connect广播流后得到,之后调用process需要传入该Function
- KeyedBroadcastProcessFunction: 一个keyby流connect广播流后得到
常用流之间的转换关系
- SingleOutputDataSteam是继承自DataStream的。
- window操作必须是基于keyby流
- 特殊流经过reduce, aggreagate,process, apply等聚合操作后就变为SingleOutputDataStream
processFunction的定时器方法
定时服务 和定时器
- TimerService:定时服务,用于注册定时器,删除定时
- long currentProcessingTime():获取当前的处理时间
- registerProcessingTimer(): 注册处理时间定时器
- registerEventTimeTimer():注册事件时间定时器
- currentWatermark():获取当前水位线
- deleteEventTimeTimer():注册事件时间定时器
- Timer:定时器,在未来的某个时间注册一个事件,定时器触发,会执行定义的事件
- ontimer(): 定时器触发以后,会自动调用该方法
- 注意:
- 要定时先keyby, 设置定时器必须基于一个keyby流
- 同一个key在注册多个相同时间的定时器,未来之后触发一次,不同的key注册相同时间的定时器,每个key都会触发一次。
TopN 实现
/*** title:** @Author 浪拍岸* @Create 11/12/2023 下午3:19* @Version 1.0** 实时统计一段时间的热门URL* 例如:10秒内最热门的两个URL链接,并且每5秒更新一次** 方案1:不进行keyby操作,将所有URL数据统一往一个窗口* 中收集,并且使用全量聚合,等到窗口触发计算时,在处理函数中* 对窗口内所有数据进行汇总处理**/
public class Flink03_TopN {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);SingleOutputStreamOperator<Event> ds = Flink06_EventSource.getEventSource(env).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs()));ds.print("input");ds.windowAll(
// TumblingEventTimeWindows.of(Time.seconds(10))SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessAllWindowFunction<Event, String, TimeWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Event, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<String> out) throws Exception {//统计每个URL的点击次数long start = context.window().getStart();long end = context.window().getEnd();HashMap<String, Long> urlCountMap = new HashMap<>();for (Event element : elements) {Long count = urlCountMap.getOrDefault(element.getUrl(), 0L);urlCountMap.put(element.getUrl(), count+1);}//将map结构转换为list
// ArrayList<UrlUserCount> urlCountList = new ArrayList<>(urlCountMap.size());List<UrlUserCount> urlUserCountList = urlCountMap.entrySet().stream().map(entry -> new UrlUserCount(entry.getKey(), entry.getValue(), start, end)).collect(Collectors.toList());urlUserCountList.sort(new Comparator<UrlUserCount>() {@Overridepublic int compare(UrlUserCount o1, UrlUserCount o2) {return Long.compare(o2.getCount(), o1.getCount());}});//取topNStringBuilder result = new StringBuilder("*******************************\n");for (int i = 0; i < Math.min(2,urlUserCountList.size()); i++) {UrlUserCount urlUserCount = urlUserCountList.get(i);result.append("TOP."+(i+1)+" "+urlUserCount+"\n");}result.append("*****************************************\n");//输出out.collect(result.toString());}}).print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}