火山引擎 ByteHouse:ClickHouse 如何保证海量数据一致性

背景

ClickHouse是一个开源的OLAP引擎,不仅被全球开发者广泛使用,在字节各个应用场景中也可以看到它的身影。基于高性能、分布式特点,ClickHouse可以满足大规模数据的分析和查询需求,因此字节研发团队以开源ClickHouse为基础,推出火山引擎云原生数据仓库ByteHouse。

在日常工作中,研发人员经常会遇到业务链路过长,导致流程稳定性和数据一致性难保障的问题,这在分布式、跨服务的场景中更为明显。本篇文章提出针对这一问题的解决思路:在火山引擎ByteHouse中构建轻量级流程引擎,来解决数据一致性问题。

使用轻量级流程引擎可以帮我们使用统一的标准来解决复杂业务链路的编排问题,不仅提高业务代码的可读性和复用性,还能更专注业务核心逻辑的开发,让整体流程更加标准化、规范化。

总结来说,使用流程引擎有以下优势:

  • 轻量级,接入方便,内存操作,性能有保障

  • 易维护,流程配置与业务分离,支持热更新

  • 易扩展,丰富的执行策略及算子支持

大体思路

8239a84c9f65c08769b3712e63e7e732.jpeg

上图为ByteHouse企业版管理平台功能架构图。从该功能架构图可以看出,ByteHouse核心能力都是依赖ClickHouse集群,对于集群节点多、数据计算量大的业务场景,容易出现节点状态不一致的问题,因此保证ClickHouse集群间的状态一致性是我们的核心诉求。

2a12dd1533910355d5d72c795f55e077.png

为了保证数据一致性,ByteHouse提供了以下能力:

  1. event engine: 事件处理中心

  2. workflow engine:轻量级流程引擎

  3. 对账系统

保障数据一致性最简单的方式是通过状态机来监听流程执行过程:

  • 首先,将所有的任务请求下发到event engine,由event engine将任务分发对应的handler执行,统一管理所有下发任务的生命周期,并提供异步重试、回滚补偿等功能。流量汇总到event engine以后,会让服务后续的业务扩展更加便捷。

  • 其次,对于比较复杂的任务请求,我们可以下发到workflow engine执行,由workflow生成实例,并编排任务队列,管理流程执行实例的生命周期,统一失败回滚,失败重试。

  • 最后,对于服务不可用等特殊场景产生的脏数据,由对账服务兜底。

a9545ed459ab02347bcc763c6706d474.png

架构设计

在流程监控的架构设计中,主要包含以下:

  • 流程管理层:主要负责流程配置的解析初始化,并完成编排策略的工作

  • 策略behavior层:编排执行节点,并下发执行任务到执行器

  • 执行器:管理执行节点执行

  • 执行节点:负责业务具体实现

9bd0d275b3798d11010380ca0522d35c.png

实现方案

执行节点

906e89e2b700dbddea015a094e395ddd.png

流程引擎的核心为“责任链”,按照责任链上的节点顺序依次执行所有任务,所以我们需要的三个基本单元分别为:

  • request:入参

  • processlist:流程执行节点list

  • response:出参

在研发工作中,我们时常会遇到以下问题:

  • 如果同时出现了一个问题,node1、node2、node3之间的数据交互如何实现?

  • 如果node1入参、出参与node2,node3不一样该如何处理?

  • 参数类型不同的node又该如何统一调度?

最简单的处理办法,是让node使用相同的上下文信息,将整个执行node模版化。我们让所有的执行节点node实现相同的接口Delegation,统一使用相同的上下文executionContext作为执行方法的入参。

对于流程中的request和response,我们可以放入executionContext中,让每个执行节点都可以通过上下文操作response。

// Delegation -
type Delegation interface {Execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorTryExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorConfirmExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorCancelExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorCode() stringType() value.DelegationType
}

执行策略

