引言
实际上,在服务端程序开发和爬虫程序开发时,我们的大多数业务都是IO密集型业务,什么是IO密集型业务,通俗地说就是CPU运行时间只占整个业务执行时间的一小部分,而剩余的大部分时间都在等待IO操作。
IO操作包括http请求、数据库查询、文件读取、摄像设备录音设备的输入等等。这些IO操作会引起中断,使业务线程暂时放弃cpu,暂时停止运行,等待IO操作完成后在重新获得cpu、继续运行下去。
比如下面这段代码,我执行了一个很简单的IO操作,就是请求百度的主页面。在IO操作之后我又执行了一个一千万数量级的循环,用来模拟cpu计算业务。
func main() {t := time.Now().UnixMilli()// 发起http请求病接收响应res, _ := http.Get("https://www.baidu.com")fmt.Printf("https请求结束,耗时%dms\n", time.Now().UnixMilli()-t)// 进行1e7次计算操作,用来模拟业务处理时cpu计算内容body, _ := io.ReadAll(res.Body)res.Body.Close()for i := 1; i <= 1e7; i++ {_ = i * i_ = i + i}fmt.Printf("程序运行结束,总耗时%dms, 数据长度为%d", time.Now().UnixMilli()-t, len(body))
}
这段程序在我的电脑上的输出结果是:
https请求结束,耗时250ms
程序运行结束,总耗时256ms, 数据长度为227
不难看出,向百度发起请求耗费了250ms,而一千万次cpu计算仅耗费6ms。(实际情况中,大多数业务的cpu计算量甚至远远达不到1e7级别)
我们为什么要引入并发呢,正如操作系统课程上讲的那样,目的就是为了提高cpu的利用率,如果某个线程正处在等待IO的状态,此时cpu是空闲的,那么我们就应该用cpu去执行别的线程,而不是傻傻的等待这个进程结束再去进行别的操作。
并发样例
假设现在有一个爬虫项目,它的业务流程如下图所示,我大致描述一下流程:
- 首先要爬取A和B的页面,并解析页面结果
- 然后根据B页面的解析结果,设定好相关参数,对C、D、F页面进行爬取;同样的根据A页面的解析结果,设定好相关参数,对E页面进行爬取
- 接下来根据C、D页面的解析结果,设定好相关参数,对G页面进行爬取
- 最后,对E、F、G三个页面进行解析,得到我们需要的最终数据
虽然只是假设,但类似的业务流程在爬虫项目中是很常见的。
除此之外,类似的业务流程在后端程序开发中也很常见,对于前端的一个请求,后端很可能需要多次访问数据库、向下游服务器发送请求、访问内存外的缓存数据等等。
每个页面爬取耗费的时间如图中所示,由于cpu计算耗费时间很短,所以在这里就忽略不计。我们用sleep函数来模拟发起请求耗费的时间,代码如下图所示,每个task函数代表爬取一个页面:
func taskA() string {time.Sleep(time.Millisecond * 40)return "AAA"
}
func taskB() string {time.Sleep(time.Millisecond * 30)return "BBB"
}
func taskC(resultOfB string) string {time.Sleep(time.Millisecond * 30)return "CCC"
}
func taskD(resultOfB string) string {time.Sleep(time.Millisecond * 30)return "DDD"
}
func taskF(resultOfB string) string {time.Sleep(time.Millisecond * 30)return "FFF"
}
func taskE(resultOfA string) string {time.Sleep(time.Millisecond * 30)return "EEE"
}
func taskG(resultOfC, resultOfD string) string {time.Sleep(time.Millisecond * 30)return "GGG"
}
串行
串行,也就是不使用并发的代码如下所示:
串行的代码非常好写,从前往后把task函数顺序执行就行了,之所以在这里把串行代码和运行结果列出了,主要是为了和下面的并发做对比。
func main() {t := time.Now().UnixMilli()// 按照任务执行的先后要求,顺序执行所有任务resultOfA := taskA()fmt.Printf(" %dms: A over\n", time.Now().UnixMilli()-t)resultOfB := taskB()fmt.Printf(" %dms: B over\n", time.Now().UnixMilli()-t)resultOfC := taskC(resultOfB)fmt.Printf(" %dms: C over\n", time.Now().UnixMilli()-t)resultOfD := taskD(resultOfB)fmt.Printf(" %dms: D over\n", time.Now().UnixMilli()-t)resultOfE := taskE(resultOfA)fmt.Printf(" %dms: E over\n", time.Now().UnixMilli()-t)resultOfF := taskF(resultOfB)fmt.Printf(" %dms: F over\n", time.Now().UnixMilli()-t)resultOfG := taskG(resultOfC, resultOfD)fmt.Printf(" %dms: G over\n", time.Now().UnixMilli()-t)// 打印E、F、G的运行结果,至此程序就运行结束了,打印程序运行所耗费的时间fmt.Printf(" %dms: all over, %s, %s, %s\n", time.Now().UnixMilli()-t, resultOfE, resultOfF, resultOfG)
}
程序的执行结果如下所示,可以看到效率很慢很慢,总执行时间等于所有任务执行时间之和。
41ms: A over
73ms: B over
118ms: C over
164ms: D over
194ms: E over
240ms: F over
285ms: G over
286ms: all over, EEE, FFF, GGG
并发
本文的重点来了,如何并发地执行上面假设的爬虫程序?怎样才能让cpu的利用效率最高?
go为我们提供了非常好用的goroutine和chan,前者叫做协程也可以简单地认为是小型线程,能够以极小的开销和极快的速度启动一个并发任务;后者叫做通道,也可以叫管道,是协程和协程间通信的工具,不仅能够传递数据,还能够阻塞和唤醒协程从而实现协程间的同步。
在下面的代码中,我们使用通道来进行任务与任务之间的通信,我们为每一个任务都开一个协程,如果该任务没有前置依赖,那么就之间执行,然后把执行结果放到对应的通道中;如果该任务有前置依赖任务,那么先从通道中读取自己所需要的数据,然后再执行相应任务。
代码如下所示,逻辑很简单,主要是体现了一种并发的思想和并发执行任务的思路,现实情况中并发业务的代码肯定要复杂得多。
func main() {t := time.Now().UnixMilli()// AtoE代表A把自己的运行结果交给E的所经过的通道,下面同理AtoE := make(chan string, 1)BtoC := make(chan string, 1)BtoF := make(chan string, 1)BtoD := make(chan string, 1)CtoG := make(chan string, 1)DtoG := make(chan string, 1)GtoEnd := make(chan string, 1)FtoEnd := make(chan string, 1)EtoEnd := make(chan string, 1)// 每个go func代表着给某个任务开一个协程// 为A任务开个协程go func() {resultOfA := taskA()AtoE <- resultOfAfmt.Printf(" %dms: A over\n", time.Now().UnixMilli()-t)}()// 为B任务开个协程go func() {resultOfB := taskB()BtoF <- resultOfBBtoC <- resultOfBBtoD <- resultOfBfmt.Printf(" %dms: B over\n", time.Now().UnixMilli()-t)}()// 为C任务开个协程go func() {resultOfB := <-BtoCresultOfC := taskC(resultOfB)CtoG <- resultOfCfmt.Printf(" %dms: C over\n", time.Now().UnixMilli()-t)}()// 为D任务开个协程go func() {resultOfB := <-BtoDresultOfD := taskC(resultOfB)DtoG <- resultOfDdefer fmt.Printf(" %dms: D over\n", time.Now().UnixMilli()-t)}()// 为F任务开个协程go func() {resultOfB := <-BtoFresultOfF := taskF(resultOfB)FtoEnd <- resultOfFfmt.Printf(" %dms: F over\n", time.Now().UnixMilli()-t)}()// 为E任务开个协程go func() {resultOfA := <-AtoEresultOfE := taskC(resultOfA)EtoEnd <- resultOfEfmt.Printf(" %dms: E over\n", time.Now().UnixMilli()-t)}()// 为G任务开个协程go func() {resultOfC := <-CtoGresultOfD := <-DtoGresultOfG := taskG(resultOfC, resultOfD)GtoEnd <- resultOfGfmt.Printf(" %dms: G over\n", time.Now().UnixMilli()-t)}()// 接收E、F、G的运行结果,至此程序就运行结束了,打印程序运行所耗费的时间resultOfE, resultOfF, resultOfG := <-EtoEnd, <-FtoEnd, <-GtoEndfmt.Printf(" %dms: all over, %s, %s, %s\n", time.Now().UnixMilli()-t, resultOfE, resultOfF, resultOfG)
}
程序的执行结果如下所示,可以看到程序运行中花费了105ms,很接近关键路径的长度90ms,说明我们这个程序的并发性很好,在最大程度上实现了cpu的高效利用。(PS:关键路径begin → B→ C → G → end)
43ms: A over
44ms: B over
74ms: E over
74ms: C over
74ms: D over
74ms: F over
105ms: G over
105ms: all over, CCC, FFF, GGG