flink 最后一个窗口一直没有新数据,窗口不关闭问题

flink 最后一个窗口一直没有新数据,窗口不关闭问题

  • 自定义实现 WatermarkStrategy接口

自定义实现 WatermarkStrategy接口

代码:

    public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);@Overridepublic WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<JSONObject>() {private long maxWatermark;@Overridepublic void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));state.f0 = System.currentTimeMillis();System.out.println("maxWatermark is " + maxWatermark);state.f1 = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {//乱序时间long outOfTime = 3000L;if (maxWatermark - outOfTime <=0){} else {// 10s内没有数据则关闭当前窗口System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));System.out.println("state.f1:" + state.f1);if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){watermarkOutput.emitWatermark(new Watermark(maxWatermark  + 6000L));state.f1 = true;System.out.println("触发窗口,maxWatermark  + 6000L:" + (maxWatermark  + 6000L));} else {System.out.println("正常发送水印");watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));}}}};}}

代码部分逻辑说明
在这里插入图片描述若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);

使用自定义的watermark:
在这里插入图片描述
参考:https://blog.csdn.net/lr131425/article/details/127422833

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/240582.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Chondrex:Glycosaminoglycans Assay Kit(糖胺聚糖检测试剂盒)

糖胺聚糖&#xff08;glycosaminoglycans&#xff0c;GAGs&#xff09;是一种携带负电荷的多糖链&#xff0c;位于大多数结缔组织和许多不同类型细胞的细胞外基质&#xff08;extracellular matrices, ECM&#xff09;中以及细胞表面上。由重复双糖单位复合构成的糖胺聚糖可分为…

Docker之安装Nginx

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是君易--鑨&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的博客专栏《Docker之Dockerfile构建镜像》。&#x1f3af;&…

