源码时代JAVA干货分享|带你用Netty框架实现WebSocket通信

功能介绍

  • Netty开发服务端
  • HTML实现客户端
  • 实现服务端与客户端时实时交互

开发步骤

1.导包

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty‐all</artifactId>
  4. <version>5.0.0.Alpha2</version>
  5. </dependency>

2.工程配置文件:NettyConfig

  1. /**
  2. * 这里放的是工程中相应的配置
  3. */
  4. public class NettyConfig{
  5. /**
  6. * 用于存储每一个客户端接入进来时的channel对象
  7. */
  8. public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  9. }

3.MyWebSocketHandler

  1. public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> { private WebSocketServerHandshaker handshaker;
  2. //请求路径
  3. private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
  4. //工程出现异常的时候调用@Override
  5. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  6. cause.printStackTrace(); ctx.close(); //关闭当前连接
  7. }
  8. //客户端与服务器创建连接的时候调用@Override
  9. public void channelActive(ChannelHandlerContext ctx) throws Exception { NettyConfig.group.add(ctx.channel());
  10. System.out.println("客户端与服务器连接开启");
  11. }
  12. //客户端与服务器断开连接的时候调用@Override
  13. public void channelInactive(ChannelHandlerContext ctx) throws Exception { NettyConfig.group.remove(ctx.channel());
  14. System.out.println("客户端与服务器断开连接");
  15. }
  16. //服务端接收客户端发送过来的数据结束之后调用@Override
  17. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{ ctx.flush(); //进行数据清除
  18. }
  19. //服务端处理客户端调用的核心方法
  20. protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
  21. //处理客户端向服务端发起http握手请求业务if(msg instanceof FullHttpRequest){
  22. handHttpRequest(context,(FullHttpRequest) msg);
  23. }else if(msg instanceof WebSocketFrame){
  24. //处理websocket连接业务handWebSocketFrme(context,(WebSocketFrame)msg);
  25. }
  26. }
  27. /**
  28. 处理客户端与服务端之前的websocket业务
  29. @param ctx
  30. @param frame
  31. */
  32. private void handWebSocketFrme(ChannelHandlerContext ctx,WebSocketFrame frame){
  33. //判断是否是关闭websocket的指令
  34. if(frame instanceof CloseWebSocketFrame){ handshaker.close(ctx.channel(),((CloseWebSocketFrame) frame).retain());
  35. }
  36. //判断是否是Ping消息
  37. if(frame instanceof PingWebSocketFrame){
  38. //返回一个胖信息
  39. ctx.channel().write(newPongWebSocketFrame(frame.content().retain())); return;
  40. }
  41. //判断是否是二进制消息,如果是二进制消息,抛出异常if(!(frame instanceof TextWebSocketFrame)){
  42. System.out.println("目前我们不支持二进制消息");
  43. throw new RuntimeException(this.getClass().getName()+"不支持的消息");
  44. }
  45. //返回应答消息
  46. //获取客户端向服务端发送的消息
  47. String request = ((TextWebSocketFrame)frame).text(); System.out.println("服务端收到客户端的消息===>"+request); TextWebSocketFrame tws = new TextWebSocketFrame(new
  48. Date().toString()+ctx.channel().id()+"===>"+request);
  49. //群发,服务端向每个上来的客户端群发消息NettyConfig.group.writeAndFlush(tws);
  50. }
  51. /**
  52. 处理客户端向服务器发起http握手请求的业务
  53. @param ctx
  54. @param req
  55. */
  56. private void handHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){
  57. // req.headers().get()
  58. // req.decoderResult().isSuccess()
  59. //不是websocket,就不是客户端发给服务器的Http请求if(!req.decoderResult().isSuccess()|| !
  60. ("websocket").equals(req.headers().get("Upgrade"))){
  61. sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
  62. return;
  63. }
  64. WebSocketServerHandshakerFactory wsFactory =
  65. newWebSocketServerHandshakerFactory(WEB_SOCKET_URL,null,false); handshaker = wsFactory.newHandshaker(req);
  66. if(handshaker==null){
  67. WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
  68. }else{
  69. handshaker.handshake(ctx.channel(),req);
  70. }
  71. }
  72. /**
  73. 服务端向客户端响应消息
  74. @param ctx
  75. @param req
  76. @param res
  77. */
  78. private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res){
  79. if(res.status().code()!=200){
  80. ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
  81. res.content().writeBytes(buf); buf.release();
  82. }
  83. //服务器向客户端发送数据
  84. ChannelFuture f = ctx.channel().writeAndFlush(res); if(res.status().code()!=200){
  85. f.addListener(ChannelFutureListener.CLOSE);
  86. }
  87. }
  88. }

