增加并行度后,发现Flink窗口不会计算的问题。

文章目录

  • 前言
  • 一、现象
  • 二、结论
  • 三、解决


前言

窗口没有关闭计算的问题,一直困扰了很久,经过多次验证,确定了问题的根源。


一、现象

Flink使用了window,同时使用了watermark ,并且还设置了较高的并行度。生产是设置了300的并行度,并且接入了几十个topic ,这个地方划重点,后面会提到。结果就是,窗口没有关闭进行计算。于是我查阅的相关文档,得到的答案是因为配置的源并行度大于topic的分区数而导致。这个答案只能说很接近,而且我最开始也觉得很有道理。
解释一下watermark + window的原理

在这里插入图片描述
可以看到前面三个窗口里面都有数据,窗口触发计算的其中一个必要条件是最新的数据没过最低的水位线,就进行计算,认为不会再有乱序的数据进来了。但是从图中我们可以看到其中一个窗口一个数据都没有,就会导致拿不到所有窗口的最低水位线。因此也就无法触发计算。
为了验证这一法则
我在测试环境配置了一个并行度为10的程序,topic只有一个分区,启动任务的时候,我信誓旦旦地保证这不可能关闭窗口进行计算,然而,现实狠狠打了我一巴掌,窗口结果算出来了。虽然只是三言两语,实际上我做了很多尝试,只是其他的实验不重要,都是证明我是错的

于是通过比较的方法,想到和生产的情况不同就在于,生产消费了几十个topic,而我的测试只有一个topic,于是我再次坚信,问题一定就在这了。

我直接在idea进行测试
在这里插入图片描述

我配置了两个topic,并且在一开始只往第一个topic中写数据,而第二个topic不写数据

很好,跑了一整个中午,一次窗口聚合计算都没有。

此时进行最后一步验证,就是往第二个topic写数据。

我在这个时间往第二个topic发了数据

collectTime":1697693856606

在这里插入图片描述
为了让大家看清楚现象,我把日志和截图都给出来

