前言
最近主要用到的技術比較有意思的是 mqtt
+ webRTC
進行 音視訊通話,由於之前對這塊接觸的比較少,所以經過這段時間的學習和使用,也算是對這塊有個大概的瞭解,這裏記錄下學習使用過程中出現的一些問題和實現思路,希望能對你有幫助。
音視訊通話
這裏採用的方案是透過webRTC
進行實現,主要實現方法是先各自拿到雙方的 MediaStream
,然後透過信令伺服器交換SDP
和內網穿透,最後建立通訊實現音影片實時通話;其中webRTC
還可用來獲取本機IP
地址資訊等。信令伺服器一般使用的是webSocket
,mqtt.js
等實時方案,由於我們專案使用場景偏loT
,所以這裏採用mqtt.js
來實現信令伺服器進行訊息傳送和接收。
WebRTC(Web 實時通訊)是一種使 Web 應用程式和站點能夠捕獲和選擇性地流式傳輸音訊或影片媒體,以及在瀏覽器之間交換任意資料的而無需中介軟體的技術。WebRTC 的一系列標準使得在不需要使用者安裝外掛或任何其他第三方軟體的情況下,可以實現點對點資料共享和電話會議。
mqtt封裝整合
mqtt基本介紹
MQTT(Message Queuing Telemetry Transport)是一種輕量級、基於釋出-訂閱模式的訊息傳輸協議,適用於資源受限的裝置和低頻寬、高延遲或不穩定的網路環境。它在物聯網應用中廣受歡迎,能夠實現感測器、執行器和其它裝置之間的高效通訊。
mqtt
常用於物聯網,車聯網,智慧xx等領域,基於釋出訂閱模式,比較輕量簡單。
特點
開放訊息協議,簡單易實現
釋出訂閱模式,一對多訊息釋出
基於TCP/IP網路連線,提供有序,無損,雙向連線。
1位元組固定報頭,2位元組心跳報文,最小化傳輸開銷和協議交換,有效減少網路流量。
訊息QoS支援,可靠傳輸保證
常用API
mqtt.connect([url], options) 連線到
mqtt
,其中mqtt
options
預設引數如下:
{ keepalive: 60, reschedulePings: true, protocolId: "MQTT", protocolVersion: 4, reconnectPeriod: 1e3, connectTimeout: 30 * 1e3, clean: true, resubscribe: true, writeCache: true, timerVariant: "auto" }
這裏介紹下options
常用屬性:
clientId
客戶端id,預設mqttjs_${Math.random().toString(16).substr(2, 8)}
reschedulePings
心跳檢測,預設true
keepalive
心跳檢測時長預設60秒
,設定為0
則不開啟檢測clean
斷開是否清除回話,預設true
reconnectPeriod
斷開重連時長,預設1000ms
,設定為0
則不自動重連connectTimeout
連線超時時長,預設30秒
timerVariant
設定使用定時器型別,預設auto
,自動檢測適合的定時器型別,可選worker
、native
,設定worker
將使用web workers
來執行定時器,keepalive
,reconnectPeriod
等需要定時器的地方,都是透過這個地方設定使用對應的定時器。
connect
方法會返回一個client
客戶端例項,後續監聽訊息等都是透過客戶端例項實現;下面列舉一些client
常見的監聽和方法。
監聽:
connect
當連線到mqtt時觸發
client.on("connect", () => { console.log(`mqtt Connection succeeded!`); });
disconnect
當客戶端主動呼叫end
方法或斷網等導致連線斷開的時候觸發
client.on("disconnect", () => { console.log(`mqtt disconenct succeeded!`); });
close
當客戶端主動呼叫end(true)
方法、斷網、連線超時等導致連線斷開的時候觸發,發生在disconnect
後,釋放資源時觸發
client.on("close", () => { console.log(`mqtt closed!`); });
reconnect
當客戶端重連的時候觸發
client.on("reconnect", () => { console.log(`mqtt reconnection succeeded!`); });
error
當客戶端無法連線、連線超時、解析錯誤等觸發
client.on("error", (error) => { console.log(`mqtt Connection failed!`,error); });
message
當收到釋出的訊息觸發,有三個引數 topic
,message
, packet
;topic
為釋出的主題,message
為訊息內容,注意mqtt協議
中message
是透過位元組流的形式傳遞的,所以拿到值之後需要透過toString
或者JSON.parse
進行轉換,packet
為mqtt
報文資訊,主要包含qos
、topic
、payload
等。
client.on("connect", (topic,message,packet) => { console.log(topic); console.log(message.toString()); });
方法:
subscribe
訂閱某個主題/多個主題,有三個引數topic
,options
,callback
client.subscribe("/test/topic",{qos:0},(err, granted)=>{ //err為訂閱失敗的錯誤資訊 //granted為訂閱的資訊,topic,qos等 })
其中qos
引數需要注意下,MQTT
定義了三個 QoS
等級,分別為:
0,最多交付一次,傳送完不用等待對方是否確認收到,也不會儲存當前傳送的資訊和重發,訊息可能會丟失。
1,至少交付一次,傳送完需要等對方確認收到後纔會停止傳送,可能會重複。
2,只交付一次,保證只會收到一次,不會重複。
unsubscribe
取消訂閱某個主題/多個主題,有三個引數topic
,options
,callback
client.unsubscribe("/test/topic",(err)=>{ //err為取消訂閱失敗的錯誤資訊 })
publish
publish
釋出訊息,有四個引數topic
、payload
、options
、callback
,其中topic
為釋出的主題,payload
為傳送的引數,必須是String
或者Buffer
,options
可選配置例如qos
等引數。
client.publish("/test/topic",JSON.stringify({text:"hello"}),{qos:0},(err)=>{ if(err){ console.log(err) } })
end
關閉mqtt
連線,有三個引數force
、options
、callback
,其中force
若傳參為true
,則立即關閉客戶端。
client.end(true)
mqtt基礎封裝
爲了保證全域性只有唯一一個mqtt例項,所以這裏定義一個簡單的class MQTTServer
。
import mqtt from "mqtt"; class MQTTServer{ constructor(url, options = {}) { this.url = url; this.mqttClient = null; this.clientId = null; this.options = { clean: true, // true: 清除會話, false: 保留會話 connectTimeout: 2000, // 超時時間 毫秒 ...options, }; } isConnected() { return this.mqttClient?.connected??false; } connect(clientId) { this.clientId =`${clientId || new Date().getTime()}_${Math.round(Math.random() * 100000)}` // 連線到mqtt this.mqttClient = mqtt.connect(this.url, {...this.options,clientId:this.clientId}); // 監聽連線成功 this.mqttClient.on("connect", () => { console.log(`mqtt ${this.url} Connection succeeded!`); }); // 監聽錯誤 this.mqttClient.on("error", (error) => { console.log("mqtt Connection failed", error); }); // 監聽重連 this.mqttClient.on("reconnect", () => { console.log("mqtt reconnect"); }); // 監聽斷開連線 this.mqttClient.on("disconnect",()=>{ console.log("mqtt disconnect"); }); // 監聽關閉 this.mqttClient.on("close",()=>{ console.log("mqtt close",); }); } disconnect() { this.mqttClient.end(true, () => { this.mqttClient = null; console.log(`mqtt Successfully disconnected at ${this.url}`); }); } subscribe(topics) { this.mqttClient.subscribe(topics, { qos: 0 }, (error, granted) => { if (error) { console.error("mqtt Subscribe to topic error", error); } }); } unsubscribe(topics) { this.mqttClient.unsubscribe(topics, (error) => { if (error) { console.log("mqtt Unsubscribe to topic error", error); } }); } publish(topic, payload = {}, qos = 0) { this.mqttClient.publish(topic, JSON.stringify(payload), qos, (error) => { if (error) { console.log(`mqtt Publish topic ${topic} error`, error); } }); } }
對外暴露:
const mqttServer = new MQTTServer(`ws://localhost:9001`); function connectMqtt(clientId){ mqttServer.connect(clientId); } export {mqttServer,mqttConnect}
使用:
import {mqttServer,mqttConnect} from "@/utils/mqttServer" //登入完成後呼叫 mqttConnect function login(){ ...登入邏輯 mqttConnect(userInfo.userId); //訂閱訊息 mqttServer.subscribe("/test/aaa") mqttServer.mqttClient.on("message",(topic, message)=>{ console.log(topic) console.log(message.toString()) }) }
存在的問題
到這裏基礎封裝已經完成了,可以進行基礎的使用,但是,我個人使用中發現一些問題:
mqtt
自動重連機制沒有限制次數,導致無限重連沒有對斷網等特殊情況做一個容錯處理
頁面切換到後臺時,可能會導致
mqtt
斷開訂閱,傳送,監聽訊息不夠優雅
可選:針對訊息釋出主題和資訊,可以考慮儲存起來,每次釋出成功後才清除,若遇到斷網等情況,重連後將未釋出的訊息重新發布
最佳化調整
針對上面存在的一些問題,重新對mqtt
的封裝進行一些調整。
mqtt重連機制最佳化
由於mqtt
重連觸發後會一直自動重連,且沒有次數限制引數,而是會一直根據設定的時長自動重連;但在理想情況下,斷開連線後,進行自動重連n
次,如果還是連線不上,就不繼續連線。
1.修改配置
設定mqtt
的reconnectPeriod
為 0,去掉mqtt
的內部重連機制。
2.自定義重連邏輯
假設定義最大重連次數 maxReconnectCount
= 20,當前重連次數 reconnectCount
= 0。每次斷開連線後(需要過濾掉主動關閉導致的斷開連線)失敗後當前連線次數 +1,如果小於最大連線次數,就重新呼叫連線方法,連線成功後重置當前連線次數
修改監聽close
的方法,為什麼這裏選擇close
裡面去進行重連的操作,不選擇disconnect
,因為連線超時,連線錯誤等情況下,disconnect
不會觸發,但close
會觸發。
// 監聽連線成功 this.mqttClient.on("connect", this.handleConnectEvent.bind(this)); // 監聽關閉 this.mqttClient.on("close", this.handleCloseEvent.bind(this)); // 監聽關閉處理函式 handleCloseEvent() { if (this.disconnectReason === "disconnect") return; // 判斷當前連線次數是否超過最大次數 if (this.reconnectCount >= this.maxReconnectCount) { console.error("mqtt超過最大重連次數"); clearTimeout(this.reconnectTimer); // 重置重連計數器 this.reconnectCount = 0; this.mqttClient && this.mqttClient.end(); return; } console.log("正在重連,剩餘重連次數:", this.maxReconnectCount - this.reconnectCount); this.reconnectCount++; this.reconnectTimer && clearTimeout(this.reconnectTimer); this.reconnectTimer = setTimeout(() => { this.connect(); }, 3000); //這裏的重連間隔時長可以加一個配置項去替代 } // 監聽連線成功處理函式 handleConnectEvent(){ // 重置重連計數器 this.reconnectCount = 0; if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } }
異常斷線處理
針對斷網,可以對online
和offline
進行監聽,當斷線或重新線上後可以進行一些處理。
// 監聽網路狀態的變化 window.addEventListener("online", this.handleOnlineChange); window.addEventListener("offline", this.handleOnlineChange); // 監聽視窗顯示是否可見 window.addEventListener("visibilitychange", this.handleVisibilityChange); handleOnlineChange(val){ const online= typeof val === "boolean" ? val : val.target.online; if(online){ if (!mqttServer.isConnected()) { this.connect() } } } handleVisibilityChange(){ if (!document.hidden) { // 視窗重新可見了,判斷是否需要重連 if (!mqttServer.isConnected()) { this.connect() } } }
合理管理訂閱傳送
如果按照我們之前封裝的效果,使用mqtt
是這樣的:
import {mqttServer,mqttConnect} from "@/utils/mqttServer" mqttServer.subscribe("/test/aaa") mqttServer.subscribe("/test/bbb") mqttServer.subscribe("/test/ccc") mqttServer.mqttClient.on("message",(topic, message)=>{ console.log(topic) console.log(message.toString()) }) mqttServer.publish("/test/aaa",{content:"hello"}) mqttServer.publish("/test/bbb",{content:"test123"}) onBeforeUnmount(()=>{ mqttServer.unSubscribe("/test/aaa") mqttServer.unSubscribe("/test/bbb") mqttServer.unSubscribe("/test/ccc") })
乍一看也能用,但是如果基本上每個頁面都需要使用呢?或者臨時調整了某個mqtt
訂閱地址,再或者我現在想傳送publish
時,攜帶當前登入人資訊,這樣管理起來就比較麻煩了,所以這裏我們可以建立一個集中管理方法,對所有傳送,監聽都進行一個管理。
爲了方便使用,這裏定義一個useMqtt
檔案,他需要包含以下功能:
進入頁面自動訂閱和分發監聽觸發的函式
離開頁面時自動取消訂閱
暴露
publish
方法,傳送可攜帶公用引數可選:多個頁面共享同一狀態
//useMqtt.js import {mqttServer} from "@/utils/mqttServer" export const useMqtt = (mqttHooks = {}) => { // 主題Map const topicMap = new Map(); onMounted(subscribeTopic) onBeforeUnmount(unsubscribeTopic) function subscribeTopic(){ Object.keys(mqttHooks).forEach((topic) => { if (mqttServer.isConnected()) { subscribe(topic, mqttHooks[topic]); } }); mqttServer.mqttClient.on("message", handleMqttMessage); } function unsubscribeTopic(){ Array.from(topicMap).forEach((item) => { if (mqttServer.isConnected()) { unsubscribe(item[0]); } }); mqttServer.mqttClient?.off?.("message", handleMqttMessage); } // 訊息監聽處理 function handleMqttMessage(messageTopic, message) { if (topicMap.has(messageTopic)) { const objectData = JSON.parse(message); const callback = topicMap.get(messageTopic); if (typeof callback === "function") { callback(objectData); } } } // 傳送訊息 function publish(topic, message) { mqttServer.publish(topic, { ...message, // ...一些公用的引數 }); } //訂閱某個主題 function subscribe(topic, callback) { mqttServer.subscribe(topic); topicMap.set(topic, callback); } //取消訂閱某個主題 function unsubscribe(topic) { mqttServer.unsubscribe(topic); topicMap.delete(topic); } return{ subscribe, unsubscribe, publish } })
同時,爲了對topic
主題進行統一管理,定義topic
常量,方便使用,如果使用ts
,可透過enum
來進行實現。
export const TOPIC_TYPE={ A:"/test/aaa", B:"/test/bbb" C:"/test/ccc" }
頁面使用:
import {useMqtt} from "./useMqtt" import {TOPIC_TYPE} from "@/constant/topic" const {subscribe,unsubscribe,publish } = useMqtt({ [TOPIC_TYPE.A]: handleMqttTestAMessage, [TOPIC_TYPE.B]: handleMqttTestBMessage, [TOPIC_TYPE.C]: handleMqttTestCMessage, }); function handleMqttTestAMessage(){} function handleMqttTestBMessage(){} function handleMqttTestCMessage(){} // 如果有些需要後期訂閱的主題,可以使用 subscribe後續訂閱 subscribe(TOPIC_TYPE.D,handleMqttTestDMessage) //傳送訊息 publish(TOPIC_TYPE.A,{content:"hello"})
重連繼續傳送(可選)
考慮到有這樣的場景,本來準備傳送10條訊息,但是由於這個時候網路波動了一下,只發送出3條,剩餘7條未傳送,正常情況下肯定是沒有了。有兩個辦法:
設定
qos
為2
,保證只收到一次新增一個
佇列-消費
的機制,儲存每次傳送的請求,消費後就刪除
這裏我選擇了新增一個佇列,是因為qos
雖然設定為2
,可以保證該請求會被收到,但是他會一直向對方確認你是否收到了我的請求,我現在可能需要的場景是重連後再進行該操作,所以這裏採用了新增佇列機制。
定義publishList
欄位,用於儲存待消費的資料。修改連線成功的事件監聽。
// 監聽連線成功 this.mqttClient.on("connect", this.handleConnectEvent.bind(this)); // 監聽連線成功處理函式 handleConnectEvent(){ //...重置重連計數器 //傳送未消費的資料 if (this.publishList.length > 0) { const queue = this.publishList.slice(); this.publishList = []; for (let i = 0; i < queue.length; i++) { const { topic, payload, qos } = queue[i]; this.publish(topic, payload, qos); } } }
修改publish
傳送方法:
publish(topic, payload = {}, qos = 0) { if (this.isConnected()) { this.mqttClient.publish(topic, JSON.stringify(payload), qos); } else if (this.reconnectTimer) { // 如果當前處於重連狀態,還在呼叫publish // 將未傳送出去的訊息存起來,連線成功後在傳送 // 可選 限制10條 this.publishList = this.publishList.slice(-9); this.publishList.push({ topic, payload, qos, }); } }
最後
到這裏mqtt
的封裝整合已經完成了,由於篇幅有限,這裏就不貼原始碼了,有需要的後續可以開放原始碼,希望這個文章能對你有幫助。