Go 异步任务
异步任务在开发中很常见,用来做解耦。本文介绍一下异步队列的实现的几个问题,并且结合三方库实现来分析。
有下面的几个关键点:
- 用户代码(任务)如何封装
- 数据的存放(数据存放在哪里?就是一个读取队列)
- worker的管理(worker的数量,worker执行是否支持超时时间,worker的异常恢复)
带着上面的问题,对比https://github.com/golang-queue/queue的实现,说明一下。
用户代码如何封装
对于任务来说,最重要的是 函数操作,也就是对应的代码逻辑。go中是可以将方法作为参数传递的,方法也是一种类型。所以我们定义下面的方法,方法签名如下:
type TaskFunc func(ctx context.Context) error
还可以配置方法的callback逻辑,比如重试次数,重试间隔,重试error的判断等等
抽象出一个结构体来表示
https://github.com/golang-queue/queue/blob/master/job/job.go#L15
数据的存放
这是很好拓展的地方,可以支持多种存储媒介和中间件,比如基于内存实现的循环队列,redis,rocketmq。
在实现上就是接口抽象功能,依赖倒转。接口有下面的两个功能
- 存数据
- 取数据
https://github.com/golang-queue/queue/blob/master/core/worker.go
解释一下QueuedMessage
接口和Worker中的Run
方法
-
QueuedMessage
用来做数据转换的。
-
Run
用来执行函数,表示执行的任务。
worker的管理
worker管理涉及到下面几个方面
- worker的数量限制
- worker执行时候的超时时间
- worker执行时候的异常panic
- workder从队列中获取需要处理的处理,并且支持请求超时操作
- 服务关闭之后worker也需要操作
我们来看golang-queue/queue
中的实现是什么?
通过metric
来记录queue在运行期间具体的情况
https://github.com/golang-queue/queue/blob/master/metric.go#L20
并且通过 channel 来做限制。
每次在goroutine启动和停止的时候通过metric
来计数。并且会调用schedule
来发信号,给ready发送信号。
goroutine在启动的时候会select ready。
work的异常情况,在调用task的处理函数的时候,肯定要用到defer来做error恢复,并且通过channel来通信,context来实现超时控制。
具体的原理,我们从下面的代码开始来分析。
https://github.com/golang-queue/queue/blob/master/queue.go#L285
// 对于start来说,是一个死循环,会启动一个goroutine从work中获取数据,当前goroutine等待结果,并且启动goroutine来执行,此Goroutine叫做worker。
func (q *Queue) start() {// QueuedMessage 表示messagetasks := make(chan core.QueuedMessage, 1)// 启动一个goroutine来处理任务// 从work中获取任务,并且启动一个goroutine来处理任务for {// check worker number// 做调度的,就是检查work的数量q.schedule()// 数量不够,需要堵塞select {// wait worker readycase <-q.ready:case <-q.quit:return}// 启动一个goRoutine从 work中获取数据q.routineGroup.Run(func() {for {// 从队列中获取一个请求t, err := q.worker.Request()// 没有消息,或者有错误if t == nil || err != nil {// 有错误if err != nil {select {// 队列退出,关闭掉task,case <-q.quit:if !errors.Is(err, ErrNoTaskInQueue) {close(tasks)return}// 等待一秒再次从work中抓取新数据case <-time.After(time.Second):// sleep 1 second to fetch new task}}}if t != nil { // 说明取到了消息tasks <- treturn}// 说明t为nil但是没有错误select {case <-q.quit:if !errors.Is(err, ErrNoTaskInQueue) {close(tasks)return}default:}}})// 这就是从queue中获取一个task,之后将此task提交给work来实现task, ok := <-tasksif !ok {return}// 所以,这里并没有维护所谓的goroutine池,因为go的编程是不需要这些玩意的。goroutine已经很轻量级的了,直接提交运行就好了// start new taskq.metric.IncBusyWorker()q.routineGroup.Run(func() {q.work(task)})}
}func (q *Queue) work(task core.QueuedMessage) {var err error// 来处理一些内部的错误,在这里会减去worker的数量,并且重新scheduledefer func() {q.metric.DecBusyWorker()e := recover()if e != nil {q.logger.Errorf("panic error: %v", e)}q.schedule()// increase success or failure numberif err == nil && e == nil {q.metric.IncSuccessTask()} else {q.metric.IncFailureTask()}}()// 运行任务,可以看到这里的代码就是为了包装一下if err = q.run(task); err != nil {q.logger.Errorf("runtime error: %s", err.Error())}
}func (q *Queue) run(task core.QueuedMessage) error {data := task.(*job.Message)if data.Task == nil {data = job.Decode(task.Bytes())data.Data = data.Payload}return q.handle(data)
}func (q *Queue) handle(m *job.Message) error {// create channel with buffer size 1 to avoid goroutine leak// 这是go中很创建的做法,一个channel中有数据,但并没有被其他的任何的goroutine操作的话,也是会被gc掉的done := make(chan error, 1) // 完成的信号channelpanicChan := make(chan interface{}, 1) // panic的channelstartTime := time.Now() ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)defer func() {cancel()}()// run the job 启动goroutine来运行一个jobgo func() {// handle panic issuedefer func() {if p := recover(); p != nil {panicChan <- p}}()// run custom process functionvar err error// 做重试逻辑,这里的重试逻辑还可以指定重试的错误,比如as,基于那种类型的错误来做重试操作等。b := &backoff.Backoff{Min: m.RetryMin,Max: m.RetryMax,Factor: m.RetryFactor,Jitter: m.Jitter,}delay := m.RetryDelay// backoff都是通过for循环来做的loop:for {// 两种形式,一种是直接function,一直是通过messageif m.Task != nil {err = m.Task(ctx)} else {err = q.worker.Run(ctx, m)}// 不需要重试就直接返回,如果有错误就开始重试,并且利用time来做重试时间的控制if err == nil || m.RetryCount == 0 {break}m.RetryCount--if m.RetryDelay == 0 {delay = b.Duration()}// 这里用select来做操作select {case <-time.After(delay): // retry delayq.logger.Infof("retry remaining times: %d, delay time: %s", m.RetryCount, delay)case <-ctx.Done(): // timeout reached // ctx完成就直接返回err = ctx.Err()break loop}}done <- err}()// 当前的goroutine在等待结果,select {case p := <-panicChan:panic(p)case <-ctx.Done(): // timeout reachedreturn ctx.Err()case <-q.quit: // shutdown service// cancel jobcancel()leftTime := m.Timeout - time.Since(startTime)// wait jobselect {case <-time.After(leftTime):return context.DeadlineExceededcase err := <-done: // job finishreturn errcase p := <-panicChan:panic(p)}case err := <-done: // job finishreturn err}
}
有个问题,如何保证程序退出的时候这些work可以执行结束呢?利用waitGroup实现。
https://github.com/golang-queue/queue/blob/master/thread.go