Golang基于DTM的分布式事务SAGA实战

SAGA介绍

SAGA是“长时间事务”运作效率的方法,大致思路是把一个大事务分解为可以交错运行的一系列子事务的集合。原本提出 SAGA 的目的,是为了避免大事务长时间锁定数据库的资源,后来才逐渐发展成将一个分布式环境中的大事务,分解为一系列本地事务的设计模式

SAGA事务典型的时序图

SAGA失败的时序图

如图TM事务管理器,DTM是开源的分布式事务管理中间件

DTM的SAGA支持

dtm根据http的不同状态码来代表当前事务的处理结果

dtm事务默认无回滚时间支持,尽最大能力交付

失败重试默认为指数回避算法。需要固定时间重试需要在saga属性配置

dtm默认事务执行顺序为并发执行也是顺序执行,可以设置属性为并行执行

http状态码当前版本不能完全代表业务成功需要结合 返回msg具体看业务代码

实战

代码在宿主机运行 docker network:bridge

docker安装,安装成功后可以访问http://localhost:36789/ 打开dtm事务web-ui

代码github GitHub - Ssummer520/dtm-gin

docker run -itd  --name dtm -p 36789:36789 -p 36790:36790  yedf/dtm:latest
创建tm事务管理器提交全局事务
package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/gin-gonic/gin""github.com/lithammer/shortuuid/v3""log"
)func main() {app := gin.Default()app.GET("/test", func(c *gin.Context) {QsFireRequest()log.Printf("TransOut")c.JSON(200, "sss")})app.Run(":1111")}const qsBusiAPI = "/api/busi_start"
const qsBusiPortIN = 8881
const qsBusiPortOUT = 8880
const dtmServer = "http://localhost:36789/api/dtmsvr"var qsBusiIN = fmt.Sprintf("http://host.docker.internal:%d%s", qsBusiPortIN, qsBusiAPI)
var qsBusiOUT = fmt.Sprintf("http://host.docker.internal:%d%s", qsBusiPortOUT, qsBusiAPI)func QsFireRequest() string {req := &ReqHTTP{Amount: 30} // load of micro-service// DtmServer is the url of dtmsaga := dtmcli.NewSaga(dtmServer, shortuuid.New()).// add a TransOut sub-transaction,forward operation with url: qsBusi+"/TransOut", reverse compensation operation with url: qsBusi+"/TransOutCompensate"Add(qsBusiOUT+"/TransOut", qsBusiOUT+"/TransOutCompensate", req).// add a TransIn sub-transaction, forward operation with url: qsBusi+"/TransIn", reverse compensation operation with url: qsBusi+"/TransInCompensate"Add(qsBusiIN+"/TransIn", qsBusiIN+"/TransInCompensate", req)// submit the created saga transaction,dtm ensures all sub-transactions either complete or get revokedsaga.RetryInterval = 1//saga.RequestTimeout = 10err := saga.Submit()if err != nil {panic(err)}return saga.Gid
}type ReqHTTP struct {Amount int `json:"amount"`
}

saga全局事务属性设置

saga属性事务设置
type TransOptions struct {WaitResult         bool              `json:"wait_result,omitempty" gorm:"-"`     // 是否等待结果,默认为falseTimeoutToFail      int64             `json:"timeout_to_fail,omitempty" gorm:"-"` // 事务失败的超时时间,单位:秒RequestTimeout     int64             `json:"request_timeout,omitempty" gorm:"-"` // 全局事务的请求超时时间,单位:秒RetryInterval      int64             `json:"retry_interval,omitempty" gorm:"-"`  // 重试间隔时间,单位:秒PassthroughHeaders []string          `json:"passthrough_headers,omitempty" gorm:"-"` // 需要传递的HTTP头部字段BranchHeaders      map[string]string `json:"branch_headers,omitempty" gorm:"-"`  // 自定义的分支头部字段,DTM服务器到服务APIConcurrent         bool              `json:"concurrent" gorm:"-"`                // 是否并发执行,适用于saga和消息事务类型
}
rm1表示第一个微服务业务
package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/dtm-labs/dtmcli/dtmimp""github.com/dtm-labs/dtmcli/logger""github.com/gin-gonic/gin""log""net/http"
)func main() {QsStartSvr()}// busi address
const qsBusiAPI = "/api/busi_start"
const qsBusiPort = 8881// QsStartSvr quick start: start server
func QsStartSvr() {app := gin.Default()qsAddRoute(app)log.Printf("quick start examples listening at %d", qsBusiPort)app.Run(fmt.Sprintf(":%d", qsBusiPort))}func qsAddRoute(app *gin.Engine) {app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {info := infoFromContext(c)var req ReqHTTPc.ShouldBindJSON(&req)log.Printf("TransIn:%v,gid:%v", req.Amount, info.Gid)c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess)) // Status 409 for Failure. Won't be retried})app.POST(qsBusiAPI+"/TransInCompensate", func(c *gin.Context) {info := infoFromContext(c)var req ReqHTTPc.ShouldBindJSON(&req)log.Printf("TransInCompensate:%v,gid:%v", req.Amount, info.Gid)c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))})}
func string2DtmError(str string) error {return map[string]error{dtmcli.ResultFailure: dtmcli.ErrFailure,dtmcli.ResultOngoing: dtmcli.ErrOngoing,dtmcli.ResultSuccess: nil,"":                   nil,}[str]
}type mainSwitchType struct {TransInResult         AutoEmptyStringTransOutResult        AutoEmptyStringTransInConfirmResult  AutoEmptyStringTransOutConfirmResult AutoEmptyStringTransInRevertResult   AutoEmptyStringTransOutRevertResult  AutoEmptyStringQueryPreparedResult   AutoEmptyStringNextResult            AutoEmptyStringJrpcResult            AutoEmptyStringFailureReason         AutoEmptyString
}// AutoEmptyString auto reset to empty when used once
type AutoEmptyString struct {value string
}// SetOnce set a value once
func (s *AutoEmptyString) SetOnce(v string) {s.value = v
}// Fetch fetch the stored value, then reset the value to empty
func (s *AutoEmptyString) Fetch() string {v := s.values.value = ""if v != "" {logger.Debugf("fetch obtain not empty value: %s", v)}return v
}// MainSwitch controls busi success or fail
var MainSwitch mainSwitchTypefunc infoFromContext(c *gin.Context) *dtmcli.BranchBarrier {info := dtmcli.BranchBarrier{TransType: c.Query("trans_type"),Gid:       c.Query("gid"),BranchID:  c.Query("branch_id"),Op:        c.Query("op"),}return &info
}type ReqHTTP struct {Amount int `json:"amount"`
}
rm2表示第二个微服务业务
package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/dtm-labs/dtmcli/dtmimp""github.com/dtm-labs/dtmcli/logger""github.com/gin-gonic/gin""log""net/http"
)func main() {app := gin.Default()app.POST(qsBusiAPI+"/TransOut", func(c *gin.Context) {info := infoFromContext(c)var req ReqHTTPc.ShouldBindJSON(&req)log.Printf("TransOut:%v,gid:%v", req.Amount, info.Gid)c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))})app.POST(qsBusiAPI+"/TransOutCompensate", func(c *gin.Context) {info := infoFromContext(c)var req ReqHTTPc.ShouldBindJSON(&req)log.Printf("TransOutCompensate:%vgid:%v", req.Amount, info.Gid)c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))})log.Printf("quick start examples listening at %d", qsBusiPort)app.Run(fmt.Sprintf(":%d", qsBusiPort))
}// busi address
const qsBusiAPI = "/api/busi_start"
const qsBusiPort = 8880// QsStartSvr quick start: start server
func QsStartSvr() {}type mainSwitchType struct {TransInResult         AutoEmptyStringTransOutResult        AutoEmptyStringTransInConfirmResult  AutoEmptyStringTransOutConfirmResult AutoEmptyStringTransInRevertResult   AutoEmptyStringTransOutRevertResult  AutoEmptyStringQueryPreparedResult   AutoEmptyStringNextResult            AutoEmptyStringJrpcResult            AutoEmptyStringFailureReason         AutoEmptyString
}// AutoEmptyString auto reset to empty when used once
type AutoEmptyString struct {value string
}// SetOnce set a value once
func (s *AutoEmptyString) SetOnce(v string) {s.value = v
}// Fetch fetch the stored value, then reset the value to empty
func (s *AutoEmptyString) Fetch() string {v := s.values.value = ""if v != "" {logger.Debugf("fetch obtain not empty value: %s", v)}return v
}// MainSwitch controls busi success or fail
var MainSwitch mainSwitchTypetype ReqHTTP struct {Amount int `json:"amount"`
}func infoFromContext(c *gin.Context) *dtmcli.BranchBarrier {info := dtmcli.BranchBarrier{TransType: c.Query("trans_type"),Gid:       c.Query("gid"),BranchID:  c.Query("branch_id"),Op:        c.Query("op"),}return &info
}
结果

