Netty+http+websocket聊天室案例
- 一、实现流程
- 二、实现效果
- 三、实现代码
- ChatServer
- HttpHandler
- WebSocktHandler
- ChatMessageProcessor
一、实现流程
本案例可以 掌握netty对http协议的处理;掌握netty对websocket协议的处理;
1、浏览器地址栏输入netty服务器的ip和端口,通过http短连接访问到了Netty服务器;
2、Netty服务器处理http请求并返回一个页面的html源码给浏览器,浏览器解析html源码并渲染出页面效果;
3、紧接着页面上的js被执行,js发起一个websocket连接到netty服务器,netty服务器接收到连接,建立起一个浏览器到netty服务器的websocket的长连接;
4、页面上通过websocket协议发送聊天信息到netty服务器;
5、netty服务接收到页面的聊天信息后,将信息写向浏览器,如果有多个浏览器在访问netty服务器,会把信息像广播一样写给每一个浏览器;
6、当netty服务端写出数据到浏览器,实际上是触发页面上js的websocket.onmessage()方法,接收到数据后,通过js在页面渲染数据即可;
二、实现效果
三、实现代码
ChatServer
import com.mytest.server.handler.HttpHandler;
import com.mytest.server.handler.WebSocktHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.log4j.Logger;import java.io.IOException;/*** 聊天室服务端* Netty+http+websocket聊天室案例*/
public class ChatServer {private static Logger LOG = Logger.getLogger(ChatServer.class);private static final int PORT = 8088;public static void main(String[] args) throws IOException {new ChatServer().start();}public void start() {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//Http请求编码和解码pipeline.addLast(new HttpServerCodec());//将同一个http请求或响应的多个消息对象变成一个fullHttpRequest完整的消息对象pipeline.addLast(new HttpObjectAggregator(64 * 1024));//用于处理大数据流,比如1G大小的文件传输时会切成小块处理,加上该handler就不用考虑大文件传输的问题pipeline.addLast(new ChunkedWriteHandler());//我们自己编写的http请求逻辑处理Handlerpipeline.addLast(new HttpHandler());//WebSocket请求处理(是netty内置的handler,直接使用即可,websocket的请求路径是 ws://ip:port/im)pipeline.addLast(new WebSocketServerProtocolHandler("/im"));//我们自己编写的websocket请求逻辑处理Handlerpipeline.addLast(new WebSocktHandler());}});ChannelFuture future = b.bind(PORT).sync();LOG.info("服务已启动, 监听端口" + PORT);future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}
HttpHandler
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import org.apache.log4j.Logger;import java.io.File;
import java.io.RandomAccessFile;
import java.net.URL;/*** 对Http请求的处理Handler*/
public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static Logger LOG = Logger.getLogger(HttpHandler.class);//web页面文件所在目录private final String webroot = "web";//获取class路径private URL baseURL = HttpHandler.class.getProtectionDomain().getCodeSource().getLocation();private File getResource(String fileName) throws Exception {String path = baseURL.toURI() + webroot + "/" + fileName;path = !path.contains("file:") ? path : path.substring(5);path = path.replaceAll("//", "/");return new File(path);}@Overridepublic void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {String uri = request.uri();RandomAccessFile file = null;try {String page = uri.equals("/") ? "chat.html" : uri;file = new RandomAccessFile(getResource(page), "r");} catch (Exception e) {ctx.fireChannelRead(request.retain());return;}HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);String contextType = "text/html;";if (uri.endsWith(".css")) {contextType = "text/css;";} else if (uri.endsWith(".js")) {contextType = "text/javascript;";} else if (uri.toLowerCase().matches("(jpg|png|gif)$")) {String ext = uri.substring(uri.lastIndexOf("."));contextType = "image/" + ext;}response.headers().set(HttpHeaderNames.CONTENT_TYPE, contextType + "charset=utf-8;");boolean keepAlive = HttpUtil.isKeepAlive(request);if (keepAlive) {response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);}ctx.write(response);ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));//ctx.write(new ChunkedNioFile(file.getChannel()));ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);if (!keepAlive) {future.addListener(ChannelFutureListener.CLOSE);}file.close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {LOG.info("Client:" + ctx.channel().remoteAddress() + "异常");cause.printStackTrace();ctx.close();}
}
WebSocktHandler
/*** WebSocket的请求处理**/
public class WebSocktHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static Logger LOG = Logger.getLogger(WebSocktHandler.class);//消息的处理private ChatMessageProcessor processor = new ChatMessageProcessor();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {processor.sendMsg(ctx.channel(), msg.text());}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {Channel client = ctx.channel();String addr = processor.getAddress(client);LOG.info("WebSocket Client:" + addr + "加入");}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {Channel client = ctx.channel();processor.logout(client);LOG.info("WebSocket Client:" + processor.getNickName(client) + "离开");}@Overridepublic void channelActive(ChannelHandlerContext ctx) {Channel client = ctx.channel();String addr = processor.getAddress(client);LOG.info("WebSocket Client:" + addr + "上线");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {Channel client = ctx.channel();String addr = processor.getAddress(client);LOG.info("WebSocket Client:" + addr + "掉线");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {Channel client = ctx.channel();String addr = processor.getAddress(client);LOG.info("WebSocket Client:" + addr + "异常");cause.printStackTrace();ctx.close();}
}
ChatMessageProcessor
import com.alibaba.fastjson.JSONObject;
import com.mytest.protocol.ChatDecoder;
import com.mytest.protocol.ChatEncoder;
import com.mytest.protocol.ChatMessage;
import com.mytest.protocol.ChatType;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;public class ChatMessageProcessor {//记录在线用户private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);//定义一些扩展属性private final AttributeKey<String> NICK_NAME = AttributeKey.valueOf("nickName");private final AttributeKey<String> IP_ADDR = AttributeKey.valueOf("ipAddr");private final AttributeKey<JSONObject> ATTRS = AttributeKey.valueOf("attrs");//解码器private ChatDecoder decoder = new ChatDecoder();//编码器private ChatEncoder encoder = new ChatEncoder();/*** 获取用户昵称** @param client* @return*/public String getNickName(Channel client) {return client.attr(NICK_NAME).get();}/*** 获取用户远程IP地址** @param client* @return*/public String getAddress(Channel client) {return client.remoteAddress().toString().replaceFirst("/", "");}/*** 获取扩展属性** @param client* @return*/public JSONObject getAttrs(Channel client) {try {return client.attr(ATTRS).get();} catch (Exception e) {return null;}}/*** 设置扩展属性** @param client* @return*/private void setAttrs(Channel client, String key, Object value) {try {JSONObject json = client.attr(ATTRS).get();json.put(key, value);client.attr(ATTRS).set(json);} catch (Exception e) {JSONObject json = new JSONObject();json.put(key, value);client.attr(ATTRS).set(json);}}/*** 登录退出通知** @param client*/public void logout(Channel client) {if (getNickName(client) == null) {return;}for (Channel channel : onlineUsers) {ChatMessage request = new ChatMessage(ChatType.SYSTEM.getName(), sysTime(), onlineUsers.size(), getNickName(client) + "离开");String content = encoder.encode(request);channel.writeAndFlush(new TextWebSocketFrame(content));}onlineUsers.remove(client);}/*** 发送消息** @param client* @param msg*/public void sendMsg(Channel client, ChatMessage msg) {sendMsg(client, encoder.encode(msg));}/*** 发送消息** @param client* @param msg*/public void sendMsg(Channel client, String msg) {ChatMessage request = decoder.decode(msg);if (null == request) {return;}String addr = getAddress(client);//登录消息if (request.getCmd().equals(ChatType.LOGIN.getName())) {client.attr(NICK_NAME).getAndSet(request.getSender());client.attr(IP_ADDR).getAndSet(addr);onlineUsers.add(client);for (Channel channel : onlineUsers) {if (channel != client) {request = new ChatMessage(ChatType.SYSTEM.getName(), sysTime(), onlineUsers.size(), getNickName(client) + "加入");} else {request = new ChatMessage(ChatType.SYSTEM.getName(), sysTime(), onlineUsers.size(), "已与服务器建立连接!");}String content = encoder.encode(request);//把数据通过websocket协议写到浏览器channel.writeAndFlush(new TextWebSocketFrame(content));}//聊天消息} else if (request.getCmd().equals(ChatType.CHAT.getName())) {for (Channel channel : onlineUsers) {if (channel == client) {request.setSender("你");} else {request.setSender(getNickName(client));}request.setTime(sysTime());String content = encoder.encode(request);channel.writeAndFlush(new TextWebSocketFrame(content));}//送鲜花消息} else if (request.getCmd().equals(ChatType.FLOWER.getName())) {JSONObject attrs = getAttrs(client);long currTime = sysTime();if (null != attrs) {long lastTime = attrs.getLongValue("lastFlowerTime");//10秒之内不允许重复刷鲜花int secends = 10;long sub = currTime - lastTime;if (sub < 2000 * secends) {request.setSender("你");request.setCmd(ChatType.SYSTEM.getName());request.setContent("送鲜花太频繁了, " + (secends - Math.round(sub / 1000)) + "秒后再试.");String content = encoder.encode(request);client.writeAndFlush(new TextWebSocketFrame(content));return;}}//正常送花for (Channel channel : onlineUsers) {if (channel == client) {request.setSender("你");request.setContent("你给大家送了一波鲜花雨");setAttrs(client, "lastFlowerTime", currTime);} else {request.setSender(getNickName(client));request.setContent(getNickName(client) + "送来一波鲜花雨");}request.setTime(sysTime());String content = encoder.encode(request);channel.writeAndFlush(new TextWebSocketFrame(content));}}}/*** 获取系统时间** @return*/private Long sysTime() {return System.currentTimeMillis();}
}