1、WebSocket简述
WebSocket 是 HTML5 新增的 API,是一种基于 TCP 连接的持久化双向通信协议。
WebSocket 默认连接端口是80,运行端口是443。
WebSocket 连接地址示例(以 ws 或者 wss 开头):ws://text.com:8080 或 wss://text.com:8080(加密)。
Springboot项目导入WebSocket依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
注册WebSocket节点
@Slf4j @Configuration public class WebSocketConfig { /** * ServerEndpointExporter 作用 * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { log.info("serverEndpointExporter被注入了"); return new ServerEndpointExporter(); } }
2、RabbitMQ简述
RabbitMQ是一款开源的消息中间件,实现了高级消息队列协议(AMQP),提供了可靠的消息传递机制,用于在分布式系统中传递和存储消息。
Springboot项目导入RabbitMQ依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在application.yml文件中配置信息:
spring: rabbitmq: host: 172.16.10.XXX port: 5672 virtual-host: / username: XXX password: XXXXXX
生产者生产消息,并向消息队列发送消息简单示例,这里以接口的形式呈现。
若要生产进度消息,还需要再任务模型里进行设计,或按分页计算进度,或预估计算时间
@Slf4j @RestController @RequestMapping("/inform") public class MQProducerController { @Autowired private RabbitTemplate rabbitTemplate; ObjectMapper mapper = new ObjectMapper(); @RequestMapping("/test") public Result<String> index(String msgType, String userId, String sessionId, Float result) throws JsonProcessingException { AnalysisTaskProgressMessage<Float> progressMsg = new AnalysisTaskProgressMessage<>(msgType, userId, sessionId, result); String progressMsgJson = mapper.writeValueAsString(progressMsg); rabbitTemplate.convertAndSend("simple.queue", progressMsgJson); log.info(progressMsgJson + "消息发送成功!"); return Result.OK("消息发送成功!"); } }
3、注册WebSocket端点
@Slf4j @ServerEndpoint(value = "/webSocket/analysisTask/{userId}", encoders = {BaseModelEncoder.class}) @Component public class AnalysisTaskWSSvr { /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ private static ConcurrentHashMap<AnalysisTaskCompositeId, AnalysisTaskWSSvr> webSocketMap = new ConcurrentHashMap<>(); /** * 记录当前在线用户的ID */ private static Set<String> onlineUserSet = new CopyOnWriteArraySet<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 接收 AnalysisTaskCompositeId */ private AnalysisTaskCompositeId analysisTaskCompositeId; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId, @PathParam("requestId") String requestId) { // 设置长连接时间 // session.setMaxIdleTimeout(3600000); this.session = session; AnalysisTaskCompositeId analysisTaskId = new AnalysisTaskCompositeId(userId, this.session.getId()); this.analysisTaskCompositeId = analysisTaskId; if (webSocketMap.containsKey(analysisTaskId)) { webSocketMap.remove(analysisTaskId); webSocketMap.put(analysisTaskId, this); } else { // 加入set中 webSocketMap.put(analysisTaskId, this); addOnlineCount(analysisTaskId.getUserId()); } try { AnalysisTaskWSSvr.sendSuccessMessage(analysisTaskId); } catch (IOException e) { throw new RuntimeException(e); } log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount()); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { if (webSocketMap.containsKey(analysisTaskCompositeId)) { webSocketMap.remove(analysisTaskCompositeId); //从set中删除 subOnlineCount(analysisTaskCompositeId.getUserId()); } log.info("用户退出:" + analysisTaskCompositeId.getUserId() + ",当前在线人数为:" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("用户消息:" + analysisTaskCompositeId.getUserId() + ", 报文:" + message); if (StringUtils.isEmpty(message)) return; // TODO: 接收信息,调用相关服务 } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.warn("用户错误:" + this.analysisTaskCompositeId.getUserId() + ",原因:" + error.getMessage()); error.printStackTrace(); } public static void sendSuccessMessage(AnalysisTaskCompositeId compositeId) throws IOException { AnalysisTaskWSSvr webSocketServer = webSocketMap.get(compositeId); webSocketServer.session.getBasicRemote().sendText(String.valueOf(compositeId)); } public static int getOnlineCount() { return onlineUserSet.size(); } public static void addOnlineCount(String userId) { if (!onlineUserSet.contains(userId)) { onlineUserSet.add(userId); } } public static void subOnlineCount(String userId) { if (onlineUserSet.contains(userId)) { onlineUserSet.remove(userId); } } }
4、监听队列,消费消息
构建消息监听器,同时调用相关功能处理消息,这里有多种方式可以选择:
1、采用Direct订阅模型,不同的消息被不同的队列进行消费;
2、采用简单队列模式,生产者生产的消息采用同一个方式进行封装,并标注好消息类型,监听器接收到消息后会根据消息类型分发给不同的websocket连接。
@Slf4j @Configuration @RequiredArgsConstructor public class AnalysisTaskProgressListener { /** * 监听任务进度状态 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "analysis.queue1", durable = "true"), exchange = @Exchange(name = "analysisTask", type = ExchangeTypes.DIRECT), key = "task1" )) public void listenTaskProgress(String msg) { executeTask(msg); } @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { executeTask(msg); } ``` private static void executeTask(String msg) { log.info("监听到消息" + msg); JSONObject jsonMsg = JSON.parseObject(msg); // 对消息类型做判断 AnalysisTaskWSSvr.sendProgressMessage( jsonMsg.getString("userId"), jsonMsg.getString("sessionId"), jsonMsg.getString("taskId"), jsonMsg.getString("taskType"), jsonMsg.getFloatValue("progress") ); } }
5、使用vue3构建WebSocket连接
这里使用vue3构建前端界面,并建立WebSocket连接
WebSocket连接:ws://localhost:8889/webSocket/analysisTask/#{userId}
export default class SocketService { static instance = null; static get Instance() { if (!this.instance) { this.instance = new SocketService(); } return this.instance; } // 和服务端连接的socket对象 ws = null; // 存储回调函数 callBackMapping = {}; // 标识是否连接成功 connected = false; // 记录重试的次数 sendRetryCount = 0; // 重新连接尝试的次数 connectRetryCount = 0; // 定义连接服务器的方法 connect() { // 连接服务器 if (!window.WebSocket) { return console.log('您的浏览器不支持WebSocket'); } // let token = $.cookie('123'); // let token = '4E6EF539AAF119D82AC4C2BC84FBA21F'; let url = 'ws://localhost:8889/webSocket/analysisTask/001'; this.ws = new WebSocket(url); // 连接成功的事件 this.ws.onopen = () => { console.log('连接服务端成功了'); this.connected = true; // 重置重新连接的次数 this.connectRetryCount = 0; }; // 1.连接服务端失败 // 2.当连接成功之后, 服务器关闭的情况 this.ws.onclose = () => { console.log('连接服务端失败'); this.connected = false; this.connectRetryCount++; setTimeout(() => { this.connect(); }, 500 * this.connectRetryCount); }; // 得到服务端发送过来的数据 this.ws.onmessage = msg => { console.log(msg.data, '从服务端获取到了数据'); }; } // 回调函数的注册 registerCallBack(socketType, callBack) { this.callBackMapping[socketType] = callBack; } // 取消某一个回调函数 unRegisterCallBack(socketType) { this.callBackMapping[socketType] = null; } // 发送数据的方法 send(data) { // 判断此时此刻有没有连接成功 if (this.connected) { this.sendRetryCount = 0; try { this.ws.send(JSON.stringify(data)); } catch (e) { this.ws.send(data); } } else { this.sendRetryCount++; setTimeout(() => { this.send(data); }, this.sendRetryCount * 500); } } }
<script setup> import SocketService from "../websocket/SocketService.js" import { reactive } from "vue"; const data = reactive({ socketServe: SocketService.Instance, }); const sendData = () => { data.socketServe.send({ "type": "SoilFertilityAnalysisTask", "taskId": "002" }); console.log('发送数据'); }; const buildWS = () => { SocketService.Instance.connect(); data.socketServe = SocketService.Instance; data.socketServe.registerCallBack('callback1', data.socketServe); setTimeout(() => { sendData() }, 1000) } </script> <template> <div> <button @click="buildWS">运行</button> </div> </template> <style scoped> .read-the-docs { color: #888; } </style>
效果展示如下: