websocket消息推送

傷城~ 2022-03-17 06:14 469阅读 0赞

参考

http://www.cnblogs.com/best/p/5695570.html

https://blog.csdn.net/a78270528/article/details/78180449

需求说明:

1、实现一个用户在多个地方连接服务,例如,当一个用户a在手机上和pc上都连接了socket,则给两个地方都推送消息。

2、使用 id—List 的集合形式:

  1. ConcurrentHashMap<String, List<PushSocketService>> 解决一个用户在多端登陆的情况

3、socket服务类是无法通过注入的方式注入其他服务(因为socket相当与新开了一个独立的线程,各种bean是自己管理的,而通过注入的方式注入的类是spring在管理,所以会出现注入为null的情况),使用SpringFactory.getObject()获取需要的对象

1、引入支持包

  1. <dependency>
  2. <groupId>org.springframework</groupId>
  3. <artifactId>spring-websocket</artifactId>
  4. <version>4.0.6.RELEASE</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>javax</groupId>
  8. <artifactId>javaee-api</artifactId>
  9. <version>7.0</version>
  10. <scope>provided</scope>
  11. </dependency>

2、写socket服务

  1. package im.qingtui.meeting.socket;
  2. import im.qingtui.meeting.constants.ErrorCodeConstants;
  3. import im.qingtui.meeting.constants.exception.SocketException;
  4. import im.qingtui.meeting.dao.UserInfoMapper;
  5. import im.qingtui.meeting.model.Token;
  6. import im.qingtui.meeting.service.CommonService;
  7. import im.qingtui.meeting.utils.StringUtil;
  8. import im.qingtui.meeting.utils.StringUtils;
  9. import im.qingtui.platform.common.SpringFactory;
  10. import java.io.IOException;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. import javax.websocket.ClientEndpoint;
  15. import javax.websocket.EndpointConfig;
  16. import javax.websocket.OnClose;
  17. import javax.websocket.OnOpen;
  18. import javax.websocket.Session;
  19. import javax.websocket.server.PathParam;
  20. import javax.websocket.server.ServerEndpoint;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.stereotype.Component;
  25. /**
  26. * webSocket服务
  27. *
  28. * @author qiaofeng
  29. */
  30. @ServerEndpoint(value = "/socket/meeting/{openId}/{accessToken}")
  31. @Component
  32. @Slf4j
  33. public class PushSocketService {
  34. @Autowired
  35. private UserInfoMapper userInfoMapper = (UserInfoMapper) SpringContextUtils.getSpringBean("userInfoMapper");
  36. //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
  37. private static int onLineCount = 0;
  38. //与某个客户端连接会话,通过此会话给客户端发送数据
  39. private Session session;
  40. //用线程安全的map存放每个用户连接对应的webSocket对象
  41. private static ConcurrentHashMap<String, List<PushSocketService>> writeArrayMap = new ConcurrentHashMap<>();
  42. private String openId;
  43. /**
  44. * 建立socket连接
  45. * @param openId 建立连接的用户openId
  46. */
  47. @OnOpen
  48. public synchronized void onOpen(@PathParam("openId") String openId, @PathParam("accessToken") String token, Session session, EndpointConfig config){
  49. log.info("建立长连接的openId:{}, accessToken{}", openId, token);
  50. if (StringUtils.isEmpty(token) || StringUtil.isEmpty(openId) || userInfoMapper.getUserInfoByOpenId(openId)==null) {
  51. throw new SocketException(ErrorCodeConstants.PARAM_ERROR);
  52. }
  53. this.session = session;
  54. this.openId = openId;
  55. if (writeArrayMap.containsKey(openId)){
  56. List<PushSocketService> pushSocketServiceList = writeArrayMap.get(openId);
  57. pushSocketServiceList.add(this);
  58. }else {
  59. List<PushSocketService> pushSocketServiceList = new ArrayList<>(1);
  60. pushSocketServiceList.add(this);
  61. writeArrayMap.put(openId, pushSocketServiceList);
  62. }
  63. addOnlineCount();
  64. log.info("有新连接加入,现在连接人数{}", getOnlineCount());
  65. }
  66. @OnClose
  67. public void onClose(){
  68. List<PushSocketService> pushSocketServiceList = writeArrayMap.get(openId);
  69. pushSocketServiceList.remove(this);//删除长连接
  70. subOnlineCount();
  71. log.info("有连接关闭,现在连接人数{}", getOnlineCount());
  72. }
  73. /**
  74. * 给部分人发送消息
  75. * @param openIdList 用户openId集合
  76. * @param msg 消息体信息 {
  77. * "cmd": 1,
  78. * “msg”:{
  79. * “meetingId”:””,
  80. * “openId”: ””, //改变状态人的openId
  81. * “attendStatus”:””, //参加状态(0待,1确认参加、2不参加 即请假
  82. * “leaveReason”:“请假理由”
  83. * }
  84. * }
  85. */
  86. public void sendMsgToUser(List<String> openIdList, String msg){
  87. if(StringUtil.isNotEmptyList(openIdList)){
  88. for (String toOpenId : openIdList){
  89. if (writeArrayMap.containsKey(toOpenId)) {
  90. List<PushSocketService> pushSocketServiceList = writeArrayMap.get(toOpenId);
  91. if (StringUtil.isNotEmptyList(pushSocketServiceList)){
  92. for (PushSocketService pushSocketService : pushSocketServiceList) {
  93. try {
  94. pushSocketService.getSession().getBasicRemote().sendText(msg);
  95. } catch (IOException e) {
  96. log.error("socket推送消息失败");
  97. throw new SocketException(ErrorCodeConstants.SOCKET_ERR);
  98. }
  99. }
  100. }
  101. }
  102. }
  103. }
  104. }
  105. public static synchronized int getOnlineCount() {
  106. return onLineCount;
  107. }
  108. private static synchronized void addOnlineCount() {
  109. PushSocketService.onLineCount++;
  110. }
  111. private static synchronized void subOnlineCount() {
  112. PushSocketService.onLineCount--;
  113. }
  114. public Session getSession() {
  115. return session;
  116. }
  117. public void setSession(Session session) {
  118. this.session = session;
  119. }
  120. public String getOpenId() {
  121. return openId;
  122. }
  123. public void setOpenId(String openId) {
  124. this.openId = openId;
  125. }
  126. }

