flink-1.16 table sql 消费 kafka 数据,指定时间戳位置消费数据报错:Invalid negative offset 问题解决

1 背景

1.使用 flink-1.16 的 table sql 消费 kafka数据,并使用 sql 计算指标,然后写入 doris;

2.指标计算时,需要统计当日数据条数,考虑到作业异常退出被重新拉起时,需要从零点开始消费,所以指定 'scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '当日零点时间戳' 方式创建 kafka table:

s"""|CREATE TABLE qysfxjKafkaTable (|xlid STRING,|available_status STRING,|sendtime STRING,|`ts` TIMESTAMP(3) METADATA FROM 'timestamp'|) WITH (|'connector' = 'kafka',|'topic' = '${param.getProperty("qysfxjTopic")}',|'scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '当日零点时间戳'|'properties.group.id' = '${param.getProperty("qysfxjTopicGroupId")}',|'properties.bootstrap.servers' = '${param.getProperty("brokers")}',|'properties.auto.offset.reset' = 'earliest',|'json.ignore-parse-errors' = 'true',|'json.fail-on-missing-field' = 'false',|'format' = 'json')|""".stripMargin

3.启动时报 kakfa 的错误,Invalid negative offset,即 flink 使用了一个不正确的 offset 到 kafka 消费数据,经过排查 topic 中最新一条数据的时间,在今日零点之前,也就是说,kafka table sql 中指定今日零点的时间戳,落后于 kafka 最新数据的时间;

2 解决方案

1.两种解决方案,① 从检查点启动作业;② 根据 kafka 数据时间,调整消费的时间;考虑到第一次启动可能 topic 也没有数据,且如果检查点失败会导致作业无法从检查点恢复的情况,决定采用 ② 方案解决;

2.方案步骤

1.使用 kafka java api,获取 topic 中最后一条数据,根据数据的时间戳初始化创建 kafka table sql 的启动时间;

2.获取到 kafka 最后一条数据的场景有两种:① kafka 中最新一条数据时间早于零点(报错的场景);② kafka 中最新一条数据时间晚于零点;

3.根据以上步骤,实现代码,代码会返回一个时间戳,0或者最后一条数据时间戳:

def getTopicLatestRecordTimeStamp(brokers: String,topic: String): Long ={var retMillis = 0Lval props = new Properties();props.put("bootstrap.servers", brokers);props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");val consumer = new KafkaConsumer[String,String](props);try {// 订阅 topicconsumer.subscribe(Collections.singletonList(topic))// 获取并订阅全部分区var assigment = consumer.assignment()while (assigment.size() == 0) {consumer.poll(Duration.ofMillis(1000L))assigment = consumer.assignment()}println(assigment)val earliestOffset = consumer.beginningOffsets(assigment)val lastOffset = consumer.endOffsets(assigment)println("assigment: " + assigment + ",topic earliestOffset:" + earliestOffset + ",topic endOffsets:" + lastOffset)// 遍历所有分区,获取最新的 offsetval lastOffsetIter = lastOffset.entrySet().iterator()while (lastOffsetIter.hasNext){val ele = lastOffsetIter.next()if(ele.getValue != 0L){// 分区有数据consumer.seek(ele.getKey, ele.getValue - 1)val records = consumer.poll(1000).iterator()while (records.hasNext){val next = records.next()if(next.timestamp() > retMillis){retMillis = next.timestamp()}System.out.println("Timestamp of last record: " + next.timestamp())}}}retMillis} finally {consumer.close();}}

4.根据返回的时间戳,于当日零点判断,如果返回的时间戳早于零点,使用 latest-offset,返回的时间戳晚于当日零点,使用零点启动即可,以下代码返回使用的是时间戳启动,还是 latest-offset 启动:

if(parameterTool.get("qysfxjTopicStartFrom","latest").equals("latest")){val toAssignmentTime = TimeHandler.getMidNightMillions()val latestTime = KfkUtil.getTopicLatestRecordTimeStamp(pro.get("brokers").toString,pro.get("qysfxjTopic").toString)// 如果最后一条数据在 toAssignmentTime 之前,则使用 latest-offset 启动消费if(toAssignmentTime > latestTime){pro.put("qysfxjTopicStartFrom","latest-offset")}else {pro.put("qysfxjTopicStartFrom",(toAssignmentTime).toString)}}else {pro.put("qysfxjTopicStartFrom",parameterTool.get("qysfxjTopicStartFrom"))}

5.根据时间戳,还是 latest-offset,生成 sql 中的 scan 片段:

/*** 根据 startFrom,判断是从什么位置消费。** @param startFrom:earliest-offset,latest-offset,group-offsets,timestamp* @return*/def getKafkaSQLScanStr(startFrom: String): String = {var scanStartup = ""if(Character.isDigit(startFrom.trim()(0))){scanStartup ="'scan.startup.mode' = 'timestamp'," +s"'scan.startup.timestamp-millis' = '${startFrom.trim()}',"}else {scanStartup =s"'scan.startup.mode' = '${startFrom}',"}scanStartup}

6.完整table sql 拼接:

val qysfxjKafkaSource =s"""|CREATE TABLE qysfxjKafkaTable (|xlid STRING,|available_status STRING,|sendtime STRING,|`ts` TIMESTAMP(3) METADATA FROM 'timestamp'|) WITH (|'connector' = 'kafka',|'topic' = '${param.getProperty("qysfxjTopic")}',|${TXGJUtils.getKafkaSQLScanStr(qysfxjTopicStartFrom)}|'properties.group.id' = '${param.getProperty("qysfxjTopicGroupId")}',|'properties.bootstrap.servers' = '${param.getProperty("brokers")}',|'properties.auto.offset.reset' = 'earliest',|'json.ignore-parse-errors' = 'true',|'json.fail-on-missing-field' = 'false',|'format' = 'json')|""".stripMargin

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/495962.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实战分享:开发设计文档模版及编写要点

总框架 一、需求类开发设计文档模版 1、PRD链接 PRD文档链接 2、后端设计 1)流程图/代码逻辑描述 描述代码逻辑,要求清晰准确,尽量用图表描述 超过3人天工作量的需求必须有流程图 2)库表设计 涉及数据库的改动&#xff0c…

