从Flink的Kafka消费者看算子联合列表状态的使用

背景

算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态

算子联合列表状态

首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
在这里插入图片描述
算子联合列表状态主要由这两个方法处理:
1初始化方法

public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}

2.开始通知检查点开始的方法:

public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/161891.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UE5 Python脚本自动化Sequence Key帧

前言 码上1024了&#xff0c;给大家分享一个UE5的脚本小功能&#xff0c;UE5中Sequence动态Key功能&#xff0c;这样我们就可以根据我们的数据动态更新了&#xff0c;非常实用&#xff0c;适合刚入门或者小白&#xff0c;接下来我就把整个过程分享给大家。 过程 新建一个工程…

【微信小程序开发】自定义组件以及页面布局设计

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于小程序的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一.自定义组件的使用步骤&#xff08;附实…

Sandboxie+Buster Sandbox Analyzer打造个人沙箱

一、运行环境和需要安装的软件 实验环境&#xff1a;win7_x32或win7_x64 用到的软件&#xff1a;WinPcap_4_1_3.exe、Sandboxie-3-70.exe、Buster Sandbox Analyzer 重点是Sandboxie必须是3.70版本。下载地址&#xff1a;https://github.com/sandboxie-plus/sandboxie-old/blo…

制作linux系统内部yum源仓库

需求说明 制作内网linux系统yum源仓库&#xff0c;比较简单的方式就是添加系统镜像&#xff0c;此种yum配置方式可参考文章 https://blog.csdn.net/d1240673769/article/details/108477661 如果无法提供系统镜像&#xff0c;那该如何创建内网的yum源仓库呢&#xff1f;本文提…

HarmonyOS 音视频开发概述

在音视频开发指导中&#xff0c;将介绍各种涉及音频、视频播放或录制功能场景的开发方式&#xff0c;指导开发者如何使用系统提供的音视频 API 实现对应功能。比如使用 TonePlayer 实现简单的提示音&#xff0c;当设备接收到新消息时&#xff0c;会发出短促的“滴滴”声&#x…

JMeter 随机数生成器简介:使用 Random 和 UUID 算法

在压力测试中&#xff0c;经常需要生成随机值来模拟用户行为。JMeter 提供了多种方式来生成随机值&#xff0c;本文来具体介绍一下。 随机数函数 JMeter 提供了多个用于生成随机数的函数&#xff0c;其中最常用的是 __Random 函数。该函数可以生成一个指定范围内的随机整数或…

JAVA转GO

GO 环境配置 go环境 下载go并安装(win下),环境变量他自己要配置上 https://dl.google.com/go/go1.21.3.windows-amd64.msi 验证是否安装成功: //打开cmd go versionVSCODE环境 下载VSCODE…略 配置VSCODE的环境 下载插件 go开发工具包 打开cmd,或者VSCODE自带的终端,…

IDEA使用http client无法识别http-client.env.json的环境配置

http-client.env.json的配置 {"dev": {"baseUrl": "http://192.168.60.176:9160","accessToken": "eyJhbPNOQ"} }选择不到环境 问题原因&#xff1a; 安装了Alibaba Cloud Toolkit插件后&#xff0c;被Alibaba Cloud ROS …

[初始java]——java为什么这么火,java如何实现跨平台、什么是JDK/JRE/JVM

java的名言&#xff1a; ”一次编译、到处运行“ 一、编译语言与解释语言 编译&#xff1a; 是将整份源代码转换成机器码再进行下面的操作&#xff0c;最终形成可执行文件 解释&#xff1a; 是将源代码逐行转换成机器码并直接执行的过程&#xff0c;不需要生成目标文件 jav…

springboot 导出word模板

一、安装依赖 <dependency><groupId>com.deepoove</groupId><artifactId>poi-tl</artifactId><version>1.12.1</version></dependency>二、定义工具类 package com.example.springbootmp.utils;import com.deepoove.poi.XWP…

如何加入开源项目维护并提交代码?本地搭建源码阅读开发构建环境示例: kafka

