简介
设计模式在软件开发中起着至关重要的作用,它们是解决常见问题的经过验证的解决方案。而Netty
作为一个优秀的网络应用程序框架,同样也采用了许多设计模式来提供高性能和可扩展性。在本文中,我们将探讨Netty
中使用的一些关键设计模式,以及它们在构建强大网络应用程序中的应用。
源码分析
单例模式
Netty中MqttEncoder
这个编码器就用到了单例模式,它将构造函数私有化,并基于饿汉式的方式全局创建单例一个MqttEncoder
的单例 。
@ChannelHandler.Sharable
public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {public static final MqttEncoder INSTANCE = new MqttEncoder();private MqttEncoder() { }//略}
这使得我们后续需要使用这个编码器的话,只能使用这个全局维护的示例对象INSTANCE
,避免了重复创建的开销。
nioSocketChannel.pipeline().addLast(new StringDecoder()).addLast(MqttEncoder.INSTANCE)
对此我们将其梳理为类图,对应的如下图所示:
同样的Netty
对于异常的管理也处理的很精细,例如ReadTimeoutException
,就是基于饿汉式的方式创建一个单例INSTANCE
:
public final class ReadTimeoutException extends TimeoutException {private static final long serialVersionUID = 169287984113283421L;public static final ReadTimeoutException INSTANCE = new ReadTimeoutException();private ReadTimeoutException() { }
}
对于ReadTimeoutException
这个异常类的使用,也是全局仅用这个示例进行传播:
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {if (!closed) {ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);ctx.close();closed = true;}}
对应的类图如下所示:
简单工厂模式
策略模式最典型的特点就是:
- 客户端既不需要关心创建细节,甚至连类名都不需要记住,只需要知道类型所对应的参数。
- 基于简单工厂封装对象创建细节,由于涉及创建的对象类型较少,所以所有的创建细节都在一个方法内实现。
在Netty
中最典型的实现就是DefaultEventExecutorChooserFactory
,这个工厂会工具用户传入executors
动态创建EventExecutorChooser
:
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();private DefaultEventExecutorChooserFactory() { }//根据用户传入的executors动态返回一个executors轮询选择器@SuppressWarnings("unchecked")@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {//如果executors的长度是2的次幂则返回PowerOfTowEventExecutorChooser,反之返回GenericEventExecutorChooserif (isPowerOfTwo(executors.length)) {return new PowerOfTowEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}private static boolean isPowerOfTwo(int val) {return (val & -val) == val;}private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTowEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}}private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}}
}
对应的类图如下所示:
装饰者模式
基于装饰者模式,实现基于拓展的方式对类进行增强,做到尽可能避免没必要的修改,在Netty
中WrappedByteBuf
就是最经典的装饰着模式,其实现思路为:
WrappedByteBuf
继承ByteBuf
获取ByteBuf
的所有行为方法。- 对外开放一个构造方法,传入
ByteBuf
。 - 基于第一步的继承的
ByteBuf
的所有行为方法,通过传入的ByteBuf
进行实现,并返回WrappedByteBuf
而不是传入的buf
,从而做对外屏蔽内部实现。
class WrappedByteBuf extends ByteBuf {protected final ByteBuf buf;protected WrappedByteBuf(ByteBuf buf) {if (buf == null) {throw new NullPointerException("buf");}this.buf = buf;}@Overridepublic final boolean hasMemoryAddress() {return buf.hasMemoryAddress();}@Overridepublic final long memoryAddress() {return buf.memoryAddress();}@Overridepublic final int capacity() {return buf.capacity();}@Overridepublic ByteBuf capacity(int newCapacity) {buf.capacity(newCapacity);return this;}@Overridepublic final int maxCapacity() {return buf.maxCapacity();}@Overridepublic final ByteBufAllocator alloc() {return buf.alloc();}//使用buf实现,返回当前对象,从而对外屏蔽内部实现细节@Overridepublic ByteBuf retain() {buf.retain();return this;}//略
}
对应的类图如下所示:
与之同理的还有UnreleasableByteBuf
,这里就不多做赘述了:
final class UnreleasableByteBuf extends WrappedByteBuf {private SwappedByteBuf swappedBuf;UnreleasableByteBuf(ByteBuf buf) {super(buf);}@Overridepublic ByteBuf order(ByteOrder endianness) {if (endianness == null) {throw new NullPointerException("endianness");}if (endianness == order()) {return this;}SwappedByteBuf swappedBuf = this.swappedBuf;if (swappedBuf == null) {this.swappedBuf = swappedBuf = new SwappedByteBuf(this);}return swappedBuf;}@Overridepublic ByteBuf asReadOnly() {return new UnreleasableByteBuf(buf.asReadOnly());}@Overridepublic ByteBuf readSlice(int length) {return new UnreleasableByteBuf(buf.readSlice(length));}@Overridepublic ByteBuf readRetainedSlice(int length) {return new UnreleasableByteBuf(buf.readRetainedSlice(length));}
//略}
观察者模式
我们在使用Netty
时,可能经常会进行这样一段的代码编写,通过addListener
方法注册监听器,确保端口绑定成功后,回调通知我们的监听器:
serverBootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {System.out.println("端口[" + port + "]绑定成功!");} else {System.err.println("端口[" + port + "]绑定失败!");bind(serverBootstrap, port + 1);}});
步入DefaultPromise
的addListener
源码之后,可以看到它会通过addListener0
完成监听器注册:
@Overridepublic Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {checkNotNull(listener, "listener");synchronized (this) {addListener0(listener);}if (isDone()) {notifyListeners();}return this;}
最终就会通过异步的方式通知我们的监听器。
private void notifyListeners() {EventExecutor executor = executor();if (executor.inEventLoop()) {final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();final int stackDepth = threadLocals.futureListenerStackDepth();if (stackDepth < MAX_LISTENER_STACK_DEPTH) {threadLocals.setFutureListenerStackDepth(stackDepth + 1);try {//通知所有的监听器notifyListenersNow();} finally {threadLocals.setFutureListenerStackDepth(stackDepth);}return;}}//通知所有的监听器safeExecute(executor, new Runnable() {@Overridepublic void run() {notifyListenersNow();}});}
对应的类图如下图所示:
迭代器模式
迭代器模式不是很常见,我们还是以Netty
中的一段示例代码来了解这种涉及模式,代码执行步骤如下:
- 创建两个
ByteBuf
,分别是header
和body
。 - 将这两个
ByteBuf
添加到CompositeByteBuf
。 - 基于
CompositeByteBuf
迭代这两个ByteBuf
数组。
public static void main(String[] args) {//创建两个ByteBuf ,分别是header和bodyByteBuf header = Unpooled.wrappedBuffer(new byte[]{1, 2, 3});ByteBuf body = Unpooled.wrappedBuffer(new byte[]{4, 5, 6});CompositeByteBuf byteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(2);//这两个ByteBuf 添加到CompositeByteBuf`byteBuf.addComponent(true, header);byteBuf.addComponent(true, body);//基于CompositeByteBuf 迭代这两个ByteBuf数组Iterator<ByteBuf> iterator = byteBuf.iterator();while (iterator.hasNext()){ByteBuf next = iterator.next();int len = next.readableBytes();byte[] bytes=new byte[len];next.readBytes(bytes);for (byte b : bytes) {System.out.println(b);}}}
这种模式在java
中非常常见,其特点为:
- 将集合与迭代职责分离,让集合类专门处理集合添加,聚合一个迭代器负责迭代。
- 集合类继承集合接口实现添加删除操作。
- 集合类继承迭代器接口获取迭代器获取能力。
- 迭代器继承迭代器接口,并以聚合的方式聚合到类中。
首先看看集合类:
- 继承
Iterable
获取迭代器的能力。 - 基于
addComponent
实现集合添加能力。
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator();@Overridepublic Iterator<ByteBuf> iterator() {ensureAccessible();if (components.isEmpty()) {return EMPTY_ITERATOR;}return new CompositeByteBufIterator();}public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {checkNotNull(buffer, "buffer");addComponent0(increaseWriterIndex, components.size(), buffer);consolidateIfNeeded();return this;}}
查看迭代器:
- 继承
Iterator
实现迭代能力。 - 以内部类的方式实现迭代器。
//以内部类的形式继承Iterator实现迭代能力
private final class CompositeByteBufIterator implements Iterator<ByteBuf> {private final int size = components.size();private int index;@Overridepublic boolean hasNext() {return size > index;}@Overridepublic ByteBuf next() {if (size != components.size()) {throw new ConcurrentModificationException();}if (!hasNext()) {throw new NoSuchElementException();}try {return components.get(index++).buf;} catch (IndexOutOfBoundsException e) {throw new ConcurrentModificationException();}}@Overridepublic void remove() {throw new UnsupportedOperationException("Read-Only");}}
责任链模式
责任链模式在Netty
中最常见了,例如我们希望读处理器在一条链上不断传播,我们就会通过这样一段代码:
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InBoundHandlerC: " + msg);ctx.fireChannelRead(msg);}
}
而fireChannelRead
就会通过findContextInbound
找到下一个处理器继续调用channelRead
像一条链条一样传播下去。
@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(), msg);return this;}
查看findContextInbound
细节可知,这些ChannelInboundHandlerAdapter
都是通过AbstractChannelHandlerContext
封装再进行串联,确保每个消息都会通过这条链传播到所有ChannelInboundHandlerAdapter
:
private AbstractChannelHandlerContext findContextInbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.next;} while (!ctx.inbound);return ctx;}
对应的类图如下所示:
参考文献
netty的设计模式:https://blog.csdn.net/fst438060684/article/details/81155203?csdn_share_tail={"type"%3A"blog"%2C"rType"%3A"article"%2C"rId"%3A"81155203"%2C"source"%3A"shark_chili3007"}&fromshare=blogdetail
netty设计模式-单例模式:https://blog.csdn.net/fst438060684/article/details/81156825
迭代器模式:https://www.runoob.com/design-pattern/iterator-pattern.html
设计模型之责任链模式含UML完整实例):https://blog.csdn.net/atu1111/article/details/105558539
一次性搞懂设计模式–迭代器模式:https://zhuanlan.zhihu.com/p/537080924
行为型模式:https://design-patterns.readthedocs.io/zh_CN/latest/behavioral_patterns/behavioral.html