Edge Scdn是用来干什么的?

酷盾安全Edge Scdn,即边缘式高防御内容分发网络,主要是通过分布在不同地理位置的多个节点,使用户能够更快地访问网站内容。同时,Edge Scdn通过先进的技术手段,提高了网上内容传输的安全性,防止各种网络攻击…

牛客周赛73B:JAVA

链接:登录—专业IT笔试面试备考平台_牛客网 来源:牛客网 题目描述 \hspace{15pt}小红拿到了正整数 xxx ,她希望你找到一个长度为 kkk 的区间,满足区间内恰好有 nnn 个数是 xxx 的倍数。你能帮帮她吗? 输入描述: …

微信小程序中遇到过的问题

记录微信小程序中遇到的问题(持续更新ing) 问题描述:1. WXML中无法直接调用JavaScript方法。2. css中无法直接引用背景图片。3. 关于右上角胶囊按钮。4. 数据绑定问题。5. 事件处理问题。 问题描述: 1. WXML中无法直接调用JavaSc…

Docker 安装mysql ,redis,nacos

一、Mysql 一、Docker安装Mysql 1、启动Docker 启动:sudo systemctl start dockerservice docker start 停止:systemctl stop docker 重启:systemctl restart docker 2、查询mysql docker search mysql 3、安装mysql 3.1.默认拉取最新版…

Leecode刷题C语言之字符串及其反转中是否存在同一子字符串

执行结果:通过 执行用时和内存消耗如下&#xff1a; bool isSubstringPresent(char* s) {int i,lenstrlen(s),end;for(i0;i<len-1;i){if(s[i]s[i1]) return true;for(endlen-1;end>1;end--){if(s[i]s[end]&&s[i1]s[end-1]) return true;}}return false; }解…

uniapp登录

第一步整登录 先整个appid APPID和APPSecret https://developers.weixin.qq.com/community/develop/article/doc/000ca4601b8f70e379febac985b413 一个账号只能整一个小程序 正确流程 调用uni.login https://juejin.cn/post/7126553599445827621 https://www.jb51.net/a…

【开源免费】基于SpringBoot+Vue.JS安康旅游网站(JAVA毕业设计)

本文项目编号 T 098 &#xff0c;文末自助获取源码 \color{red}{T098&#xff0c;文末自助获取源码} T098&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

AIGC:生成图像动力学

文章目录 前言一、介绍二、方法2.1、运动预测模块运动纹理 2.2、图像渲染模块 三、数据集实验总结 前言 让静态的风景图能够动起来真的很有意思&#xff0c;不得不说CVPR2024 best paper实质名归&#xff0c;创意十足的一篇文章&#xff01;&#xff01;&#xff01; paper&a…