2023-10-19 13:37:32.699 [Legacy Source Thread - Source: Custom Source -> Flat Map -> (Flat Map -> Flat Map -> Sink: Unnamed, Timestamps/Watermarks -> (Flat Map, Flat Map, Flat Map)) (10/16)#0] INFO  c.a.c.d.risk.domain.function.IndicatrixMapFunction - 【通过】滑动窗口前置数据处理
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:8
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:27
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:28
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:17
2023-10-19 13:37:32.805 [Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CountAverageFunction, LogResultWindowFunction) (13/16)#0] INFO  com.ai.cass.dc.risk.re.idxSend.IdxSend - 聚合时:存储指标结果,calcTypeCode:FrequencyOccurStttc key:ff83d41c-335f-405d-88e7-f5285aecdcf5a1123 Value:20

在这里插入图片描述

证明就是在这个时间节点上,窗口计算处理结果

二、结论

因此我就可以大胆地推断,是因为多个topic进行了数据消费,其中有个topic数据会进入窗口进行计算,但有的窗口又永远不会有数据进入计算,这就造成对应的窗口永远没有最低的watermark以致于窗口无法关闭并计算。

三、解决

既然问题找到了,那解决办法就随之而生

  • 1、如果可以不使用水印,直接关闭水印即可,只要消费的数据不会积压,并且要求没那么高的话,这个方法最简单
  • 2、减小并行度到能够使得每个窗口都有数据,减小并行度会让不同topic用同一个窗口,至于这个数量,那还得研究研究了
  • 3、把需要到窗口和不到窗口计算的数据进行分流
  • 4、也可以把源与后面算子之间采用rebalance的方式传递,这样就能够轮询的方式往下传递,使得每个window都会有数据,这里有一点一定要注意,rebalance必须放在watermark之前才可以。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/164458.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从入门到进阶 之 ElasticSearch 节点配置 集群篇

🌹 以上分享 ElasticSearch 安装部署,如有问题请指教写。🌹🌹 如你对技术也感兴趣,欢迎交流。🌹🌹🌹 如有需要,请👍点赞💖收藏🐱‍&a…

PHP 变量

变量 变量的声明、使用、释放 变量定义 形式 $ 变量名;严格区分大小写 $name; $Name; $NAME //三个变量不是同一个变量字母、数字、下划线组成,不能以数字开头,不能包含其他字符(空白字符、特殊字符) 驼峰式命名法、下划线式命名法 $first_name; $fi…

央国企、金融信创改造必备的Windows AD域控国产替代方案

自国资委下发79号文并明确规定了2027年底前信息系统全面替换的目标后,金融机构、大型央国企均规划起信创改造方案,其中金融机构更是走在8大行业信创前列,成为央国企、医疗、能源等行业国产化改造的参考样板。 在参与并负责某大型金融机构与某…

Redis内存回收机制-内存淘汰策略和过期策略

Redis是基于内存操作的非关系型数据库,在内存空间不足的时候,为了保证程序的运行和命中率,就会淘汰一部分数据。如何淘汰数据?这就是Redis的内存回收策略。 Redis中的内存回收策略主要有两个方面: Redis过期策略&#…

使用poco出现Cannot find any visible node by query UIObjectProxy of “xxx“怎么办

在编写脚本的时候,使用poco的控件识别已经是大家非常喜欢的一种方式,准确度很高,而且也很容上手。 但是有时候会出现下面这种报错,提示 Cannot find any visible node by query UIObjectProxy of “xxx“这个时候是不是开始着急…

STM32标准外设库下载(下载地址与步骤详解)

文章目录 1. 概述2. 官方下载地址3. 步骤详解3.1 打开官网3.2 工具与软件 ➡ 嵌入式软件 ➡ MEMS软件3.3 微控制器软件 ➡ STM32微控制器软件 ➡ STM32标准外设软件库 ➡ 选择产品系列3.4 选择版本 ➡ 点击下载3.5 点击“接受” ➡ 填写邮箱信息 ➡ 点击“下载”3.6 点击接收到…

京东商品详情API接口(标题|主图|SKU|价格|库存..)

京东商品详情接口的应用场景有很多,以下为您推荐几种: 电商平台集成:如果想要实现商品查询、购买、支付等功能,提高自身平台的电商能力,可以将京东API接口集成到自己的电商网站或应用程序中。第三方开发者插件&#x…

Clin Cancer Res|“乳酸化+巨噬细胞”国自然强强联合

前列腺癌(PC)是全球第二大最常见的男性癌症,每年估计有375,304人死亡。虽然雄激素剥夺疗法(ADT)仍然是晚期前列腺癌的当前标准治疗方法,但大多数患者最终进展并发展为致命的转移性去势抵抗性前列腺癌(mCRPC)。 PTEN(一种抑癌基因&#xff09…

Godot 官方2D C#重构(1):雪花碰撞

前言 Godot 官方 教程 Godot 2d 官方案例C#重构 专栏 Godot 2d 重构 github地址 实现效果 难点介绍 Godot GDScript和C# 对应关系大部分靠猜 文件导入 资源地址:默认为res://开头2D贴图导入类型:Texture2D public Texture2D Bullet_Image new Textu…

无人机航拍图像拼接与目标识别

一、简介 无人机用来做图像侦察是常见功能,现有技术基本是无人机对某片区域进行飞行,人工实时监控飞行图像,将图像录制成视频供事后回放。此方法对人员业务要求比较高、反应速度足够快、不利于信息收集、录制视频丢失空间信息、对于后期开展区…

书单|1024程序员狂欢节充能书单!

作者简介: 辭七七,目前大二,正在学习C/C,Java,Python等 作者主页: 七七的个人主页 文章收录专栏: 七七的闲谈 欢迎大家点赞 👍 收藏 ⭐ 加关注哦!💖&#x1f…

【JavaEE】 阻塞式队列详解

文章目录 🌲阻塞队列是什么🌳生产者消费者模型🚩耦合📌紧耦合(强耦合)📌松耦合(解耦合) 🎄Java标准库中的阻塞队列的使用🚩标准库实现消费者生产者…

大规模语言LLaVA:多模态GPT-4智能助手,融合语言与视觉,满足用户复杂需求

大规模语言LLaVA:多模态GPT-4智能助手,融合语言与视觉,满足用户复杂需求 一个面向多模式GPT-4级别能力构建的助手。它结合了自然语言处理和计算机视觉,为用户提供了强大的多模式交互和理解。LLaVA旨在更深入地理解和处理语言和视…

使用MFC创建一个SaleSystem

目录 1、项目的创建: 2、项目的配置: 3、设置窗口属性: (1)、设置图标 1)、添加导入资源 2)、代码初始化图标 (2)、设置标题 (3)、设置窗口…

如何解决香港服务器使用的常见问题

​  站长们在选择香港服务器租用时会考虑到它的各种性能以及稳定性,这是必须的。但是使用过程中还有些问题也不容忽视,比如:带宽资源是否短缺,是否存在安全漏洞,连接是否正常等这些问题也要考虑到。 香港服务器使用中…

整理uvc驱动相关函数的调用流程

目录 1、uvc_video.c初始化函数的调用关系 2、uvc_queue.c3、uvc_v4l2.c4、v4l2-core5、数据传输1、分配一个gadget请求2、请求一个queue 1、uvc_video.c // uvc_video.c uvc_video_encode_header uvc_video_encode_data uvc_video_encode_bulk uvc_video_encode_isoc uvcg_vi…

关闭mysql,关闭redis服务

1. 关闭redis服务: 查询redis安装目录: whereis redis which redis find / -name redis 关闭redis服务: redis-cli -h 127.0.0.1 -p 6379 auth 输入密码 shutdown 关闭redis服务 2. 关闭mysql服务: 查询mysql安装目录&…

Docker逃逸---SYS_PTRACE浅析

一、产生原因 用户授予了容器SYS_PTRACE权限,并且与宿主机共享一个进程命名空间(--pidhost),使得容器内可以查看到宿主机的进程,攻击者可以利用进程注入,反弹shell,从而实现逃逸 二、利用条件 1、容器有SYS_PTRACE权…

(H5轮播)vue一个轮播里显示多个内容/一屏展示两个半内容

效果图 : html: <div class"content"><van-swipeclass"my-swipe com-long-swipe-indicator":autoplay"2500"indicator-color"#00C4FF"><van-swipe-itemclass"flex-row-wrap"v-for"(items, index) in M…