【Netty】四、Netty服务端推送消息到客户端实现消息个性化推送

你的名字 2023-10-09 15:02 134阅读 0赞

Netty服务端推送消息到客户端

  • 一、 Netty服务端推送消息到客户端
    • 需求:
    • 应用场景
  • 二、服务端代码
    • PushServer
    • PushServerHandler
    • PushAckHandler
  • 三、服务端代码
    • PushClient
    • PushClientHandler
  • 四、测试
    • 客户端发送消息
    • 服务端接受消息

一、 Netty服务端推送消息到客户端

需求:

1、服务端从redis或数据库等存储层获取到要推送的消息;
2、服务端把获取到的消息主动推送给客户端(只要查询到了数据就推送给客户端);
3、如果有多个客户端连接到了服务端,要区分客户端,不同客户端发送不同的消息;
4、客户端接收到消息后给服务端一个应答;
5、服务端接收到应答之后,对消息状态进行修改,表示该消息已经处理;

应用场景

1、个性化推送,可以千人千面,女性用户可以推送化妆品优惠活动消息,男性用户可以推送电子科技类产品消息;
2、每个客户端(比如手机App),他们首先会登录,登录后有用户的id等唯一业务参数值,用户登录成功后,可以建立一个与netty服务端的长连接并向服务端写出一条信息,信息里面带上用户id等唯一业务参数值;
3、服务端收到客户端的用户id后,可以用一个全局的Map存放起来:
Map
Key: channelId, value: (key: userId, value: Channel)
4、然后服务端启动netty的定时任务,每隔多久从redis或数据库等存储层查询出业务数据,并把数据推送给对应的客户端;
5、如果连接channel断开了,把全局Map中的channel移除;

二、服务端代码

PushServer

  1. import com.mytest.push.handler.PushAckHandler;
  2. import com.mytest.push.handler.PushServerHandler;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelFutureListener;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import io.netty.handler.codec.serialization.ClassResolvers;
  11. import io.netty.handler.codec.serialization.ObjectDecoder;
  12. import io.netty.handler.codec.serialization.ObjectEncoder;
  13. import io.netty.handler.timeout.IdleStateHandler;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * 服务端向客户端推送消息
  17. *
  18. * 需求:
  19. * 1、服务端从redis或数据库等存储层获取到要推送的消息;
  20. * 2、服务端把获取到的消息主动推送给客户端(只要查询到了数据就推送给客户端);
  21. * 3、如果有多个客户端连接到了服务端,要区分客户端,不同客户端发送不同的消息;
  22. * 4、客户端接收到消息后给服务端一个应答;
  23. * 5、服务端接收到应答之后,对消息状态进行修改,表示该消息已经处理;
  24. */
  25. public class PushServer {
  26. private static final int PORT = 6868;
  27. public static void main(String[] args) {
  28. PushServer pushServer = new PushServer();
  29. pushServer.start();
  30. }
  31. private void start() {
  32. ServerBootstrap serverBootstrap = new ServerBootstrap();
  33. NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
  34. NioEventLoopGroup workGroup = new NioEventLoopGroup();
  35. serverBootstrap.group(boosGroup, workGroup);
  36. serverBootstrap.channel(NioServerSocketChannel.class);
  37. serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
  38. protected void initChannel(NioSocketChannel ch) {
  39. //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
  40. ch.pipeline().addLast(new ObjectEncoder());
  41. ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
  42. ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
  43. ch.pipeline().addLast(PushServerHandler.INSTANCE);
  44. ch.pipeline().addLast(PushAckHandler.INSTANCE);
  45. }
  46. });
  47. ChannelFuture channelFuture = serverBootstrap.bind(PORT).addListener((ChannelFutureListener) future -> {
  48. if (future.isSuccess()) {
  49. System.out.println("Netty server start success!");
  50. } else {
  51. System.out.println("Netty server start fail!");
  52. }
  53. });
  54. try {
  55. channelFuture.channel().closeFuture().sync();
  56. } catch (InterruptedException e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. }

