Flink(java版)

watermark

时间语义和 watermark

注意:数据进入flink的时间:如果用这个作为时间语义就不存在问题,但是开发中往往会用处理时间
作为时间语义这里就需要考虑延时的问题。
如上图,数据从kafka中获取出来,从多个分区中获取,这时候时间肯定有乱序,这时候就需要使用事
件时间。

场景:游戏连续过五关,给予奖励
地铁里面玩游戏,连过三关断网了,二分钟过了八关。这时候是用处理时间还是事件时间呢?
处理时间的优势:牺牲一定的数据准确性,没有延迟

package com.atguigu.apitest.window;/**import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认为当前机器的cpu的最大核数//env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setAutoWatermarkInterval(100);// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型,分配时间戳和watermarkDataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));})// 乱序数据设置时间戳和watermark.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {};// 基于事件时间的开窗聚合,统计15秒内温度的最小值SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1)).sideOutputLateData(outputTag).minBy("temperature");minTempStream.print("minTemp");minTempStream.getSideOutput(outputTag).print("late");env.execute();}
}
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718211,32.8
sensor_1,1547718212,37.1注意:第一个窗口是[1547718195,1547718210);

sensor_1,1547718213,33
sensor_1,1547718224,32.1
sensor_1,1547718225,31.6
sensor_1,1547718226,21.2
sensor_1,1547718227,33.6第二个窗口大小:第一个窗口是[1547718210,1547718225);

1.理想状态:来一条数据处理一条,每条数据代表对时间推进;如图到5之后就将【0,5)的窗口关闭并输出;2.乱序状态:原因:网络延迟、分布式、分区导致乱序数据产生;网络延迟和分布式处理造成的乱序都是几十毫秒和几百毫秒的范围的差距;这将回造成大多数延迟数据集中在几十毫秒和几百毫秒的范围内;3.解决方案:将时间事件放慢

flink的三重保证:1.设置watermaker将几百毫秒的数据全部输出;2.先输出一个近似的结果,但是不要关闭窗口后面延迟的时间还需要更新;3.当延时时间到了,窗口就关闭了;兜底方案使用侧输出流保证数据不丢失;注意:数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据都已经到
达了,因此,window 的执行也是由 Watermark 触发的。
6 3 2 5 4 1 
比如设置3秒的watermaker:
到达5:说明2秒之前的数据都到齐了,后面2,3都可以输出
到达6:说明3秒之前的数据都到齐了,大于等于3秒的数据才能输出意义:watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太
小数据就不准确,需要通过具体的业务场景去平衡这个值;

watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太小,乱序数据
没有搞定,数据就不准确,需要通过具体的业务场景去平衡这个值;如何找到watermaker:首先要了解乱序程度;
解决方案:通过机器学习构建一个模型,构建当前业务模型中的延迟状态的分布情况;

如图:大部分的延时数据都20ms和80ms之间的范围中,这时候设置80ms就搞定大部分乱序数据;
这时候还有很少的数据,如果对数据准确性要求比较高,这时候就需要设置窗口迟到机制去保证
数据的准备性;最后还有网络延迟的数据还是没有输出这时候就需要添加侧输出流作为兜底方案。

 watermark 生成问题

默认:来一条生产一条watermaker,如果短时间数据量比较大,会造成watermaker都一样造成资
源浪费;周期性添加watermaker:每隔一段时间更新一下watermaker 
周期性时间缺点:实时性不好;数据过于分散会造成资源浪费;如何选择:看数据的分布,过于集中使用周期性生成模式,数据稀疏,使用默认的模型;

状态编程 

需求:我们可以利用 Keyed state,实现这样一个需求: 检测传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警

package com.atguigu.apitest.state;/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved* <p>* Project: FlinkTutorial* Package: com.atguigu.apitest.state* Version: 1.0* <p>* Created by wushengran on 2020/11/10 16:33*/import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @ClassName: StateTest3_KeyedStateApplicationCase* @Description:* @Author: wushengran on 2020/11/10 16:33* @Version: 1.0*/
public class StateTest3_KeyedStateApplicationCase {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 定义一个flatmap操作,检测温度跳变,输出报警SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));resultStream.print();env.execute();}// 实现自定义函数类public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>>{// 私有属性,温度跳变阈值private Double threshold;public TempChangeWarning(Double threshold) {this.threshold = threshold;}// 定义状态,保存上一次的温度值private ValueState<Double> lastTempState;@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));}@Overridepublic void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {// 获取状态Double lastTemp = lastTempState.value();// 如果状态不为null,那么就判断两次温度差值if( lastTemp != null ){Double diff = Math.abs( value.getTemperature() - lastTemp );if( diff >= threshold )out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));}// 更新状态lastTempState.update(value.getTemperature());}@Overridepublic void close() throws Exception {lastTempState.clear();}}
}
sensor_1,1547718206,36.3
sensor_1,1547718206,37.9
sensor_1,1547718206,48
sensor_6,1547718201,15.4
sensor_6,1547718201,35
sensor_1,1547718226,36

 状态后端

