由浅入深Netty聊天室案例

曾经终败给现在 2024-03-22 19:46 120阅读 0赞

目录

  • 1 聊天室业务介绍
  • 2 聊天室业务-登录
  • 3 聊天室业务-单聊
  • 4 聊天室业务-群聊
  • 5 聊天室业务-退出
  • 6 聊天室业务-空闲检测
    • 6.1 连接假死

1 聊天室业务介绍

在这里插入图片描述

  1. /**
  2. * 用户管理接口
  3. */
  4. public interface UserService {
  5. /**
  6. * 登录
  7. * @param username 用户名
  8. * @param password 密码
  9. * @return 登录成功返回 true, 否则返回 false
  10. */
  11. boolean login(String username, String password);
  12. }
  13. /**
  14. * 会话管理接口
  15. */
  16. public interface Session {
  17. /**
  18. * 绑定会话
  19. * @param channel 哪个 channel 要绑定会话
  20. * @param username 会话绑定用户
  21. */
  22. void bind(Channel channel, String username);
  23. /**
  24. * 解绑会话
  25. * @param channel 哪个 channel 要解绑会话
  26. */
  27. void unbind(Channel channel);
  28. /**
  29. * 获取属性
  30. * @param channel 哪个 channel
  31. * @param name 属性名
  32. * @return 属性值
  33. */
  34. Object getAttribute(Channel channel, String name);
  35. /**
  36. * 设置属性
  37. * @param channel 哪个 channel
  38. * @param name 属性名
  39. * @param value 属性值
  40. */
  41. void setAttribute(Channel channel, String name, Object value);
  42. /**
  43. * 根据用户名获取 channel
  44. * @param username 用户名
  45. * @return channel
  46. */
  47. Channel getChannel(String username);
  48. }
  49. /**
  50. * 聊天组会话管理接口
  51. */
  52. public interface GroupSession {
  53. /**
  54. * 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
  55. * @param name 组名
  56. * @param members 成员
  57. * @return 成功时返回组对象, 失败返回 null
  58. */
  59. Group createGroup(String name, Set<String> members);
  60. /**
  61. * 加入聊天组
  62. * @param name 组名
  63. * @param member 成员名
  64. * @return 如果组不存在返回 null, 否则返回组对象
  65. */
  66. Group joinMember(String name, String member);
  67. /**
  68. * 移除组成员
  69. * @param name 组名
  70. * @param member 成员名
  71. * @return 如果组不存在返回 null, 否则返回组对象
  72. */
  73. Group removeMember(String name, String member);
  74. /**
  75. * 移除聊天组
  76. * @param name 组名
  77. * @return 如果组不存在返回 null, 否则返回组对象
  78. */
  79. Group removeGroup(String name);
  80. /**
  81. * 获取组成员
  82. * @param name 组名
  83. * @return 成员集合, 没有成员会返回 empty set
  84. */
  85. Set<String> getMembers(String name);
  86. /**
  87. * 获取组成员的 channel 集合, 只有在线的 channel 才会返回
  88. * @param name 组名
  89. * @return 成员 channel 集合
  90. */
  91. List<Channel> getMembersChannel(String name);
  92. }