如果确定好了最小的执行节点,我们需要考虑到,业务场景并不会永远顺序执行node,再返回结果,流程执行过程中跳转、循环、并发执行都是比较常见的操作。考虑不同业务场景复用性,我们在执行节点之上加了一层执行策略,用策略behaivor来重新编排触发执行节点的任务。

  • 下图将流程分成了behavior1和behavior2,分别对应不同的策略。

  • 简单的策略举例:按顺序执行、并发执行、循环执行、条件跳转执行等。

  • 我们可以根据自身业务实际需要定制,后续会有实例介绍。

6e88ba463f800f06de5705821e4faaaf.png
// ActivityBehavior -
type ActivityBehavior interface {Enter(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppErrorExecute(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppErrorLeave(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppErrorCode() value.ActivityBehaviorCode
}

策略behavior提供有Enter,Execute,Leave三个接口,Enter负责生成执行节点任务instance,Execute负责编排并触发执行任务instance操作,Leave负责跳转到下一个behavior。

可以看出来策略behaivor的跳转方式类似于链表,不断执行next方法,所以编码过程中需要注意不要出现死循环,小心stackoverflow。

Executor

执行器Executor的主要作用是串联执行策略和执行节点,策略behavior将执行的命令下发给Executor,由Executor对执行节点的触发操作。这里会根据执行节点的type,映射到三种执行节点的执行方式,包含tcc,执行一次,重试多次。

// DelegationExecutor -
type DelegationExecutor interface {execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorpostExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
}func (de *DefaultDelegationExecutor) execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError {delegationCode := executionContext.GetExecutionInstance().GetDelegationCode()if len(delegationCode) == 0 || de.DelegationMap[delegationCode] == nil {logger.Info(ctx, "DefaultDelegationExecutor delegation code not found,use default delegation", zap.String("delegationCode", delegationCode))delegationCode = string(value.DefaultDelegation)executionContext.GetExecutionInstance().SetDelegationCode(delegationCode)}return de.dumpExecute(ctx, executionContext, delegationCode)
}func (de *DefaultDelegationExecutor) dumpExecute(ctx context.Context, executionContext ExecutionContextInterface, delegationCode string) apperror.AppError {FireEvent(ctx, executionContext, value.ExecutionStart)var err apperror.AppErrordelegation := de.DelegationMap[delegationCode]switch delegation.Type() {case value.TccDelegation:err = tccExecute(ctx, executionContext, delegation)case value.SingleDelegation:err = singleExecute(ctx, executionContext, delegation)case value.RetryDelegation:err = retryExecute(ctx, executionContext, delegation)}if err != nil {logger.Error(ctx, "delegation.Execute_err", zap.Error(err))return apperror.Trace(err)}FireEvent(ctx, executionContext, value.ExecutionEnd)return nil
}

ExecutionContext

ExecutionContext上下文是用来记录了流程执行的所有细节,包含以下:

  • ProcessEngineConfigurationInterface: 流程定义信息

  • ExecutionInstanceInterface: 执行节点实例

  • ActivityInstanceInterface: 执行策略实例

  • ProcessInstanceInterface: 流程实例

  • request:入参

  • response:返回值

为了保证整个流程执行的稳定性,这里除了response之外,所以其他的实例参数都不建议开放写接口,response可以用来存储流程实例执行过程中会产生的变量信息。

对于整个流程的定义ProcessEngineConfiguration,我们可以选择最简单的方式,即在数据库里,将配置信息映射成json字符串。当然也可以选择读取配置文件,只要能满足读取方便,数据不丢即可。

// ExecutionContextInterface -
type ExecutionContextInterface interface {GetProcessEngineConfiguration() ProcessEngineConfigurationInterfaceSetProcessEngineConfiguration(processEngineConfiguration ProcessEngineConfigurationInterface)GetExecutionInstance() instance.ExecutionInstanceInterfaceSetExecutionInstance(executionInstance instance.ExecutionInstanceInterface)GetActivityInstance() instance.ActivityInstanceInterfaceSetActivityInstance(activityInstance instance.ActivityInstanceInterface)GetProcessInstance() instance.ProcessInstanceInterfaceSetProcessInstance(processInstance instance.ProcessInstanceInterface)SetNeedPause(needPause bool)IsNeedPause() boolSetActivityIndex(activityIndex int)GetActivityIndex() intSetActivityBehaviorCode(activityBehaviorCode value.ActivityBehaviorCode)GetActivityBehaviorCode() value.ActivityBehaviorCodeSetBizUniqueKey(bizUniqueKey string)GetBizUniqueKey() stringGetRequest() map[string]interface{}SetRequest(request map[string]interface{})GetResponse() map[string]stringSetResponse(response map[string]string)AtomicAddResponse(key string, value string)
}

Listener

监听器的主要作用是用来监听流程执行中的重要参数信息。从上述executor接口可以看到fireEvent,它的作用是发送消息event,让listener监听到对应的event类型,完成一些定制化的行为。

类似于面向切面编程,我们可以在执行节点的前后增加定制化的逻辑,如打日志、监听节点执行时间,持久化流程中产生的response信息、增加链路追踪等。

API

ac5943dc65c2d11784eeedd7c8e8d7cd.png

最后,我们将上述的内容拼接串联起来,主要提供三个接口:

  • Start: 启动流程

  • Signal: 暂停或是异常退出后,继续执行流程

  • Abort: 强制中断流程

process start(){//1.get and create ProcessEngineConfigurationInterface 解析流程定义//2.create processInstance 创建流程实例//3.create ExecutionContext 创建执行上下文//4. lockstrategy trylock //5. invoke process start processinstance.start()//6. persist processInstance and return//7. lockstrategy unlock 
}processinstance start(){// get behavior// behavior enterbehavior.Enter(ctx, executionContext)//behavior executebehavior.Execute(ctx, executionContext)//behavior leavebehavior.Leave(ctx, executionContext)
}

相比于start,signal需要读取执行的细节信息,找到之前失败的执行节点位置,并加载到上下文中,再继续执行。

对于失败节点信息的持久化有两种方式:第一,可以选择在流程执行结束持久化;第二,可以通过listener在每个执行节点结束持久化。具体根据实际业务场景对于性能、数据一致性的要求做出抉择。

并发场景考虑

  1. behavior策略中肯定会出现定制、并发、处理多个执行节点到场景的问题,如果同时修改必定会造成数据错乱。简单的方法推荐使用带锁的容器存储,可以被修改的信息(response),此处使用的是github.com/bytedance/gopkg包里面封装的skipmap。

  2. lockstrategy可以自己定义最适配业务场景的,最简单的方案是redis锁,同时也考虑到系统异常退出后的恢复问题。可以参考redis官网解决特殊情况下的锁异常解决方案:https://redis.io/commands/setnx/

后续的工作

轻量级流程引擎的基本功能到此已经实现,后续的扩展优化可以围绕以下方向进行:

  1. 界面化展示,可以将链路执行情况展示出来

  2. 策略behavior维度扩展,适配各种业务场景

  3. 增加子流程的维度,可以复用原先的执行逻辑

Demo示例

以下为简单的processconfiguration的配置信息,此处使用DefaultBehavior,即同步顺序执行策略。

{"ProcessContentList":[{"Behavior":"DefaultBehavior","DelegationList":[{"Code":"sample1"},{"Code":"sample2"},{"Code":"sample3"}]},{"Behavior":"DefaultBehavior","DelegationList":[{"Code":"sample4"},{"Code":"sample5"}]}]
}
d20944d7dad46418584d4c190f459dd4.png

在listener里面加入日志,这样可以追溯出整个流程的执行流程,以便更好的监控整个流程的运行状态。

实际使用

以ClickHouse集群缩容为例:

c0162f657ec4f7f4b8b97a337361eae9.png
{"ProcessContentList":[// 查询所有需要重分布的table{"Behavior":"DefaultBehavior",// 顺序执行"DelegationList":[{"Code":"hor_reshard_table_loop" }]},// 遍历所有table进行数据的重分布 {"LoopKey":"reshard_table_loop_key","Behavior":"NonBlockLoopBehavior",// 非阻塞循环处理"DelegationList":[{"Code":"hor_reshard_table"}]},// 进行删除节点操作{"Behavior":"DefaultBehavior","DelegationList":[{"Code":"hor_start_remove_node"},{"Code":"hor_prepare_node_vcloud","PostCode":"hor_rollback_remove_node_vcloud"// 统一失败回滚处理},{"Code":"hor_update_config_vcloud","PostCode":"hor_rollback_remove_node_vcloud"},{"Code":"hor_set_cluster_running","PostCode":"hor_rollback_remove_node_vcloud"},{"Code":"hor_release_node"},{"Code":"hor_callback_bill"}]}]
}

总结

一个流程引擎适配所有的业务场景几乎是不可能,除非接受复杂的方案设计,而第三方流程引擎对于日常的业务开发显得太笨重。轻量级流程引擎则会简化接入方式,减少了过多http请求带来的性能损耗,更加灵活多变,追述问题也变得简单。

在ByteHouse中加入流程引擎的能力,能以较小的代价给业务更多重试的可能性,而不需要反复回滚,特别对于耗时很长的任务,能带来更好用户使用体验。除此之外,流程引擎还能将业务流程模版化,增加接口服务的复用性,使得业务代码的可读性、扩展性得到提升,方便后期维护。

火山引擎云原生数据仓库ByteHouse是火山引擎旗下的一款云原生数据仓库,为用户提供极速分析体验,能够支撑实时数据分析和海量数据离线分析,同时还具备便捷的弹性扩缩容能力,极致分析性能和丰富的企业级特性,助力客户数字化转型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/134413.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker 已经配置了国内镜像源,但是拉取镜像速度还是很慢(gcr.io、quay.io、ghcr.io)

前言 国内用户在使用 docker 时,想必都遇到过镜像拉取慢的问题,那是因为 docker 默认指向的镜像下载地址是 https://hub.docker.com,服务器在国外。 网上有关配置 docker 国内镜像源的教程很多,像 腾讯、阿里、网易 等等都会提供…

前后端跨域请求问题解决方法

如图: 1.在config配置包中创建一个CorsConfig配置类 2.将下面代码复制到这个类中即可 import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.cors.CorsConfigurati…

华为云云耀云服务器L实例评测|服务器反挖矿防护指南

前言 本文为华为云云耀云服务器L实例测评文章,测评内容是 云耀云服务器L实例 反挖矿防护指南 系统配置:2核2G 3M CentOS7.9 之前的文章中『一文教你如何防御数据库恶意攻击』,我们讲到黑客如何通过攻击数据库来获取权限,以及我们…

Java面试常用函数

1. charAt() 方法用于返回字符串指定索引处的字符。索引范围为从 0 到 length() - 1。 map.getOrDefault(num, 0) :如果map存在num这个key,则返回num对应的value,否则返回0. Arrays.sort(nums); 数组排序 Arrays.asList("a","b",&q…

详细解释HiveSQL执行计划

一、前言 Hive SQL的执行计划描述SQL实际执行的整体轮廓,通过执行计划能了解SQL程序在转换成相应计算引擎的执行逻辑,掌握了执行逻辑也就能更好地把握程序出现的瓶颈点,从而能够实现更有针对性的优化。此外还能帮助开发者识别看似等价的SQL其…

java 封装一个将String类型转Long类型的函数

Long是一种超大类型的数字变量类型 但java无法直接生成这种数据 但我们可以封装一个函数 public Long getuniid(String number) {Long longNumber Long.parseLong(number);return longNumber; }这样 我们就可以传入一个字符串 然后将其转换为 long 然后我们调用这个函数 Sys…

向日葵无法连接服务器(无法登录)

最近在使用向日葵过程中,突然就不能登录向日葵了,网上查了各种解决方案,比如说防火墙是不是把向日葵给拦截了?更换不同的版本等等,都无法解决,最后突然想到是不是电脑对向日葵原安装目录限制了?…

虚拟化技术:深入浅出

🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…

PY32F003F18之输入捕获

输入捕获是定时器的功能之一,配合外部引脚,捕获脉宽时间或采集周期。 CPU中的定时器最基本的功能就是计数功能,其次是输入捕获(IC),再次就是比较输出(OC),还有就是使用引脚对外部时钟进行计数,触发信号捕捉…

关于阿里云服务器Ubuntu编译jdk8中遇到的坑及解决方案

关于阿里云服务器Ubuntu系统安装jdk8中遇到的坑及解决方案 记录一下困扰了很多天、到处查资料最后终于成功安装的过程 关于阿里云服务器无法登录的问题 基本反馈是这样的: 如果你添加了ip之后仍然登不进去,有一种方法是直接从第三个选项进去登录之后修…

JWT安全

文章目录 JWT是什么?为什么要使用JWT?JWT的数据结构JWT的工作过程 JWT是什么? JSON Web Token (JWT)是一个开放标准(RFC 7519),它定义了一种紧凑的、自包含的方式,用于作为JSON对象在各方之间安全地传输信息。 JWT全称…

《Python趣味工具》——自制emoji2(2)

今天,我们将会完成以下2个内容: 绘制静态emoji总结turtle中常用的绘图函数 文章目录 一、绘制静态emoji::sparkles: 画脸::sparkles:绘制嘴巴::sparkles:绘制眼白:绘制眼白-Part1:绘制眼白—pa…

积木报表 JimuReport v1.6.2-GA5版本发布—高危SQL漏洞安全加固版本

项目介绍 一款免费的数据可视化报表,含报表和大屏设计,像搭建积木一样在线设计报表!功能涵盖,数据报表、打印设计、图表报表、大屏设计等! Web 版报表设计器,类似于excel操作风格,通过拖拽完成报…

Apache Spark 在爱奇艺的应用实践

01 Apache Spark 在爱奇艺的现状 Apache Spark 是爱奇艺大数据平台主要使用的离线计算框架,并支持部分流计算任务,用于数据处理、数据同步、数据查询分析等场景: 数据处理:在数据开发平台中支持开发者提交 Spark Jar 包任务或Spar…

构建无限画布,协作数字绘图 | 开源日报 0915

tldraw/tldraw Stars: 16.4k License: Apache-2.0 tldraw 是一个协作数字白板项目,可在 tldraw.com 上使用。它的编辑器、用户界面和其他底层库都是开源的,并且可以通过 npm 进行分发。您可以使用 tldraw 为产品创建一个即插即用的白板,或者…

Python Opencv实践 - 视频文件写入(格式和分辨率修改)

参考资料: python opencv写视频——cv2.VideoWriter()_cv2.cv.videowriter(_翟羽嚄的博客-CSDN博客 import cv2 as cv import numpy as np#1. 打开原始视频 video_in cv.VideoCapture("../SampleVideos/Unity2D.mp4") video_width int(video_in.get(c…

道路空间功率谱密度与时间功率谱密度(笔记)

1.先上代码其中之一 clc clear close all %% SimTime200; dt0.01;%仿真步长 time0:dt:SimTime; sim_step length(time); Ntlength(time); % 采样点(可能要修改) u10; % m/s df1/(Nt*dt); % 采样频率间隔 f0:df:1/(2*dt); % 采用频率一…

网络安全深入学习第四课——热门框架漏洞(RCE— Log4j2远程代码执行)

文章目录 一、log4j2二、背景三、影响版本四、漏洞原理五、LDAP和JNDI是什么六、漏洞手工复现1、利用DNSlog来测试漏洞是否存在2、加载恶意文件Exploit.java,将其编译成class文件3、开启web服务4、在恶意文件Exploit.class所在的目录开启LDAP服务5、监听反弹shell的…

浅谈C++|文件篇

引子&#xff1a; 程序运行时产生的数据都属于临时数据&#xff0c;程序一旦运行结束都会被释放通过文件可以将数据持久化。C中对文件操作需要包含头文件< fstream > 。 C提供了丰富的文件操作功能&#xff0c;你可以使用标准库中的fstream库来进行文件的读取、写入和定位…

什么是16S rRNA,rDNA, 菌群研究为什么用16S测序,细菌如何命名分类?

谷禾健康 当谈到肠道菌群研究时&#xff0c;16S测序是一种常用的方法&#xff0c;它在了解微生物组成和多样性方面非常重要且实用。 16S rRNA是细菌和古细菌中的一个高度保守的基因片段&#xff0c;同时具有一定的变异性。通过对16S rRNA基因进行测序&#xff0c;可以确定微生物…