1、WebSocket簡述
WebSocket 是 HTML5 新增的 API,是一種基於 TCP 連線的持久化雙向通訊協議。
WebSocket 預設連線埠是80,執行埠是443。
WebSocket 連線地址示例(以 ws 或者 wss 開頭):ws://text.com:8080 或 wss://text.com:8080(加密)。
Springboot專案匯入WebSocket依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
註冊WebSocket節點
@Slf4j @Configuration public class WebSocketConfig { /** * ServerEndpointExporter 作用 * 這個Bean會自動註冊使用@ServerEndpoint註解宣告的websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { log.info("serverEndpointExporter被注入了"); return new ServerEndpointExporter(); } }
2、RabbitMQ簡述
RabbitMQ是一款開源的訊息中介軟體,實現了高階訊息佇列協議(AMQP),提供了可靠的訊息傳遞機制,用於在分散式系統中傳遞和儲存訊息。
Springboot專案匯入RabbitMQ依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在application.yml檔案中配置資訊:
spring: rabbitmq: host: 172.16.10.XXX port: 5672 virtual-host: / username: XXX password: XXXXXX
生產者生產訊息,並向訊息佇列傳送訊息簡單示例,這裏以介面的形式呈現。
若要生產進度訊息,還需要再任務模型裡進行設計,或按分頁計算進度,或預估計算時間
@Slf4j @RestController @RequestMapping("/inform") public class MQProducerController { @Autowired private RabbitTemplate rabbitTemplate; ObjectMapper mapper = new ObjectMapper(); @RequestMapping("/test") public Result<String> index(String msgType, String userId, String sessionId, Float result) throws JsonProcessingException { AnalysisTaskProgressMessage<Float> progressMsg = new AnalysisTaskProgressMessage<>(msgType, userId, sessionId, result); String progressMsgJson = mapper.writeValueAsString(progressMsg); rabbitTemplate.convertAndSend("simple.queue", progressMsgJson); log.info(progressMsgJson + "訊息傳送成功!"); return Result.OK("訊息傳送成功!"); } }
3、註冊WebSocket端點
@Slf4j @ServerEndpoint(value = "/webSocket/analysisTask/{userId}", encoders = {BaseModelEncoder.class}) @Component public class AnalysisTaskWSSvr { /** * concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。 */ private static ConcurrentHashMap<AnalysisTaskCompositeId, AnalysisTaskWSSvr> webSocketMap = new ConcurrentHashMap<>(); /** * 記錄當前線上使用者的ID */ private static Set<String> onlineUserSet = new CopyOnWriteArraySet<>(); /** * 與某個客戶端的連線會話,需要透過它來給客戶端傳送資料 */ private Session session; /** * 接收 AnalysisTaskCompositeId */ private AnalysisTaskCompositeId analysisTaskCompositeId; /** * 連線建立成功呼叫的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId, @PathParam("requestId") String requestId) { // 設定長連線時間 // session.setMaxIdleTimeout(3600000); this.session = session; AnalysisTaskCompositeId analysisTaskId = new AnalysisTaskCompositeId(userId, this.session.getId()); this.analysisTaskCompositeId = analysisTaskId; if (webSocketMap.containsKey(analysisTaskId)) { webSocketMap.remove(analysisTaskId); webSocketMap.put(analysisTaskId, this); } else { // 加入set中 webSocketMap.put(analysisTaskId, this); addOnlineCount(analysisTaskId.getUserId()); } try { AnalysisTaskWSSvr.sendSuccessMessage(analysisTaskId); } catch (IOException e) { throw new RuntimeException(e); } log.info("使用者連線:" + userId + ",當前線上人數為:" + getOnlineCount()); } /** * 連線關閉呼叫的方法 */ @OnClose public void onClose() { if (webSocketMap.containsKey(analysisTaskCompositeId)) { webSocketMap.remove(analysisTaskCompositeId); //從set中刪除 subOnlineCount(analysisTaskCompositeId.getUserId()); } log.info("使用者退出:" + analysisTaskCompositeId.getUserId() + ",當前線上人數為:" + getOnlineCount()); } /** * 收到客戶端訊息後呼叫的方法 * * @param message 客戶端傳送過來的訊息 */ @OnMessage public void onMessage(String message, Session session) { log.info("使用者訊息:" + analysisTaskCompositeId.getUserId() + ", 報文:" + message); if (StringUtils.isEmpty(message)) return; // TODO: 接收資訊,呼叫相關服務 } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.warn("使用者錯誤:" + this.analysisTaskCompositeId.getUserId() + ",原因:" + error.getMessage()); error.printStackTrace(); } public static void sendSuccessMessage(AnalysisTaskCompositeId compositeId) throws IOException { AnalysisTaskWSSvr webSocketServer = webSocketMap.get(compositeId); webSocketServer.session.getBasicRemote().sendText(String.valueOf(compositeId)); } public static int getOnlineCount() { return onlineUserSet.size(); } public static void addOnlineCount(String userId) { if (!onlineUserSet.contains(userId)) { onlineUserSet.add(userId); } } public static void subOnlineCount(String userId) { if (onlineUserSet.contains(userId)) { onlineUserSet.remove(userId); } } }
4、監聽佇列,消費訊息
構建訊息監聽器,同時呼叫相關功能處理訊息,這裏有多種方式可以選擇:
1、採用Direct訂閱模型,不同的訊息被不同的佇列進行消費;
2、採用簡單佇列模式,生產者生產的訊息採用同一個方式進行封裝,並標註好訊息型別,監聽器接收到訊息後會根據訊息型別分發給不同的websocket連線。
@Slf4j @Configuration @RequiredArgsConstructor public class AnalysisTaskProgressListener { /** * 監聽任務進度狀態 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "analysis.queue1", durable = "true"), exchange = @Exchange(name = "analysisTask", type = ExchangeTypes.DIRECT), key = "task1" )) public void listenTaskProgress(String msg) { executeTask(msg); } @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { executeTask(msg); } ``` private static void executeTask(String msg) { log.info("監聽到訊息" + msg); JSONObject jsonMsg = JSON.parseObject(msg); // 對訊息型別做判斷 AnalysisTaskWSSvr.sendProgressMessage( jsonMsg.getString("userId"), jsonMsg.getString("sessionId"), jsonMsg.getString("taskId"), jsonMsg.getString("taskType"), jsonMsg.getFloatValue("progress") ); } }
5、使用vue3構建WebSocket連線
這裏使用vue3構建前端介面,並建立WebSocket連線
WebSocket連線:ws://localhost:8889/webSocket/analysisTask/#{userId}
export default class SocketService { static instance = null; static get Instance() { if (!this.instance) { this.instance = new SocketService(); } return this.instance; } // 和服務端連線的socket物件 ws = null; // 儲存回撥函式 callBackMapping = {}; // 標識是否連線成功 connected = false; // 記錄重試的次數 sendRetryCount = 0; // 重新連線嘗試的次數 connectRetryCount = 0; // 定義連線伺服器的方法 connect() { // 連線伺服器 if (!window.WebSocket) { return console.log('您的瀏覽器不支援WebSocket'); } // let token = $.cookie('123'); // let token = '4E6EF539AAF119D82AC4C2BC84FBA21F'; let url = 'ws://localhost:8889/webSocket/analysisTask/001'; this.ws = new WebSocket(url); // 連線成功的事件 this.ws.onopen = () => { console.log('連線服務端成功了'); this.connected = true; // 重置重新連線的次數 this.connectRetryCount = 0; }; // 1.連線服務端失敗 // 2.當連線成功之後, 伺服器關閉的情況 this.ws.onclose = () => { console.log('連線服務端失敗'); this.connected = false; this.connectRetryCount++; setTimeout(() => { this.connect(); }, 500 * this.connectRetryCount); }; // 得到服務端傳送過來的資料 this.ws.onmessage = msg => { console.log(msg.data, '從服務端獲取到了資料'); }; } // 回撥函式的註冊 registerCallBack(socketType, callBack) { this.callBackMapping[socketType] = callBack; } // 取消某一個回撥函式 unRegisterCallBack(socketType) { this.callBackMapping[socketType] = null; } // 傳送資料的方法 send(data) { // 判斷此時此刻有沒有連線成功 if (this.connected) { this.sendRetryCount = 0; try { this.ws.send(JSON.stringify(data)); } catch (e) { this.ws.send(data); } } else { this.sendRetryCount++; setTimeout(() => { this.send(data); }, this.sendRetryCount * 500); } } }
<script setup> import SocketService from "../websocket/SocketService.js" import { reactive } from "vue"; const data = reactive({ socketServe: SocketService.Instance, }); const sendData = () => { data.socketServe.send({ "type": "SoilFertilityAnalysisTask", "taskId": "002" }); console.log('傳送資料'); }; const buildWS = () => { SocketService.Instance.connect(); data.socketServe = SocketService.Instance; data.socketServe.registerCallBack('callback1', data.socketServe); setTimeout(() => { sendData() }, 1000) } </script> <template> <div> <button @click="buildWS">執行</button> </div> </template> <style scoped> .read-the-docs { color: #888; } </style>
效果展示如下: