Springboot 集成 SSE 向前端推送消息

旧城等待, 2023-09-29 13:27 214阅读 0赞

Sse推送

    • Sse介绍
    • 特点分析
    • 应用场景
    • Spring Boot 集成
    • 测试
    • 前端代码

Sse介绍

sse(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件

我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式

特点分析

SSE 最大的特点,可以简单规划为两个

  • 长连接
  • 服务端可以向客户端推送信息

了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别

sse 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道
那么为什么有了 webscoket 还要搞出一个 sse 呢?既然存在,必然有着它的优越之处






























sse websocket
http 协议 独立的 websocket 协议
轻量,使用简单 相对复杂
默认支持断线重连 需要自己实现断线重连
文本传输 二进制传输
支持自定义发送的消息类型 -

应用场景

从 sse 的特点出发,我们可以大致的判断出它的应用场景,需要轮询获取服务端最新数据的 case 下,多半是可以用它的

比如显示当前网站在线的实时人数,法币汇率显示当前实时汇率,电商大促的实时成交额等等…

Spring Boot 集成

项目结构
在这里插入图片描述

使用到的依赖

  1. <dependency>
  2. <groupId>org.projectlombok</groupId>
  3. <artifactId>lombok</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>cn.hutool</groupId>
  11. <artifactId>hutool-all</artifactId>
  12. <version>5.7.16</version>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-test</artifactId>
  17. </dependency>

消息实体

  1. package com.ddz.sse.entity;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. /**
  6. * 消息体
  7. *
  8. * @author Lenovo
  9. * @date 2022/5/6
  10. */
  11. @Data
  12. @AllArgsConstructor
  13. @NoArgsConstructor
  14. public class MessageVo {
  15. /**
  16. * 客户端id
  17. */
  18. private String clientId;
  19. /**
  20. * 传输数据体(json)
  21. */
  22. private String data;
  23. }

接口

  1. package com.ddz.sse.service;
  2. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  3. /**
  4. * @author Lenovo
  5. * @date 2022/5/6
  6. */
  7. public interface SseEmitterService {
  8. /**
  9. * 创建连接
  10. *
  11. * @param clientId 客户端ID
  12. */
  13. SseEmitter createConnect(String clientId);
  14. /**
  15. * 根据客户端id获取SseEmitter对象
  16. *
  17. * @param clientId 客户端ID
  18. */
  19. SseEmitter getSseEmitterByClientId(String clientId);
  20. /**
  21. * 发送消息给所有客户端
  22. *
  23. * @param msg 消息内容
  24. */
  25. void sendMessageToAllClient(String msg);
  26. /**
  27. * 给指定客户端发送消息
  28. *
  29. * @param clientId 客户端ID
  30. * @param msg 消息内容
  31. */
  32. void sendMessageToOneClient(String clientId, String msg);
  33. /**
  34. * 关闭连接
  35. *
  36. * @param clientId 客户端ID
  37. */
  38. void closeConnect(String clientId);
  39. }

实现类

  1. package com.ddz.sse.service.impl;
  2. import cn.hutool.core.map.MapUtil;
  3. import cn.hutool.core.util.IdUtil;
  4. import cn.hutool.core.util.StrUtil;
  5. import cn.hutool.http.HttpStatus;
  6. import com.ddz.sse.entity.MessageVo;
  7. import com.ddz.sse.service.SseEmitterService;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.springframework.http.MediaType;
  10. import org.springframework.stereotype.Service;
  11. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  12. import java.io.IOException;
  13. import java.util.Map;
  14. import java.util.concurrent.ConcurrentHashMap;
  15. import java.util.function.Consumer;
  16. @Slf4j
  17. @Service
  18. public class SseEmitterServiceImpl implements SseEmitterService {
  19. /**
  20. * 容器,保存连接,用于输出返回 ;可使用其他方法实现
  21. */
  22. private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
  23. /**
  24. * 根据客户端id获取SseEmitter对象
  25. *
  26. * @param clientId 客户端ID
  27. */
  28. @Override
  29. public SseEmitter getSseEmitterByClientId(String clientId) {
  30. return sseCache.get(clientId);
  31. }
  32. /**
  33. * 创建连接
  34. *
  35. * @param clientId 客户端ID
  36. */
  37. @Override
  38. public SseEmitter createConnect(String clientId) {
  39. // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
  40. SseEmitter sseEmitter = new SseEmitter(0L);
  41. // 是否需要给客户端推送ID
  42. if (StrUtil.isBlank(clientId)) {
  43. clientId = IdUtil.simpleUUID();
  44. }
  45. // 注册回调
  46. sseEmitter.onCompletion(completionCallBack(clientId)); // 长链接完成后回调接口(即关闭连接时调用)
  47. sseEmitter.onTimeout(timeoutCallBack(clientId)); // 连接超时回调
  48. sseEmitter.onError(errorCallBack(clientId)); // 推送消息异常时,回调方法
  49. sseCache.put(clientId, sseEmitter);
  50. log.info("创建新的sse连接,当前用户:{} 累计用户:{}", clientId, sseCache.size());
  51. try {
  52. // 注册成功返回用户信息
  53. sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_CREATED)).data(clientId, MediaType.APPLICATION_JSON));
  54. } catch (IOException e) {
  55. log.error("创建长链接异常,客户端ID:{} 异常信息:{}", clientId, e.getMessage());
  56. }
  57. return sseEmitter;
  58. }
  59. /**
  60. * 发送消息给所有客户端
  61. *
  62. * @param msg 消息内容
  63. */
  64. @Override
  65. public void sendMessageToAllClient(String msg) {
  66. if (MapUtil.isEmpty(sseCache)) {
  67. return;
  68. }
  69. // 判断发送的消息是否为空
  70. for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
  71. MessageVo messageVo = new MessageVo();
  72. messageVo.setClientId(entry.getKey());
  73. messageVo.setData(msg);
  74. sendMsgToClientByClientId(entry.getKey(), messageVo, entry.getValue());
  75. }
  76. }
  77. /**
  78. * 给指定客户端发送消息
  79. *
  80. * @param clientId 客户端ID
  81. * @param msg 消息内容
  82. */
  83. @Override
  84. public void sendMessageToOneClient(String clientId, String msg) {
  85. MessageVo messageVo = new MessageVo(clientId, msg);
  86. sendMsgToClientByClientId(clientId, messageVo, sseCache.get(clientId));
  87. }
  88. /**
  89. * 关闭连接
  90. *
  91. * @param clientId 客户端ID
  92. */
  93. @Override
  94. public void closeConnect(String clientId) {
  95. SseEmitter sseEmitter = sseCache.get(clientId);
  96. if (sseEmitter != null) {
  97. sseEmitter.complete();
  98. removeUser(clientId);
  99. }
  100. }
  101. /**
  102. * 推送消息到客户端
  103. * 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
  104. *
  105. * @param clientId 客户端ID
  106. * @param messageVo 推送信息,此处结合具体业务,定义自己的返回值即可
  107. **/
  108. private void sendMsgToClientByClientId(String clientId, MessageVo messageVo, SseEmitter sseEmitter) {
  109. if (sseEmitter == null) {
  110. log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}",
  111. clientId, messageVo.toString());
  112. return;
  113. }
  114. SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_OK))
  115. .data(messageVo, MediaType.APPLICATION_JSON);
  116. try {
  117. sseEmitter.send(sendData);
  118. } catch (IOException e) {
  119. // 推送消息失败,记录错误日志,进行重推
  120. log.error("推送消息失败:{},尝试进行重推", messageVo.toString());
  121. boolean isSuccess = true;
  122. // 推送消息失败后,每隔10s推送一次,推送5次
  123. for (int i = 0; i < 5; i++) {
  124. try {
  125. Thread.sleep(10000);
  126. sseEmitter = sseCache.get(clientId);
  127. if (sseEmitter == null) {
  128. log.error("{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
  129. continue;
  130. }
  131. sseEmitter.send(sendData);
  132. } catch (Exception ex) {
  133. log.error("{}的第{}次消息重推失败", clientId, i + 1, ex);
  134. continue;
  135. }
  136. log.info("{}的第{}次消息重推成功,{}", clientId, i + 1, messageVo.toString());
  137. return;
  138. }
  139. }
  140. }
  141. /**
  142. * 长链接完成后回调接口(即关闭连接时调用)
  143. *
  144. * @param clientId 客户端ID
  145. **/
  146. private Runnable completionCallBack(String clientId) {
  147. return () -> {
  148. log.info("结束连接:{}", clientId);
  149. removeUser(clientId);
  150. };
  151. }
  152. /**
  153. * 连接超时时调用
  154. *
  155. * @param clientId 客户端ID
  156. **/
  157. private Runnable timeoutCallBack(String clientId) {
  158. return () -> {
  159. log.info("连接超时:{}", clientId);
  160. removeUser(clientId);
  161. };
  162. }
  163. /**
  164. * 推送消息异常时,回调方法
  165. *
  166. * @param clientId 客户端ID
  167. **/
  168. private Consumer<Throwable> errorCallBack(String clientId) {
  169. return throwable -> {
  170. log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
  171. // 推送消息失败后,每隔10s推送一次,推送5次
  172. for (int i = 0; i < 5; i++) {
  173. try {
  174. Thread.sleep(10000);
  175. SseEmitter sseEmitter = sseCache.get(clientId);
  176. if (sseEmitter == null) {
  177. log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
  178. continue;
  179. }
  180. sseEmitter.send("失败后重新推送");
  181. } catch (Exception e) {
  182. e.printStackTrace();
  183. }
  184. }
  185. };
  186. }
  187. /**
  188. * 移除用户连接
  189. *
  190. * @param clientId 客户端ID
  191. **/
  192. private void removeUser(String clientId) {
  193. sseCache.remove(clientId);
  194. log.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
  195. }
  196. }

