Netty(四)NIO-优化与源码

Netty优化与源码

1. 优化

1.1 扩展序列化算法

序列化,反序列化主要用于消息正文的转换。
序列化:将java对象转为要传输对象(byte[]或json,最终都是byte[])
反序列化:将正文还原成java对象。

//java自带的序列化
// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);
// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口,提供两个实现,将实现加入了枚举类 Serializer.Algorithm 中:

enum SerializerAlgorithm implements Serializer {// Java 实现Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));Object object = in.readObject();return (T) object;} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream out = new ByteArrayOutputStream();new ObjectOutputStream(out).writeObject(object);return out.toByteArray();} catch (IOException e) {throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);}}}, // Json 实现(引入了 Gson 依赖)Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);}@Overridepublic <T> byte[] serialize(T object) {return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);}};// 需要从协议的字节中得到是哪种序列化算法public static SerializerAlgorithm getByInt(int type) {SerializerAlgorithm[] array = SerializerAlgorithm.values();if (type < 0 || type > array.length - 1) {throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");}return array[type];}
}

增加配置类和配置文件:

public abstract class Config {static Properties properties;static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}public static int getServerPort() {String value = properties.getProperty("server.port");if(value == null) {return 8080;} else {return Integer.parseInt(value);}}public static Serializer.Algorithm getSerializerAlgorithm() {String value = properties.getProperty("serializer.algorithm");if(value == null) {return Serializer.Algorithm.Java;} else {return Serializer.Algorithm.valueOf(value);}}
}

配置文件

serializer.algorithm=Json

修改编解码器

/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overridepublic void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(Config.getSerializerAlgorithm().ordinal());byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {byte serializerAlgorithm = in.readByte(); // 0 或 1// 找到反序列化算法Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];// 确定具体消息类型Class<? extends Message> messageClass = Message.getMessageClass(messageType);Message message = algorithm.deserialize(messageClass, bytes);out.add(message);}
}

1.2 参数调优

CONNECT_TIMEOUT_MILLIS
  • 属于SocketChannel参数,用在客户端建立连接时,如超时则抛出timeout异常
  • SO_TIMEOUT主要用在阻塞IO,阻塞IO中accept,read等都是无限等待的
Bootstrap bootstrap = new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).channel(NioServerSocketChannel.class).handler(new LoggingHandler());

附源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

@Override
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {// ...// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() {                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}// ...
}
SO_BACKLOG
  • 属于ServerSocketChannel参数
    三次握手过程
    sync queue - 半连接队列
    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
      accept queue - 全连接队列
    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
      netty中可通过option(ChannelOption.SO_BACKLOG,值)来设置大小
public class DefaultServerSocketChannelConfig extends DefaultChannelConfigimplements ServerSocketChannelConfig {private volatile int backlog = NetUtil.SOMAXCONN;// ...默认大小
}
ulimit -n
  • 限制一个进程打开最大文件描述符的数目,属于操作系统参数
TCP_NODELAY
  • nagle算法的延迟,一般设为true不延迟,数据赞属于 SocketChannal 参数
SO_SNDBUF & SO_RECVBUF

滑动接口的参数,现在的操作系统会根据实际情况自动调整。

  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
ALLOCATOR

ByteBuf分配器,属于 SocketChannal 参数,用来分配 ByteBuf, ctx.alloc()。源码详解P128

RCVBUF_ALLOCATOR
  • 属于 SocketChannal 参数,控制 netty 接收缓冲区大小。源码详解:P129
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

1.3 RPC 框架

通过反射获取配置