2 聊天室业务-登录

  1. @Slf4j
  2. public class ChatServer {
  3. public static void main(String[] args) {
  4. NioEventLoopGroup boss = new NioEventLoopGroup();
  5. NioEventLoopGroup worker = new NioEventLoopGroup();
  6. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  7. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  8. try {
  9. ServerBootstrap serverBootstrap = new ServerBootstrap();
  10. serverBootstrap.channel(NioServerSocketChannel.class);
  11. serverBootstrap.group(boss, worker);
  12. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. protected void initChannel(SocketChannel ch) throws Exception {
  15. ch.pipeline().addLast(new ProcotolFrameDecoder());
  16. ch.pipeline().addLast(LOGGING_HANDLER);
  17. ch.pipeline().addLast(MESSAGE_CODEC);
  18. ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {
  19. @Override
  20. protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
  21. String username = msg.getUsername();
  22. String password = msg.getPassword();
  23. boolean login = UserServiceFactory.getUserService().login(username, password);
  24. LoginResponseMessage message;
  25. if(login) {
  26. message = new LoginResponseMessage(true, "登录成功");
  27. } else {
  28. message = new LoginResponseMessage(false, "用户名或密码不正确");
  29. }
  30. ctx.writeAndFlush(message);
  31. }
  32. });
  33. }
  34. });
  35. Channel channel = serverBootstrap.bind(8080).sync().channel();
  36. channel.closeFuture().sync();
  37. } catch (InterruptedException e) {
  38. log.error("server error", e);
  39. } finally {
  40. boss.shutdownGracefully();
  41. worker.shutdownGracefully();
  42. }
  43. }
  44. }
  45. @Slf4j
  46. public class ChatClient {
  47. public static void main(String[] args) {
  48. NioEventLoopGroup group = new NioEventLoopGroup();
  49. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  50. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  51. CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
  52. AtomicBoolean LOGIN = new AtomicBoolean(false);
  53. try {
  54. Bootstrap bootstrap = new Bootstrap();
  55. bootstrap.channel(NioSocketChannel.class);
  56. bootstrap.group(group);
  57. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  58. @Override
  59. protected void initChannel(SocketChannel ch) throws Exception {
  60. ch.pipeline().addLast(new ProcotolFrameDecoder());
  61. // ch.pipeline().addLast(LOGGING_HANDLER);
  62. ch.pipeline().addLast(MESSAGE_CODEC);
  63. ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
  64. // 接收响应消息
  65. @Override
  66. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  67. log.debug("msg: {}", msg);
  68. if ((msg instanceof LoginResponseMessage)) {
  69. LoginResponseMessage response = (LoginResponseMessage) msg;
  70. if (response.isSuccess()) {
  71. // 如果登录成功
  72. LOGIN.set(true);
  73. }
  74. // 唤醒 system in 线程
  75. WAIT_FOR_LOGIN.countDown();
  76. }
  77. }
  78. // 在连接建立后触发 active 事件
  79. @Override
  80. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  81. // 负责接收用户在控制台的输入,负责向服务器发送各种消息
  82. new Thread(() -> {
  83. Scanner scanner = new Scanner(System.in);
  84. System.out.println("请输入用户名:");
  85. String username = scanner.nextLine();
  86. System.out.println("请输入密码:");
  87. String password = scanner.nextLine();
  88. // 构造消息对象
  89. LoginRequestMessage message = new LoginRequestMessage(username, password);
  90. // 发送消息
  91. ctx.writeAndFlush(message);
  92. System.out.println("等待后续操作...");
  93. try {
  94. WAIT_FOR_LOGIN.await();
  95. } catch (InterruptedException e) {
  96. e.printStackTrace();
  97. }
  98. // 如果登录失败
  99. if (!LOGIN.get()) {
  100. ctx.channel().close();
  101. return;
  102. }
  103. while (true) {
  104. System.out.println("==================================");
  105. System.out.println("send [username] [content]");
  106. System.out.println("gsend [group name] [content]");
  107. System.out.println("gcreate [group name] [m1,m2,m3...]");
  108. System.out.println("gmembers [group name]");
  109. System.out.println("gjoin [group name]");
  110. System.out.println("gquit [group name]");
  111. System.out.println("quit");
  112. System.out.println("==================================");
  113. String command = scanner.nextLine();
  114. String[] s = command.split(" ");
  115. switch (s[0]){
  116. case "send":
  117. ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
  118. break;
  119. case "gsend":
  120. ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
  121. break;
  122. case "gcreate":
  123. Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
  124. set.add(username); // 加入自己
  125. ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
  126. break;
  127. case "gmembers":
  128. ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
  129. break;
  130. case "gjoin":
  131. ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
  132. break;
  133. case "gquit":
  134. ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
  135. break;
  136. case "quit":
  137. ctx.channel().close();
  138. return;
  139. }
  140. }
  141. }, "system in").start();
  142. }
  143. });
  144. }
  145. });
  146. Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
  147. channel.closeFuture().sync();
  148. } catch (Exception e) {
  149. log.error("client error", e);
  150. } finally {
  151. group.shutdownGracefully();
  152. }
  153. }
  154. }

3 聊天室业务-单聊

服务器端将 handler 独立出来

登录 handler

  1. @ChannelHandler.Sharable
  2. public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
  5. String username = msg.getUsername();
  6. String password = msg.getPassword();
  7. boolean login = UserServiceFactory.getUserService().login(username, password);
  8. LoginResponseMessage message;
  9. if(login) {
  10. SessionFactory.getSession().bind(ctx.channel(), username);
  11. message = new LoginResponseMessage(true, "登录成功");
  12. } else {
  13. message = new LoginResponseMessage(false, "用户名或密码不正确");
  14. }
  15. ctx.writeAndFlush(message);
  16. }
  17. }