控制器类

  1. package com.ddz.sse.controller;
  2. import com.ddz.sse.entity.MessageVo;
  3. import com.ddz.sse.service.SseEmitterService;
  4. import org.springframework.web.bind.annotation.*;
  5. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  6. import javax.annotation.Resource;
  7. /**
  8. * SSE长链接
  9. */
  10. @RestController
  11. @RequestMapping("/sse")
  12. public class SseEmitterController {
  13. @Resource
  14. private SseEmitterService sseEmitterService;
  15. @CrossOrigin
  16. @GetMapping("/createConnect")
  17. public SseEmitter createConnect(String clientId) {
  18. return sseEmitterService.createConnect(clientId);
  19. }
  20. @CrossOrigin
  21. @PostMapping("/broadcast")
  22. public void sendMessageToAllClient(@RequestBody(required = false) String msg) {
  23. sseEmitterService.sendMessageToAllClient(msg);
  24. }
  25. @CrossOrigin
  26. @PostMapping("/sendMessage")
  27. public void sendMessageToOneClient(@RequestBody(required = false) MessageVo messageVo) {
  28. if (messageVo.getClientId().isEmpty()) {
  29. return;
  30. }
  31. sseEmitterService.sendMessageToOneClient(messageVo.getClientId(), messageVo.getData());
  32. }
  33. @CrossOrigin
  34. @GetMapping("/closeConnect")
  35. public void closeConnect(@RequestParam(required = true) String clientId) {
  36. sseEmitterService.closeConnect(clientId);
  37. }
  38. }

