一、简单版本
package com.ptc.ai.box.biz.relay.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;import java.net.URI;
import java.util.concurrent.CountDownLatch;/*** https://blog.csdn.net/a1053765496/article/details/130701218* 基于Netty快速实现WebSocket客户端,不手动处理握手*/
@Slf4j
public class SimpleWsClient {final CountDownLatch latch = new CountDownLatch(1);public static void main(String[] args) throws Exception {SimpleWsClient client = new SimpleWsClient();client.test();}public void test() throws Exception {Channel dest = dest();latch.await();dest.writeAndFlush(new TextWebSocketFrame("CountDownLatch完成后发送的消息"));}public Channel dest() throws Exception {final URI webSocketURL = new URI("wss://api.openai.com:443/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01");DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();httpHeaders.add("Authorization", "");httpHeaders.add("OpenAI-Beta", "realtime=v1");/*httpHeaders.add("Sec-WebSocket-Version", "13");httpHeaders.add("Upgrade", "websocket");httpHeaders.add("Connection", "Upgrade");*/EventLoopGroup group = new NioEventLoopGroup();Bootstrap boot = new Bootstrap();boot.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).group(group).handler(new LoggingHandler(LogLevel.INFO)).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {ChannelPipeline pipeline = sc.pipeline();pipeline.addLast(new HttpClientCodec());pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpObjectAggregator(64 * 1024));pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, httpHeaders)));pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)throws Exception {System.err.println(" 客户端收到消息======== " + msg.text());}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE.equals(evt)) {log.info(ctx.channel().id().asShortText() + " 握手完成!");latch.countDown();send(ctx.channel());}super.userEventTriggered(ctx, evt);}});}});ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync();return cf.channel();}public static void send(Channel channel) {final String textMsg = "握手完成后直接发送的消息";if (channel != null && channel.isActive()) {TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {if (channelFuture.isDone() && channelFuture.isSuccess()) {log.info(" ================= 发送成功.");} else {channelFuture.channel().close();log.info(" ================= 发送失败. cause = " + channelFuture.cause());channelFuture.cause().printStackTrace();}});} else {log.error("消息发送失败! textMsg = " + textMsg);}}}
二、结构化版本
package com.ptc.ai.box.biz.relay.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;/*** https://gitcode.com/ddean2009/learn-netty4/tree/master/src/main/java/com/flydean25/socketclient* https://www.flydean.com/25-netty-websocket-client* https://blog.csdn.net/superfjj/article/details/120648434* https://blog.csdn.net/twypx/article/details/84543518*/
public final class NettyWsClient {static final String URL = System.getProperty("url", "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01");public static void main(String[] args) throws Exception {URI uri = new URI(URL);final int port = 443;EventLoopGroup group = new NioEventLoopGroup();try {DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();httpHeaders.add("Authorization", "");httpHeaders.add("OpenAI-Beta", "realtime=v1");WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, httpHeaders));Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);}});Channel ch = b.connect(uri.getHost(), port).sync().channel();handler.handshakeFuture().sync();BufferedReader console = new BufferedReader(new InputStreamReader(System.in));while (true) {String msg = console.readLine();if (msg == null) {break;} else if ("再见".equalsIgnoreCase(msg)) {ch.writeAndFlush(new CloseWebSocketFrame());ch.closeFuture().sync();break;} else if ("ping".equalsIgnoreCase(msg)) {WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));ch.writeAndFlush(frame);} else {WebSocketFrame frame = new TextWebSocketFrame(msg);ch.writeAndFlush(frame);}}} finally {group.shutdownGracefully();}}
}
package com.ptc.ai.box.biz.relay.client;import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {private final WebSocketClientHandshaker handshaker;private ChannelPromise handshakeFuture;public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {this.handshaker = handshaker;}public ChannelFuture handshakeFuture() {return handshakeFuture;}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {handshakeFuture = ctx.newPromise();}@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("channelActive, 进行handshake");handshaker.handshake(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.info("channelInactive!");}@Overridepublic void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {Channel ch = ctx.channel();if (!handshaker.isHandshakeComplete()) {try {handshaker.finishHandshake(ch, (FullHttpResponse) msg);log.info("websocket Handshake 完成!");handshakeFuture.setSuccess();} catch (WebSocketHandshakeException e) {log.info("websocket连接失败!");handshakeFuture.setFailure(e);}return;}log.info("channelRead0.......");if (msg instanceof FullHttpResponse) {FullHttpResponse response = (FullHttpResponse) msg;throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="+ response.content().toString(CharsetUtil.UTF_8) + ')');}WebSocketFrame frame = (WebSocketFrame) msg;if (frame instanceof TextWebSocketFrame) {TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;log.info("接收到TXT消息: " + textFrame.text());} else if (frame instanceof PongWebSocketFrame) {log.info("接收到pong消息");} else if (frame instanceof CloseWebSocketFrame) {log.info("接收到closing消息");ch.close();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 异常处理log.error("出现异常", cause);if (!handshakeFuture.isDone()) {handshakeFuture.setFailure(cause);}ctx.close();}
}
三、ChannelInitializer解耦合版本
package com.ptc.ai.box.biz.relay.client;import com.ptc.ai.box.biz.relay.client.handler.TargetServerHandler;
import com.ptc.ai.box.biz.relay.client.handler.WebSocketClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;import java.net.URI;/*** Created by IntelliJ IDEA.** @Author :* @create 4/11/24 17:52*/public class NettyClient {private final String url;private final int port;private Channel channel;public NettyClient(String url, int port) {this.url = url;this.port = port;}public void start() throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new WebSocketClientInitializer());ChannelFuture future = bootstrap.connect(url, port).sync();this.channel = future.channel();future.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}public void sendMessage(String message) {if (channel != null && channel.isActive()) {channel.writeAndFlush(new TextWebSocketFrame(message));}}public static void main(String[] args) throws Exception {NettyClient client = new NettyClient("api.openai.com", 443);client.start();// 发送测试消息client.sendMessage("Hello WebSocket!");}
}
package com.ptc.ai.box.biz.relay.client.handler;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.net.URI;/*** Created by IntelliJ IDEA.** @Author : * @create 4/11/24 18:01*/@Slf4j
@Component
public class WebSocketClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {log.info("target initChannel......");String url = "api.openai.com";// 10秒未读发送心跳,5秒未写关闭连接//ch.pipeline().addLast(new IdleStateHandler(180, 60, 0, TimeUnit.SECONDS));ch.pipeline().addLast(new HttpClientCodec());ch.pipeline().addLast(new HttpObjectAggregator(6555360));ch.pipeline().addLast(new ChunkedWriteHandler());DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();httpHeaders.add("Authorization", "");httpHeaders.add("OpenAI-Beta", "realtime=v1");WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(URI.create("wss://" + url +"/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"),WebSocketVersion.V13,null,false,httpHeaders, Integer.MAX_VALUE);ch.pipeline().addLast(new WebSocketClientProtocolHandler(handshaker));ch.pipeline().addLast(new TargetServerHandler());}
}
package com.ptc.ai.box.biz.relay.client.handler;import com.ptc.ai.box.biz.relay.server.handler.DataChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;/*** Created by IntelliJ IDEA.** @Author :* @create 4/11/24 18:02*/
@Slf4j
@Component
public class TargetServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {log.info("target channel read complete");ctx.flush();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("target channel inactive: channelId={}", ctx.channel().id());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.warn("target channel exception:", cause);//ctx.close();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("target channelActive......");InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress();String ip = inetSocket.getAddress().getHostAddress();log.info(ip);/*JSONObject jsonObject = new JSONObject();jsonObject.put("msg", "register");jsonObject.put("data", System.currentTimeMillis()+"");TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(jsonObject.toJSONString());ctx.writeAndFlush(textWebSocketFrame);*//*** register router at userEventTriggered(HandshakeComplete)*/}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {log.info("Received from target server: " + textWebSocketFrame.text());ctx.channel().writeAndFlush(textWebSocketFrame);}
}