状态后端: 1.本地的状态管理(如何存,上下文配置,怎么存,怎么写)  2.做快照容错,如何恢复数据

1. 测试环境:MemoryStateBackend
2. 生产环境:FsStateBackend 
3. 数据非常大时候:RocksDBStateBackend
state.backend: filesystem //默认使用FsStateBackend 
tate.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints 
//配置一个checkpoint的hdfs的存储路径jobmanager.execution.failover-strategy: region //区域化重启state.backend.incremental: false  //增量添加checkpoint

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/120126.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

信息系统安全运维和管理指南

声明 本文是学习 信息系统安全运维管理指南. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 安全运维支撑系统 信息系统安全服务台 目的 对信息系统安全事件进行统一监控与处理。 要求 建立一个集中的信息系统运行状态收集、处理、显示及报警的系…

EMERSON A6500-CC 机架接口模块 AMS参数

EMERSON A6500-CC 机架接口模块 AMS参数 ModBus和机架接口模块设计用于工厂的高可靠性 最关键的旋转机械。它从所有AMS A6500 ATG模块读取参数 并通过ModBus TCP/IP和/或ModBus RTU&#xff08;串行&#xff09;输出这些参数。 此外&#xff0c;OPC UA可用于向第三方系统传输数…

搭建单机版FastDFS分布式文件存储系统

一、准备工作 1、下载FastDFS安装包和依赖包 https://codeload.github.com/happyfish100/libfastcommon/tar.gz/V1.0.43 https://codeload.github.com/happyfish100/fastdfs/tar.gz/V6.06 https://codeload.github.com/happyfish100/fastdfs-nginx-module/tar.gz/V1.22 注&…

Mac下安装Jmeter及其配置

一、安装JDK环境 安装方式&#xff1a;mac下配置JDK环境_只看不学的博客-CSDN博客 如果已安装JDK环境即可忽略该步骤&#xff0c;检查方式&#xff0c;在终端输入java -version,如果出现了java版本&#xff0c;即代表已经配置过JDK环境了&#xff0c;如下图所示&#xff1a; …

uni-app 之 vue语法

uni-app 之 vue语法 image.png --- v-html 字符 --- image.png <template><view><view>{{title}}</view>--- v-html 字符 ---<view>{{title2}}</view><view v-html"title2"></view><view>{{arr}}</view&g…

3D数据导出工具HOOPS Publish:3D数据查看、生成标准PDF或HTML文档!

HOOPS中文网http://techsoft3d.evget.com/ 一、3D导出SDK HOOPS Publish是一款功能强大的SDK&#xff0c;可以创作丰富的工程数据并将模型文件导出为各种行业标准格式&#xff0c;包括PDF、STEP、JT和3MF。HOOPS Publish核心的3D数据模型是经过ISO认证的PRC格式(ISO 14739-1:…

龙迅LT6911UXE HDMI转2PORT MIPIDSI/CSI,支持单PORT 4K60HZ,内置MCU,加音频

龙迅LT6911UXE 1.描述&#xff1a; LT6911UXE是一款高性能HDMI2.0到MIPI DSI/CSI转换器&#xff0c;可用于VR、智能手机和显示器应用。 HDMI2.0输入支持高达6Gbps的数据速率&#xff0c;这为60Hz的视频提供了足够的带宽。同时&#xff0c;还支持 HDCP2.3进行数据解密。对于M…

Linux中的多线程剖析

目录 1、前言 2、多线程理解 2.1 线程 2.2 通俗了解进程和线程 2.2.1 进程是资源分配的基本单位 2.2.2 Linux中的线程是一种轻量化进程 2.3 进程和线程详解 2.3.1 创建一个线程 (pthread_create) 2.3.2 线程自己的一部分数据 2.3.3 线程组 2.3.4 关于进程的其他操作…

【聚类】DBCAN聚类

OPTICS是基于DBSCAN改进的一种密度聚类算法&#xff0c;对参数不敏感。当需要用到基于密度的聚类算法时&#xff0c;可以作为DBSCAN的一种替代的优化方案&#xff0c;以实现更优的效果。 原理 基于密度的聚类算法&#xff08;1&#xff09;——DBSCAN详解_dbscan聚类_root-ca…

python安装wind10

一、下载&#xff1a; 官网:Python Releases for Windows | Python.org 二、安装 双击下载的安装程序文件。这将打开安装向导。安装界面图下方两个框的" Use admin privileges wheninstalling py. exe和” Add python. exe to PATH"都要勾选,一定要勾选!一定要勾选…

5年测试在职经验之谈:2年功能测试、3年自动化测试,从入门到不可自拔...

毕业3年了&#xff0c;学的是环境工程专业&#xff0c;毕业后零基础转行做软件测试。 已近从事测试行业8年了&#xff0c;自己也从事过2年的手工测试&#xff0c;从事期间越来越觉得如果一直在手工测试的道路上前进&#xff0c;并不会有很大的发展&#xff0c;所以通过自己的努…

Linux之基于HTTPS的静态网站

目录 Linux之基于HTTPS的静态网站 定义 SSL协议 使用Apachemod_ssl组件的加密认证网站 mod_ssl模组 安装 配置文件 ssl配置文件的主要参数 案例 案例1 --- 搭建HTTPSSL的加密认证的web服务器 案例2 --- 组建多个子目录的网站www.joker.com&#xff0c;该网站下有2个子…

【docker】Mac M1 构建 x64 linux镜像

亲测教程 文章目录 首先构建环境 首先 首先你需要有一个 Dockerfile 比如&#xff1a;这里以一个 python 项目举例 FROM python:3.10-slimWORKDIR /appCOPY requirements.txt requirements.txt RUN pip install --no-cache-dir -r requirements.txtCOPY . .CMD [ "pyth…

Spring MVC:域对象共享数据

Spring MVC 前言域对象共享数据使用 ModelAndView 向 request 域对象中共享数据使用 Map 、Model 或 ModelMap 向 request 域对象中共享数据使用 SesionAttributes 注解向 session 域对象中共享数据使用 Servlet API 向 application 域对象中共享数据 附 前言 在上一章中&…

Redis的数据类型到底有什么奥秘

这里我们先只介绍五种常用的数据类型~ 目录 1、string 2、hash 3、list 4、set 5、zset 6、示例 1、string 数据类型&#xff1a;string内部编码&#xff1a;raw、int、embstr 说明&#xff1a; raw是最基本的字符串--底层是一个char数组&#xff08;此处的char是一个字…

【计算机网络】 静态库与动态库

文章目录 静态库实践使用方法总结 动态库实践使用方法总结 静态库与动态库的优缺点静态库优点缺点 动态库缺点优点 库有两种&#xff1a;静态库&#xff08;.a、.lib&#xff09;和动态库&#xff08;.so、.dll&#xff09;。所谓静态、动态是指链接。静态库是将整个库文件都拷…

学习网络编程No.5【TCP套接字通信】

引言&#xff1a; 北京时间&#xff1a;2023/8/25/15:52&#xff0c;昨天刚把耗时3天左右的文章更新&#xff0c;充分说明我们这几天并不是在摆烂中度过&#xff0c;而是在为了更文不懈奋斗&#xff0c;历时这么多天主要是因为该部分知识比较陌生&#xff0c;所以需要我们花费…

京东搜索EE链路演进 | 京东云技术团队

导读 搜索系统中容易存在头部效应&#xff0c;中长尾的优质商品较难获得充分的展示机会&#xff0c;如何破除系统的马太效应&#xff0c;提升展示结果的丰富性与多样性&#xff0c;助力中长尾商品成长是电商平台搜索系统的一个重要课题。其中&#xff0c;搜索EE系统在保持排序…

C#-SQLite-使用教程笔记

微软官网资料链接&#xff08;可下载文档&#xff09; 教程参考链接&#xff1a;SQLite 教程 - SQLite中文手册 项目中对应的system.dat文件可以用SQLiteStudio打开查看 参考文档&#xff1a;https://d7ehk.jb51.net/202008/books/SQLite_jb51.rar 总结介绍 1、下载SQLiteS…

RK3399平台开发系列讲解(内核调试篇)IO 数据工具:iostat和iotop

🚀返回专栏总目录 文章目录 一、iostat 命令二、/proc/diskstats 文件三、iotop 命令沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 在 Linux 系统上,iostat 和 iotop 这两个 IO 数据工具非常常用。它们都是性能分析领域中不可缺少的工具性软件。 一、iostat 命令…