测试

自己创建clientid
在这里插入图片描述
使用系统创建clientid
在这里插入图片描述
这里浏览器一直在转圈说明是连接成功.
我们使用APIPost接口测试工具测试
在这里插入图片描述
这里ddz123的是能收到我们推送的消息的.

前端代码

  1. <!doctype html>
  2. <html lang="en">
  3. <head>
  4. <title>Sse测试文档</title>
  5. </head>
  6. <body>
  7. <div>sse测试</div>
  8. <div id="data"></div>
  9. </body>
  10. </html>
  11. <script>
  12. var source = new EventSource('http://192.168.1.128:8889/sse/createConnect?clientId=ddz');
  13. source.onmessage = function (event) {
  14. text = document.getElementById('data').innerText;
  15. text += '\n' + event.data;
  16. document.getElementById('data').innerText = text;
  17. };
  18. <!-- 添加一个开启回调 -->
  19. source.onopen = function (event) {
  20. text = document.getElementById('data').innerText;
  21. text += '\n 开启: ';
  22. console.log(event);
  23. document.getElementById('data').innerText = text;
  24. };
  25. </script>

如果需要携带token请求,那就需要引入一个插件

  1. npm install event-source-polyfill

我这里的是vue的代码;代码不完善只供参考一下

  1. <template>
  2. <div>sse测试</div>
  3. <div id="data"></div>
  4. </template>
  5. <script>
  6. import {
  7. EventSourcePolyfill
  8. } from 'event-source-polyfill';
  9. export default {
  10. name: 'sse',
  11. data() {
  12. return {
  13. token:'获取到的令牌'
  14. }
  15. },
  16. mounted() {
  17. },
  18. created() {
  19. this.createContent('http://192.168.1.128:8080/syscon/sse/createConnect','?clientId=ddz',this.token)
  20. },
  21. methods: {
  22. //创建连接
  23. createContent(url, params, token) {
  24. return new EventSourcePolyfill(url, params, {
  25. headers: {
  26. Authorization: token
  27. }
  28. })
  29. },
  30. },
  31. }
  32. </script>

文章参考于https://www.cnblogs.com/yihuihui/p/12622729.html

发表评论

表情:
评论列表 (有 0 条评论,214人围观)

还没有评论,来说两句吧...

相关阅读