运行tm提交一个全局事务

rm1返回

rm2返回

dtm webui管理页面

当前业务已经消费成功

我们把这块修改为rm1 提交失败,看到rm2事务回滚

const (// StatusPrepared 表示全局/分支事务的状态。// 第一步,事务准备阶段StatusPrepared = "prepared"// StatusSubmitted 表示全局事务的状态。StatusSubmitted = "submitted"// StatusSucceed 表示全局/分支事务的状态。StatusSucceed = "succeed"// StatusFailed 表示全局/分支事务的状态。// 注意:将全局状态更改为失败可以停止触发(在生产环境中不推荐)StatusFailed = "failed"// StatusAborting 表示全局事务的状态。StatusAborting = "aborting"// ResultSuccess 事务/事务分支的结果成功ResultSuccess = dtmimp.ResultSuccess// ResultFailure 事务/事务分支的结果失败ResultFailure = dtmimp.ResultFailure// ResultOngoing 事务/事务分支的结果进行中ResultOngoing = dtmimp.ResultOngoing// DBTypeMysql 数据库驱动类型:MySQLDBTypeMysql = dtmimp.DBTypeMysql// DBTypePostgres 数据库驱动类型:PostgreSQLDBTypePostgres = dtmimp.DBTypePostgres
)

参考资料SAGA事务模式 | DTM开源项目文档

https://zhuanlan.zhihu.com/p/688088173

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/404378.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

The Sandbox 新提案: 2024 年亚洲和拉丁美洲区块链活动预算

理事会建议: 积极 🙂 内容 此提案请求为2024年第四季度,The Sandbox 在东南亚和拉丁美洲的主要区块链活动中的激活分配 94,500 美元的 SAND 倡议预算。(具体活动列表见下方活动描述) 原因 区域团队希望在这些现场活…

一文学会本地部署可视化应用JSONCrack并配置公网地址实现远程协作

文章目录 前言1. Docker安装JSONCrack2. 安装Cpolar内网穿透工具3. 配置JSON Crack界面公网地址4. 远程访问 JSONCrack 界面5. 固定 JSONCrack公网地址 前言 本文主要介绍如何在Linux环境使用Docker安装数据可视化工具JSONCrack,并结合cpolar内网穿透工具实现团队在…

【二分查找】--- 进阶题目赏析

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏: 算法Journey 本篇博客我们继续来了解一些有关二分查找算法的进阶题目。 🏠 寻找峰值 📌 题目内容 162. 寻找峰值 - 力扣&#…

【python爬虫】邮政包裹物流查询2瑞数6加密

大家好呀,我是你们的好兄弟【星云牛马】,今天给大家带来的是瑞数6的补环境的总结,补环境肯定是需要一些基础补环境知识的,所以建议没有基础的小伙伴可以加入学习群进行学习,有基础的伙伴加入交流起来。 QQ群&#xff…

用C#写一个随机音乐播放器