[C#]利用paddleocr进行表格识别

【官方框架地址】 https://github.com/PaddlePaddle/PaddleOCR.git 【算法介绍】 PaddleOCR表格识别是PaddlePaddle开源项目中的一个强大功能&#xff0c;它利用深度学习技术实现了对各类表格的高精度识别。PaddleOCR表格识别能够处理各种复杂的表格&#xff0c;包括但不限于…

Docker篇之修改docker默认磁盘占用目录

一、前言 通常情况下&#xff0c;当我们默认安装docker服务时&#xff0c;在不指定默认存储路径时&#xff0c;docker会自动创建目录&#xff0c;经常会出现打满根目录的情况。 默认存储路径为&#xff1a;/var/lib/docker 下 可通过如下进行查询&#xff1a; docker info输出…

消息中间件面试题

目录 一.为什么使用消息队列 二.RabbitMQ 1.RabbitMQ如何保证消息不丢失 2.RabbitMQ消息的重复消费问题 3.RabbitMQ延迟队列 4.RabbitMQ消息堆积 5.RabbitMQ高可用机制 三.Kafka 1.Kafka如何保证消息不丢失 2.Kafka如何保证消费消息的顺序性 3.Kafka高可用机制 4.Ka…

AVL树 -- C++实现

AVL树 – C实现 1. AVL树的概念 二叉搜索树虽可以缩短查找的效率&#xff0c;但如果数据有序或接近有序二叉搜索树将退化为单支树&#xff0c;查找元素相当于在顺序表中搜索元素&#xff0c;效率低下。因此&#xff0c;两位俄罗斯的数学家G.M.Adelson-Velskii和E.M.Landis在1…

【NI国产替代】USB‑7846 Kintex-7 160T FPGA,500 kS/s多功能可重配置I/O设备

Kintex-7 160T FPGA&#xff0c;500 kS/s多功能可重配置I/O设备 USB‑7846具有用户可编程FPGA&#xff0c;可用于高性能板载处理和对I/O信号进行直接控制&#xff0c;以确保系统定时和同步的完全灵活性。 您可以使用LabVIEW FPGA模块自定义这些设备&#xff0c;开发需要精确定时…

Android 系统启动过程纪要(基于Android 10)

前言 看过源码的都知道&#xff0c;Launcher系统启动都会经过这三个进程 init ->zygote -> system_server。今天我们就来讲解一下这三个进程以及Launcher系统启动。 init进程 准备Android虚拟机环境&#xff1a;创建和挂载系统文件目录&#xff1b;初始化属性服务&…

RTSP协议实现发送ACC音频数据

一.AAC音频格式介绍 AAC音频格式&#xff1a;Advanced Audio Coding&#xff08;高级音频解码&#xff09;&#xff0c;是一种由MPEG—4标准定义的有损音频压缩格式。音频压缩编码的输出码流&#xff0c;以音频帧的形式存在。每个音频帧包含若干个音频采样的压缩数据&#xff0…

代码随想录算法训练营29期|day 23 任务以及具体安排

669. 修剪二叉搜索树 class Solution {public TreeNode trimBST(TreeNode root, int low, int high) {if (root null) {return null;}if (root.val < low) {return trimBST(root.right, low, high);}if (root.val > high) {return trimBST(root.left, low, high);}// ro…

【数学建模】2024年华数杯国际赛B题-光伏发电Photovoltaic Power 思路、代码、参考论文

1 问题背景 中国电力构成包括传统能源(如煤炭、石油、天然气)、可再生能源(如水电、风能、太阳能、核能)和其他形式的电力。这些发电模式在满足中国巨大的电力需求方面发挥着至关重要的作用。据最新数据显示&#xff0c;中国总发电量超过20万亿千瓦时&#xff0c;居世界第一。…

企业怎么传输大容量视频?

在企业中&#xff0c;视频的应用越来越广泛&#xff0c;不论是在内部沟通、培训、宣传&#xff0c;还是在外部合作、推广、展示方面&#xff0c;视频都扮演着不可或缺的角色。然而&#xff0c;由于视频文件通常较大&#xff0c;传输时往往会面临网速慢、容量限制、安全风险等问…

【开源之美】:hello-algo

图文并茂的方式讲解常用算法&#xff0c;适合算法知识点学习和回顾总结。 一、算法知识点 二、前往地址 https://www.hello-algo.com/

云边协同的 RTC 如何助力即构全球实时互动业务实践

作者&#xff1a;即构科技 由 51 CTO 主办的“WOT 全球技术创新大会 2023深圳站”于 11 月 24 日 - 25 日召开&#xff0c;即构科技后台技术总监肖潇以“边缘容器在全球音视频场景的探索与实践”为主题进行分享。 边缘计算作为中心云计算的补充&#xff0c;通过边缘容器架构和…

【性能调优】local模式下flink处理离线任务能力分析

文章目录 一. flink的内存管理1.Jobmanager的内存模型2.TaskManager的内存模型2.1. 模型说明2.2. 通讯、数据传输方面2.3. 框架、任务堆外内存2.4. 托管内存 3.任务分析 二. 单个节点的带宽瓶颈1. 带宽相关理论2. 使用speedtest-cli 测试带宽3. 任务分析3. 其他工具使用介绍 本…

【欢迎您的到来】这里是开源库get_local_info作者的付费专栏

您好&#xff0c; 我是带剑书生&#xff0c;开源库get_local_info的作者&#xff0c;欢迎您的到来&#xff0c;这里是我的付费专栏&#xff0c;会用更简洁的语言&#xff0c;更通俗的话语&#xff0c;来帮助您更好的学习rust&#xff0c;这里不仅仅讲解Rust在某些应用功能实现上…

Kafka的安装、管理和配置

Kafka的安装、管理和配置 1.Kafka安装 官网: https://kafka.apache.org/downloads 下载安装包,我这里下载的是https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz Kafka是Java生态圈下的一员&#xff0c;用Scala编写&#xff0c;运行在Java虚拟机上&#xf…

【Internet Protocol】ip介绍,如何组局域网实现远程桌面和文件共享

文章目录 1.何为“上网”1.1 定义1.2 为什么连了WiFi就能上网了&#xff1f; 2.ip2.1 什么是ip2.2 为什么区分广域网和局域网&#xff0c;ip的唯一性2.3 如何查看设备的ip2.4 什么叫"ping"2.5 区分是否两个ip是否在同一局域网2.5.1 最稳妥的方式&#xff1a;ip&m…

Flutter 综述

Flutter 综述 1 介绍1.1 概述1.2 重要节点1.3 移动开发中三种跨平台框架技术对比1.4 flutter 技术栈1.5 IDE1.6 Dart 语言1.7 应用1.8 框架 2 Flutter的主要组成部分3 资料书籍 《Flutter实战第二版》Dart 语言官网Flutter中文开发者社区flutter 官网 4 搭建Flutter开发环境参考…

【印象深刻的实战经历】两次全国大学生数学建模经历分享

目录 &#x1f33c;初次接触 初次参加培训 分享培训所得 比赛开始 &#x1f525;再次接触 参加校赛 机缘巧合 再次培训 比赛开始 &#x1f4d5;技巧总结 从问题的实际意义分析大体上可分为 从问题的解决方法上分析 做国赛题目的步骤 赛前准备 选题 寻找思路…