分布式异步任务处理组件底层网络通信模型的设计--如图:
- 使用Java原生NIO来实现TCP通信模型
- 普通节点维护一个网络IO线程,负责和主节点的网络数据通信连接--这里的网络数据是指组件通信协议之下的直接面对字节流的数据读写,上层会有另一个线程负责网络通信协议的实现;---也就是说维护一个selector线程,负责处理socketchannel的IO事件;
- Leader节点网络通信层有多个线程--一个selector线程负责接受其他节点的连接请求,然后为每个连接建立一个线程并分配单独的selector来处理各自连接上的IO事件--如此设计的原因是各节点的状态严格依赖与主节点的心跳和其他通信,防止主节点线程阻塞导致心跳失败;从而引发节点下线带来的大量同步工作--后续会聊到;
- 各节点网络通信线程之上会有一个线程专门负责组件的网络通信协议,就是将网络传输的字节流解码成组件的通信协议包,因为NIO的buffer是数据块,所以首先通过读写队列将字节转化为字节流,通过协议转化为网络通信命令包,同时解决粘包半包等问题;
- 网络通信线程和协议实现线程之间通过读写两个队列来实现(网络IO线程的读队列就是协议线程的写队列,反过来一样,所以这里读写队列是相对的;),为了保证性能,避免重复创建对象和对象回收,设计了ByteBuffer缓存机制和异步读写队列数据结构--详细结构如图--
- 说一下三个队列--读写队列和缓存队列,用来实现IO通信线程和协议通信线程之间的数据通信--两个线程基本上会轮训处理网络IO事件,和上层协议事件,基本过程如下--
- 从网络IO线程角度出发--
- 当产生可读事件时,网络IO线程会从缓存队列中获取一个空的ByteBuffer,这里设计为当没有可用的缓存Buffer对象时会新建一个--具体在队列实现里讲,可能会产生写扩张现象,后期性能优化时考虑加入回收机制;
- 将socket缓冲区中的网络数据read进Buffer中,然后将Buffer对象入队到IO写队列中;
- 然后检查IO读队列不为空时,对IO读队列出队,获取要发送的数据Buffer对象,发送到其他节点中;
- 从网络IO线程角度出发--
- 异步多线程队列,支持两个线程同时出队入队操作;原理和代码贴下来,基本实现:
package org.example.web.buffer;import org.example.web.api.SocketBufferQueue;import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;public class AsynchronousQueue<T extends AbstractBuffer> implements SocketBufferQueue {//异步读写队列实现原理;/** 当队列中的元素个数>1时,读线程和写线程可以同时进行,因为这时候不涉及操作共享变量*当队列中的元素个数<=1时,读写队列中只能有一个线程操作读或者写,因为此时会涉及队列头尾指针的操作;* 实现原理,写线程在获取写锁时可以正常做写操作:此时有两种情况--* 1,获取写锁之后队列为空,此时不会有读线程做读操作,只有获得写锁的该线程可以put,put完成之后将头尾指针同时指向改为以元素即可;此时队列元素个数为1;* 2,获取写锁之后队列中只有一个元素,这时也可以保证只有该线程在做写入,因为只有一个元素的情况下,读线程要读取该元素必须同时获得读锁和写锁;此时队列元素个数为2;* 3,读线程获取读锁之后有三种情况;size>1;size=1;size=0;* 4, 重点是保证不能多个线程同时进入队列元素为零的状态;就是读线程消费了最后一个元素,正好此时写线程在队列为空的时候写入,读写线程会同时操作头尾指针,造成错乱,所以在元素数量为1* 的时候就要进行同步操作;原理:* 1.读线程获取读锁之后如果size=1,此时不会先消费,而是试图获取写锁,防止此时有写线程同时操作,获取写锁之后再判断size是否为1,如果为1则做出队操作,然后释放写锁,如果为2则直接释放写锁--再进行出队操作;* 2,这里读线程获取读锁之后判断size=1,再获取读锁成功之后有两种情况--* 1,有写线程在读线程之前获取到了写锁,则读线程获取到写锁的时候size>=2了(可能不止一个),* 2,判断size=1之后直接获取到了写锁,此时就应该阻塞其他写线程做入队操作,等待自己完成出队操作之后再释放写锁;* 5,再说一下size怎么保证同步,* 1,在size<=1的时候严格保证线程同步操作,保证size;* 2,在size>1的时候,此时可以理解为队列同时在出队和入队,size在两个线程操作的时候先出队-1还是先入队+1其实是没有关系的,因为原子操作保证了最后结果是没有问题的就行;* */private AtomicInteger size;protected T head;protected T tail;private Object readLock;private Object writeLock;//这里考虑使用cas还是SynchronizedAsynchronousQueue(){this.writeLock=new Object();this.readLock=new Object();}AsynchronousQueue(int initSize){this();this.size=new AtomicInteger(initSize);}//空队列初始化要创建一个nodeAsynchronousQueue(T node){this(1);this.head=node;this.tail=this.head;}public boolean offerFirstOne(T node){synchronized (this.writeLock){if(this.size.get()>0){return false;}this.head=this.tail=node;return this.size.compareAndSet(0,1);}}public boolean offer(T node){preOfferElement(node);synchronized (this.writeLock){if(this.size.get()==0){return this.offerFirstOne(node);}else{T temp=this.head;node.next=temp;temp.pre=node;this.head=node;}return this.size.incrementAndGet() > 1;}}private void preOfferElement(T bufferNode){bufferNode.next=null;bufferNode.pre=null;}public T pollLastOne(){return this.size.compareAndSet(1,0)?this.tail:null;}public T poll(){synchronized (this.readLock){if(this.size.get()==0){return null;}if(this.size.get()==1){synchronized (this.writeLock){if(this.size()>1){return this.getTailElement();}if(this.size()==1){this.pollLastOne();}}}return this.getTailElement();}}private T getTailElement(){if(this.size()>1){this.tail= (T) this.tail.pre;this.size.decrementAndGet();return (T) this.tail.next;}return null;}public int size(){return this.size.get();}public int increamentSize(){return this.size.incrementAndGet();}public int decrementSize(){return this.size.decrementAndGet();}private class BufferNode{private ByteBuffer buffer;private BufferNode pre;private BufferNode next;BufferNode(ByteBuffer byteBuffer){this.buffer=byteBuffer;}BufferNode(){}}
}