前言
最近在学习Netty框架,使用的学习教材是李林锋著的《Netty权威指南》。国内关于netty的书籍几乎没有,这本书算是比较好的入门资源了。
我始终觉得,学习一个新的框架,除了研究框架的源代码之外,还应该使用该框架自己开发一个小项目。为此,我选择Netty作为通信框架,开发一个模仿QQ的聊天室。
基本框架是这样设计的,使用Netty作为通信网关,使用JavaFX开发客户端界面,使用Spring作为IOC容器,使用MyBatics支持持久化。本文将着重介绍Netty网关的私有协议栈开发。
Netty服务端程序示例
启动Reactor线程组监听客户端链路的连接与IO网络读写。
public class ChatServer {private Logger logger = LoggerFactory.getLogger(ChatServer.class);//避免使用默认线程数参数private EventLoopGroup bossGroup = new NioEventLoopGroup(1);private EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());public void bind(int port) throws Exception {logger.info("服务端已启动,正在监听用户的请求......");try{ServerBootstrap b = new ServerBootstrap();b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());ChannelFuture f = b.bind(new InetSocketAddress(port)).sync();f.channel().closeFuture().sync();}catch(Exception e){logger.error("", e);throw e;}finally{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public void close() {try{if (bossGroup != null) {bossGroup.shutdownGracefully();}if (workerGroup != null) {workerGroup.shutdownGracefully();}}catch(Exception e){logger.error("", e);}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{@Overrideprotected void initChannel(SocketChannel arg0) throws Exception {ChannelPipeline pipeline = arg0.pipeline();pipeline.addLast(new PacketDecoder(1024*4,0,4,0,4));pipeline.addLast(new LengthFieldPrepender(4));pipeline.addLast(new PacketEncoder());//客户端300秒没收发包,便会触发UserEventTriggered事件到MessageTransportHandlerpipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, 300));pipeline.addLast(new IoHandler());}}}
通信私有协议栈的设计
私有协议栈主要用于跨进程的数据通信,只能用于企业内部,但协议设计比较灵巧方便。
在这里,消息定义将消息头和消息体融为一体。将消息的第一个short数据视为消息的类型,服务端将根据消息类型处理不同的业务逻辑。
定义Packet抽象类,抽象方法readFromBuff(ByteBuf buf) 和 writePacketMsg(ByteBuf buf) 作为读写数据的抽象行为,负责处理数据包的序列化与反序列化。
而具体的读写方式由相应的子类去实现。代码如下:
public abstract class Packet {public void writeToBuff(ByteBuf buf){buf.writeShort(getPacketType().getType());writePacketMsg(buf);}abstract public void writePacketMsg(ByteBuf buf);abstract public void readFromBuff(ByteBuf buf);abstract public PacketType getPacketType();abstract public void execPacket();protected String readUTF8(ByteBuf buf){int strSize = buf.readInt();byte[] content = new byte[strSize];buf.readBytes(content);try {return new String(content,"UTF-8");} catch (UnsupportedEncodingException e) {e.printStackTrace();return "";}}protected void writeUTF8(ByteBuf buf,String msg){byte[] content ;try {content = msg.getBytes("UTF-8");buf.writeInt(content.length);buf.writeBytes(content);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}
需要注意的是,由于Netty通信本质上传送的是byte数据,无法直接传送String字段串,需要先经过简单的编解码成字节数组才能传送。
消息编解码
数据发送方发送载体为ByteBuf,因此在发包时,需要将POJO对象进行编码。本项目使用Netty自带的编码器MessageToByteEncoder,实现自定义的编码方式。代码如下:
package com.kingston.net;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class PacketEncoder extends MessageToByteEncoder<Packet> {@Overrideprotected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out)throws Exception {msg.writeToBuff(out);}}
接收方实际接收ByteBuf数据,需要将其解码成对应的POJO对象,才能处理对应的逻辑。本项目使用Netty自带的解码器ByteToMessageDecoder(LengthFieldBasedFrameDecoder继承自ByteToMessageDecoder,其作用见下文),实现自定义的解码方式。代码如下:
package com.kingston.net.codec;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import com.kingston.net.Packet;
import com.kingston.net.PacketManager;public class PacketDecoder extends LengthFieldBasedFrameDecoder{public PacketDecoder(int maxFrameLength,int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip) {super(maxFrameLength, lengthFieldOffset, lengthFieldLength,lengthAdjustment, initialBytesToStrip);}@Overridepublic Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {ByteBuf frame = (ByteBuf)(super.decode(ctx, in));if(frame.readableBytes() <= 0) return null ;short packetType = frame.readShort();Packet packet = PacketManager.createNewPacket(packetType);packet.readFromBuff(frame);return packet;}}
通信协议将包头的第一个short数据视为包类型,根据包类型反射拿到对应的包class定义,调用抽象读取方法完成消息体的读取。
数据包的序列化与反序列化
对于一个给定的消息类型。由于网络发送的数据是字节流,客户端发送的时候,需要把消息序列化为byte数组,而服务端读取到字节流后,需要反序列化成对应的消息对象。
消息体序列化方案的选择,也是一个值得花大篇幅介绍的话题。下面对比目前常用的几种方式。
优点 | 缺点 | |
jdk | 官方血统 | 通信双方都是Java平台; 需要实现Serializable接口; 速度慢,体积大; |
json | 自说明,可读性高 | 数据量大,不利于传输 |
protobuf | 压缩率高 | 手动编写.proto文件; 数据肉眼无法识别 |
本文采取的是,自己手写消息的编解码。比如客户端writeInt(),服务端就对应readInt()。只要客户端,服务端以统一的读写顺序即可。(这种方式写起来很啰嗦,但灵活性很高!)
消息协议的解析与执行
消息使用第一个short数据作为消息的类型。为了区分每一个消息协议包,需要有一个数据结构缓存各种协议的类型与对应的消息包定义。为此,使用枚举类定义所有的协议包。代码如下:
public enum PacketType {//业务上行数据包//链接心跳包ReqHeartBeat((short)0x0001, ReqHeartBeatPacket.class),//新用户注册ReqUserRegister((short)0x0100, ReqUserRegisterPacket.class),//用户登陆ReqUserLogin((short)0x0101, ReqUserLoginPacket.class),//聊天ReqChat((short)0x0102, ReqChatPacket.class),//业务下行数据包RespHeartBeat((short)0x2001, RespHeartBeatPacket.class),//新用户注册ResUserRegister((short)0x2100, ResUserRegisterPacket.class),RespLogin((short)0x2102, RespUserLoginPacket.class),RespChat((short)0x2103, RespChatPacket.class),;private short type;private Class<? extends AbstractPacket> packetClass;private static Map<Short,Class<? extends AbstractPacket>> PACKET_CLASS_MAP = new HashMap<Short,Class<? extends AbstractPacket>>();public static void initPackets() {Set<Short> typeSet = new HashSet<Short>();Set<Class<?>> packets = new HashSet<>();for(PacketType p:PacketType.values()){Short type = p.getType();if(typeSet.contains(type)){throw new IllegalStateException("packet type 协议类型重复"+type);}Class<?> packet = p.getPacketClass();if (packets.contains(packet)) {throw new IllegalStateException("packet定义重复"+p);}PACKET_CLASS_MAP.put(type,p.getPacketClass());typeSet.add(type);packets.add(packet);}}PacketType(short type,Class<? extends AbstractPacket> packetClass){this.setType(type);this.packetClass = packetClass;}public short getType() {return type;}public void setType(short type) {this.type = type;}public Class<? extends AbstractPacket> getPacketClass() {return packetClass;}public void setPacketClass(Class<? extends AbstractPacket> packetClass) {this.packetClass = packetClass;}public static Class<? extends AbstractPacket> getPacketClassBy(short packetType){return PACKET_CLASS_MAP.get(packetType);}}
PacketType枚举类中有一个初始化方法initPackets(),用于缓存所有包类型与对应的实体类的映射关系。这样,就可以根据包类型,直接拿到对应的Packet子类。
经过解码反射得到完整的消息包定义后,就可以通过反射机制,调用相应的业务方法。该步骤由包执行器完成,代码如下:
package com.kingston.net;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;public class PacketExecutor {public static void execPacket(Packet pact){if(pact == null) return;try {Method m = pact.getClass().getMethod("execPacket");m.invoke(pact, null);} catch (NoSuchMethodException | SecurityException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} catch (IllegalArgumentException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}
包执行器其实是根据反射,调用对应子类消息包的业务处理方法。
到这里,读者应该可以感受抽象包Packet的定义是该通信机制的精华部分。正是有了abstract public void readFromBuff(ByteBuf buf); abstract public void writePacketMsg(ByteBuf buf); abstract public void execPacket()三个抽象方法,才能将各种消息包的读写、业务逻辑相互隔离。
写到这里,我不禁回想起大学期间做过的一个聊天室课程设计。当初,我采用Java作为服务器,flash作为客户端,基于socket进行通信。通信消息体只有一个长字符串,通信双方根据不同消息类型将字符串作多次分隔。如果当初协议类型再多几个的话,估计想死的心都有了。
Netty的半包读写解决之道
MessageToByteEncoder 和 ByteToMessageDecoder两个类只是解决POJO的编解码,并没有处理粘包,拆包的异常情况。在本例中,使用LengthFieldBasedFrameDecoder和LengthFieldPrepender两个工具类,就可以轻松解决半包读写异常。
服务端与客户端数据通信方式
客户端tcp链路建立后,服务端必须缓存对应的ChannelHandlerContext对象。这样,服务端就可以向所有连接的用户发送数据了。发送数据基础服务类代码如下:
package com.kingston.base;import io.netty.channel.ChannelHandlerContext;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;import com.kingston.net.Packet;
import com.kingston.util.StringUtil;public class ServerManager {//缓存所有登录用户对应的通信上下文环境(主要用于业务数据处理)private static Map<Integer,ChannelHandlerContext> USER_CHANNEL_MAP = new ConcurrentHashMap<>();//缓存通信上下文环境对应的登录用户(主要用于服务)private static Map<ChannelHandlerContext,Integer> CHANNEL_USER_MAP = new ConcurrentHashMap<>();public static void sendPacketTo(Packet pact,String userId){if(pact == null || StringUtil.isEmpty(userId)) return;Map<Integer,ChannelHandlerContext> contextMap = USER_CHANNEL_MAP;if(StringUtil.isEmpty(contextMap)) return;ChannelHandlerContext targetContext = contextMap.get(userId);if(targetContext == null) return;targetContext.writeAndFlush(pact);}/*** 向所有在线用户发送数据包*/public static void sendPacketToAllUsers(Packet pact){if(pact == null ) return;Map<Integer,ChannelHandlerContext> contextMap = USER_CHANNEL_MAP;if(StringUtil.isEmpty(contextMap)) return;contextMap.values().forEach( (ctx) -> ctx.writeAndFlush(pact));}/*** 向单一在线用户发送数据包*/public static void sendPacketTo(Packet pact,ChannelHandlerContext targetContext ){if(pact == null || targetContext == null) return;targetContext.writeAndFlush(pact);}public static ChannelHandlerContext getOnlineContextBy(String userId){return USER_CHANNEL_MAP.get(userId);}public static void addOnlineContext(Integer userId,ChannelHandlerContext context){if(context == null){throw new NullPointerException();}USER_CHANNEL_MAP.put(userId,context);CHANNEL_USER_MAP.put(context, userId);}/*** 注销用户通信渠道*/public static void ungisterUserContext(ChannelHandlerContext context ){if(context != null){int userId = CHANNEL_USER_MAP.getOrDefault(context,0);CHANNEL_USER_MAP.remove(context);USER_CHANNEL_MAP.remove(userId);context.close();}}}
模拟用户登录的服务端demo
1. demo流程为客户端发送一个以Req开头命名的上行包到服务端,服务端接受数据后,直接发送一个以Resp开头命名的响应包到客户端。
上行包ReqUserLogin代码如下:
public class ReqUserLoginPacket extends Packet{private long userId;private String userPwd; @Overridepublic void writePacketBody(ByteBuf buf) {buf.writeLong(userId);writeUTF8(buf, userPwd);}@Overridepublic void readPacketBody(ByteBuf buf) {this.userId = buf.readLong();this.userPwd =readUTF8(buf);System.err.println("id="+userId+",pwd="+userPwd);}@Overridepublic PacketType getPacketType() {return PacketType.ReqUserLogin;}@Overridepublic void execPacket() {}public String getUserPwd() {return userPwd;}public void setUserPwd(String userPwd) {this.userPwd = userPwd;}public long getUserId() {return userId;}public void setUserId(long userId) {this.userId = userId;}}
2. 业务逻辑服务,收到登录包后,调用对应的业务处理方法进行处理
@Component
public class LoginService {@Autowiredprivate UserDao userDao;public void validateLogin(Channel channel, long userId, String password) {User user = validate(userId, password);IoSession session = ChannelUtils.getSessionBy(channel);RespUserLoginPacket resp = new RespUserLoginPacket();if(user != null) {resp.setIsValid((byte)1);resp.setAlertMsg("登录成功");ServerManager.INSTANCE.registerSession(user, session);}else{resp.setAlertMsg("帐号或密码错误");}ServerManager.INSTANCE.sendPacketTo(session, resp);}/*** 验证帐号密码是否一致*/private User validate(long userId, String password){if (userId <= 0 || StringUtils.isEmpty(password)) {return null;}User user = userDao.findById(userId);if (user != null &&user.getPassword().equals(password)) {return user;}return null;}}
3. 业务处理后,下发一个响应包。下行包RespUserLogin代码如下:
public class RespUserLoginPacket extends AbstractPacket{private String alertMsg;private byte isValid;@Overridepublic void writePacketBody(ByteBuf buf) {writeUTF8(buf, alertMsg);buf.writeByte(isValid);}@Overridepublic void readPacketBody(ByteBuf buf) {this.alertMsg = readUTF8(buf);this.isValid = buf.readByte();}@Overridepublic PacketType getPacketType() {return PacketType.RespUserLogin;}@Overridepublic void execPacket() {System.err.println("receive login "+ alertMsg);LoginManager.getInstance().handleLoginResponse(this);}public String getAlertMsg() {return alertMsg;}public void setAlertMsg(String alertMsg) {this.alertMsg = alertMsg;}public byte getIsValid() {return isValid;}public void setIsValid(byte isValid) {this.isValid = isValid;}}
至此,服务端主要通信逻辑基本完成。
模拟用户登录的客户端demo
客户端私有协议跟编解码方式跟服务端完全一致。客户端主要关注数据界面的展示。下面只给出启动应用程序的代码,以及测试通信的示例代码。
1.启动Reactor线程组建立与服务端的的连接,以及处理IO网络读写。
public class SocketClient { /** 当前重接次数*/private int reconnectTimes = 0;public void start() {try{connect(ClientConfigs.REMOTE_SERVER_IP,ClientConfigs.REMOTE_SERVER_PORT);}catch(Exception e){}}public void connect(String host,int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(1); try{ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel arg0) throws Exception { ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast(new PacketDecoder(1024*1, 0,4,0,4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new PacketEncoder()); pipeline.addLast(new ClientTransportHandler()); } }); ChannelFuture f = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(ClientConfigs.LOCAL_SERVER_IP, ClientConfigs.LOCAL_SERVER_PORT)) .sync(); f.channel().closeFuture().sync(); }catch(Exception e){ e.printStackTrace(); }finally{ // group.shutdownGracefully(); //这里不再是优雅关闭了 //设置最大重连次数,防止服务端正常关闭导致的空循环if (reconnectTimes < ClientConfigs.MAX_RECONNECT_TIMES) {reConnectServer(); }} }
}
2.处理业务逻辑的ClientTransportHandler代码如下:
public class ClientTransportHandler extends ChannelHandlerAdapter{public ClientTransportHandler(){}@Overridepublic void channelActive(ChannelHandlerContext ctx){//注册sessionClientBaseService.INSTANCE.registerSession(ctx.channel());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception{AbstractPacket packet = (AbstractPacket)msg;PacketManager.INSTANCE.execPacket(packet);}@Overridepublic void close(ChannelHandlerContext ctx,ChannelPromise promise){System.err.println("TCP closed...");ctx.close(promise);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.err.println("客户端关闭1");}@Overridepublic void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.disconnect(promise);System.err.println("客户端关闭2");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.err.println("客户端关闭3");Channel channel = ctx.channel();cause.printStackTrace();if(channel.isActive()){System.err.println("simpleclient"+channel.remoteAddress()+"异常");}}
}
3. 先启动服务器,再启动JavaFX客户端(ClientStartup),即可看到登录界面
至此,聊天室的登录流程基本完成。限于篇幅,此demo例子并没有出现spring,mybatic相关代码,但是私有协议通信方式代码已全部给出。有了一个用户登录的例子,相信构建其他得业务逻辑也不会太困难。
最后,说下写代码的历程。这个demo是我春节宅家期间,利用零碎时间做的,平均一天一个小时。很多开发人员应该有这样的经历,看书的时候往往觉得都能理解,但实际上自己动手就会遇到各种卡思路。在做这个demo时,我更多时间是花在查资料上。
我也会继续往这个项目添加功能,让它看起来越来越“炫”。(^-^)
全部代码已在github上托管(代码经过多次重构,与博客上的代码略有不同)
完整服务端代码请移步 --> netty聊天室服务器
完整客户端代码请移步 --> netty聊天室客户端