form1中namespce里的代码如下 public partial class Form1 : Form {public Form1(){InitializeComponent();}private void button1_Click(object sender, EventArgs e){string folder textBox1.Text;string folderPath folder; // 指定音频文件所在的文件夹路径OpenRandomFi…

thinkphp5漏洞分析之文件包含

目录 一、环境 二、开始研究 三、漏洞分析 四、漏洞修复 五、攻击总结 一、环境 thinkphp官网下载 创建 application/index/view/index/index.html 文件,内容随意(没有这个模板文件的话,在渲染时程序会报错) 二、开始研究 创…

后端开发刷题 | 链表内指定区间反转【链表篇】

描述 将一个节点数为 size 链表 m 位置到 n 位置之间的区间反转,要求时间复杂度 O(n)O(n),空间复杂度 O(1)O(1)。 例如: 给出的链表为 1→2→3→4→5→NULL1→2→3→4→5→NULL, m2,n4 返回 1→4→3→2→5→NULL 数据范围: 链表…

java使用itext 直接生成pdf

itext 使用 需求背景itext 的使用依赖简单示例基础设置(页面大小、边距、字体等)段落内部,特殊设置关键字 字体或颜色生成动态表格页脚展示页数其他设置密码添加水印(背景图)目录Header, Footer分割 PDF合并 PDF 需求背…

Oracle+ASM+High冗余详解及空间计算

Oracle ASM(Automatic Storage Management)的High冗余模式是一种提供高度数据保护的策略,它通过创建多个数据副本来确保数据的可用性和安全性。 以下是关于Oracle ASM High冗余的详细解释: 一、High冗余的特点 1.数据冗余度 在Hi…

ThreadLocal 详解

文章目录 1.什么是Thradlocal2.Thradlocal常见的API3.什么是内存溢出与内存泄漏内存溢出 (Memory Overflow)内存泄漏 (Memory Leak) 4.强 软 弱 虚引用实现区别5.Threadlocal原理分析set方法get方法 6.Threadlocal产生内存泄漏问题断点查看变化 1.什么是Thradlocal ThreadLoca…

Golang基于DTM的分布式事务TCC实战

Golang基于DTM的分布式事务SAGA实战-CSDN博客 源代码:https://github.com/Ssummer520/dtm-gin 我们可以通过canal来监听转账表的binlog来看数据库变更docker-compose 安装canal-CSDN博客 代码在宿主机运行 docker network:bridge docker安装,安装成功后可以访问h…

python提取b站视频的音频(提供源码

如果我想开一家咖啡厅,那么咖啡厅的音乐可得精挑细选!又假设我非常喜欢o叔,而o叔只在b站弹钢琴,那这时候我就得想方设法把b站的视频转为音频咯! 一、首先打开网页版bilibili,按F12: 二、刷新页面…

力扣爆刷第174天之TOP200五连刷136=140(最小k数、字典序、跳跃游戏)

力扣爆刷第174天之TOP200五连刷136140(最小k数、字典序、跳跃游戏) 文章目录 力扣爆刷第174天之TOP200五连刷136140(最小k数、字典序、跳跃游戏)一、LCR 159. 库存管理 III二、450. 删除二叉搜索树中的节点三、440. 字典序的第K小…

【Spark集群部署系列一】Spark local模式介绍和搭建以及使用(内含Linux安装Anaconda)

简介 注意: 在部署spark集群前,请部署好Hadoop集群,jdk8【当然Hadoop集群需要运行在jdk上】,需要注意hadoop,spark的版本,考虑兼容问题。比如hadoop3.0以上的才兼容spark3.0以上的。 下面是Hadoop集群部署…

【Oracle篇】统计信息和动态采样的深度剖析(第一篇,总共六篇)

💫《博主介绍》:✨又是一天没白过,我是奈斯,DBA一名✨ 💫《擅长领域》:✌️擅长Oracle、MySQL、SQLserver、阿里云AnalyticDB for MySQL(分布式数据仓库)、Linux,也在扩展大数据方向的知识面✌️…

网络协议 十一 ARP,RARP,icmp,websocket,webservice,HTTPDNS,FTP,邮件相关的协议, SMTP,POP,IMAP

ARP 已知IP 求 MAC 的过程 RARP 已知MAC 求 IP 的过程,已被DHCP取代 ICMP websocket 协议,html5中提出的前端使用协议 webservice 技术,已过时 HTTPDNS 之前我们要获得 某一个域名的 IP ,要通过DNS协议 去 运营商的ISP 查询&…

计算机毕业设计 饮食营养管理信息系统 平衡膳食管理系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点…

一文入门re 正则表达式

一、常用方法 (一)匹配 一般使用方法 第一个参数:正则模式 第二个参数:需要处理的字符串 第三个参数:附加处理方法result从任意位置开始匹配,返回match,没有匹配到返回None result re.searc…

Linux:CentOS配置

一,安装VMware 这个可以通过官网获取 vmware下载 也可以联系我,我发给你 二,安装CentOS Centos官网找要下载的版本: https://vault.centos.org/ 阿里云镜像:https://mirrors.aliyun.com/centos-vault/?spma2c6h.13…

window搭建代理ip池:详细的搭建指南分享

在Windows上搭建代理IP池的指南 在进行网络爬虫或其他需要频繁请求的任务时,建立一个代理IP池可以有效提高抓取效率和隐私保护。本文将详细介绍如何在Windows环境下搭建一个简单的代理IP池。 1. 准备工作 在开始之前,请确保你具备以下条件&#xff1a…