完整的TCP服务端解析代码
1.maven依赖 不要的依赖自行删除,懒的删了
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.13.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.site</groupId><artifactId>TcpServer</artifactId><version>2.0</version><name>TcpServer</name><description>Tcp系统服务端</description><properties><java.version>1.8</java.version><protobuf.version>3.3.0</protobuf.version><hutool.version>5.7.16</hutool.version><fastjson.version>1.2.78</fastjson.version><plumelog.version>3.3</plumelog.version><druid.version>1.2.8</druid.version><redisson.version>3.7.3</redisson.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><!-- 阿里JSON解析器 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- redis 缓存操作 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- pool 对象池 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- rabbitmq --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.plumelog</groupId><artifactId>plumelog-logback</artifactId><version>${plumelog.version}</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>${protobuf.version}</version></dependency><!-- 阿里数据库连接池 --><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><!-- Mysql驱动包 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.32</version></dependency><!-- 引入 websocket 依赖类 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><!--常用工具类 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><!-- SpringBoot 拦截器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><!-- redisson --><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>${redisson.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>javax.activation</groupId><artifactId>javax.activation-api</artifactId><version>1.2.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><!-- <version>2.12.6</version>--></dependency><dependency><groupId>com.mybatis-flex</groupId><artifactId>mybatis-flex-spring-boot-starter</artifactId><version>1.9.3</version></dependency><dependency><groupId>com.mybatis-flex</groupId><artifactId>mybatis-flex-processor</artifactId><version>1.9.3</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><fork>true</fork> <!-- 如果没有该配置,devtools不会生效 --></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><executions><execution><id>default-resources</id><phase>validate</phase><goals><goal>copy-resources</goal></goals><configuration><outputDirectory>target/classes</outputDirectory><useDefaultDelimiters>false</useDefaultDelimiters><delimiters><delimiter>${*}</delimiter></delimiters><resources><resource><directory>src/main/resources/</directory><filtering>true</filtering></resource></resources></configuration></execution></executions></plugin></plugins><finalName>${project.artifactId}</finalName></build><profiles><profile><id>dev</id><properties><profileActive>dev</profileActive></properties><activation><activeByDefault>true</activeByDefault></activation></profile><profile><id>pro</id><properties><profileActive>pro</profileActive></properties></profile></profiles>
</project>
2.cmd2实时数据接受
package com.site.tcp.command;import com.site.tcp.server.TcpReceiveStructure;
import com.site.tcp.utils.*;
import lombok.extern.slf4j.Slf4j;/*** @author 程涛* @Title: CMD2* @Description: 实时信息上报* @date 2021年2月23日*/
@Slf4j
public class CMD2 {public static void cmd2(TcpReceiveStructure cmdReceiveStructure) {
// 车辆VIN(车架号)String VIN = cmdReceiveStructure.VIN;byte[] date = cmdReceiveStructure.date;byte[] timeByte = new byte[6];System.arraycopy(date, 0, timeByte, 0, 6);
// 数据采集时间String time = JacTools.hexToTime(timeByte);int index = 6;while (index < date.length) {int type = JacTools.subBytesToInt(date, index, 1);
// 0x01 整车数据 详见7.2.3.1
// 0x02 驱动电机数据 详见7.2.3.2,且停车充电过程无需传输该数据
// 0x03 燃料电池数据 详见7.2.3.3
// 0x04 发动机数据 详见7.2.3.4,停车充电过程无需传输该数据
// 0x05 车辆位置数据 详见7.2.3.5
// 0x06 极值数据 详见7.2.3.6
// 0x07 报警数据 详见7.2.3.7
// 0x08~0x09 终端数据预留
// 0x0A~0x2F 平台交换协议自定义数据switch (type) {case 1:byte[] carinfoByte = new byte[20];System.arraycopy(date, index + 1, carinfoByte, 0, 20);
// 解析整车数据AnalyzeVehicleData.getCarInfo(carinfoByte, VIN, time, 2);index = index + 21;break;case 2:
// 驱动电机个数index = DriveMotorUtils.getDriveMotor(date, VIN, time, index);break;case 3:
// 燃料电池数据index = FuelCellUtils.getFuelCell(date, VIN, time, index);break;case 4:
// 发动机数据index = EngineDataUtils.getEngineData(date, VIN, time, index);break;case 5:byte[] positioningInfoByte = new byte[9];System.arraycopy(date, index + 1, positioningInfoByte, 0, 9);
// 解析定位数据AnalyzePositioningInfo.getPositioningInfo(positioningInfoByte, VIN, time);index = index + 10;break;case 6:
// 极值数据byte[] ExtremeInfoByte = new byte[14];System.arraycopy(date, index + 1, ExtremeInfoByte, 0, 14);ExtremeInfoUtils.getExtremeInfo(ExtremeInfoByte, VIN, time);index = index + 15;break;case 7:index = AlarmInfoUtils.getAlarmInfo(date, VIN, time, index, 2);default:index = date.length;break;}}}
}
3.CMD3延迟数据补发
package com.site.tcp.command;import com.site.tcp.server.TcpReceiveStructure;
import com.site.tcp.utils.*;
import lombok.extern.slf4j.Slf4j;/*** @author 程涛* @Title: CMD2* @Description: 实时信息上报* @date 2021年2月23日*/
@Slf4j
public class CMD3 {public static void cmd3(TcpReceiveStructure cmdReceiveStructure) {
// 车辆VIN(车架号)String VIN = cmdReceiveStructure.VIN;byte[] date = cmdReceiveStructure.date;byte[] timeByte = new byte[6];System.arraycopy(date, 0, timeByte, 0, 6);
// 数据采集时间String time = JacTools.hexToTime(timeByte);int index = 6;while (index < date.length) {int type = JacTools.subBytesToInt(date, index, 1);
// 0x01 整车数据 详见7.2.3.1
// 0x02 驱动电机数据 详见7.2.3.2,且停车充电过程无需传输该数据
// 0x03 燃料电池数据 详见7.2.3.3
// 0x04 发动机数据 详见7.2.3.4,停车充电过程无需传输该数据
// 0x05 车辆位置数据 详见7.2.3.5
// 0x06 极值数据 详见7.2.3.6
// 0x07 报警数据 详见7.2.3.7
// 0x08~0x09 终端数据预留
// 0x0A~0x2F 平台交换协议自定义数据 switch (type) {case 1:byte[] carinfoByte = new byte[20];System.arraycopy(date, index + 1, carinfoByte, 0, 20);
// 解析整车数据AnalyzeVehicleData.getCarInfo(carinfoByte, VIN, time, 3);index = index + 21;break;case 2:
// 驱动电机数据index = DriveMotorUtils.getDriveMotor(date, VIN, time, index);break;case 3:
// 燃料电池数据index = FuelCellUtils.getFuelCell(date, VIN, time, index);break;case 4:
// 发动机数据index = EngineDataUtils.getEngineData(date, VIN, time, index);break;case 5:byte[] positioningInfoByte = new byte[9];System.arraycopy(date, index + 1, positioningInfoByte, 0, 9);
// 解析定位数据AnalyzePositioningInfo.getPositioningInfo(positioningInfoByte, VIN, time);index = index + 10;break;case 6:
// 极值数据byte[] ExtremeInfoByte = new byte[14];System.arraycopy(date, index + 1, ExtremeInfoByte, 0, 14);ExtremeInfoUtils.getExtremeInfo(ExtremeInfoByte, VIN, time);index = index + 15;break;case 7:index = AlarmInfoUtils.getAlarmInfo(date, VIN, time, index, 3);default:index = date.length;break;}}}}
4 handler模块
package com.site.tcp.handler;import com.site.tcp.server.TcpReceiveStructure;
import com.site.tcp.utils.JacTools;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;import java.util.List;@Slf4j
public class RequestDecoder extends ByteToMessageDecoder {// 记录上次未读完的字节private ByteBuf tempMessage = Unpooled.buffer();// 申请一块内存保存分包数据,根据最后一次0d出现的位置选择保留哪部分数据private int CurrentSize = 0; // 本次新传输的字节数private int TempMessageSize = 0; // 上次剩余的字节数private ByteBuf WorkByteBuf = Unpooled.buffer();// 最终操作的ByteBufprivate byte[] WorkPackBuf = null;// 最终操作的字节数组/*** 用来进行解包操作,得到完整的合法数据包** @param channelHandlerContext* @param byteBuf* @param list* @throws Exception*/@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)throws Exception {// log.info("=========开始分析数据========");WorkByteBuf.clear();CurrentSize = byteBuf.readableBytes();// 当前流可读取字节数TempMessageSize = tempMessage.readableBytes();// 上次保存的可读字节数// 进行合包操作if (TempMessageSize == 0) {WorkPackBuf = new byte[CurrentSize];// ByteBuf的转换的字节数组WorkByteBuf.writeBytes(byteBuf);WorkByteBuf.readBytes(WorkPackBuf);} else if (TempMessageSize > 0) {WorkByteBuf.writeBytes(tempMessage);WorkByteBuf.writeBytes(byteBuf);WorkPackBuf = new byte[CurrentSize + TempMessageSize];WorkByteBuf.readBytes(WorkPackBuf);} else {tempMessage.clear();}if (WorkPackBuf != null) {if (JacTools.subBytesToInt(WorkPackBuf, 0, 1) == 0x23 & JacTools.subBytesToInt(WorkPackBuf, 1, 1) == 0x23) {
// 车辆VIN码String VIN = JacTools.subBytesToString(WorkPackBuf, 4, 17);
// 命令标识int cmd = JacTools.subBytesToInt(WorkPackBuf, 2, 1);
// 消息长度int length = JacTools.subBytesToInt(WorkPackBuf, 22, 2);
// 数据域byte[] data = new byte[length];System.arraycopy(WorkPackBuf, 24, data, 0, length);
// 数据校验if (checksum(WorkPackBuf, length)) {TcpReceiveStructure cmdReceiveStructure = new TcpReceiveStructure(VIN, cmd, data);list.add(cmdReceiveStructure);log.info("车辆数据上行》" + JacTools.bytesToHexString(WorkPackBuf));}}}}/*** 安全校验** @param WorkPackBuf* @param length* @return*/private static Boolean checksum(byte[] WorkPackBuf, int length) {
// 数据和域字节数组byte[] checksumByte = new byte[1];System.arraycopy(WorkPackBuf, length + 24, checksumByte, 0, 1);
// 数据和域字符String checksumStr = JacTools.bytesToHexString(checksumByte);// 数据域byte[] data = new byte[length + 22];System.arraycopy(WorkPackBuf, 2, data, 0, length + 22);String bcc = JacTools.getBCC(data);return checksumStr.toUpperCase().equals(bcc);}}
package com.site.tcp.handler;import com.site.tcp.utils.JacTools;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import lombok.extern.slf4j.Slf4j;/*** @program: ResponseEncoder* @description: 发送数据* @author: 程涛* @create: 2020-12-10**/@Slf4j
public class ResponseEncoder extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {String str = ((String) msg).toUpperCase();byte[] mess = JacTools.hexStringToByte(str);ByteBuf encoded = ctx.alloc().buffer(mess.length);encoded.writeBytes(mess);ctx.write(encoded);ctx.flush();log.info("车辆数据下行》 " + str);}
}
package com.site.tcp.handler;import java.util.concurrent.TimeUnit;import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;/*** 空闲检测** @author pjmike* @create 2018-10-25 16:21*/
@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {/*** 设置空闲检测时间为 60s*/private static final int READER_IDLE_TIME = 60;public ServerIdleStateHandler() {super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);}@Overrideprotected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {ctx.close();log.error("车辆数据服务空闲检测超时断开连接");}
}
package com.site.tcp.handler;import com.site.tcp.command.CMD2;
import com.site.tcp.command.CMD3;
import com.site.tcp.server.TcpReceiveStructure;
import com.site.tcp.utils.JacTools;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;@Component
@ChannelHandler.Sharable
@Slf4j
public class TcpHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {TcpReceiveStructure cmdReceiveStructure = (TcpReceiveStructure) msg;switch (cmdReceiveStructure.cmd) {
// 实时信息上报case 2:CMD2.cmd2(cmdReceiveStructure);break;
// 信息补发case 3:CMD3.cmd3(cmdReceiveStructure);break;default:break;}String start = "2323";String back = JacTools.intToHex(cmdReceiveStructure.cmd, 1) + "01"+ JacTools.strToHex(cmdReceiveStructure.VIN, 17) + "010006" + JacTools.getHexTime();String bcc = JacTools.getBCC(JacTools.hexToByteArr(back));ctx.writeAndFlush(start + back + bcc);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// cause.printStackTrace();log.error("-----客户端关闭:" + ctx.channel().remoteAddress(), cause);
// Channel channel = ctx.channel();
// if (!channel.isActive()) {
// ctx.close();
// }}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.info("-----客户端断开" + ctx.channel().remoteAddress());ctx.close();}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) {log.info("-----客户端注册" + ctx.channel().remoteAddress());}}
5.server模块
package com.site.tcp.server;import java.nio.ByteOrder;import com.site.tcp.handler.RequestDecoder;
import com.site.tcp.handler.ResponseEncoder;import com.site.tcp.handler.TcpHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline()
// 空闲检测
// .addLast(new ServerIdleStateHandler())
// 分包,大端.addFirst("decoder",new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE, 22, 2, 1, 0, false)).addLast(new RequestDecoder()).addLast(new ResponseEncoder()).addLast(new TcpHandler());}
}
package com.site.tcp.server;import java.net.InetSocketAddress;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @author pjmike* @create 2018-10-24 15:13*/
@Component
public class TcpNettyServer {protected final Logger logger = LoggerFactory.getLogger(TcpNettyServer.class);/*** boss 线程组用于处理连接工作*/private EventLoopGroup boss = new NioEventLoopGroup();/*** work 线程组用于数据处理*/private EventLoopGroup work = new NioEventLoopGroup();@Value("${netty.tcpPort}")private Integer port;/*** 启动Netty Server** @throws InterruptedException*/@PostConstructpublic void start() throws InterruptedException {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, work)// 指定Channel.channel(NioServerSocketChannel.class)// 使用指定的端口设置套接字地址.localAddress(new InetSocketAddress(port))// 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数.option(ChannelOption.SO_BACKLOG, 1024)// 设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文.childOption(ChannelOption.SO_KEEPALIVE, true)// 将小的数据包包装成更大的帧进行传送,提高网络的负载.childOption(ChannelOption.TCP_NODELAY, true).childHandler(new NettyServerHandlerInitializer());ChannelFuture future = bootstrap.bind().sync();if (future.isSuccess()) {logger.info("启动车俩数据服务端");}}@PreDestroypublic void destory() throws InterruptedException {boss.shutdownGracefully().sync();work.shutdownGracefully().sync();logger.info("关闭Netty");}
}