PushServerHandler

  1. import com.mytest.codec.LoginMessage;
  2. import com.mytest.codec.PushAckMessage;
  3. import com.mytest.codec.PushMessage;
  4. import io.netty.channel.Channel;
  5. import io.netty.channel.ChannelHandler;
  6. import io.netty.channel.ChannelHandlerContext;
  7. import io.netty.channel.ChannelInboundHandlerAdapter;
  8. import io.netty.handler.timeout.IdleState;
  9. import io.netty.handler.timeout.IdleStateEvent;
  10. import java.util.Date;
  11. import java.util.Map;
  12. import java.util.UUID;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. import java.util.concurrent.TimeUnit;
  15. @ChannelHandler.Sharable
  16. public class PushServerHandler extends ChannelInboundHandlerAdapter {
  17. public static final PushServerHandler INSTANCE = new PushServerHandler();
  18. //key = channelID, value =(key:uid, value:channel)
  19. private static final Map<String, Map<String, Channel>> channelMap = new ConcurrentHashMap<String, Map<String, Channel>>();
  20. @Override
  21. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  22. Channel channel = ctx.channel();
  23. String channelId = channel.id().asLongText();
  24. Map<String, Channel> uidMap = new ConcurrentHashMap<String, Channel>();
  25. //如果是 LoginMessage
  26. if (msg instanceof LoginMessage) {
  27. LoginMessage loginMessage = (LoginMessage) msg;
  28. //当前客户端登录的 用户id --> 当前的channel
  29. uidMap.put(loginMessage.getUid(), channel);
  30. channelMap.put(channelId, uidMap);
  31. // 每一条新连接,都是5秒之后发消息
  32. ctx.executor().scheduleAtFixedRate(() -> {
  33. String uid = loginMessage.getUid();
  34. //这样就是每个客户端发送每个客户端的消息
  35. Channel ch = channelMap.get(channelId).get(uid);
  36. if (ch != null) {
  37. //TODO 可以根据uid查询业务数据,然后把业务数据封装成消息推送给客户端 (业务查询省略)
  38. PushMessage pushMessage = new PushMessage();
  39. pushMessage.setMessageId(UUID.randomUUID().toString());
  40. pushMessage.setContent("尊敬的"+channelId+"童鞋,鉴于你2020年第二季度的卓越表现,公司奖励你1万元,于下月发放,期望再接再厉");
  41. pushMessage.setTimestamp(System.currentTimeMillis());
  42. pushMessage.setExt(String.valueOf(uid));
  43. ch.writeAndFlush(pushMessage);
  44. }
  45. System.out.println(new Date() + " 服务端推送消息ok,currentUid=" + uid + ", channelMap" + channelMap);
  46. }, 5, 5, TimeUnit.SECONDS);
  47. }
  48. //如果是 LoginMessage
  49. if (msg instanceof PushAckMessage) {
  50. PushAckMessage pushAck = (PushAckMessage) msg;
  51. System.out.println("服务端接收到客户端的确认消息:" + pushAck.getMessageId());
  52. }
  53. }
  54. @Override
  55. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  56. //空闲状态的事件
  57. if (evt instanceof IdleStateEvent) {
  58. IdleStateEvent event = (IdleStateEvent) evt;
  59. if (event.state().equals(IdleState.READER_IDLE)) {
  60. String channelId = ctx.channel().id().asLongText();
  61. channelMap.remove(channelId);
  62. // 心跳包丢失,10秒没有收到客户端心跳 (断开连接)
  63. ctx.channel().close().sync();
  64. System.out.println("已与 "+ctx.channel().remoteAddress()+" 断开连接");
  65. }
  66. }
  67. }
  68. @Override
  69. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  70. String channelId = ctx.channel().id().asLongText();
  71. channelMap.remove(channelId);
  72. }
  73. @Override
  74. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  75. System.err.println(cause.getMessage());
  76. }
  77. }

PushAckHandler

  1. import com.mytest.codec.PushAckMessage;
  2. import io.netty.channel.ChannelHandler;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. @ChannelHandler.Sharable
  6. public class PushAckHandler extends SimpleChannelInboundHandler<PushAckMessage> {
  7. public static final PushAckHandler INSTANCE = new PushAckHandler();
  8. @Override
  9. protected void channelRead0(ChannelHandlerContext ctx, PushAckMessage pushAck) {
  10. System.out.println("服务端接收到客户端的确认消息:" + pushAck.getMessageId());
  11. //TODO 更新消息状态 (业务处理,暂时省略)
  12. }
  13. }

三、服务端代码

