分布式异步任务处理组件(七)

分布式异步任务处理组件底层网络通信模型的设计--如图:

  1. 使用Java原生NIO来实现TCP通信模型
  2. 普通节点维护一个网络IO线程,负责和主节点的网络数据通信连接--这里的网络数据是指组件通信协议之下的直接面对字节流的数据读写,上层会有另一个线程负责网络通信协议的实现;---也就是说维护一个selector线程,负责处理socketchannel的IO事件;
  3. Leader节点网络通信层有多个线程--一个selector线程负责接受其他节点的连接请求,然后为每个连接建立一个线程并分配单独的selector来处理各自连接上的IO事件--如此设计的原因是各节点的状态严格依赖与主节点的心跳和其他通信,防止主节点线程阻塞导致心跳失败;从而引发节点下线带来的大量同步工作--后续会聊到;
  4. 各节点网络通信线程之上会有一个线程专门负责组件的网络通信协议,就是将网络传输的字节流解码成组件的通信协议包,因为NIO的buffer是数据块,所以首先通过读写队列将字节转化为字节流,通过协议转化为网络通信命令包,同时解决粘包半包等问题;
  5. 网络通信线程和协议实现线程之间通过读写两个队列来实现(网络IO线程的读队列就是协议线程的写队列,反过来一样,所以这里读写队列是相对的;),为了保证性能,避免重复创建对象和对象回收,设计了ByteBuffer缓存机制和异步读写队列数据结构--详细结构如图--
  6. 说一下三个队列--读写队列和缓存队列,用来实现IO通信线程和协议通信线程之间的数据通信--两个线程基本上会轮训处理网络IO事件,和上层协议事件,基本过程如下--
    1. 从网络IO线程角度出发--
      1. 当产生可读事件时,网络IO线程会从缓存队列中获取一个空的ByteBuffer,这里设计为当没有可用的缓存Buffer对象时会新建一个--具体在队列实现里讲,可能会产生写扩张现象,后期性能优化时考虑加入回收机制;
      2. 将socket缓冲区中的网络数据read进Buffer中,然后将Buffer对象入队到IO写队列中;
      3. 然后检查IO读队列不为空时,对IO读队列出队,获取要发送的数据Buffer对象,发送到其他节点中;
  7. 异步多线程队列,支持两个线程同时出队入队操作;原理和代码贴下来,基本实现:
