websocket消息推送
参考
http://www.cnblogs.com/best/p/5695570.html
https://blog.csdn.net/a78270528/article/details/78180449
需求说明:
1、实现一个用户在多个地方连接服务,例如,当一个用户a在手机上和pc上都连接了socket,则给两个地方都推送消息。
2、使用 id—List
ConcurrentHashMap<String, List<PushSocketService>> 解决一个用户在多端登陆的情况
3、socket服务类是无法通过注入的方式注入其他服务(因为socket相当与新开了一个独立的线程,各种bean是自己管理的,而通过注入的方式注入的类是spring在管理,所以会出现注入为null的情况),使用SpringFactory.getObject()获取需要的对象
1、引入支持包
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>4.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
<scope>provided</scope>
</dependency>
2、写socket服务
package im.qingtui.meeting.socket;
import im.qingtui.meeting.constants.ErrorCodeConstants;
import im.qingtui.meeting.constants.exception.SocketException;
import im.qingtui.meeting.dao.UserInfoMapper;
import im.qingtui.meeting.model.Token;
import im.qingtui.meeting.service.CommonService;
import im.qingtui.meeting.utils.StringUtil;
import im.qingtui.meeting.utils.StringUtils;
import im.qingtui.platform.common.SpringFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.ClientEndpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* webSocket服务
*
* @author qiaofeng
*/
@ServerEndpoint(value = "/socket/meeting/{openId}/{accessToken}")
@Component
@Slf4j
public class PushSocketService {
@Autowired
private UserInfoMapper userInfoMapper = (UserInfoMapper) SpringContextUtils.getSpringBean("userInfoMapper");
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onLineCount = 0;
//与某个客户端连接会话,通过此会话给客户端发送数据
private Session session;
//用线程安全的map存放每个用户连接对应的webSocket对象
private static ConcurrentHashMap<String, List<PushSocketService>> writeArrayMap = new ConcurrentHashMap<>();
private String openId;
/**
* 建立socket连接
* @param openId 建立连接的用户openId
*/
@OnOpen
public synchronized void onOpen(@PathParam("openId") String openId, @PathParam("accessToken") String token, Session session, EndpointConfig config){
log.info("建立长连接的openId:{}, accessToken{}", openId, token);
if (StringUtils.isEmpty(token) || StringUtil.isEmpty(openId) || userInfoMapper.getUserInfoByOpenId(openId)==null) {
throw new SocketException(ErrorCodeConstants.PARAM_ERROR);
}
this.session = session;
this.openId = openId;
if (writeArrayMap.containsKey(openId)){
List<PushSocketService> pushSocketServiceList = writeArrayMap.get(openId);
pushSocketServiceList.add(this);
}else {
List<PushSocketService> pushSocketServiceList = new ArrayList<>(1);
pushSocketServiceList.add(this);
writeArrayMap.put(openId, pushSocketServiceList);
}
addOnlineCount();
log.info("有新连接加入,现在连接人数{}", getOnlineCount());
}
@OnClose
public void onClose(){
List<PushSocketService> pushSocketServiceList = writeArrayMap.get(openId);
pushSocketServiceList.remove(this);//删除长连接
subOnlineCount();
log.info("有连接关闭,现在连接人数{}", getOnlineCount());
}
/**
* 给部分人发送消息
* @param openIdList 用户openId集合
* @param msg 消息体信息 {
* "cmd": 1,
* “msg”:{
* “meetingId”:””,
* “openId”: ””, //改变状态人的openId
* “attendStatus”:””, //参加状态(0待,1确认参加、2不参加 即请假
* “leaveReason”:“请假理由”
* }
* }
*/
public void sendMsgToUser(List<String> openIdList, String msg){
if(StringUtil.isNotEmptyList(openIdList)){
for (String toOpenId : openIdList){
if (writeArrayMap.containsKey(toOpenId)) {
List<PushSocketService> pushSocketServiceList = writeArrayMap.get(toOpenId);
if (StringUtil.isNotEmptyList(pushSocketServiceList)){
for (PushSocketService pushSocketService : pushSocketServiceList) {
try {
pushSocketService.getSession().getBasicRemote().sendText(msg);
} catch (IOException e) {
log.error("socket推送消息失败");
throw new SocketException(ErrorCodeConstants.SOCKET_ERR);
}
}
}
}
}
}
}
public static synchronized int getOnlineCount() {
return onLineCount;
}
private static synchronized void addOnlineCount() {
PushSocketService.onLineCount++;
}
private static synchronized void subOnlineCount() {
PushSocketService.onLineCount--;
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
public String getOpenId() {
return openId;
}
public void setOpenId(String openId) {
this.openId = openId;
}
}
3.前段html调用
<!DOCTYPE HTML>
<html>
<head>
<title>My WebSocket</title>
</head>
<body>
Welcome<br/>
<input id="text" type="text" /><button οnclick="send()">Send</button> <button οnclick="closeWebSocket()">Close</button>
<div id="message">
</div>
</body>
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
websocket = new WebSocket("ws://localhost:8181/socket/meeting/6737d8c65bd24215a42d39675fef5f78/sss");
}
else{
alert('Not support websocket')
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回调方法
websocket.onmessage = function(event){
setMessageInnerHTML(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭连接
function closeWebSocket(){
websocket.close();
}
//发送消息
function send(){
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</html>
还没有评论,来说两句吧...