WebSocket 实现服务器消息推送客户端

一、背景

项目需要做一个消息能够实时获取的功能,系统日活跃量达到10000,产生的消息是活跃量的数倍,如果采用 Http 的方式轮询后端服务,会使得后端服务压力过大而奔溃,因此需要一种新的技术方式来改变 “拉” 的方式。

二、解决方案

经过各种 Google、百度 后发现可以使用 html5 的新技术 WebSocket ,将现有 “拉”消息的方式改变成 “推” 的模式,大大的减少服务器压力。

在这里插入图片描述

三、具体实现

实例采用 Spring Boot 框架,

  1. 引入 pom 依赖


    org.springframework.boot
    spring-boot-starter-websocket


    org.springframework.boot
    spring-boot-starter-undertow


    org.springframework.boot
    spring-boot-starter-web


    org.springframework.boot
    spring-boot-starter-tomcat


  2. WebSocket 服务可采用 websocket-apispring-websocket 开发,我们采用 websocket-api 的注解开发方式:

    package com.gridsum.techpub.systemhistory.api.server;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;

    import javax.websocket.*;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.Objects;
    import java.util.Set;
    import java.util.concurrent.CopyOnWriteArraySet;

    /**

    • @author ouyangrongtao
    • @version 1.0
    • @description WebSocketServer
    • @date 2019/12/23 10:16
      **/
      @ServerEndpoint(“/websocket/{sid}”)
      @Service
      public class WebSocketServer {

      private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);

      private ClientInfo clientInfo;
      /**

      • 存放每个客户端对应的 ClientInfo 对象。
        */
        private static final Set WEB_SOCKET_SET = new CopyOnWriteArraySet<>();

        /**

      • 连接建立成功调用的方法
        *
      • @param session 会话
      • @param sid 客户端
        */
        @OnOpen
        public void onOpen(Session session, @PathParam(“sid”) String sid) {
        //加入set中
        this.clientInfo = new ClientInfo(sid, session);
        WEB_SOCKET_SET.add(clientInfo);
        logger.info(“有新窗口开始监听:[{}],当前在线人数为[{}]”, sid, WEB_SOCKET_SET.size());
        try {

        1. this.sendMessage(session, "连接成功");

        } catch (IOException e) {

        1. logger.error("websocket IO异常");

        }
        }

        /**

      • 连接关闭调用的方法
        */
        @OnClose
        public void onClose() {
        //从set中删除
        WEB_SOCKET_SET.remove(this.clientInfo);
        logger.info(“有一连接关闭!当前在线人数为:[{}]”, WEB_SOCKET_SET.size());
        }

        /**

      • 群发
      • @param message 客户端发送过来的消息
        */
        @OnMessage
        public void onMessage(String message) {
        logger.info(“收到来自窗口[{}]的信息:[{}]”, this.clientInfo.getSid(), message);
        //群发消息
        for (ClientInfo item : WEB_SOCKET_SET) {

        1. try {
        2. this.sendMessage(item.getSession(), message);
        3. } catch (IOException ignored) {
        4. }

        }
        }

        /**

      • 错误时调用
      • @param session 会话
      • @param error 错误信息
        */
        @OnError
        public void onError(Session session, Throwable error) {
        logger.error(“发生错误”, error);
        }

        /**

      • 给 sid 发送消息
      • @param message 消息
      • @param sid sid
        */
        public void sendMessage(String message, String sid) {
        logger.info(“推送消息到窗口[{}],推送内容:[{}]”, sid, message);

        ClientInfo client = WEB_SOCKET_SET.parallelStream()

        1. .filter(item -> item.getSid().equals(sid)).findFirst().orElse(null);

        if (client != null) {

        1. try {
        2. this.sendMessage(client.getSession(), message);
        3. } catch (IOException ignored) {
        4. }

        }
        }

        /**

      • 实现服务器主动推送
      • @param session session
      • @param message message
      • @throws IOException IOException
        */
        private void sendMessage(Session session, String message) throws IOException {
        session.getBasicRemote().sendText(message);
        }
  1. class ClientInfo {
  2. /**
  3. * 接收sid
  4. */
  5. private String sid = "";
  6. /**
  7. * 客户端
  8. */
  9. private Session session;
  10. public ClientInfo() { }
  11. private ClientInfo(String sid, Session session) {
  12. this.sid = sid;
  13. this.session = session;
  14. }
  15. private String getSid() {
  16. return sid;
  17. }
  18. private Session getSession() {
  19. return session;
  20. }
  21. @Override
  22. public boolean equals(Object o) {
  23. if (this == o) {
  24. return true;
  25. }
  26. if (o == null || getClass() != o.getClass()) {
  27. return false;
  28. }
  29. ClientInfo that = (ClientInfo) o;
  30. return Objects.equals(sid, that.sid);
  31. }
  32. @Override
  33. public int hashCode() {
  34. return Objects.hash(sid);
  35. }
  36. }
  37. }
  1. 前端代码

    <!DOCTYPE HTML>




    WebSocketClient1000001









  2. 来一个发消息的接口

    /**

    • 发送消息给客户端
    • @author ouyangrongtao
      */
      @RestController
      public class WebSocketController {

      private WebSocketServer webSocketServer;

      @Autowired
      public WebSocketController(WebSocketServer webSocketServer) {

      1. this.webSocketServer = webSocketServer;

      }

      @PostMapping(“/socket/push”)
      public boolean pushToWeb(@RequestBody Map content) {

      1. webSocketServer.sendMessage(content.get("message"), content.get("cid"));
      2. return true;

      }
      }

到此已经基本写完。使用 Postman 调用发消息的接口,发现客户端可以收到发送的消息。

四、问题记录

在做的时候,因为项目用的 Tomcat 容器,导致 Tomcat 相关包与 WebSocket 依赖有冲突,最终项目不能启动,解决方式只需要将 Tomcat 容器改为 Undertow 。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-undertow</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. <exclusions>
  9. <exclusion>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-tomcat</artifactId>
  12. </exclusion>
  13. </exclusions>
  14. </dependency>

异常信息:

  1. Caused by: java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available
  2. at org.springframework.util.Assert.state(Assert.java:73)
  3. at org.springframework.web.socket.server.standard.ServerEndpointExporter.afterPropertiesSet(ServerEndpointExporter.java:106)
  4. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1753)
  5. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1690)
  6. ... 16 common frames omitted

发表评论

表情:
评论列表 (有 0 条评论,89人围观)

还没有评论,来说两句吧...

相关阅读