如何从零开始手写一个消息中间件(从宏观角度理解消息中间件的技术原理)
- 什么是消息中间件
- 消息中间件的作用
- 逐一拆解消息中间件的核心技术
- 消息中间件核心技术总览
- IO
- BIO
- NIO
- IO多路复用
- AIO
- IO多路复用详细分析
- select
- poll
- epoll
- Java中的IO多路复用
- 协议
- 序列化
- 消息的存储
- 消息的读写
- 随机写、顺序写
- 内存映射、零拷贝
- 普通读写函数
- 内存映射mmap()
- sendfile()
- 服务注册与发现
- 分区并行与消费者组rebalance机制
- 主从复制
- 消息定时清理
什么是消息中间件
消息中间件,也就是俗称的MQ。消息中间件是一个在分布式环境下提供消息收发能力的服务,消息生产者把消息发送到消息中间件,然后消息中间件存储生产者发送的消息,消息消费者请求消息中间件拉取消息,拉取到后进行消费。通过消息中间件,两个服务间可以异步的传递消息,满足了服务间消息传递的需求的同时,又避免了服务间的强依赖,达到了异步和解耦的效果。
消息中间件的作用
首先消息中间件的第一个功能就是可以做到异步。如果我们一个接口的处理逻辑比较复杂,调用链路比较长,耗时比较久,但是客户端又不需要马上得到响应结果,那么我们可以先把请求相关的信息存到MQ,然后再由后面的消费者去消费这个消息,处理这个请求,这样就能达到快速响应客户端的效果。把实时性要求不高的请求通过MQ来滞后请求的处理,可以在一定程度上提升接口的性能。
消息中间件的第二个作用就是解耦合。一个服务接收到请求,需要通知其他服务来进行相应的处理,如果采用同步通知的方式的话,就只能通过调用接口的方式,假如后面又增加了几个服务,也需要收到通知,那么就只能通过改代码的方式增加服务调用,这种方式是很不利于代码的维护的,要频繁的修改代码然后重新打包部署,非常的麻烦。但是如果我们通过MQ来实现异步的通知,那么生产者只需要发送一次消息到MQ,其他服务需要收到通知的服务,则作为消费者去监听MQ对应的消息,收到消息时进行相应的处理,后续如果有其他服务也需要接收到通知,不需要找生产者加代码,而是通过监听MQ的,就能收到同样的消息,这样就不需要频繁的修改代码,一定程度上提升了可维护性。
消息中间件还有一个作用就是削峰。假如我们的服务某一时刻接收大量的请求,达到了一个非常高的峰值,如果使用同步的方式来处理,服务器很可能扛不住压力而挂掉。如果我们先把请求存到MQ,快速响应客户端,这样就能减轻服务器的压力。然后存到MQ的消息由消费者异步慢慢的处理,这样就可以快速地削掉并发请求的峰值,有效的缓解服务器的压力。
逐一拆解消息中间件的核心技术
消息中间件被应用在分布式环境中提供高性能、高可靠的消息收发服务。它通常不是一个单一的简单服务,如果它是一个单一的服务,首先就不满足高可靠,一个服务挂了,生产者和消费者间的通信就断了;其次它也不是一个简单服务,里面涉及到许多技术,如果它是一个简单服务的话,比如消息的收发都是简单的http请求,然后数据存到数据库,虽然这样也可以实现消息收发的功能,但是这种消息中间件性能并不高,只能用来玩一玩,没用什么实际的作用,因此正规的消息中间件它的内部组成都是非常复杂的。
消息中间件核心技术总览
首先消息要发送到MQ,就要经过网络,因此就会有网络IO,所以IO是消息中间间需要考虑的一个技术点。
在网络间传递消息,通常都会有固定的协议,比如http协议,dns协议,不同协议有不同的报文格式,使用http协议的话性能太低,所以消息中间件还应该考虑实现自己的协议,定义自己的报文格式。
然后我们的消息在内存中通常是一个对象,如果要网络发送出去,必须要进行序列化。
当消息中间件接收到消息后,需要进行存储,因此消息中间件要设计自己的消息存储格式。
消息中间件接收生产者发送来的消息,然后消费者请求消息中间件拉取消息,因此消息中间件要设计自己的消息读写策略,一个高性能的MQ它的读写必须是高性能的。
消息中间件通常应该在分布式环境下,它本身应该也是分布式的,所以消息中间件的地址应该要支持动态的发现,不能写死在生产者和消费者的配置文件里面,因此消息中间件还要设计自己的服务注册与发现的机制。
如果消息中间件要提高自己的吞吐量的化,必须支持消息分区并行消费,消息分区通常伴随着消费者组一起出现,因此消息中间件还要考虑如何实现分区并行以及消费者如果给消费者分配自己负载的分区,也就是rebalance。
消息中间件还要考虑如何实现消息的可靠性,存到消息中间件的消息,应该尽量保证不丢失,因此消息中间件还要考虑主从复制。
最后,存到消息中间件中的消息不能无限堆积,要定时清理,所以消息中间件还要设计自己的消息定时清理的机制。
因此,消息中间件涉及到的技术点就有以下几项:
- IO
- 协议
- 序列化
- 消息的存储
- 消息的读写
- 服务注册与发现
- 分区并行与消费者组rebalance机制
- 主从复制
- 消息定时清理
下面我们对它们进行详细的分析。
IO
消息中间件是在生产者和消费者之间提供消息收发能力的服务,消息的传递涉及到网络IO。一款高性能的消息中间件,它的底层使用的必然是高性能的IO模型。
IO模型有BIO、NIO、IO多路复用、AIO。
BIO
BIO就是传统的阻塞式IO,在BIO的模式下,如果服务器端调用Socket的accept()方法监听连接,如果此时没有客户端连接,当前线程会一直阻塞,等待连接的到来;当调用Socket的read()函数读取消息时,如果此时没有消息,当前线程会一直阻塞等待,直到有消息到达,拷贝到用户空间,当前线程才会返回。
可以看到无论是等待连接建立、还是等待数据的到来,都是阻塞式的等待,性能是非常低的。一旦没有客户端连接,或者客户端不发送数据,服务器的当前线程就会一直阻塞住,不干别的事情。
NIO
NIO相对于BIO做的优化就是,当此时没有数据时,当前线程不会阻塞,而是马上返回,当前有数据达到,则会阻塞进行数据拷贝,数据拷贝到用户空间后才会返回。因此当前线程需要不断的轮询,检查是否有数据到达,如果没有数据到达,当前线程可以干点别的事情,然后再次轮询,如果有数据到达,当前线程才会被阻塞,因此性能比BIO要高。建立连接和读取数据一样,也是类似的机制。
由于没有连接需要建立或者没有数据到达时,当前线程不会阻塞,因此性能较BIO是有所提高的。但是当前线程需要不停的轮询,这样显得有点傻傻的,不太灵活。而且这种轮询的机制,完全可以由操作系统帮我们实现,无需用户自己去编写代码,因此就有了后面的IO多路复用。
IO多路复用
IO多路复用对NIO进行了优化,当前线程不需要轮询,而是向操作系统注册一个事件。事件有不同的类型,比如连接建立事件,数据读取事件等,我们可以同时注册不同类型的事件。比如注册了数据读取事件,当有数据到达时,会触发这个事件,通知当前线程去读取数据,当前线程就可以调用socket.read()方法去读取数据,此时当前线程会阻塞直到数据读取到用户线程。相当于是把NIO中用户线程轮询的操作移到了操作系统内核,由操作系统代替用户线程去轮询。收到操作系统的通知,一定是有数据达到可以读取,因此在IO多路复用下,线程的每一次读取都是有效的读取。
AIO
AIO是异步(前面三种都是同步IO),同步IO就是数据的读取需要当前线程完成,当有数据到来时,当前线程会主动的去搬运数据到用户空间。而异步IO则是数据的搬运工作不需要当前线程完成,而是操作系统去完成,数据搬运完成后再通知当前线程去处理,性能是最高的。但是Linux的AIO实际上是基于IO多路复用做封装的,性能没有比IO多路复用高多少,而Windows系统则在真正意义上实现了AIO。
BIO性能太低,一般不会使用。Linux对AIO没有很好的实现,因此一般也不会使用AIO。在Linux操作系统上,高性能的消息中间件的网络通信应该使用IO多路复用这种高性能的IO模型。因此我们下面对IO多路复用进行详细分析。
IO多路复用详细分析
IO多路复用,就是一个线程同时监听多个socket文件描述符,只要有任何一个socket有数据到达,当前线程就会解阻塞,然后可以有效的去读取数据。传统的IO是一个Socket对应一个线程,而IO多路复用是多个Socket复用一个线程,因此叫做IO多路复用。
在Linux操作系统下,有三种类型的IO多路复用,select、poll和epoll。
select
select的大概原理就是由操作系统内核去监听一个1024长度的文件描述符数组,当某个文件描述符对应的socket有数据可读时会通知用户线程,此时用户线程解阻塞,但是用户线程并不知道者1024个文件描述符里哪些文件描述符对应的socket是有数据可读的,因此用户线程还需要去遍历一下。
很明显这种类型的IO多路复用有两个缺点:
- 数组长度有限,只能监听1024个socket,多了就不行了
- 每次都要用户线程遍历,性能不高
poll
poll相比与select做的优化就是使用了链表去存储需要监听的文件描述符,而链表是长度不受限制的,因此可以监听超过1024个socket,但是当有文件描述符就绪时,还是需要用户线程去遍历。
epoll
前面两种IO多路复用性能都不高,原因在于每次都要用户线程去遍历一遍,如果用户线程不需要遍历,而是由操作系统直接返回就绪的文件描述符,这样性能就会高很多,因此epoll就对此做了优化。
使用epoll就不需要用户线程遍历,而是直接通过socket读取数据。
epoll有三个系统调用函数,分别是epoll_create、epoll_ctl、epoll_wait。epoll底层使用红黑树存储需要监听的socket的文件描述符,epoll_craete函数的作用就是创建存储文件描述符的红黑树,返回一个epoll实例。调用epoll_ctl函数可以将需要监听的socket对应的文件描述存储到红黑树中。当某个socket有数据到达时,操作系统会将红黑树中对应的文件描述符复制到一个链表中,调用epoll_wait就是阻塞等待某些socket有数据到达,然后获取到操作系统返回的文件描述符链表,该链表中所有的文件描述符对应的socket都是有数据达到的。
epoll是Linux系统里面性能最高的IO多路复用,高性能的消息中间件底层一般使用epoll这种类型的IO多路复用进行网络通信。
Java中的IO多路复用
我们用Java代码如何实现IO多路复用呢?其实Java中的NIO底层就是IO多路复用,虽然也叫NIO,但不是上面说的那个需要用户线程轮询的NIO,而是底层使用了epoll的IO多路复用实现的IO机制。
Java NIO的三个核心对象:Buffer、Channel、Selector。Channel封装了客户端与服务端之间的连接,有ServerSocketChannel和SocketChannel两种类型,分别与BIO的ServerSocket和Socket对应。
Selector就是IO多路复用器,我们可以把Channel注册到Selector上,并设置关注的事件类型,可以注册多个Channel到Selector,就相当于同时监听多个Socket。调用Selector的select方法,当前线程会阻塞,等待Selector监听的Channel有事件发生。当Selector监听的Channel有事件发生时,当前线程会被唤醒,然后就可以处理就绪的Channel,用户线程需要把Channel中的数据copy到Buffer中,然后再从Buffer中读取数据。
但是原生的Java NIO API使用过于复杂,因此我们一般不会使用元素的Java NIO API,而是使用Netty,Netty对Java NIO API有良好的封装,使用非常方便,性能非常高。
协议
既然涉及到消息的传递,那我们传递的消息用什么格式呢(也就是我们的协议报文)?我们可以直接使用http协议报文去传递消息,但是http协议的头部存在许多冗余数据,这些头部信息其实我们根本用不到,最重要的是http它不是二进制协议,是一个明文字符串协议,因此http协议报文的体积会相对较大,会消耗过多的网络带宽,性能并不高。
如果我们自己自定义二进制形式的报文的话,效果就会不一样。我们报文中只定义有用的字段,每一个二进制位都是有用的,不存储无用的信息,这样我们的报文格式就足够紧凑。并且我们的报文是二进制形式的,所以不需要经过字符串转二进制这一步,直接就可以在网络上进行传输,因此占用的带宽就相对较小,性能就比较高。
比如报文头部我们就存储序列化类型、报文体长度,那么整个报文就只有头部两个字段和存储消息内容的报文体,体积大大的缩小。
但是消息中间件除了要处理生产者发送消息的请求外,还要处理消费者拉取消息的请求,以及消费者发送ACK的请求等等,所以光这两个头部字段还不够,可能还要添加一个请求类型的头部,那么头部就只需要存储三个字段。
因此,我们可以自定义自己的协议报文,使用二进制形式定义报文的格式,报文的格式我们设计的足够紧凑,只存我们需要的信息,这样就能进一步提高消息中间件的性能。
序列化
定义好了协议之后,协议体(协议包含协议头和协议体)存放的就是我们的消息内容,但是我们的报文是二进制形式的,而我们的消息内容通常是一个对象,比如是一个Message对象,那么我们就要把这个Message对象序列化成二进制的形式,而不同的序列化机制它的性能有所差异,此时我们就需要选用高性能的序列化机制。
常用的序列化机制有Java原生的序列化机制,但是性能较低;还有hessian、Protobuf等性能较高的序列化机制。除此以外,我们可以把我们的消息对象转成Json格式字符串,然后再将Json字符串通过UTF8编码成二进制,也可以达到序列化的效果。
JDK自身提供的序列化机制存在两个问题:1、序列化的数据比较大,传输效率低;2、其他语言无法识别。因此我们一般不使用JDK自带的序列化机制,或者会把它作为一个备用的方案供使用者去选择。
使用Json进行序列化,Json格式字符串编码后的二进制序列体积较大,占用空间较大,传输性能较低,所以我们一般也不会把Json作为默认的序列化方案。但是使用Json序列化有它的好处,那就是对编程语言没有要求,并且不同编程语言的服务也能通过消息中间件进行通信,用Java去发送的数据,C或者C++也能接收和解析,并且它的可读性较高,方便调试。
Hessian是一个支持跨语言传输的二进制序列化协议,相对于Java默认的序列化机制来说,Hessian支持多种不同的语言,而且具有更好的性能和易用性。把Hessian作为默认的序列化机制是一个不错的选择。
Protobuf使用比较广泛,性能比Hessian还要高。但是使用Protobuf相对来说比较麻烦,没有Hessian易用。Protobuf规定每一个类都要编写对应的proto文件,proto文件有自己的语法规则,我们要按照Protobuf规定的语法规则编写proto文件,然后使用Protobuf的编译器去编译proto文件。再把编译出来的类引入到我们的工程中使用。当需要改变类的内部结构比如有字段要增删时,需要修改proto文件重新编译,因此使用Protobuf会相对麻烦一些,并且有一定的学习成本。
有了序列化机制,我们的消息就能被序列化成二进制字节流,在网络上进行传输。
消息的存储
消息发送到消息中间件,消息中间接收到消息后,就要对消息进行持久化。我们要设计出良好的消息存储格式,因为消息存储格式的设计的好与坏,直接决定了消息读写性能的高低,如果只是一股脑的往后追加,没有建立任何索引,那么每次消息的读取都只能从头开始遍历,性能肯定是不高的。
首先,我们可以把发送到消息服务的消息数据,按顺序的追加写入到一个log文件,我们定义log文件的指定大小,当一个log文件写满时,就新开一个。
但如果仅仅设计成这样,那我们读取消息的时候只能从头到尾逐条遍历。根据消费者消费偏移量(offset)与当前遍历到的消息的起始位置比较,看是否是要读取的消息,如果是要读取的消息,则根据消息头部中的消息大小字段,读取一定长度,如果不是要读取的消息,则根据消息头部中的消息大小字段跳过指定大小的空间,读取下一条。这样性能是比较低的。
因此,我们可以像数据库那样,建立一个索引文件,用于检索log文件中的数据。
这样,当我们读取一条消息的时候,我们可以通过二分查找在index文件中读到消息的偏移量,再到log文件进行检索,时间复杂度就降到了O(logN),性能就得到提升。
当然这只是其中一种方案,还不是最优的方案。
消息的读写
定义好消息存储的格式之后,就可以设计如何读写消息。消息读取的性能高低,取决于读写数据使用的函数以及消息存储格式的好坏,如果像数据库那样考虑了索引文件的建立,那么消息的读取就可以做到O(logN)的时间复杂度,否则每次读取都只能从头开始遍历。而消息的写入是写入到磁盘的,磁盘的读写又分随机写和顺序写,随机写的性能较低,顺序写的性能较高。
随机写、顺序写
为什么随机写的性能较低而顺序写的性能较高呢?
这是从网上随便扒来的一张描述磁盘组成的图。可以看到有一个磁臂带动磁头,然后磁头滑到对应的磁道后,磁头落下,随着磁盘的转动读取磁道上数据。而这个磁臂带到磁头滑到对应的磁道这个动作叫做磁盘寻址,这个动作是比较耗时间的。
随机写就是数据的写入是在磁盘上随机的几个位置上,不是连续的,因此一次随机写通常会有多次的磁盘寻址,所以随机写的性能就相对较低。
而顺序写的意思就是顺序的往磁盘写入一段数据,是在磁盘上的一段连续的空间写入,因此只有一次的磁盘寻址开销,因此性能较高。
在Java中,使用NIO类库里面的MappedByteBuffer就可以实现顺序写。通过FileChannel.open(Path, OpenOption…) 方法得到对应文件的一个FileChannel,然后调用FileChannel的map(MapMode mode, long position, long size)方法,把文件中从指定位置position开始的大小为size的一段数据映射到内存中,创建一个MappedByteBuffer对象与之对应,最后调用MappedByteBuffer的put(byte[]),把字节数组写入到MappedByteBuffer对应的内存映射区域,写入到MappedByteBuffer对应的内存映射区域中的内容,会被追加写入到对应文件的position位置后面。
内存映射、零拷贝
除此以外,读写性能的高低还跟我们使用的函数有关,比如我们使用的是普通的read()和write()函数,那么性能是很低的,它会经历多次无意义的拷贝。而如果我们使用内存映射mmap()或者零拷贝sendfile()之类的函数,那么性能就相对较高。
普通读写函数
比如我们要从磁盘读取一段数据,发送到网络。如果是普通的read()和write()函数,先调用read()函数从用户态转到内核态,然后从磁盘中读取数据到内核空间,然后将内核空间的数据拷贝到用户空间,然后从内核态切换为用户态,然后在调用write()函数再次从用户态切换到内核态,然后将用户空间的数据拷贝到内核空间的socket缓冲区,然后再将socket缓冲区的数据拷贝到网卡,然后从内核态切换回用户态,完成整个数据拷贝的操作。可以看到这个操作是非常繁琐的,伴随了四次用户态和内核态间的转换,四次的数据拷贝。
内存映射mmap()
其实在数据没有修改的情况下,完全没有必要把数据拷贝到用户空间,因此read()函数那一次拷贝到用户空间的操作其实是多余的,而通过内存映射mmap()函数,就可以节省掉这一次内存数据拷贝的动作。
在调用mmap()函数时,用户态切换到内核态,从磁盘读取数据到内核空间,然后建立内核空间与用户空间的映射,无需拷贝数据到用户空间,这两步操作做完后当前线程回到用户态,然后再调用write()函数,再次进入内核态,把内存映射区域的数据拷贝到Socket缓存区,然后把Socket缓存区中的数据拷贝到网卡。可以看到通过mmap()函数可以减少一次内存拷贝的操作,而用户态与内核态间的切换次数没有减少。
sendfile()
如果使用sendfile()函数,就可以进一步提升性能。调用sendfile()函数,从用户到切换到内核态,然后从磁盘读取数据到内核空间,然后将数据在内核空间中的地址和长度发送到socket缓冲区,然后网卡读取socket缓冲区中的信息,得知数据所在的地址和长度,直接从内核空间把数据拷走,然后当前线程从内核态切换回用户态。可以看到用户态到与内核态间的切换节省为只有两次,数据拷贝也是只有两次,性能是非常高的。
因此,作为高性能的消息中间件,一定是使用mmap()或者sendfile()函数进行数据的读写。当然这些是C语言的函数,使用Java语言的话,也是有对应的API实现的。
服务注册与发现
这样,似乎我们就可以使用这个消息中间件进行消息的传递了。但是目前这个消息中间件还是单体的,我们可以直接在生产者和消费者上写死消息中间件的地址。
但是在分布式环境下,单体服务是不具备高可用性的,因此消息中间件往往会以集群的形式部署,并且我们可能还允许消息服务动态上下线,也就是按需加减机器,此时我们就不能写死消息中间件的地址在生产者和消费者上了,我们还需要一种类似于服务注册与发现的机制,消息服务注册到注册中心,然后生产者和消费者通过注册中心发现消息服务。这个注册中心可以使用Zookeeper,也可以我们自己实现。
比如我们可以使用Zookeeper作为注册中心,然后在Zookeeper上创建一个路径是/Brokers的节点,然后每一台服务器上线,就在/Brokers节点下创建一个临时节点,临时节点的名称使用服务器的ip端口来命名。这样生成者和消费者监听/Brokers下的子节点即可。当有服务上线或下线时,/Brokers节点下的临时子节点会发生变动,利用Zookeeper的监听机制,生产者和消费者都可以收到通知。
有了服务注册与发现的机制后,我们的消息中间件就更加的灵活,可以动态的上下线消息服务器,也可以动态的发现上线的消息服务器的地址。
分区并行与消费者组rebalance机制
因为消息中间件是集群部署,那么我们可以设计消息分区并行以提高吞吐量。每一个消息都属于一个Topic(消息主题),然后一个Topic会有多个Partition(分区),不同的Partition在不同的Broker节点(消息服务)上。我们的消息可以轮询发往多个Partition,达到分区并行的效果。然后消费者以消费者组的形式部署,多个消费者组成一个消费者组,它们共同消费指定的Topic,组内不同消费者消费不同Partition的消息,这样就提高了消息消费的并行度,提升了消息中间件的吞吐量。
但是这里有一个难题就是如果决定组内的不同消费者负责哪些Partition的消费,也就是消费者组的rebalance机制,我们要通过rebalance机制,为一个消费者组内的消费者分配对应的分区。
假如我们设计的是组内的某个消费者执行rebalance:
- 那么首先第一步就是要选出执行rebalance的消费者。
- 然后该消费者从注册中心读取消息主题分区以及消费者组的信息(因此消费者也要注册到注册中心)
- 然后消费者使用一定的算法(比如轮询)为组内不同消费者分配分区
- 然后rebalance结果写入注册中心,通知其他消费者读取并执行。
当然这只是其中一种rebalance机制的实现思路,也可以考虑其他的实现方式来实现rebalance机制。
主从复制
目前我们一条消息只会发往一个消息服务,所以消息不具备可靠性,如果一台消息服务器上的消息丢失了,那么该消息就真的丢失了。因此我们还要做消息的主从复制,进行数据备份,这样可以保证消息的可靠存储,不会轻易发生消息丢失的情况。
主从复制又分为同步复制和异步复制。同步复制就是当收到生产者发送的消息时,保证消息同步到从节点,才响应生产者消息发送成功;而异步复制只要消息在主节点上落盘成功,就响应生产者消息发送成功,同步消息到从节点这个动作则异步去做,因此异步复制的消息可靠性就没有那么高,但是性能相对同步复制要高。
消息定时清理
此时我们的消息中间件已经足够完备了,但是我们还有最后一块工作要处理。我们的消息不能只做写入而不做清理,磁盘空间是有限的,我们不断的往磁盘写入数据而不去清理,磁盘迟早会被写满。因此我们还要设计消息中间件的定时自动清理机制,定时的清理过期的消息。
过期消息定时清理,看起来很简单。但是我们存储在磁盘中的数据,不仅仅有消息数据本身,还有索引数据,队列数据等,对应的就是不同的文件,在清理过期消息数据时,要考虑把索引数据和队列数据一并清除,保证它们的一致性。比如我们设计消息日志文件和消息索引文件是一一对应的,那么在做过期消息日志文件清理的时候,就可以很方便的把索引文件也一并清除。因此消息存储格式设计的好坏,还决定了过期数据清理是简单还是复杂。
有了消息定期清理机制之后,我们的磁盘空间就不会被占满,消息中间件也可以持续的运行。