public class ServicesFactory {static Properties properties;static Map<Class<?>, Object> map = new ConcurrentHashMap<>();static {try {InputStream in = Config.class.getResourceAsStream("/application.properties");properties = new Properties();properties.load(in);Set<String> names = properties.stringPropertyNames();for (String name : names) {if (name.endsWith("Services")) {Class<?> interfaceClass = Class.forName(name);Class<?> instanceClass = Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static <T> T getService(Class<T> interfaceClass) {return (T) map.get(interfaceClass);}
}

RPC消息处理器

@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();try {HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();response.setExceptionValue(e);}ctx.writeAndFlush(response);}//本地调试public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {RpcRequestMessage message = new RpcRequestMessage(1,"com.aric.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"aric"});HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());System.out.println(invoke);}
}

客户端优化,抽取使用代理对象发送消息

/*** 使用代理对象替换,主线程发送* NioEventLoop线程接收结果,需要线程间通信,使用promise对象接收结果* @author* @created by xuyu on 2023/9/23-23:10*/
@Slf4j
public class RpcClientManager {public static void main(String[] args) {//后期创建代理类优化发送结构getChannel().writeAndFlush(new RpcRequestMessage(1,"com.aric.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"test"}));//使用代理发送HelloService service = getProxyService(HelloService.class);service.sayHello("test");}//创建代理类public static <T> T getProxyService(Class<T> serviceClass) {ClassLoader loader = serviceClass.getClassLoader();  //当前类加载器Class[] interfaces = new Class[]{serviceClass};//代理类要实现的接口//jdk自带的代理Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, arg) -> {//proxy代理对象,method:代理方法,arg:代理参数//1.将方法调用转换为消息对象RpcRequestMessage message = new RpcRequestMessage(SequenceIdGenerator.nextId(),serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),arg);//2.将消息对象发送出去getChannel().writeAndFlush(message);//3.TODO:待优化异步等待返回结果return null;});return (T)o;}private static Channel channel = null;private static final Object LOCK = new Object();//单例构造获取唯一channel对象public static Channel getChannel() {if (channel != null) {return channel;}synchronized (LOCK) {if (channel != null) {return channel;}initChannel();return channel;}}//初始化channel方法private static void initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel = bootstrap.connect("localhost", 8080).sync().channel();//改为异步channel.closeFuture().addListener(future -> {group.shutdownGracefully();});} catch (InterruptedException e) {log.debug("client error", e);}}
}

优化:线程间通信:异步获取返回结果
通过promise异步等待信息返回

//创建代理类public static <T> T getProxyService(Class<T> serviceClass) {ClassLoader loader = serviceClass.getClassLoader();  //当前类加载器Class[] interfaces = new Class[]{serviceClass};//代理类要实现的接口//jdk自带的代理Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, arg) -> {//proxy代理对象,method:代理方法,arg:代理参数//1.将方法调用转换为消息对象int sequenceId = SequenceIdGenerator.nextId();RpcRequestMessage message = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),arg);//2.将消息对象发送出去getChannel().writeAndFlush(message);//3.返回//准备好空的promise对象来接收结果,参数为指定promise对象异步接收结果的线程DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());RpcResponseMessageHandler.PROMISE.put(sequenceId, promise);
//            promise.addListener(future -> {
//                //创建线程处理任务
//            });//原线程等待promise的结果promise.await();if (promise.isSuccess()) {return promise.getNow();} else {throw new RuntimeException(promise.cause());}});return (T) o;}
/*** rpc响应消息处理器*/
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {//序号-promise<结果类型>,多个线程访问,用于异步接收rpc调用的返回结果public static final Map<Integer, Promise<Object>> PROMISE = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {//拿到空的promisePromise<Object> promise = PROMISE.remove(msg.getSequenceId());  //返回并移除if (promise != null) {Object returnValue = msg.getReturnValue();Exception exceptionValue = msg.getExceptionValue();if (exceptionValue != null) {promise.setFailure(exceptionValue);} else {promise.setSuccess(returnValue);}}System.out.println(msg);}

代码:https://gitee.com/xuyu294636185/netty-demo.git

2. 源码

2.1 netty启动剖析

        //1. netty中使用EventLoopGroup(Nio boss线程),来封装线程和selectorSelector selector = Selector.open();//创建NioServerSocketChannel,同时初始化它关联的handler,以及为原生ssc存储configNioServerSocketChannel attachment = new NioServerSocketChannel();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//2.启动nio boss线程执行//建立selector和channel的注册,sscKey是事件的句柄,是将来事件发生后,通过它可以知道事件和哪个channel的事件SelectionKey sscKey = ssc.register(selector, 0, attachment);ssc.bind(new InetSocketAddress(8080));//表示sscKey只关注accept事件sscKey.interestOps(SelectionKey.OP_ACCEPT);
启动流程

启动流程

EventLoop

EventLoop重要组成:selector,线程,任务队列
EventLoop既会处理io事件,也会处理普通任务和定时任务

  1. selector何时创建?
    在构造方法创建时通过SelectorProvider.openSelector();
  2. eventloop为什么会有两个selector成员?
    为了在遍历selectedKey时提高性能。
    一个是原始的unwrappedselector(底层是hashset实现),一个是包装后的selector(底层是数组实现)
  3. eventLoop的nio线程在何时启动?
    在首次调用exectue方法时executor中将当前线程赋给nio线程,并通过state状态控制位只会启动一次
  4. 提交普通任务会不会结束select阻塞?

    int selectedKeys = selector.select(timeoutMillis);
    protected void wakeup(boolean inEventLoop) {
    if(!inEventLoop && wakeUp.compareAndSet(false,true)) {
    selector.wakeup();
    }
    }
  5. wakeup方法理解
    inEventLoop:用于判断当前wakeup线程是否和nio线程是否相同,不同才能进入。
    wakeUp:原子Boolean变量,如果有多个线程来提交任务,为了避免wakeup被频繁调用。只有一个成功。
  6. 每次循环时,什么时候会进入SelectStrategy.SELECT分支?
    public void run(){
    for(;😉 {
    switch(selectStrategy.calculateStrategy(selectNowSupplier, hasTask())) {
    case SelectStrategy.CONTINUE:
    continue;
    case SelectStrategy.BUSY_WAIT:
    case SelectStrategy.SELECT:
    select(wakeUp.getAndSet(false));
    if(wakeUp.get()) {…}
    default:
    }
    }
    }
    public int calculateStrategy(IntSupplier supplier,boolean hasTasks) {
    return hasTasks ? suppplier.get() : SelectStrategy.SELECT;
    }
    没有任务时,才会进入SELECT。
    当有任务时,会调用SelectNow方法,顺便拿到io事件。
  7. 何时会select阻塞,阻塞多久?
    long currentTimeNanos = System.nanoTime();
    long selectDeadLineNanos = currentTimeNanos + delayNanos(cuurrentTimeNanos);
    for(;😉{
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    int selectedKeys = selector.select(timeoutMillis);
    }
    没有定时任务的情况
    selectDeadLineNanos:截至时间 = 当前时间 + 1s
    timeoutMillis:超时时间 = 1s + 0.5ms
  8. nio空轮询bug在哪体现,如何解决?
    for(;😉{
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    int selectedKeys = selector.select(timeoutMillis);
    }
    在select中没有阻塞,一直在死循环
    解决:引入selectCnt,每循环一次++,当超过设置的阈值(默认512),selectRebuildSelector(selectCnt)重新创建一个selector,替换旧的。
  9. ioRatio控制什么,设置为100有何作用?
    if(ioRatio == 100) {
    processSelectedKeys(); //处理所有ioio事件
    runAllTasks();
    } else {
    long ioStartTime = System.nanoTime();
    processSelectedKeys();
    long ioTime = System.nanoTime() - ioStartTime;
    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //避免普通事件执行时间太长
    }
    ioRatio控制处理io事件所占用的事件比例50%,ioTime代表执行io事件处理耗时。
  10. selectedKeys优化,在哪区分不同事件类型。
    selectedKeys由hashset集合替换为数组实现。
    private void processSelectedKeys() [
    if(selectedKeys != null) {
    processSelectedKeysOptimized(); //优化后的
    } else {
    processSelectedKeysPlain(selector.selectedKeys()); //原始的
    }
    }
    private void processSelectedKeysOptimized() {
    for(int i = 0;i < selectedKeys.size; ++i) {
    SelectionKey k = selectedKeys.keys[i];
    selectedKeys.keys[i] = null;
    Objected a = k.attachment();
    if(a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a); //处理具体的事件类型
    }
    }
    }
accept流程
  1. selector.select()阻塞直到事件发生
  2. 遍历处理selectedKeys
  3. 拿到一个key,判断事件类型是否为accpet
  4. 创建socketChannel,设置非阻塞
  5. 将socketChannel注册到selector
  6. 关注selectionKey的read事件。
read流程
  1. selector.select()阻塞直到事件发生
  2. 遍历处理selectedKeys
  3. 拿到一个key,判断事件类型是否为read
  4. 读取操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/153378.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java】微服务——Feign远程调用

目录 1.Feign替代RestTemplate1&#xff09;引入依赖2&#xff09;添加注解3&#xff09;编写Feign的客户端4&#xff09;测试5&#xff09;总结 2.自定义配置2.1.配置文件方式2.2.Java代码方式 3.Feign使用优化4.最佳实践4.1.继承方式4.2.抽取方式4.3.实现基于抽取的最佳实践1…

麻省理工学院与Meta AI共同开发StreamingLLM框架,实现语言模型无限处理长度

&#x1f989; AI新闻 &#x1f680; 麻省理工学院与Meta AI共同开发StreamingLLM框架&#xff0c;实现语言模型无限处理长度 摘要&#xff1a;麻省理工学院与Meta AI的研究人员联合研发了一款名为StreamingLLM的框架&#xff0c;解决了大语言模型在RAM与泛化问题上的挑战&am…

微信小程序 获取当前屏幕的可见高宽度

很多时候我们做一下逻辑 需要用整个窗口的高度或宽度参与计算 而且很多时候我们js中拿到的单位都是px像素点 没办法和rpx同流合污 官方提供了wx.getSystemInfoSync() 可以获取到部分窗口信息 其中就包括了整个窗口的宽度和高度 wx.getSystemInfoSync().windowHeight 返回值为像…

微店商品链接获取微店商品详情数据(用 Python实现微店商品信息抓取)

在网页抓取方面&#xff0c;可以使用 Python、Java 等编程语言编写程序&#xff0c;通过模拟 HTTP 请求&#xff0c;获取微店网站上的商品页面。在数据提取方面&#xff0c;可以使用正则表达式、XPath 等方式从 HTML 代码中提取出有用的信息。值得注意的是&#xff0c;微店网站…

[stm32]外中断控制灯光

在STM32CubeMX中配置外部中断功能和参数 1、将上拉输入的引脚设置为&#xff1a;GPIO_EXTI功能 2、GPIO模式设为下降沿触发外部中断&#xff0c;使能上拉电阻&#xff0c;用户标签 3、要将NVIC的相关中断勾选 只有将中断源进行勾选&#xff0c;相关的中断请求才能得到内核的…

xshell安装完成在windows不能打开

文章目录 问题描述问题排查解决第一步第二步 问题描述 安装打开xshell的时候总是点击没有任何的反应&#xff0c;重启电脑后再次点击xshell也没有任何的响应。只有在重装软件后才能正常打开。 问题排查 点击打开xshell7的时候总是报如下错 在这里能看到具体的描述&#xff…

高频时序数据仓库

天软课堂将在本周四添加新主题--天软超高频行情数据。针对市场上高频行情数据处理业务的相关痛点&#xff0c;直观的在线演示如何通过天软高频数仓及高性能计算能力&#xff0c;将其逐个击破&#xff0c;期待各位老师的参会。

Android攻城狮学鸿蒙-配置

1、config.json配置 鸿蒙中的config.json应该类似于Android开发中Manifest.xml&#xff0c;可以进行页面的配置。根据顺序&#xff0c;会识别启动应用的时候&#xff0c;要打开哪个界面。 2、 Ability详解&#xff0c;以及与Android的Activity对比。 他人的学习文章连接&…

奖品定制经营商城小程序的作用是什么

奖品是激励人员团体很好的方式&#xff0c;也是荣誉象征&#xff0c;奖牌、奖杯、高端礼盒等&#xff0c;同时市场中团体非常多&#xff0c;其需求也是很多&#xff0c;尤其定制方面&#xff0c;就更是不用说。 对奖品定制企业来说&#xff0c;除了线下门店获客经营外&#xf…

从零学算法(LCR 180)

文件组合.待传输文件被切分成多个部分&#xff0c;按照原排列顺序&#xff0c;每部分文件编号均为一个 正整数&#xff08;至少含有两个文件&#xff09;。传输要求为&#xff1a;连续文件编号总和为接收方指定数字 target 的所有文件。请返回所有符合该要求的文件传输组合列表…

Idea JavaWeb项目,继承自HttpFilter的过滤器,启动Tomcat时部署工件出错

JDK版本&#xff1a;1.8 Tomcat版本&#xff1a;8.5 10-Oct-2023 13:55:17.586 严重 [RMI TCP Connection(3)-127.0.0.1] org.apache.catalina.core.StandardContext.startInternal One or more Filters failed to start. Full details will be found in the appropriate conta…

【数据分类】基于麻雀搜索算法优化支持向量机的数据分类方法 SSA-SVM分类算法【Matlab代码#61】

文章目录 【可更换其他群智能算法&#xff0c;获取资源请见文章第6节&#xff1a;资源获取】1. 麻雀搜索算法&#xff08;SSA&#xff09;2. 支持向量机&#xff08;SVM&#xff09;3. SSA-SVM分类模型4. 部分代码展示5. 仿真结果展示6. 资源获取 【可更换其他群智能算法&#…

【ftp篇】 vsftp(ftp) 每天生成一个动态密码

这里写目录标题 前言为什么需要动态每日生成一个密码&#xff1f;编写脚本定时任务java对应的代码 前言 社长最近接到一个需求&#xff0c;需要ftp每天动态生成一个密码 为什么需要动态每日生成一个密码&#xff1f; 在软硬件通讯过程中&#xff0c;就以共享单车为例&#xff0…

6.Docker搭建RabbitMQ

1、端口开放 如果在云服务上部署需在安全组开通一下端口&#xff1a;15672、5672、25672、61613、1883。 15672(UI页面通信口)、5672(client端通信口)、25672(server间内部通信口)、61613(stomp 消息传输)、1883(MQTT消息队列遥测传输)。 2、安装镜像 docker pull rabbitmq 3、…

ARM:使用汇编完成三个灯流水亮灭

1.汇编源代码 .text .global _start _start: 设置GPIOF寄存器的时钟使能LDR R0,0X50000A28LDR R1,[R0]ORR R1,R1,#(0x1<<5)STR R1,[R0]设置GPIOE寄存器的时钟使能LDR R0,0X50000A28LDR R1,[R0] 从r0为起始地址的4字节数据取出放在R1ORR R1,R1,#(0x1<<4) 第4位设…

腾讯云2核4G服务器一年和三年价格性能测评

腾讯云轻量2核4G5M服务器&#xff1a;CPU内存流量带宽系统盘性能测评&#xff1a;轻量应用服务器2核4G5M带宽&#xff0c;免费500GB月流量&#xff0c;60GB系统盘SSD盘&#xff0c;5M带宽下载速度可达640KB/秒&#xff0c;流量超额按照0.8元每GB的价格支付流量费&#xff0c;轻…

使用docker搭建kafka集群、可视化操作台

单机搭建 1 拉取zookeeper镜像 docker pull wurstmeister/zookeeper 2 启动zookeeper容器 docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper 3 拉取kafka镜像 docker pull wurstmeister/kafka 4 启动kafka镜像 docker…

存储数据迁移需求和迁移工具分析

随着互联网的发展&#xff0c;数据量的爆炸式增长已经成为一种常态&#xff0c;为了更好地使用和管理这些数据&#xff0c;通常需要升级存储系统来满足客户对数据存储的要求。升级新的存储系统能够提供高可靠、高可扩展、高性能和高安全的存储服务&#xff0c;但在升级过程中也…

SSM整合RabbitMQ,Spring4.x整合RabbitMQ

SSM整合RabbitMQ目录 前言版本实现目录参考pom.xml依赖rabbitmq.properties配置文件spring-rabbitmq.xmlspring-mvc.xml或applicationContext.xmlrabbitmq目录下MessageConsumer.javaMessageConsumer2.javaMessageProducer.javaMessageConstant.java 测试调用 扩展消息重发 前言…

硬件测试(一):温循

一、定义&#xff1a; 温度循环试验&#xff0c;也称为热循环试验或高低温循环试验&#xff0c;是将试验样品暴露于预设的高低温交替的试验环境中所进行的可靠性试验。 温循作为自然环境的模拟&#xff0c;可以考核产品在不同环境条件下的适应能力&#xff0c;常用于产…