单聊 handler

  1. @ChannelHandler.Sharable
  2. public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
  5. String to = msg.getTo();
  6. Channel channel = SessionFactory.getSession().getChannel(to);
  7. // 在线
  8. if(channel != null) {
  9. channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
  10. }
  11. // 不在线
  12. else {
  13. ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或者不在线"));
  14. }
  15. }
  16. }

4 聊天室业务-群聊

创建群聊

  1. @ChannelHandler.Sharable
  2. public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
  5. String groupName = msg.getGroupName();
  6. Set<String> members = msg.getMembers();
  7. // 群管理器
  8. GroupSession groupSession = GroupSessionFactory.getGroupSession();
  9. Group group = groupSession.createGroup(groupName, members);
  10. if (group == null) {
  11. // 发生成功消息
  12. ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));
  13. // 发送拉群消息
  14. List<Channel> channels = groupSession.getMembersChannel(groupName);
  15. for (Channel channel : channels) {
  16. channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
  17. }
  18. } else {
  19. ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "已经存在"));
  20. }
  21. }
  22. }

群聊

  1. @ChannelHandler.Sharable
  2. public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
  5. List<Channel> channels = GroupSessionFactory.getGroupSession()
  6. .getMembersChannel(msg.getGroupName());
  7. for (Channel channel : channels) {
  8. channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
  9. }
  10. }
  11. }

加入群聊

  1. @ChannelHandler.Sharable
  2. public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
  5. Group group = GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername());
  6. if (group != null) {
  7. ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群加入成功"));
  8. } else {
  9. ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
  10. }
  11. }
  12. }

退出群聊

  1. @ChannelHandler.Sharable
  2. public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
  5. Group group = GroupSessionFactory.getGroupSession().removeMember(msg.getGroupName(), msg.getUsername());
  6. if (group != null) {
  7. ctx.writeAndFlush(new GroupJoinResponseMessage(true, "已退出群" + msg.getGroupName()));
  8. } else {
  9. ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
  10. }
  11. }
  12. }

查看成员

  1. @ChannelHandler.Sharable
  2. public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
  5. Set<String> members = GroupSessionFactory.getGroupSession()
  6. .getMembers(msg.getGroupName());
  7. ctx.writeAndFlush(new GroupMembersResponseMessage(members));
  8. }
  9. }

5 聊天室业务-退出

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class QuitHandler extends ChannelInboundHandlerAdapter {
  4. // 当连接断开时触发 inactive 事件
  5. @Override
  6. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  7. SessionFactory.getSession().unbind(ctx.channel());
  8. log.debug("{} 已经断开", ctx.channel());
  9. }
  10. // 当出现异常时触发
  11. @Override
  12. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  13. SessionFactory.getSession().unbind(ctx.channel());
  14. log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
  15. }
  16. }

6 聊天室业务-空闲检测

6.1 连接假死

原因

  • 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
  • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
  • 应用程序线程阻塞,无法进行数据读写

问题

  • 假死的连接占用的资源不能自动释放
  • 向假死的连接发送数据,得到的反馈是发送超时

服务器端解决

  • 怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死

    // 用来判断是不是 读空闲时间过长,或 写空闲时间过长
    // 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
    ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
    // ChannelDuplexHandler 可以同时作为入站和出站处理器
    ch.pipeline().addLast(new ChannelDuplexHandler() {

    1. // 用来触发特殊事件
    2. @Override
    3. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
    4. IdleStateEvent event = (IdleStateEvent) evt;
    5. // 触发了读空闲事件
    6. if (event.state() == IdleState.READER_IDLE) {
    7. log.debug("已经 5s 没有读到数据了");
    8. ctx.channel().close();
    9. }
    10. }

    });

客户端定时心跳

  • 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器

    // 用来判断是不是 读空闲时间过长,或 写空闲时间过长
    // 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
    ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
    // ChannelDuplexHandler 可以同时作为入站和出站处理器
    ch.pipeline().addLast(new ChannelDuplexHandler() {

    1. // 用来触发特殊事件
    2. @Override
    3. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
    4. IdleStateEvent event = (IdleStateEvent) evt;
    5. // 触发了写空闲事件
    6. if (event.state() == IdleState.WRITER_IDLE) {
    7. // log.debug("3s 没有写数据了,发送一个心跳包");
    8. ctx.writeAndFlush(new PingMessage());
    9. }
    10. }

    });

发表评论

表情:
评论列表 (有 0 条评论,120人围观)

还没有评论,来说两句吧...

相关阅读