Flink问题解决及性能调优-【Flink不同并行度引起sink2es报错问题】

最近需求,仅想提高sink2es的qps,所以仅调节了sink2es的并行度,但在调节不同算子并行度时遇到一些问题,找出问题的根本原因解决问题,并分析整理。

实例代码

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 msCREATE TABLE kafka_table (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])--event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)--WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't1','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','properties.group.id' = 'g1','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format' = 'json'
);CREATE TABLE es_sink(send_type      STRING,account_id     STRING,publish_time   STRING,grouping_id       INTEGER,init           INTEGER,init_cancel    INTEGER,push          INTEGER,succ           INTEGER,fail           INTEGER,init_delete    INTEGER,update_time    STRING,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with ('connector' = 'elasticsearch-6','index' = 'es_sink','document-type' = 'es_sink','hosts' = 'http://xxx:9200','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb'
);CREATE view  tmp as
selectsend_type,account_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,event_time,opt,ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6CREATE view  tmp_groupby as
selectCOALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1when send_type is not null and account_id is null and publish_time is null then 2when send_type is not null and account_id is not null and publish_time is null then 3when send_type is not null and account_id is not null and publish_time is not null then 4end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上INSERT INTO es_sink
selectsend_type,account_id,publish_time,grouping_id,init,init_cancel,push,succ,fail,init_delete,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

发现问题

由于groupby或join聚合等算子操作的并行度与sink2es算子操作的并行度不同,上游算子同一个key的数据可能会下发到下游多个不同算子中。
导致sink2es出现多个subtask同时操作同一个key(这里key作为主键id),报错如下:

...Caused by: [test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)... 1 more[CIRCULAR REFERENCE:[test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]]

问题原因

Flink中存在八种分区策略,常用Operator Chain链接方式有三种分区器:

  • forward:上下游并行度相同,且不发生shuffle,直连的分区器
  • hash:将数据按照key的Hash值下发到下游的算子中
  • rebalance:数据会被循环或者随机下发到下游算子中,改变并行度若无keyby,默认使用RebalancePartitioner分区策略

rebalance分区器,可能会将上游算子的同一个key随机下发到下游不同算子中,因而引起报错,如下图:
在这里插入图片描述在这里插入图片描述模型如下:

在这里插入图片描述

解决方案

  • 分组聚合算子与sink2es算子配置成相同的并行度,即使用forward分区器,如下图:
    在这里插入图片描述在这里插入图片描述
    另外sink2es forward分区器上游operator chain已经通过hash分区器保证了同一个key只能下发到下游一个subtask实例中

模型如下:
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/215b7ba208b142da803a78d85b0f474e.png

  • sink2es算子的并行度配置为1,如下图:

operator chain为forward分区器模型如下:
在这里插入图片描述

总结

归根结底就是需要保证:上游subtask中同一个key只能下发到下游一个subtask中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/247382.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

centos 7安装MySQl

本文参考借鉴&#xff1a;https://cloud.tencent.com/developer/article/2353312&#xff0c;非常赞&#xff01; 为了避免权限不足的问题&#xff0c;建议切换至root用户进行安装 1.MySQL的清理与安装 查看是否存在MySQL服务 安装mysql之前&#xff0c;需要先看看要安装系…

基于springboot宠物领养系统

摘要 随着社会的不断发展和人们生活水平的提高&#xff0c;宠物在家庭中的地位逐渐上升&#xff0c;宠物领养成为一种流行的社会现象。为了更好地管理和促进宠物领养的过程&#xff0c;本文基于Spring Boot框架设计和实现了一套宠物领养系统。该系统以用户友好的界面为特点&…

选择合适的CRM管理系统,需要满足以下条件

随着数据时代的发展和企业业务的不断扩大&#xff0c;数据的比例开始增加&#xff0c;传统的数据计算方法不再适合现代企业。客户管理已成为企业最重要的组成部分之一&#xff0c;越来越多的企业开始关注客户管理。在crm管理系统上&#xff0c;企业希望通过crm管理系统&#xf…

第一节课,用户管理--后端初始化,项目调通。二次翻工

一、代码下载 网址&#xff1a; 用户管理第一节课&#xff0c;阿里生成代码包-CSDN博客 二、项目步骤&#xff0c;参考从 网址&#xff1a; 一、第一节课&#xff0c;用户管理--后端初始化&#xff0c;项目调通-CSDN博客 从这里开始跟随 &#xff08;一&#xff09;、跟随…

爬虫基础-前端基础

Html是骨骼、css是皮肤、js是肌肉&#xff0c;三者之间的关系可以简单理解为m(html)-v(css)-c(js) 浏览器的加载过程 构建dom树 子资源加载-加载外部的css、图片、js等外部资源 样式渲染-css执行 DOM树 ajax、json、xml AJAX 是一种在无需重新加载整个网页的情况下&#xf…

简述云原生基础定义及关键技术

云原生是什么 云原生是面向“云”而设计的应用,因此技术部分依赖于传统云计算的 3 层概念,基础设施即服务(IaaS)、平台即服务(PaaS)和软件即服务(SaaS)。 例如,敏捷的不可变基础设施交付类似于 IaaS,用来提供计算网络存储等基础资源,这些资源是可编程且不可变的,直…

【Java与网络6】实现一个自己的HTTP浏览器

前面我们讨论了HTTP协议的基本结构和Socket编程的基本原理&#xff0c;本文我们来整个大活&#xff1a;自己实现一个简单的浏览器。 目录 1.主线程循环体 2.readHostAndPort()方法的实现 3.readHttpRequest()方法的实现 4.sendHttpRequest()方法的实现 5.readHttpRespons…

前端性能优化——图片压缩和懒加载

图片压缩 使用第三方工具手动压缩图片使用Webpack工具在打包时自动压缩图片 这里主要介绍第二种方法。 &#xff08;1&#xff09;将小于某个大小的图片转化成 data URI 形式&#xff08;Base64 格式&#xff09;&#xff0c;减少请求数量&#xff0c;但是体积变得更大 modu…

每日一换,美好随心——发现那些让屏幕焕发新彩的壁纸!

1、方小童在线工具集 网址&#xff1a; 方小童 该网站是一款在线工具集合的网站&#xff0c;目前包含PDF文件在线转换、随机生成美女图片、精美壁纸等功能&#xff0c;喜欢的可以赶紧去试试&#xff01;

阿里云SSL证书DV型

我们都在SSL证书厂家&#xff08;CA&#xff09;并不多&#xff0c;全球可以兼容性99%机构仅有3-4家&#xff0c;少的可怜&#xff0c;全球绝大部分都要去厂家授权提供商申请&#xff0c;这意味着很多也喜欢去阿里云买SSL证书也是这样的原因。 SSL证书无论在哪里购买肯定是DV类…

ElasticSearch 学习笔记

基本概念 术语 文档&#xff08;document&#xff09;&#xff1a;每条记录就是一个文档&#xff0c;会以 JSON 格式进行存储 映射&#xff08;mapping&#xff09;&#xff1a;索引中文档字段的约束信息&#xff0c;类似 RDBMS 中的表结构约束&#xff08;schema&#xff09…

【LIBS】交叉编译TCPDUMP

目录 1. 安装编译工具2. 设置环境变量3. 编译libpcap3.1 安装依赖3.2 交叉编译 4. 编译TCPDUMP4.1 克隆仓库与生成构建环境4.2 静态链接LIBPCAP4.3 动态链接LIBPCAP4.4 构建与安装 5. 查看交叉编译结果5.1 文件布局 1. 安装编译工具 sudo apt-get install -y autoconf automak…

[MRCTF2020]Ez_bypass1

代码审计&#xff0c;要求gg和id的MD5值相等而gg和id的值不等或类型不等 相同MD5值的不同字符串_md5相同的不同字符串-CSDN博客 不过这道题好像只能用数组 下一步是passwd不能是纯数字&#xff0c;但是下一个判断又要passwd等于1234567 这里通过passwd1234567a实现绕过 原…

Differentiated Key-Value Storage Management for Balanced I/O Performance——论文泛读

ATC 2021 Paper 论文阅读笔记整理 问题 现代键值&#xff08;KV&#xff09;存储采用LSM树作为管理KV对的核心数据结构&#xff0c;但存在较高的写放大和读放大问题。现有的LSM树优化通常需要做出设计权衡&#xff0c;并无法同时在写入、读取和扫描方面实现高性能。 现有方法…

QT发生弹出警告窗口

QTC开发程序弹出警告窗口&#xff0c;如上图 实施代码&#xff1a; #include <QMessageBox> int main() {// 在发生错误的地方QMessageBox::critical(nullptr, "错误", "发生了一个错误&#xff0c;请检查您的操作。");}上面的文字可以更改&#x…

线性代数--------学习总结

高斯消去法&#xff1a;对于任意的矩阵&#xff0c;总是能够利用倍加和行变换的方法变化成为阶梯形矩阵&#xff08;每一行第一个非零元叫做主元&#xff0c;他所在的列就叫做主列------每一行的主列都在他上方任意一行主列的右边&#xff09;和行简化阶梯矩阵&#xff08;主元…

【Web前端实操17】导航栏效果——滑动门

滑动门 定义: 类似于这种: 滑到导航栏的某一项就会出现相应的画面,里面有对应的画面出现。 箭头图标操作和引用: 像一些图标,如果需要的话,可以找字体图标,比如阿里巴巴矢量图标库:iconfont-阿里巴巴矢量图标库 选择一个——>添加至购物车——>下载代码 因…

精通Python第13篇—数据之光:Pyecharts旭日图的魔法舞台

文章目录 引言准备工作绘制基本旭日图调整颜色和样式添加交互功能定制标签和标签格式嵌套层级数据高级样式与自定义进阶主题&#xff1a;动态旭日图数据源扩展&#xff1a;外部JSON文件总结 引言 数据可视化在现代编程中扮演着重要的角色&#xff0c;而Pyecharts是Python中一个…

【学网攻】 第(13)节 -- 动态路由(OSPF)

系列文章目录 目录 系列文章目录 文章目录 前言 一、动态路由是什么&#xff1f; 二、实验 1.引入 实验拓扑图 实验配置 实验验证 总结 文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学…

Python爬虫请求库安装

请求库的安装 爬虫可以简单分为几步&#xff1a;抓取页面、分析页面和存储数据。 在抓取页面的过程中&#xff0c;我们需要模拟浏览器向服务器发出请求&#xff0c;所以需要用到一些 Python 库来实现 HTTP 请求操作。在本教程中&#xff0c;我们用到的第三方库有 requests、S…