一.udp创建服务端
/*** udp 服务器 */
@Slf4j
@Component
public class UdpServer {/*** 创建服务端*/@Asyncpublic void bind(int port) {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535)).handler(new ChannelInitializer<DatagramChannel>() {@Overridepublic void initChannel(DatagramChannel ch) {ChannelPipeline cp = ch.pipeline();cp.addLast(new ServerHandler(port));}});Channel serverChannel = b.bind(port).sync().channel();log.info("UdpServer start success...");serverChannel.closeFuture().await();} catch (Exception e) {log.error("UdpServer start fall!");} finally {group.shutdownGracefully();}}private class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {private int port;// 当前 端口public ServerHandler(int port) {this.port = port;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error(cause.getMessage());cause.printStackTrace();}/*** 接收消息*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) {// 1.获取数据内容,它是一个ByteBufByteBuf content = packet.content();String request = content.toString(CharsetUtil.UTF_8);// 2.你可以使用ByteBuf的API来读取数据//byte[] bytes = new byte[content.readableBytes()];//content.readBytes(bytes);//String request = new String(bytes, StandardCharsets.UTF_8);InetSocketAddress senderAddress = packet.sender();log.info("{} ---> {}:{}", senderAddress.getAddress().getHostAddress(), this.port, request);}}}
二.udp创建客户端
下面展示一些 有些地方赖得改了,当是记录
。
@Slf4j
@Component
public class UdpClient {/*** 发送udp,等待对方回复** @param ip* @param port* @param format str,hex* @param msg* @return*/public String sendData(String ip, int port, String format, Object msg) {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);EventLoopGroup group = new NioEventLoopGroup();Channel channel = null;Response r = new Response();try {InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getByName(ip), port);Bootstrap b = new Bootstrap();b.group(group).channel(NioDatagramChannel.class).handler(new SimpleChannelInboundHandler() {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("exceptionCaught ->" + cause.getMessage());cause.printStackTrace();ctx.close();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {// 处理接收到的数据ByteBuf byteBuf = ((DatagramPacket)msg).content();String response = null;if (format.equals("str")) {response = byteBuf.toString(StandardCharsets.UTF_8);} else {response = ByteBufUtil.hexDump(byteBuf);}log.info("response msg: " + response);r.setMsg(response);ctx.close();}});ChannelFuture future = b.bind(0).sync(); // 绑定端口0以获取随机可用端口channel = future.channel();log.info("{} <- {}", ip, msg.toString());Channel finalChannel = channel;Future<?> executorServiceFuture = executorService.schedule(() -> {// 检查Channel是否仍然是活动状态if (finalChannel.isActive()) {finalChannel.close();}}, 35, TimeUnit.SECONDS);ByteBuf byteBuf = null;if (format.equals("str")) {byteBuf = Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8); // 将消息内容转换为ByteBuf} else {byte[] bytes = hexString2Bytes(msg.toString());// 将16进制字符串转换为字节数组byteBuf = Unpooled.wrappedBuffer(bytes); // 使用字节数组创建ByteBuf}DatagramPacket requestPacket = new DatagramPacket(byteBuf, inetSocketAddress);channel.writeAndFlush(requestPacket);// 发送channel.closeFuture().await();//异步等待,通道关闭后会往下执行executorServiceFuture.cancel(true); // 立刻中断return r.getMsg();} catch (Exception e) {e.printStackTrace();return null;} finally {executorService.shutdown(); // 清理资源if (channel != null) channel.close();group.shutdownGracefully();}}/*** 发送udp,不等待回复* @param ip* @param port* @param msg*/public void sendDataNoReply(String ip, int port, Object msg) {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(new SimpleChannelInboundHandler() {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {// 处理接收到的数据}});Channel channel = bootstrap.bind(0).sync().channel();// 发送数据到指定的地址和端口InetSocketAddress address = new InetSocketAddress(ip, port);ByteBuf buffer = Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8);DatagramPacket packet = new DatagramPacket(buffer, address);channel.writeAndFlush(packet);// 等待一段时间以确保数据发送完成//Thread.sleep(1000);} catch (InterruptedException e) {log.error("发送udp数据失败:", e);throw new RuntimeException("发送数据失败或连接不上");} finally {group.shutdownGracefully();}}class Response {private String msg;public Response() {}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg = msg;}}public static byte[] hexString2Bytes(String src) {byte[] bytes = new byte[src.length() / 2];for (int i = 0; i < bytes.length; i++) {int index = i * 2;int j = Integer.parseInt(src.substring(index, index + 2), 16);bytes[i] = (byte) j;}return bytes;}}