cesium入门学习二

之前学习了cesium的一些基本操作&#xff0c;现在学习cesium怎么加载模型&#xff0c;以及一些其他操作。 1.学习汇总目录 第一篇&#xff1a;cesium入门学习一-CSDN博客 2.cesium效果显示以及代码 2.1 加载模型并显示 效果&#xff1a; js代码&#xff1a; // 创建 Ces…

路由策略

控制层流量 --- 路由协议传递路由信息时产生的流量 数据层流量 --- 设备访问目标地址时产生的流量 所谓的路由策略----在控制层面转发流量的过程中&#xff0c;截取流量&#xff0c;之后修改流量再转发或不转发的技术&#xff0c;最终达到影响路由器路由表的生成&#xff0c…

网络安全 - Cross-site scripting

1.1.1 摘要 在本系列的第一篇博文中&#xff0c;我向大家介绍了SQL Injection常用的攻击和防范的技术。这个漏洞可以导致一些非常严重的后果&#xff0c;但幸运的是我们可以通过限制用户数据库的权限、使用参数化的SQL语句或使用ORM等技术来防范SQL Injection的发生&#xff0c…

一、Hadoop概述

文章目录 一、Hadoop是什么二、Hadoop发展历史三、Hadoop三大发行版本1. Apache Hadoop2. Cloudera Hadoop3. Hortonworks Hadoop四、Hadoop优势1. 高可靠性2. 高扩展性3. 高效性4. 高容错性五、Hadoop 组成1. Hadoop1.x、2.x、3.x区别2. HDFS 架构概述3. YARN 架构概述4. MapR…

信息安全管理与评估赛题第9套

全国职业院校技能大赛 高等职业教育组 信息安全管理与评估 赛题九 模块一 网络平台搭建与设备安全防护 1 赛项时间 共计180分钟。 2 赛项信息 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 第一阶段 网络平台搭建与设备安全防护 任务1 网络平台搭建 XX:XX- XX:XX 50 任务2…

低代码开发中 DDD 领域驱动的页面权限控制

在低代码开发的领域中&#xff0c;应用安全与灵活性是两大关键考量因素。领域驱动设计&#xff08;DDD&#xff09;作为一种在软件设计领域广泛应用且颇具影响力的方法论&#xff0c;正逐渐在低代码开发的页面权限控制方面展现出其独特的价值与潜力。本文旨在客观地探讨如何借助…

目录jangow-01-1.0.1靶机

靶机 ip&#xff1a;192.168.152.155 把靶机的网络模式调成和攻击机kali一样的网络模式&#xff0c;我的kali是NAT模式, 在系统启动时(长按shift键)直到显示以下界面 ,我们选第二个&#xff0c;按回车。 继续选择第二个&#xff0c;这次按 e 进入编辑页面 接下来&#xff0c;…

微信小程序 不同角色进入不同页面、呈现不同底部导航栏

遇到这个需求之前一直使用的小程序默认底部导航栏&#xff0c;且小程序默认入口页面为pages/index/index&#xff0c;要使不同角色呈现不同底部导航栏&#xff0c;必须要在不同页面引用不同的自定义导航栏。本篇将结合分包&#xff08;subPackages&#xff09;展开以下三步叙述…

【GeekBand】C++设计模式笔记15_Proxy_代理模式

1. “接口隔离” 模式 在组件构建过程中&#xff0c;某些接口之间直接的依赖常常会带来很多问题&#xff0c;甚至根本无法实现。采用添加一层间接&#xff08;稳定&#xff09;接口&#xff0c;来隔离本来互相紧密关联的接口是一种常见的解决方案。典型模式 FacadeProxyAdapte…

网络安全之接入控制

身份鉴别 ​ 定义:验证主题真实身份与其所声称的身份是否符合的过程&#xff0c;主体可以是用户、进程、主机。同时也可实现防重放&#xff0c;防假冒。 ​ 分类:单向鉴别、双向鉴别、三向鉴别。 ​ 主题身份标识信息:密钥、用户名和口令、证书和私钥 Internet接入控制过程 …

UE5 崩溃问题汇总!!!

Using bundled DotNet SDK version: 6.0.302 ERROR: UnrealBuildTool.dll not found in "..\..\Engine\Binaries\DotNET\UnrealBuildTool\UnrealBuildTool.dll" 在你遇到这种极奇崩溃的BUG &#xff0c;难以解决的时候。 尝试了N种方法&#xff0c;都不行的解决方法。…