package org.example.web.buffer;import org.example.web.api.SocketBufferQueue;import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;public class AsynchronousQueue<T extends AbstractBuffer> implements SocketBufferQueue {//异步读写队列实现原理;/** 当队列中的元素个数>1时,读线程和写线程可以同时进行,因为这时候不涉及操作共享变量*当队列中的元素个数<=1时,读写队列中只能有一个线程操作读或者写,因为此时会涉及队列头尾指针的操作;* 实现原理,写线程在获取写锁时可以正常做写操作:此时有两种情况--*     1,获取写锁之后队列为空,此时不会有读线程做读操作,只有获得写锁的该线程可以put,put完成之后将头尾指针同时指向改为以元素即可;此时队列元素个数为1;*     2,获取写锁之后队列中只有一个元素,这时也可以保证只有该线程在做写入,因为只有一个元素的情况下,读线程要读取该元素必须同时获得读锁和写锁;此时队列元素个数为2;*     3,读线程获取读锁之后有三种情况;size>1;size=1;size=0;*     4, 重点是保证不能多个线程同时进入队列元素为零的状态;就是读线程消费了最后一个元素,正好此时写线程在队列为空的时候写入,读写线程会同时操作头尾指针,造成错乱,所以在元素数量为1* 的时候就要进行同步操作;原理:*           1.读线程获取读锁之后如果size=1,此时不会先消费,而是试图获取写锁,防止此时有写线程同时操作,获取写锁之后再判断size是否为1,如果为1则做出队操作,然后释放写锁,如果为2则直接释放写锁--再进行出队操作;*           2,这里读线程获取读锁之后判断size=1,再获取读锁成功之后有两种情况--*                   1,有写线程在读线程之前获取到了写锁,则读线程获取到写锁的时候size>=2了(可能不止一个),*                   2,判断size=1之后直接获取到了写锁,此时就应该阻塞其他写线程做入队操作,等待自己完成出队操作之后再释放写锁;*     5,再说一下size怎么保证同步,*           1,在size<=1的时候严格保证线程同步操作,保证size;*           2,在size>1的时候,此时可以理解为队列同时在出队和入队,size在两个线程操作的时候先出队-1还是先入队+1其实是没有关系的,因为原子操作保证了最后结果是没有问题的就行;* */private AtomicInteger size;protected T head;protected T tail;private Object readLock;private Object writeLock;//这里考虑使用cas还是SynchronizedAsynchronousQueue(){this.writeLock=new Object();this.readLock=new Object();}AsynchronousQueue(int initSize){this();this.size=new AtomicInteger(initSize);}//空队列初始化要创建一个nodeAsynchronousQueue(T node){this(1);this.head=node;this.tail=this.head;}public boolean offerFirstOne(T node){synchronized (this.writeLock){if(this.size.get()>0){return false;}this.head=this.tail=node;return this.size.compareAndSet(0,1);}}public boolean offer(T node){preOfferElement(node);synchronized (this.writeLock){if(this.size.get()==0){return this.offerFirstOne(node);}else{T temp=this.head;node.next=temp;temp.pre=node;this.head=node;}return this.size.incrementAndGet() > 1;}}private void preOfferElement(T bufferNode){bufferNode.next=null;bufferNode.pre=null;}public T pollLastOne(){return this.size.compareAndSet(1,0)?this.tail:null;}public T poll(){synchronized (this.readLock){if(this.size.get()==0){return null;}if(this.size.get()==1){synchronized (this.writeLock){if(this.size()>1){return this.getTailElement();}if(this.size()==1){this.pollLastOne();}}}return this.getTailElement();}}private T getTailElement(){if(this.size()>1){this.tail= (T) this.tail.pre;this.size.decrementAndGet();return (T) this.tail.next;}return null;}public int size(){return this.size.get();}public int increamentSize(){return this.size.incrementAndGet();}public int decrementSize(){return this.size.decrementAndGet();}private class BufferNode{private ByteBuffer buffer;private BufferNode pre;private BufferNode next;BufferNode(ByteBuffer byteBuffer){this.buffer=byteBuffer;}BufferNode(){}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/76756.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux下安装VMware虚拟机

目录 1. 简介 2. 工具/原料 2.1. 下载VMware 2.2. 安装 1. 简介 ​ VMware Workstation&#xff08;中文名“威睿工作站”&#xff09;是一款功能强大的桌面虚拟计算机软件&#xff0c;提供用户可在单一的桌面上同时运行不同的操作系统&#xff0c;和进行开发、测试 …

为什么list.sort()比Stream().sorted()更快?

真的更好吗&#xff1f; 先简单写个demo List<Integer> userList new ArrayList<>();Random rand new Random();for (int i 0; i < 10000 ; i) {userList.add(rand.nextInt(1000));}List<Integer> userList2 new ArrayList<>();userList2.add…

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

&#x1f52d; 嗨&#xff0c;您好 &#x1f44b; 我是 vnjohn&#xff0c;在互联网企业担任 Java 开发&#xff0c;CSDN 优质创作者 &#x1f4d6; 推荐专栏&#xff1a;Spring、MySQL、Nacos、Java&#xff0c;后续其他专栏会持续优化更新迭代 &#x1f332;文章所在专栏&…

240. 搜索二维矩阵 II

240. 搜索二维矩阵 II 原题链接&#xff1a;完成情况&#xff1a;解题思路&#xff1a;参考代码&#xff1a; 原题链接&#xff1a; 240. 搜索二维矩阵 II https://leetcode.cn/problems/search-a-2d-matrix-ii/description/ 完成情况&#xff1a; 解题思路&#xff1a; 从…

配置root账户ssh免密登录并使用docker-machine构建docker服务

简介 Docker Machine是一种可以在多种平台上快速安装和维护docker运行环境&#xff0c;并支持多种平台&#xff0c;让用户可以在很短时间内在本地或云环境中搭建一套docker主机集群的工具。 使用docker-machine命令&#xff0c;可以启动、审查、停止、重启托管的docker 也可以…

《向量数据库指南》——腾讯云向量数据库Tencent Cloud Vector DB正式上线公测!提供10亿级向量检索能力

8月1日,腾讯云向量数据库(Tencent Cloud Vector DB)已正式上线公测。在腾讯云官网上搜索“向量数据库”,就可以正式体验该产品。 腾讯云向量数据库不仅能为大模型提供外部知识库,提高大模型回答的准确性,还可广泛应用于推荐系统、文本图像检索、自然语言处理等 AI 领域。…

ChatGPT已打破图灵测试,新的测试方法在路上

生信麻瓜的 ChatGPT 4.0 初体验 偷个懒&#xff0c;用ChatGPT 帮我写段生物信息代码 代码看不懂&#xff1f;ChatGPT 帮你解释&#xff0c;详细到爆&#xff01; 如果 ChatGPT 给出的的代码不太完善&#xff0c;如何请他一步步改好&#xff1f; 全球最佳的人工智能系统可以通过…

K8S kubeadm搭建

kubeadm搭建整体步骤 1&#xff09;所有节点进行初始化&#xff0c;安装docker引擎和kubeadm kubelet kubectl 2&#xff09;生成集群初始化配置文件并进行修改 3&#xff09;使用kubeadm init根据初始化配置文件生成K8S的master控制管理节点 4&#xff09;安装CNI网络插件&am…

【Ubuntu 18.04 搭建 DHCP 服务】

参考Ubuntu官方文档&#xff1a;https://ubuntu.com/server/docs/how-to-install-and-configure-isc-dhcp-server dhcpd.conf 手册页 配置&#xff1a;https://maas.io/docs/about-dhcp 实验环境规划 Ubuntu 18.04&#xff08;172.16.65.128/24&#xff09;dhcp服务端Ubuntu…

GD32F103VE点灯

GD32F103VE点灯主要用来学习端口引脚的输出配置。它由LED.c&#xff0c;LED.h&#xff0c;SoftDelay.c和main.c组成。 #include "gd32f10x.h" //使能uint8_t,uint16_t,uint32_t,uint64_t,int8_t,int16_t,int32_t,int64_t #include "SoftDelay.h"#include …

后端整理(集合框架、IO流、多线程)

1. 集合框架 Java集合类主要有两个根接口Collection和Map派生出来 Collection派生两个子接口 List List代表了有序可重复集合&#xff0c;可以直接根据元素的索引进行访问Set Set代表无序不可重复集合&#xff0c;只能根据元素本身进行访问 Map接口派生 Map代表的是存储key…

数据结构 二叉树(C语言实现)

绪论 雄关漫道真如铁&#xff0c;而今迈步从头越。 本章将开始学习二叉树&#xff08;全文共一万两千字&#xff09;&#xff0c;二叉树相较于前面的数据结构来说难度会有许多的攀升&#xff0c;但只要跟着本篇博客深入的学习也可以基本的掌握基础二叉树。 话不多说安全带系好&…

在windows下安装ruby使用gem

在windows下安装ruby使用gem 1.下载安装ruby环境2.使用gem3.gem换源 1.下载安装ruby环境 ruby下载地址 选择合适的版本进行下载和安装&#xff1a; 在安装的时候&#xff0c;请勾选Add Ruby executables to your PATH这个选项&#xff0c;添加环境变量&#xff1a; 安装Ruby成…

二进制安装K8S(单Master集群架构)

目录 一&#xff1a;操作系统初始化配置 1、项目拓扑图 2、服务器 3、初始化操作 二&#xff1a; 部署 etcd 集群 1、etcd 介绍 2、准备签发证书环境 3、master01 节点上操作 &#xff08;1&#xff09;生成Etcd证书 &#xff08;2&#xff09;创建用于存放 etcd 配置文…

.Net6 Web Core API --- Autofac -- AOP

目录 一、AOP 封装 二、类拦截 案例 三、接口拦截器 案例 AOP拦截器 可开启 类拦截器 和 接口拦截器 类拦截器 --- 只有方法标注 virtual 标识才会启动 接口拦截器 --- 所有实现接口的方法都会启动 一、AOP 封装 // 在 Program.cs 配置 builder.AddAOPExt();//自定义 A…

企业电子招标采购系统源码Spring Boot + Mybatis + Redis + Layui + 前后端分离 构建企业电子招采平台之立项流程图 tbms

&#xfeff; 项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&am…

Spring IOC

◆ 传统Javaweb开发的困惑 ◆ IoC、DI和AOP思想提出 ◆ Spring框架的诞生 Spring | Home IOC控制反转&#xff1a;BeanFactory 快速入门 package com.xiaolin.service.Impl;import com.xiaolin.dao.UserDao; import com.xiaolin.service.UserService;public class UserServic…

论文浅尝 | 预训练Transformer用于跨领域知识图谱补全

笔记整理&#xff1a;汪俊杰&#xff0c;浙江大学硕士&#xff0c;研究方向为知识图谱 链接&#xff1a;https://arxiv.org/pdf/2303.15682.pdf 动机 传统的直推式(tranductive)或者归纳式(inductive)的知识图谱补全(KGC)模型都关注于域内(in-domain)数据&#xff0c;而比较少关…

Java版本spring cloud + spring boot企业电子招投标系统源代码 tbms

​ 功能模块&#xff1a; 待办消息&#xff0c;招标公告&#xff0c;中标公告&#xff0c;信息发布 描述&#xff1a; 全过程数字化采购管理&#xff0c;打造从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通供应商门户具备内外协同的能力&#xff0c;为…

【机器学习】Overfitting and Regularization

Overfitting and Regularization 1. 过拟合添加正则化2. 具有正则化的损失函数2.1 正则化线性回归的损失函数2.2 正则化逻辑回归的损失函数 3. 具有正则化的梯度下降3.1 使用正则化计算梯度&#xff08;线性回归 / 逻辑回归&#xff09;3.2 正则化线性回归的梯度函数3.3 正则化…