Netty实现聊天室

约定不等于承诺〃 2023-07-05 07:58 84阅读 0赞

文章目录

注:更多netty相关文章请访问博主专栏: netty专栏

本文内容基于上一篇博客 netty实现WebSocket协议,一些基本使用请参考该博客。

本例实现的功能:

  • 有新成员加入时,群广播消息,欢迎加入
  • 有成员退出时,群广播消息,退出
  • 每个成员都可以发送消息,消息广播给群内的每个人

完整的服务器代码如下:

  1. package com.example;
  2. import com.alibaba.fastjson.JSONObject;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.buffer.ByteBuf;
  5. import io.netty.buffer.Unpooled;
  6. import io.netty.channel.*;
  7. import io.netty.channel.group.ChannelGroup;
  8. import io.netty.channel.group.DefaultChannelGroup;
  9. import io.netty.channel.nio.NioEventLoopGroup;
  10. import io.netty.channel.socket.SocketChannel;
  11. import io.netty.channel.socket.nio.NioServerSocketChannel;
  12. import io.netty.handler.codec.http.*;
  13. import io.netty.handler.codec.http.websocketx.*;
  14. import io.netty.handler.logging.LogLevel;
  15. import io.netty.handler.logging.LoggingHandler;
  16. import io.netty.handler.stream.ChunkedFile;
  17. import io.netty.handler.stream.ChunkedWriteHandler;
  18. import io.netty.util.CharsetUtil;
  19. import io.netty.util.concurrent.GlobalEventExecutor;
  20. import javax.activation.MimetypesFileTypeMap;
  21. import java.io.File;
  22. import java.io.FileNotFoundException;
  23. import java.io.RandomAccessFile;
  24. import java.io.UnsupportedEncodingException;
  25. import java.net.InetSocketAddress;
  26. import java.net.URLDecoder;
  27. import java.text.SimpleDateFormat;
  28. import java.util.Date;
  29. import java.util.HashMap;
  30. import java.util.Map;
  31. import java.util.concurrent.*;
  32. import java.util.regex.Pattern;
  33. /** * netty实现聊天室 */
  34. public class MyChatRoomServer {
  35. int port;
  36. public MyChatRoomServer(int port) {
  37. this.port = port;
  38. }
  39. public void start() {
  40. ServerBootstrap bootstrap = new ServerBootstrap();
  41. EventLoopGroup boss = new NioEventLoopGroup();
  42. EventLoopGroup work = new NioEventLoopGroup();
  43. try {
  44. bootstrap.group(boss, work)
  45. .handler(new LoggingHandler(LogLevel.DEBUG))
  46. .channel(NioServerSocketChannel.class)
  47. .childHandler(new ChatRoomServerInitializer());
  48. ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync();
  49. System.out.println("http server started . port : " + port);
  50. f.channel().closeFuture().sync();
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. } finally {
  54. boss.shutdownGracefully();
  55. work.shutdownGracefully();
  56. }
  57. }
  58. public static void main(String[] args) {
  59. MyChatRoomServer server = new MyChatRoomServer(8080);// 8081为启动端口
  60. server.start();
  61. }
  62. }
  63. class ChatRoomServerInitializer extends ChannelInitializer<SocketChannel> {
  64. @Override
  65. protected void initChannel(SocketChannel channel) {
  66. ChannelPipeline pipeline = channel.pipeline();
  67. pipeline.addLast(new HttpServerCodec())// http 编解器
  68. // http 消息聚合器 512*1024为接收的最大contentlength
  69. .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
  70. // 支持异步发送大的码流(大的文件传输),但不占用过多的内存,防止java内存溢出
  71. .addLast("http-chunked", new ChunkedWriteHandler())
  72. .addLast(new ChatRoomRequestHandler());// 请求处理器
  73. }
  74. }
  75. class ChatRoomRequestHandler extends SimpleChannelInboundHandler<Object> {
  76. private WebSocketServerHandshaker handshaker;
  77. @Override
  78. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  79. System.out.println("收到消息:" + msg);
  80. if (msg instanceof FullHttpRequest) {
  81. //以http请求形式接入,但是走的是websocket
  82. handleHttpRequest(ctx, (FullHttpRequest) msg);
  83. } else if (msg instanceof WebSocketFrame) {
  84. //处理websocket客户端的消息
  85. handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
  86. }
  87. }
  88. @Override
  89. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  90. //添加连接
  91. System.out.println("客户端加入连接:" + ctx.channel());
  92. ChannelSupervise.addChannel(ctx.channel());
  93. TextWebSocketFrame tws = new TextWebSocketFrame(
  94. "欢迎 " + ctx.channel().id().asShortText() + "; 当前在线总人数:" + ChannelSupervise.count());
  95. ChannelSupervise.send2All(tws);
  96. }
  97. @Override
  98. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  99. //断开连接
  100. System.out.println("客户端断开连接:" + ctx.channel());
  101. ChannelSupervise.removeChannel(ctx.channel());
  102. TextWebSocketFrame tws = new TextWebSocketFrame(
  103. "再见 " + ctx.channel().id().asShortText() + "; 当前在线总人数:" + ChannelSupervise.count());
  104. ChannelSupervise.send2All(tws);
  105. }
  106. @Override
  107. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  108. ctx.flush();
  109. }
  110. /* 对WebSocket请求进行处理 */
  111. private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
  112. // 判断是否关闭链路的指令
  113. if (frame instanceof CloseWebSocketFrame) {
  114. handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
  115. return;
  116. }
  117. // 判断是否ping消息,如果是,则构造pong消息返回。用于心跳检测
  118. if (frame instanceof PingWebSocketFrame) {
  119. ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
  120. return;
  121. }
  122. // 本例程仅支持文本消息,不支持二进制消息
  123. if (!(frame instanceof TextWebSocketFrame)) {
  124. System.out.println("本例程仅支持文本消息,不支持二进制消息");
  125. throw new UnsupportedOperationException(
  126. String.format("%s frame types not supported", frame.getClass().getName()));
  127. }
  128. //处理客户端请求并返回应答消息
  129. String request = ((TextWebSocketFrame) frame).text();
  130. System.out.println(request);
  131. TextWebSocketFrame tws = new TextWebSocketFrame(ctx.channel().id().asShortText() + ":" + request);
  132. // 群发
  133. ChannelSupervise.send2All(tws);
  134. }
  135. /** * 唯一的一次http请求。 * 该方法用于处理websocket握手请求 */
  136. private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
  137. //如果HTTP解码失败,返回异常。要求Upgrade为websocket,过滤掉get/Post
  138. if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
  139. //若不是websocket方式,则创建BAD_REQUEST(400)的req,返回给客户端
  140. sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
  141. HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
  142. return;
  143. }
  144. // 构造握手响应返回,本机测试
  145. WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
  146. "ws://localhost:8080/websocket", null, false);
  147. //通过工厂来创建WebSocketServerHandshaker实例
  148. handshaker = wsFactory.newHandshaker(req);
  149. if (handshaker == null) {
  150. WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
  151. } else {
  152. /* 通过WebSocketServerHandshaker来构建握手响应消息返回给客户端。 同时将WebSocket相关编解码类添加到ChannelPipeline中,该功能需要阅读handshake的源码。 */
  153. handshaker.handshake(ctx.channel(), req);
  154. }
  155. }
  156. /** * 拒绝不合法的请求,并返回错误信息 */
  157. private static void sendHttpResponse(ChannelHandlerContext ctx,
  158. FullHttpRequest req, DefaultFullHttpResponse res) {
  159. // 返回应答给客户端
  160. if (res.status().code() != 200) {
  161. ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
  162. res.content().writeBytes(buf);
  163. buf.release();
  164. HttpUtil.setContentLength(res, res.content().readableBytes());
  165. }
  166. ChannelFuture f = ctx.channel().writeAndFlush(res);
  167. // 如果是非Keep-Alive,关闭连接
  168. if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
  169. f.addListener(ChannelFutureListener.CLOSE);
  170. }
  171. }
  172. }
  173. public class ChannelSupervise {
  174. /** * ChannelGroup是netty提供用于管理web于服务器建立的通道channel的, * 其本质是一个高度封装的set集合,在服务器广播消息时, * 可以直接通过它的writeAndFlush将消息发送给集合中的所有通道中去 */
  175. private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  176. /** * ChannelMap维护的是channelID和channel的对应关系,用于向指定channel发送消息 */
  177. private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap<>();
  178. public static void addChannel(Channel channel) {
  179. GlobalGroup.add(channel);
  180. ChannelMap.put(channel.id().asShortText(), channel.id());
  181. }
  182. public static void removeChannel(Channel channel) {
  183. GlobalGroup.remove(channel);
  184. ChannelMap.remove(channel.id().asShortText());
  185. }
  186. //找到某个channel来发送消息
  187. public static Channel findChannel(String id) {
  188. return GlobalGroup.find(ChannelMap.get(id));
  189. }
  190. public static void send2All(TextWebSocketFrame tws) {
  191. GlobalGroup.writeAndFlush(tws);
  192. }
  193. public static int count() {
  194. return GlobalGroup.size();
  195. }
  196. }

客户端页面代码:

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>my websocket client</title>
  6. </head>
  7. <body>
  8. <textarea id="msgBoxs"></textarea><br>
  9. 待发送消息`:<input type="text" id="msg">
  10. <input type="button" id="sendBtn" onclick="send()" value="发送">
  11. <script type="application/javascript"> var socket ; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ var msgBoxs = document.getElementById("msgBoxs") var msgBox = document.getElementById("msg") socket = new WebSocket("ws://localhost:8080/websocket") socket.onopen = function (evt) { console.log("Connection open ..."); socket.send("Hello WebSocket!"); } socket.onmessage = function (evt) { console.log("Received Message: ", evt.data) msgBoxs.value = msgBoxs.value + "\n" + evt.data } socket.onclose = function (evt) { console.log("Connect closed."); } }else{ alert("ERROR:您的浏览器不支持WebSocket!!"); } function send() { var msg = msgBox.value socket.send(msg) //msgBox.value = "" } </script>
  12. </body>
  13. </html>

依次打开三个浏览器,作为三个客户端,以channelID作为用户ID。
启动以后可以看到当有用户加入时,前面新加入的用户会受到欢迎的消息。每个用户都可以发送消息,消息会被广播到群里的所有人:
在这里插入图片描述

当关闭一个窗口时,其他窗口会收到再见消息。
注意这里最好在客户端检测退出关闭socket的请求,页面关闭前主动释放与服务器的链接,释放资源,并且其他用户可以第一时间感知到下线,否则需要等待TCP超时后才可以受到下线消息。

简易聊天室完成

注:更多netty相关文章请访问博主专栏: netty专栏

发表评论

表情:
评论列表 (有 0 条评论,84人围观)

还没有评论,来说两句吧...

相关阅读