不依赖Spring的websocket 服务器端做客户端代码实现

素颜马尾好姑娘i 2022-09-29 11:58 344阅读 0赞

maven配置


org.java-websocket
Java-WebSocket
1.3.0


案例

websocket生命周期为NOT_YET_CONNECTED —-》 CONNECTING —-》 OPEN —-》 CLOSING —-》 CLOSED

onclose方法并不会关闭client,需要手动调用close()方法

package server;

import java.net.URI;

import java.net.URISyntaxException;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import org.java_websocket.WebSocket.READYSTATE;

import org.java_websocket.client.WebSocketClient;

import org.java_websocket.handshake.ServerHandshake;

import net.sf.json.JSONObject;

import RedisClient;

/**

* 基本业务

*/

public class BasicInfoConnector {

private static WebSocketClient wsc = null;

private static final BlockingQueue messageReceiveQueue = new ArrayBlockingQueue(5000);

private static final BlockingQueue messageSendQueue = new ArrayBlockingQueue(2000);

private static Object lock = new Object();

private static BasicInfoConnector wsClientCore = getInstance();

private static ReceiveProcessor messageFromProcessor = null;

private boolean sendThreadSwitch = true;

private boolean receiveThreadSwitch = true;

private boolean heartBeatThreadSwitch = true;

private Thread sendThread = null;

private Thread receiveThread = null;

private Thread heartBeatThread = null;

private static long lastCommunicateTime = System.currentTimeMillis();

private static ExecutorService sendChatInfoProcessor = null;

private RedisClient redisClient = null;

private static Long channelId;

private static final String keepalive = JSONObject.fromObject(new KeepAlivePO()).toString();

public static void stop(boolean sendThreadSwitch,boolean receiveThreadSwitch,boolean heartBeatThreadSwitch){

wsClientCore.heartBeatThreadSwitch = heartBeatThreadSwitch;

wsClientCore.sendThreadSwitch = sendThreadSwitch;

wsClientCore.receiveThreadSwitch = receiveThreadSwitch;

}

public static void setProcessor(ReceiveProcessor messageFromProcessor,RedisClient redisClient,long channelId){

BasicInfoConnector.messageFromProcessor = messageFromProcessor;

wsClientCore.redisClient = redisClient;

BasicInfoConnector.channelId = channelId;

}

public final static void sendMessage(String sendMsg) throws Exception{

boolean result = messageSendQueue.offer(sendMsg);

if(!result){

throw new Exception(“the message sending queue if full, cant send the message:”+sendMsg);

}

}

public final static String getMessage() throws InterruptedException{

return messageReceiveQueue.take();

}

private BasicInfoConnector(){

}

public static final BasicInfoConnector getInstance(){

if(wsClientCore == null){

synchronized (lock) {

if(wsClientCore == null){

wsClientCore = new BasicInfoConnector();

}

}

}

return wsClientCore;

}

/**

* 初始化

*/

static {

try {

final String serverAddr = “ws://localhost:9001/ws”;

wsc = initClient(serverAddr);

wsc.connect();

} catch (URISyntaxException e) {

e.printStackTrace();

}

}

private static WebSocketClient initClient(final String serverAddr) throws URISyntaxException {

return new WebSocketClient(new URI(serverAddr)) {

@Override

public void onOpen(ServerHandshake handshakedata) {//连接建立

lastCommunicateTime = System.currentTimeMillis();

wsClientCore.componentThreadInit();//发送、接收、心跳等组件的线程服务初始化

System.out.println(“client has connected to the server:”+serverAddr);

}

@Override

public void onMessage(String message) {//接收到消息

lastCommunicateTime = System.currentTimeMillis();

boolean result = messageReceiveQueue.offer(message);

if(!result){

System.out.println(“the message receiving queue is full, message will be discarded:”+message);

}

}

@Override

public void onError(Exception ex) {//连接发生错误

System.out.println(“error during connecting server!:”+ex.getMessage());

}

@SuppressWarnings(“static-access”)

@Override

public void onClose(int code, String reason, boolean remote) {//连接关闭

System.out.println(“Client has been disconnected to the server. to reconnect!”);

System.out.println(“The current stat of the websocket client:” + wsc.getReadyState());

if(wsc.getReadyState() == READYSTATE.OPEN){

wsc.close();

System.out.println(“Close the current client, ready to reconnect!”);

}

if(wsc.getReadyState() == READYSTATE.NOT_YET_CONNECTED || wsc.getReadyState() == READYSTATE.CLOSED || wsc.getReadyState() == READYSTATE.CLOSING){

try {

Thread.currentThread().sleep(500);

} catch (InterruptedException e1) {

e1.printStackTrace();

}

try {

wsc = initClient(serverAddr);

} catch (URISyntaxException e) {

e.printStackTrace();

}

wsc.connect();

}

}

};

}

/**

* 队列中获取线程并websocket发送

*/

private class SendThread implements Runnable{

@Override

public void run() {

while(sendThreadSwitch){

try {

String msg = messageSendQueue.take();

wsc.send(msg);

lastCommunicateTime = System.currentTimeMillis();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

/**

* 从websocket中获取第三方消息,并传递给处理器

*/

private class ReceiveThread implements Runnable{

@Override

public void run() {

try {

while(receiveThreadSwitch){

String msg = getMessage();

System.out.println(“获取信息:”+msg);

try {

messageFromProcessor.processBusiness(msg, BasicInfoConnector.channelId);

} catch (Exception e) {

System.out.println(“信息处理错误:”+msg);

e.printStackTrace();

}

}

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

/**

* 心跳检测

*/

private class Heartbeat implements Runnable{

@Override

public void run() {

try {

Thread.sleep(500);

while(heartBeatThreadSwitch){

if(((System.currentTimeMillis() - lastCommunicateTime) / 1000) > 30){

System.out.println(“ready to heart beat,the current client stat:”+wsc.getReadyState());

if(wsc.getReadyState() == READYSTATE.OPEN){

System.out.println(“send heart beat to server……”);

wsc.send(keepalive);

lastCommunicateTime = System.currentTimeMillis();

}else{

System.out.println(“is not connecting, reset heart beat time”);

lastCommunicateTime = System.currentTimeMillis();

}

}

}

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

/**

* 组件初始化

*/

private void componentThreadInit(){

if(wsClientCore.receiveThread == null || !wsClientCore.receiveThread.isAlive()){//从队列中接收,并交给业务程序去处理

wsClientCore.receiveThread = new Thread(wsClientCore.new ReceiveThread());

wsClientCore.receiveThread.start();

System.out.println(“基本信息接收线程启动…”);

}

if(BasicInfoConnector.sendChatInfoProcessor == null){//来自信息到业务结果队列

BasicInfoConnector.sendChatInfoProcessor = Executors.newFixedThreadPool(Constants.SEND_PROCESSOR);

for(int i = 0 ; i < Constants.SEND_PROCESSOR ; i ++){

BasicInfoConnector.sendChatInfoProcessor.execute(new SendBasicThread(redisClient, BasicInfoConnector.channelId));

}

System.out.println(“业务线程启动…”);

}

if(wsClientCore.sendThread == null || !wsClientCore.sendThread.isAlive()){//只负责发送,从业务结果或请求队列发送到XXX

wsClientCore.sendThread = new Thread(wsClientCore.new SendThread());

wsClientCore.sendThread.start();

System.out.println(“发送线程启动…”);

}

if(wsClientCore.heartBeatThread == null || !wsClientCore.heartBeatThread.isAlive()){//心跳线程

wsClientCore.heartBeatThread= new Thread(wsClientCore.new Heartbeat());

wsClientCore.heartBeatThread.start();

System.out.println(“心跳检测线程启动…”);

}

System.out.println(“启动线程初始化完毕…….”);

}

}

我的技术交流群425783133

发表评论

表情:
评论列表 (有 0 条评论,344人围观)

还没有评论,来说两句吧...

相关阅读