3.前段html调用

  1. <!DOCTYPE HTML>
  2. <html>
  3. <head>
  4. <title>My WebSocket</title>
  5. </head>
  6. <body>
  7. Welcome<br/>
  8. <input id="text" type="text" /><button οnclick="send()">Send</button> <button οnclick="closeWebSocket()">Close</button>
  9. <div id="message">
  10. </div>
  11. </body>
  12. <script type="text/javascript">
  13. var websocket = null;
  14. //判断当前浏览器是否支持WebSocket
  15. if('WebSocket' in window){
  16. websocket = new WebSocket("ws://localhost:8181/socket/meeting/6737d8c65bd24215a42d39675fef5f78/sss");
  17. }
  18. else{
  19. alert('Not support websocket')
  20. }
  21. //连接发生错误的回调方法
  22. websocket.onerror = function(){
  23. setMessageInnerHTML("error");
  24. };
  25. //连接成功建立的回调方法
  26. websocket.onopen = function(event){
  27. setMessageInnerHTML("open");
  28. }
  29. //接收到消息的回调方法
  30. websocket.onmessage = function(event){
  31. setMessageInnerHTML(event.data);
  32. }
  33. //连接关闭的回调方法
  34. websocket.onclose = function(){
  35. setMessageInnerHTML("close");
  36. }
  37. //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
  38. window.onbeforeunload = function(){
  39. websocket.close();
  40. }
  41. //将消息显示在网页上
  42. function setMessageInnerHTML(innerHTML){
  43. document.getElementById('message').innerHTML += innerHTML + '<br/>';
  44. }
  45. //关闭连接
  46. function closeWebSocket(){
  47. websocket.close();
  48. }
  49. //发送消息
  50. function send(){
  51. var message = document.getElementById('text').value;
  52. websocket.send(message);
  53. }
  54. </script>
  55. </html>

发表评论

表情:
评论列表 (有 0 条评论,469人围观)

还没有评论,来说两句吧...

相关阅读

    相关 抓取WebSocket消息

    介绍 很多直播或对数据及时性要求比较高的网站,使用了WebSocket。这种数据要怎么抓呢? 我们这里以socket.io为例,我们可以查看网站网页源代码看使用的H5的