flink1.17 eventWindow不要配置processTrigger

理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出.

flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃

bug复现操作

idea运行代码后 往source kafka发送一条数据  

a,1,1690304400000

可以看到无限输出:

理论上时间语义不建议混用,但是在rich函数中的确可以做到混用且正常使用

问题复现代码

package com.yy.flinkWindowAndTriggerimport com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Secondsobject flinkEventWindowAndProcessTriggerBUGLearn {def main(args: Array[String]): Unit = {// flink 启动本地webuival conf = new Configurationconf.setInteger(RestOptions.PORT, 28080)//    val env = StreamExecutionEnvironment.getExecutionEnvironmentval env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)//    val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.configure(conf)/*kafka输入:a,1,1690304400000        //对应 2023-07-26 01:00:00 (无限输出)       //如果传入 a,1,1693037756000 对应:2023-08-26 16:15:56 (1条/s)a,1,7200000               // 1970-01-1 10:00:00*/val brokers = "172.18.105.147:9092"val source = KafkaSource.builder[String].setBootstrapServers(brokers).setTopics("t1").setGroupId("my-group-23asdf46").setStartingOffsets(OffsetsInitializer.latest())// .setDeserializer() // 参数: KafkaRecordDeserializationSchema.setDeserializer(new M1()).build()val ds1 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")val s1 = ds1.map(_.split(",")).map(x => C1(x(0), x(1).toInt, x(2).toLong)) // key number 时间戳.assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0))).keyBy(_.f1).window(TumblingEventTimeWindows.of(seconds(10))).trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L))).reduce((x, y) => C1(x.f1, x.f2 + y.f2, 100L))s1.print()env.execute("KafkaNewSourceAPi")}// 乱序流class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {override def extractTimestamp(element: C1): Long = {element.f3}}// key num timestampcase class C1(f1: String, f2: Int, f3: Long)
}

-

-

maven pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkLocalDemo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>FlinkLocalDemo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.8</scala.version></properties><dependencies><!-- https://mvnrepository.com/artifact/joda-time/joda-time --><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.33</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
<!--            <version>1.2.17</version>--></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 引入flink1.13.0 scala2.12.12   --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- Either... --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- or... --><!--        下面几个是代码中写sql需要的包 四个中一个都不能少 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version>
<!--            <scope>test</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--  https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--        注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存--><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>--><!--            <version>${flink.version}</version>--><!--            <scope>provided</scope>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.11</version></dependency></dependencies><build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution></executions><configuration><scalaVersion>${scala.version}</scalaVersion></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.5.5</version><configuration><!--这部分可有可无,加上的话则直接生成可运行jar包--><!--<archive>--><!--<manifest>--><!--<mainClass>${exec.mainClass}</mainClass>--><!--</manifest>--><!--</archive>--><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/78298.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

迭代器模式(Iterator)

迭代器模式是一种行为设计模式&#xff0c;可以在不暴露底层实现(列表、栈或树等)的情况下&#xff0c;遍历一个聚合对象中所有的元素。 Iterator is a behavior design pattern that can traverse all elements of an aggregate object without exposing the internal imple…

排序八卦炉之归并、计数

文章目录 1.归并排序1.1初识代码1.2代码分析1.3复杂度1.4非递归版本1.01.初识代码2.代码分析 1.5非递归版本2.01.初识代码2.代码分析 2.计数排序2.1初始代码2.2代码分析 1.归并排序 1.1初识代码 //归并排序 时间复杂度&#xff1a;O(N*logN) 空间复杂度&#xff1a;O(N) vo…

java中io流、属性集Properties、缓冲流、转换流、序列化和反序列化、打印流、网络编程(TCP通信程序、文件复制案例、文件上传案例、B/S服务案例)

IO流&#xff1a; io流中i表示input输入&#xff0c;o表示output输出&#xff0c;流表示数据&#xff08;字符&#xff0c;字节&#xff0c;1个字符2个字节8个位&#xff09;&#xff1b;这里的输入输出是以内存为基础&#xff0c;将数据从内存中输出到硬盘的过程称为输出&…

复旦大学计算机考研分析

关注我们的微信公众号 姚哥计算机考研 更多详情欢迎咨询 24计算机考研|上岸指南 复旦大学 复旦大学计算机考研招生学院是计算机科学与技术学院&#xff0c;软件学院&#xff0c;工程与应用技术研究院。目前均已出拟录取名单。 复旦大学计算机学科创建于中国计算机事业的起…

C语言数组笔试题(详解)

目录 插入知识&#xff1a; 一.指向函数指针数组的指针 二.回调函数 什么是回调函数&#xff1f; 三.数组笔试题 个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名乐于分享在学习道路上收获的大二在校生&#x1f43b;‍❄个人主页&#xff1a;GOTXX &#x1f4…

试用AI生成代码工具Fauxpilot,详细安装过程

设置服务 预先说明 需要预先安装支持NVIDIA的docker,docker compose > 1.28不能再容器里运行&#xff0c;否则出现以下报错&#xff1a; rootc536ca0dbd64:/test/fauxpilot-main# ./setup.sh Checking for curl ... /usr/bin/curl Checking for zstd ... /opt/conda/bin…

计算机网络-性能指标

计算机网络-性能指标 文章目录 计算机网络-性能指标简介速率比特速率 带宽吞吐量时延时延计算 时延带宽积往返时间网络利用率丢包率总结 简介 性能指标可以从不同的方面来度量计算机网络的性能 常用的计算机网络的性能指标有以下8个 速率带宽吞吐量时延时延带宽积往返时间利…

Windows server上用nginx部署vue3项目

Windows server上用nginx部署vue3项目 一、Node中node_modules文件夹及package.json文件的作用说明二、VUE3项目打包三、Windows Server上的Nginx部署 一、Node中node_modules文件夹及package.json文件的作用说明 node_modules是安装node后用来存放用包管理工具下载安装的包的…

【Python | 进阶】提高你的Python技能,99个让代码更简洁、更快的秘密技巧, 确定不来看看?

&#x1f935;‍♂️ 个人主页: AI_magician &#x1f4e1;主页地址&#xff1a; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 &#x1f468;‍&#x1f4bb;景愿&#xff1a;旨在于能和更多的热爱计算机的伙伴一起成长&#xff01;&#xff01;&…

突破传统监测模式:业务状态监控HM的新思路 | 京东云技术团队

一、传统监控系统的盲区&#xff0c;如何打造业务状态监控。 在系统架构设计中非常重要的一环是要做数据监控和数据最终一致性&#xff0c;关于一致性的补偿&#xff0c;已经由算法部的大佬总结过就不再赘述。这里主要讲如何去补偿&#xff1f;补偿的方案哪些&#xff1f;这就…

Vue + ElementUI 实现可编辑表格及校验

效果 完整代码见文末 实现思路 使用两个表单分别用于实现修改和新增处理。 通过一个editIndex变量判断是否是编辑状态来决定是否展示输入框&#xff0c;当点击指定行的修改后进行设置即可&#xff1a; <el-table-columnv-for"(column, index) in columns":key&qu…

Android 开发代码规范

一. AndroidStudio开发工具规范 使用最新的稳定版本.统一文件的编码格式为utf-8. 清除每个类里面的无效的import导包.代码样式统一,比如&#xff0c;tab缩进4个空格&#xff0c;或者 tab size等如果没有特殊情况使用默认的配置即可。每行字数每行字符数不得超过 160 字符&…

SpringMVC -- REST风格开发,RESTful快速开发、RESTful注解开发

&#x1f40c;个人主页&#xff1a; &#x1f40c; 叶落闲庭 &#x1f4a8;我的专栏&#xff1a;&#x1f4a8; c语言 数据结构 javaweb 石可破也&#xff0c;而不可夺坚&#xff1b;丹可磨也&#xff0c;而不可夺赤。 REST 一、REST简介1.1REST风格简介 二、RESTful入门案例2.…

MySQL日期常见的函数

-- 获取当天日期 -- 2023-06-20 select curdate();-- 获取当天年月日时分秒 select now();-- 日期运算 -- 2024-06-20 17:04:17 select date_add(now(),interval 1 year);-- 日期比较 -- 0 select datediff(now(),now());-- 日期MySQL对于日期类型数据如何查询 -- 获取指定日期…

IDEA SpringBoot Maven profiles 配置

IDEA SpringBoot Maven profiles 配置 IDEA版本&#xff1a; IntelliJ IDEA 2022.2.3 注意&#xff1a;切换环境之后务必点击一下刷新&#xff0c;推荐点击耗时更短。 application.yaml spring:profiles:active: env多环境文件名&#xff1a; application-dev.yaml、 applicat…

2023年 Java 面试八股文(20w字)

目录 第一章-Java基础篇 1、你是怎样理解OOP面向对象 难度系数&#xff1a;⭐ 2、重载与重写区别 难度系数&#xff1a;⭐ 3、接口与抽象类的区别 难度系数&#xff1a;⭐ 4、深拷贝与浅拷贝的理解 难度系数&#xff1a;⭐ 5、sleep和wait区别 难度系数&a…

CI/CD—Docker初入门学习

1 docker 了解 1 Docker 简介 Docker 是基于 Go 语言的开源应用容器虚拟化技术。Docker的主要目标是build、ship and run any app&#xff0c;anywhere&#xff0c;即通过对应用组件的封装、分发、部署、运行等生命周期的管理&#xff0c;达到应用组件级别的一次封装、到处运…

MySQL索引3——Explain关键字和索引使用规则(SQL提示、索引失效、最左前缀法则)

目录 Explain关键字 索引性能分析 Id ——select的查询序列号 Select_type——select查询的类型 Table——表名称 Type——select的连接类型 Possible_key ——显示可能应用在这张表的索引 Key——实际用到的索引 Key_len——实际索引使用到的字节数 Ref ——索引命…

vs导出和导入动态库和静态库

1. 动态库和导出和导入 1.1 动态库的导出 1. 创建新项目 新建新项目&#xff0c;选择动态链接库&#xff08;DLL&#xff09;。 填写项目名称&#xff0c;并选择项目保存的路径&#xff0c;然后点击创建。 创建完成后&#xff0c;会自动生成如下所示文件&#xff0c;可以根据…

PostgreSQL-数据库命令

PostgreSQL-数据库命令 介绍 一个数据库是一个或多个模式的集合&#xff0c;而模式包含表、函数等。因此&#xff0c;完整的逻辑组织结构层次是服务器实例&#xff08;PostgreSQL Server&#xff09;、数据库&#xff08;Database&#xff09;、模式&#xff08;Schema&#…