SAGA介绍
SAGA是“长时间事务”运作效率的方法,大致思路是把一个大事务分解为可以交错运行的一系列子事务的集合。原本提出 SAGA 的目的,是为了避免大事务长时间锁定数据库的资源,后来才逐渐发展成将一个分布式环境中的大事务,分解为一系列本地事务的设计模式。
SAGA事务典型的时序图
SAGA失败的时序图
如图TM事务管理器,DTM是开源的分布式事务管理中间件
DTM的SAGA支持
dtm根据http的不同状态码来代表当前事务的处理结果
dtm事务默认无回滚时间支持,尽最大能力交付
失败重试默认为指数回避算法。需要固定时间重试需要在saga属性配置
dtm默认事务执行顺序为并发执行也是顺序执行,可以设置属性为并行执行
http状态码当前版本不能完全代表业务成功需要结合 返回msg具体看业务代码
实战
代码在宿主机运行 docker network:bridge
docker安装,安装成功后可以访问http://localhost:36789/ 打开dtm事务web-ui
代码github GitHub - Ssummer520/dtm-gin
docker run -itd --name dtm -p 36789:36789 -p 36790:36790 yedf/dtm:latest
创建tm事务管理器提交全局事务
package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/gin-gonic/gin""github.com/lithammer/shortuuid/v3""log"
)func main() {app := gin.Default()app.GET("/test", func(c *gin.Context) {QsFireRequest()log.Printf("TransOut")c.JSON(200, "sss")})app.Run(":1111")}const qsBusiAPI = "/api/busi_start"
const qsBusiPortIN = 8881
const qsBusiPortOUT = 8880
const dtmServer = "http://localhost:36789/api/dtmsvr"var qsBusiIN = fmt.Sprintf("http://host.docker.internal:%d%s", qsBusiPortIN, qsBusiAPI)
var qsBusiOUT = fmt.Sprintf("http://host.docker.internal:%d%s", qsBusiPortOUT, qsBusiAPI)func QsFireRequest() string {req := &ReqHTTP{Amount: 30} // load of micro-service// DtmServer is the url of dtmsaga := dtmcli.NewSaga(dtmServer, shortuuid.New()).// add a TransOut sub-transaction,forward operation with url: qsBusi+"/TransOut", reverse compensation operation with url: qsBusi+"/TransOutCompensate"Add(qsBusiOUT+"/TransOut", qsBusiOUT+"/TransOutCompensate", req).// add a TransIn sub-transaction, forward operation with url: qsBusi+"/TransIn", reverse compensation operation with url: qsBusi+"/TransInCompensate"Add(qsBusiIN+"/TransIn", qsBusiIN+"/TransInCompensate", req)// submit the created saga transaction,dtm ensures all sub-transactions either complete or get revokedsaga.RetryInterval = 1//saga.RequestTimeout = 10err := saga.Submit()if err != nil {panic(err)}return saga.Gid
}type ReqHTTP struct {Amount int `json:"amount"`
}
saga全局事务属性设置
saga属性事务设置
type TransOptions struct {WaitResult bool `json:"wait_result,omitempty" gorm:"-"` // 是否等待结果,默认为falseTimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // 事务失败的超时时间,单位:秒RequestTimeout int64 `json:"request_timeout,omitempty" gorm:"-"` // 全局事务的请求超时时间,单位:秒RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // 重试间隔时间,单位:秒PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` // 需要传递的HTTP头部字段BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // 自定义的分支头部字段,DTM服务器到服务APIConcurrent bool `json:"concurrent" gorm:"-"` // 是否并发执行,适用于saga和消息事务类型
}
rm1表示第一个微服务业务
package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/dtm-labs/dtmcli/dtmimp""github.com/dtm-labs/dtmcli/logger""github.com/gin-gonic/gin""log""net/http"
)func main() {QsStartSvr()}// busi address
const qsBusiAPI = "/api/busi_start"
const qsBusiPort = 8881// QsStartSvr quick start: start server
func QsStartSvr() {app := gin.Default()qsAddRoute(app)log.Printf("quick start examples listening at %d", qsBusiPort)app.Run(fmt.Sprintf(":%d", qsBusiPort))}func qsAddRoute(app *gin.Engine) {app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {info := infoFromContext(c)var req ReqHTTPc.ShouldBindJSON(&req)log.Printf("TransIn:%v,gid:%v", req.Amount, info.Gid)c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess)) // Status 409 for Failure. Won't be retried})app.POST(qsBusiAPI+"/TransInCompensate", func(c *gin.Context) {info := infoFromContext(c)var req ReqHTTPc.ShouldBindJSON(&req)log.Printf("TransInCompensate:%v,gid:%v", req.Amount, info.Gid)c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))})}
func string2DtmError(str string) error {return map[string]error{dtmcli.ResultFailure: dtmcli.ErrFailure,dtmcli.ResultOngoing: dtmcli.ErrOngoing,dtmcli.ResultSuccess: nil,"": nil,}[str]
}type mainSwitchType struct {TransInResult AutoEmptyStringTransOutResult AutoEmptyStringTransInConfirmResult AutoEmptyStringTransOutConfirmResult AutoEmptyStringTransInRevertResult AutoEmptyStringTransOutRevertResult AutoEmptyStringQueryPreparedResult AutoEmptyStringNextResult AutoEmptyStringJrpcResult AutoEmptyStringFailureReason AutoEmptyString
}// AutoEmptyString auto reset to empty when used once
type AutoEmptyString struct {value string
}// SetOnce set a value once
func (s *AutoEmptyString) SetOnce(v string) {s.value = v
}// Fetch fetch the stored value, then reset the value to empty
func (s *AutoEmptyString) Fetch() string {v := s.values.value = ""if v != "" {logger.Debugf("fetch obtain not empty value: %s", v)}return v
}// MainSwitch controls busi success or fail
var MainSwitch mainSwitchTypefunc infoFromContext(c *gin.Context) *dtmcli.BranchBarrier {info := dtmcli.BranchBarrier{TransType: c.Query("trans_type"),Gid: c.Query("gid"),BranchID: c.Query("branch_id"),Op: c.Query("op"),}return &info
}type ReqHTTP struct {Amount int `json:"amount"`
}
rm2表示第二个微服务业务
package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/dtm-labs/dtmcli/dtmimp""github.com/dtm-labs/dtmcli/logger""github.com/gin-gonic/gin""log""net/http"
)func main() {app := gin.Default()app.POST(qsBusiAPI+"/TransOut", func(c *gin.Context) {info := infoFromContext(c)var req ReqHTTPc.ShouldBindJSON(&req)log.Printf("TransOut:%v,gid:%v", req.Amount, info.Gid)c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))})app.POST(qsBusiAPI+"/TransOutCompensate", func(c *gin.Context) {info := infoFromContext(c)var req ReqHTTPc.ShouldBindJSON(&req)log.Printf("TransOutCompensate:%vgid:%v", req.Amount, info.Gid)c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))})log.Printf("quick start examples listening at %d", qsBusiPort)app.Run(fmt.Sprintf(":%d", qsBusiPort))
}// busi address
const qsBusiAPI = "/api/busi_start"
const qsBusiPort = 8880// QsStartSvr quick start: start server
func QsStartSvr() {}type mainSwitchType struct {TransInResult AutoEmptyStringTransOutResult AutoEmptyStringTransInConfirmResult AutoEmptyStringTransOutConfirmResult AutoEmptyStringTransInRevertResult AutoEmptyStringTransOutRevertResult AutoEmptyStringQueryPreparedResult AutoEmptyStringNextResult AutoEmptyStringJrpcResult AutoEmptyStringFailureReason AutoEmptyString
}// AutoEmptyString auto reset to empty when used once
type AutoEmptyString struct {value string
}// SetOnce set a value once
func (s *AutoEmptyString) SetOnce(v string) {s.value = v
}// Fetch fetch the stored value, then reset the value to empty
func (s *AutoEmptyString) Fetch() string {v := s.values.value = ""if v != "" {logger.Debugf("fetch obtain not empty value: %s", v)}return v
}// MainSwitch controls busi success or fail
var MainSwitch mainSwitchTypetype ReqHTTP struct {Amount int `json:"amount"`
}func infoFromContext(c *gin.Context) *dtmcli.BranchBarrier {info := dtmcli.BranchBarrier{TransType: c.Query("trans_type"),Gid: c.Query("gid"),BranchID: c.Query("branch_id"),Op: c.Query("op"),}return &info
}
结果
运行tm提交一个全局事务
rm1返回
rm2返回
dtm webui管理页面
当前业务已经消费成功
我们把这块修改为rm1 提交失败,看到rm2事务回滚
const (// StatusPrepared 表示全局/分支事务的状态。// 第一步,事务准备阶段StatusPrepared = "prepared"// StatusSubmitted 表示全局事务的状态。StatusSubmitted = "submitted"// StatusSucceed 表示全局/分支事务的状态。StatusSucceed = "succeed"// StatusFailed 表示全局/分支事务的状态。// 注意:将全局状态更改为失败可以停止触发(在生产环境中不推荐)StatusFailed = "failed"// StatusAborting 表示全局事务的状态。StatusAborting = "aborting"// ResultSuccess 事务/事务分支的结果成功ResultSuccess = dtmimp.ResultSuccess// ResultFailure 事务/事务分支的结果失败ResultFailure = dtmimp.ResultFailure// ResultOngoing 事务/事务分支的结果进行中ResultOngoing = dtmimp.ResultOngoing// DBTypeMysql 数据库驱动类型:MySQLDBTypeMysql = dtmimp.DBTypeMysql// DBTypePostgres 数据库驱动类型:PostgreSQLDBTypePostgres = dtmimp.DBTypePostgres
)
参考资料SAGA事务模式 | DTM开源项目文档
https://zhuanlan.zhihu.com/p/688088173