如何加入开源项目维护并提交代码?本地搭建源码阅读开发构建环境示例: kafka。 大家对开源项目有兴趣、想成为committer,或者工作需要,会从github上获取最新的开源项目源码。本文做一个示例,怎样搭建本地的源码阅读、开发、构建环境。 首先,在github上找到项目的链接,…

Web架构安全分析/http/URL/Cookie攻击

Web 架构安全分析 Web 工作机制及基本概念 传统 Web 架构 LAMP 网页 概念 网页就是我们可以通过浏览器上网看到的精美页面&#xff0c;一般都是经过浏览器渲染过的 .html 页面&#xff0c;html 语言在浏览器中渲染。其中包含了CSS、JavaScript 等前端技术。通过浏览器访问…

ELK + Filebeat 分布式日志管理平台部署

ELK Filebeat 分布式日志管理平台部署 1、前言1.1日志分析的作用1.2需要收集的日志1.3完整日志系统的基本特征 2、ELK概述2.1ELK简介2.2为什么要用ELK?2.3ELK的组件 3、ELK组件详解3.1Logstash3.1.1简介3.1.2Logstash命令常用选项3.1.3Logstash 的输入和输出流3.1.4Logstash配…

ISR实现RDN图像增强

ISR实现RDN图像增强 图像增强作用ISR项目安装1.从PyPI安装ISR&#xff08;推荐&#xff09;&#xff1a;2.从GitHub源代码安装ISR&#xff1a; 用法预测大图像推理 训练创建模型 RDN算法介绍Residual Dense Network的结构残差稠密块&#xff08;Residual Dense Block&#xff0…

三款经典的轮式/轮足机器人讲解,以及学习EG2133产生A/B/C驱动电机。个人机器人学习和开发路线(推荐)

1&#xff0c;灯哥开源&#xff08;有使用指南&#xff0c;适合刚入门新手&#xff09; 机械部分&#xff1a;2个foc无刷电机 硬件和软件部分&#xff1a;没有驱动板子。只有驱动器&#xff0c;主控板esp32和驱动器通过pwm直接通讯。驱动器板子上有蓝色电机接口&#xff0c;直…

抓了几千万条热门股数据,用Python量化验证后发现结果竟然...... | 邢不行

在体育领域一直流传着大热必死的说法&#xff0c;历史上也不乏夺冠大热门爆冷出局的故事。 在金融领域也有大名鼎鼎的金融第三定律&#xff1a;热门的东西不要碰。 01 大热必死 针对这一定律&#xff0c;我们之前也写过相关的文章。 https://mp.csdn.net/mp_blog/creation/ed…

Linux性能优化--性能工具:特定进程内存

5.0 概述 本章介绍的工具使你能诊断应用程序与内存子系统之间的交互&#xff0c;该子系统由Linux内核和CPU管理。由于内存子系统的不同层次在性能上有数量级的差异&#xff0c;因此&#xff0c;修复应用程序使其有效地使用内存子系统会对程序性能产生巨大的影响。 阅读本章后&…

JS 通过年份获取月,季度,半年度,年度

​功能描述&#xff1a; 实例化一个函数&#xff0c;给函数内传递不同的参数&#xff0c;获取从起始年份到现在年度所有的月份&#xff0c;季度&#xff0c;半年度&#xff0c;年度 动态演示 ---------正文代码开始-------- 1. 封装函数 createMonth 注&#xff1a;此代码可…

流程图如何制作?好用的11款流程图软件盘点!

流程图是一种强大的可视化工具&#xff0c;用于清晰地展示各种过程和步骤&#xff0c;应用非常广泛&#xff0c;在各个行业中随处可见&#xff0c;凡是涉及流程步骤的场景&#xff0c;都可以用到流程图&#xff0c;那么问题来了&#xff1a;流程图如何制作&#xff1f; 这篇文…

第0章:怎么入手tensorflow

近年来人工智能的火爆吸引了很多人&#xff0c;网上相关的热门课程报名的人很多&#xff0c;但是坚持下去的人却少。那些晦涩的原理没有一定知识的积累很难能理解。 如果你对人工智能感兴趣&#xff0c;且想利用人工智能去实现某项功能&#xff0c;而不是对人工智能本身感兴趣&…