PushClient

  1. import com.mytest.codec.LoginMessage;
  2. import com.mytest.push.handler.PushClientHandler;
  3. import io.netty.bootstrap.Bootstrap;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. import io.netty.handler.codec.serialization.ClassResolvers;
  9. import io.netty.handler.codec.serialization.ObjectDecoder;
  10. import io.netty.handler.codec.serialization.ObjectEncoder;
  11. import io.netty.handler.timeout.IdleStateHandler;
  12. import io.netty.util.internal.StringUtil;
  13. import java.util.Scanner;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * 客户端接收服务端推送的消息
  17. * 需求:
  18. * 1、服务端从redis或数据库等存储层获取到要推送的消息;
  19. * 2、服务端把获取到的消息主动推送给客户端(只要查询到了数据就推送给客户端);
  20. * 3、如果有多个客户端连接到了服务端,要区分客户端,不同客户端发送不同的消息;
  21. * 4、客户端接收到消息后给服务端一个应答;
  22. * 5、服务端接收到应答之后,对消息状态进行修改,表示该消息已经处理;
  23. */
  24. public class PushClient {
  25. private static final String HOST = "127.0.0.1";
  26. private static final int PORT = 6868;
  27. public static void main(String[] args) throws Exception {
  28. //假设用户已经登录,登录消息
  29. LoginMessage loginMessage = new LoginMessage();
  30. loginMessage.setUid("1000");
  31. PushClient pushClient = new PushClient();
  32. pushClient.start(loginMessage);
  33. }
  34. private void start(LoginMessage loginMessage) throws Exception {
  35. Bootstrap bootstrap = new Bootstrap();
  36. NioEventLoopGroup group = new NioEventLoopGroup();
  37. bootstrap.group(group);
  38. bootstrap.channel(NioSocketChannel.class);
  39. bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
  40. @Override
  41. protected void initChannel(NioSocketChannel ch) {
  42. //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
  43. ch.pipeline().addLast(new ObjectEncoder());
  44. ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
  45. ch.pipeline().addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS));
  46. ch.pipeline().addLast(PushClientHandler.INSTANCE);
  47. }
  48. });
  49. //连netty服务端
  50. ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();
  51. if (channelFuture.isSuccess()) {
  52. System.out.println("Connect netty server 成功, 请输入用户ID:");
  53. //写出登录信息
  54. Scanner scanner = new Scanner(System.in);
  55. for (;;) {
  56. String uid = scanner.nextLine();
  57. if (!StringUtil.isNullOrEmpty(uid)) {
  58. loginMessage.setUid(uid); //业务参数
  59. //向服务端写出loginMessage(或者理解成向服务端注册当前app客户端)
  60. channelFuture.channel().writeAndFlush(loginMessage);;
  61. break;
  62. }
  63. }
  64. }
  65. channelFuture.channel().closeFuture().sync();
  66. }
  67. }

PushClientHandler

  1. import com.mytest.codec.PushAckMessage;
  2. import com.mytest.codec.PushMessage;
  3. import io.netty.channel.ChannelHandler;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import io.netty.handler.timeout.IdleState;
  7. import io.netty.handler.timeout.IdleStateEvent;
  8. import io.netty.util.AttributeKey;
  9. import java.util.Date;
  10. @ChannelHandler.Sharable
  11. public class PushClientHandler extends SimpleChannelInboundHandler<PushMessage> {
  12. public static final PushClientHandler INSTANCE = new PushClientHandler();
  13. @Override
  14. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  15. ctx.channel().attr(AttributeKey.newInstance("uid")).set(100);
  16. //TODO 重连
  17. }
  18. @Override
  19. protected void channelRead0(ChannelHandlerContext ctx, PushMessage msg) {
  20. PushMessage message = (PushMessage) msg;
  21. System.out.println(new Date() + " 接收到的消息:" + message);
  22. //发送消息确认
  23. PushAckMessage pushAck = new PushAckMessage();
  24. pushAck.setMessageId(msg.getMessageId());
  25. pushAck.setTimestamp(System.currentTimeMillis());
  26. ctx.channel().writeAndFlush(pushAck);
  27. }
  28. @Override
  29. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  30. if (evt instanceof IdleStateEvent) {
  31. if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
  32. ctx.writeAndFlush("ping");
  33. }
  34. }
  35. }
  36. }

四、测试

客户端发送消息

在这里插入图片描述

服务端接受消息

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,134人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Boot:服务实时消息

    在 HTTP/1.1 及以前的版本里,客户端、服务端之间的通讯模式只支持一种请求-响应模式,这是一种半双工通讯模式。 而且,在这个模型中,服务端是“被动方”,只能响应用户的请求

    相关 [Java] 服务消息汇总

    > 前言:当构建实时消息推送功能时,选择适合的方案对于开发高效的实时应用至关重要。消息的推送无非就推、拉两种数据模型。本文将介绍四种常见的消息实时推送方案:短轮询(拉)、长轮训