4 初始化连接各个组件:MyWebSocketChannelHandler

  1. /**
  2. * 初始化连接各个组件
  3. */
  4. public class MyWebSocketChannelHandlerextends ChannelInitializer<SocketChannel>{
  5. protected void initChannel(SocketChannel e) throws Exception { e.pipeline().addLast("http‐codec",new HttpServerCodec()); e.pipeline().addLast("aggregator",new HttpObjectAggregator(66636)); e.pipeline().addLast("http‐chunked",new ChunkedWriteHandler()); e.pipeline().addLast("handler",new MyWebSocketHandler());
  6. }
  7. }
  8. 5 程序入口
  9. /**
  10. * 程序的入口,负责启动应用
  11. */
  12. public class Main {
  13. public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup();try{
  14. ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new MyWebSocketChannelHandler());
  15. System.out.println("服务端开启等待客户端连接");
  16. Channel ch = b.bind(8888).sync().channel(); ch.closeFuture().sync();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. } finally {
  20. //优雅的退出程序bossGroup.shutdownGracefully(); workGroup.shutdownGracefully();
  21. }
  22. }
  23. }
  24. 6 web端测试代码
  25. <!DOCTYPE html>
  26. <htmllang="en">
  27. <head>
  28. <meta charset="UTF‐8">
  29. <title>WebSocket客户端</title>
  30. </head>
  31. <script>
  32. var socket; if(!window.WebSocket){
  33. window.WebSocket = window.MozWebSocket;
  34. }
  35. if(window.WebSocket){
  36. socket = new WebSocket("ws://localhost:8888/websocket"); socket.onmessage = function (event) {
  37. var ta = document.getElementById("responseContext"); ta.value += event.data +"\r\n";
  38. }
  39. socket.onopen = function (event) {
  40. var ta = document.getElementById("responseContext"); ta.value = "您浏览器支持webSoceket,请进行后续 操作\r\n";
  41. }
  42. socket.onclose = function (event) {
  43. var ta = document.getElementById("responseContext"); ta.value = "";
  44. ta.value = "webSoceket连接已经关闭";
  45. }
  46. }else{
  47. console.debug("浏览器不支持webSocket")
  48. }
  49. function send() {
  50. var message = document.getElementById("message").value; if(!window.WebSocket){
  51. return;
  52. }
  53. if(socket.readyState == WebSocket.OPEN){ socket.send(message)
  54. }else{
  55. alert("连接没有建立成功!");
  56. }
  57. }
  58. </script>
  59. <body>
  60. <form action=""onsubmit="return false;">
  61. <input type="text" id="message"name="message" value=""/>
  62. <inputtype="button"value="发送消息" onclick="send()" />
  63. <hr >
  64. <h1>客户端接收到服务端返回的消息</h1>
  65. <textarea name=""id="responseContext" cols="30"rows="10"></textarea>
  66. </form>
  67. </body>
  68. </html>

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ZpbGxhaW55MTM1Nzk_size_16_color_FFFFFF_t_70

发表评论

表情:
评论列表 (有 0 条评论,41人围观)

还没有评论,来说两句吧...

相关阅读