切換語言為:簡體

偏loT場景 mqtt + webRTC 實現音視訊通話

  • 爱糖宝
  • 2024-09-19
  • 2061
  • 0
  • 0

前言

最近主要用到的技術比較有意思的是 mqtt + webRTC 進行 音視訊通話,由於之前對這塊接觸的比較少,所以經過這段時間的學習和使用,也算是對這塊有個大概的瞭解,這裏記錄下學習使用過程中出現的一些問題和實現思路,希望能對你有幫助。

音視訊通話

這裏採用的方案是透過webRTC進行實現,主要實現方法是先各自拿到雙方的 MediaStream,然後透過信令伺服器交換SDP和內網穿透,最後建立通訊實現音影片實時通話;其中webRTC還可用來獲取本機IP地址資訊等。信令伺服器一般使用的是webSocketmqtt.js  等實時方案,由於我們專案使用場景偏loT,所以這裏採用mqtt.js來實現信令伺服器進行訊息傳送和接收。

WebRTC(Web 實時通訊)是一種使 Web 應用程式和站點能夠捕獲和選擇性地流式傳輸音訊或影片媒體,以及在瀏覽器之間交換任意資料的而無需中介軟體的技術。WebRTC 的一系列標準使得在不需要使用者安裝外掛或任何其他第三方軟體的情況下,可以實現點對點資料共享和電話會議。

mqtt封裝整合

mqtt基本介紹

MQTT(Message Queuing Telemetry Transport)是一種輕量級、基於釋出-訂閱模式的訊息傳輸協議,適用於資源受限的裝置和低頻寬、高延遲或不穩定的網路環境。它在物聯網應用中廣受歡迎,能夠實現感測器、執行器和其它裝置之間的高效通訊。

mqtt常用於物聯網,車聯網,智慧xx等領域,基於釋出訂閱模式,比較輕量簡單。

特點

  • 開放訊息協議,簡單易實現

  • 釋出訂閱模式,一對多訊息釋出

  • 基於TCP/IP網路連線,提供有序,無損,雙向連線。

  • 1位元組固定報頭,2位元組心跳報文,最小化傳輸開銷和協議交換,有效減少網路流量。

  • 訊息QoS支援,可靠傳輸保證

常用API

  • mqtt.connect([url], options)  連線到mqtt,其中mqtt options預設引數如下:

{
    keepalive: 60,
    reschedulePings: true,
    protocolId: "MQTT",
    protocolVersion: 4,
    reconnectPeriod: 1e3,
    connectTimeout: 30 * 1e3,
    clean: true,
    resubscribe: true,
    writeCache: true,
    timerVariant: "auto"
}

這裏介紹下options常用屬性:

  • clientId         客戶端id,預設mqttjs_${Math.random().toString(16).substr(2, 8)}

  • reschedulePings  心跳檢測,預設true

  • keepalive        心跳檢測時長預設60秒,設定為0則不開啟檢測

  • clean            斷開是否清除回話,預設true

  • reconnectPeriod  斷開重連時長,預設1000ms,設定為0則不自動重連

  • connectTimeout   連線超時時長,預設30秒

  • timerVariant     設定使用定時器型別,預設auto,自動檢測適合的定時器型別,可選workernative,設定worker將使用web workers來執行定時器,keepalive,reconnectPeriod等需要定時器的地方,都是透過這個地方設定使用對應的定時器。

connect方法會返回一個client客戶端例項,後續監聽訊息等都是透過客戶端例項實現;下面列舉一些client常見的監聽和方法。

監聽:

  • connect 當連線到mqtt時觸發

client.on("connect", () => {
  console.log(`mqtt Connection succeeded!`);
});

  • disconnect 當客戶端主動呼叫end方法或斷網等導致連線斷開的時候觸發

client.on("disconnect", () => {
  console.log(`mqtt disconenct succeeded!`);
});

  • close 當客戶端主動呼叫end(true)方法、斷網、連線超時等導致連線斷開的時候觸發,發生在disconnect後,釋放資源時觸發

client.on("close", () => {
  console.log(`mqtt closed!`);
});

  • reconnect 當客戶端重連的時候觸發

client.on("reconnect", () => {
  console.log(`mqtt reconnection succeeded!`);
});

  • error 當客戶端無法連線、連線超時、解析錯誤等觸發

client.on("error", (error) => {
  console.log(`mqtt Connection failed!`,error);
});

  • message

當收到釋出的訊息觸發,有三個引數 topic,message, packettopic為釋出的主題,message為訊息內容,注意mqtt協議message是透過位元組流的形式傳遞的,所以拿到值之後需要透過toString或者JSON.parse進行轉換,packetmqtt報文資訊,主要包含qostopicpayload等。

client.on("connect", (topic,message,packet) => {
  console.log(topic);
  console.log(message.toString());
});

方法:

  • subscribe 訂閱某個主題/多個主題,有三個引數topic,options,callback

client.subscribe("/test/topic",{qos:0},(err, granted)=>{
  //err為訂閱失敗的錯誤資訊
  //granted為訂閱的資訊,topic,qos等
})

其中qos引數需要注意下,MQTT 定義了三個 QoS等級,分別為:

  • 0,最多交付一次,傳送完不用等待對方是否確認收到,也不會儲存當前傳送的資訊和重發,訊息可能會丟失。

  • 1,至少交付一次,傳送完需要等對方確認收到後纔會停止傳送,可能會重複。

  • 2,只交付一次,保證只會收到一次,不會重複。

  • unsubscribe 取消訂閱某個主題/多個主題,有三個引數topic,options,callback

client.unsubscribe("/test/topic",(err)=>{
  //err為取消訂閱失敗的錯誤資訊
})

  • publish

publish 釋出訊息,有四個引數topicpayloadoptionscallback,其中topic為釋出的主題,payload為傳送的引數,必須是String或者Buffer,options可選配置例如qos等引數。

client.publish("/test/topic",JSON.stringify({text:"hello"}),{qos:0},(err)=>{
   if(err){
      console.log(err)
   }
})

  • end 關閉mqtt連線,有三個引數forceoptionscallback,其中force若傳參為true,則立即關閉客戶端。

client.end(true)

mqtt基礎封裝

爲了保證全域性只有唯一一個mqtt例項,所以這裏定義一個簡單的class MQTTServer

import mqtt from "mqtt";

class MQTTServer{
   constructor(url, options = {}) {
    this.url = url;
    this.mqttClient = null;
    this.clientId = null;
    this.options = {
      clean: true, // true: 清除會話, false: 保留會話
      connectTimeout: 2000, // 超時時間 毫秒
      ...options,
    };
  }
  
  isConnected() {
    return this.mqttClient?.connected??false;
  }
  
  connect(clientId) {
    this.clientId =`${clientId || new Date().getTime()}_${Math.round(Math.random() * 100000)}`
    // 連線到mqtt
    this.mqttClient = mqtt.connect(this.url, {...this.options,clientId:this.clientId});

    // 監聽連線成功
    this.mqttClient.on("connect", () => {
      console.log(`mqtt ${this.url} Connection succeeded!`);
    });
    
    // 監聽錯誤
    this.mqttClient.on("error", (error) => {
      console.log("mqtt Connection failed", error);
    });
    
    // 監聽重連
    this.mqttClient.on("reconnect", () => {
      console.log("mqtt reconnect");
    });
    
    // 監聽斷開連線
    this.mqttClient.on("disconnect",()=>{
      console.log("mqtt disconnect");
    });

    // 監聽關閉
    this.mqttClient.on("close",()=>{
      console.log("mqtt close",);
    });
  }
  
  disconnect() {
      this.mqttClient.end(true, () => {
        this.mqttClient = null;
        console.log(`mqtt Successfully disconnected at ${this.url}`);
      });
  }
  
  subscribe(topics) {
      this.mqttClient.subscribe(topics, { qos: 0 }, (error, granted) => {
        if (error) {
          console.error("mqtt Subscribe to topic error", error);
        }
      });
  }
  
  unsubscribe(topics) {
      this.mqttClient.unsubscribe(topics, (error) => {
        if (error) {
          console.log("mqtt Unsubscribe to topic error", error);
        } 
      });
  }
  
  publish(topic, payload = {}, qos = 0) {
      this.mqttClient.publish(topic, JSON.stringify(payload), qos, (error) => {
        if (error) {
          console.log(`mqtt Publish topic ${topic} error`, error);
        }
      });
  }
}

對外暴露:

const mqttServer = new MQTTServer(`ws://localhost:9001`);

function connectMqtt(clientId){
   mqttServer.connect(clientId);
}

export {mqttServer,mqttConnect}

使用:

import {mqttServer,mqttConnect} from "@/utils/mqttServer"

//登入完成後呼叫 mqttConnect

function login(){
  ...登入邏輯
  mqttConnect(userInfo.userId);
  
  //訂閱訊息
  mqttServer.subscribe("/test/aaa")
  
  mqttServer.mqttClient.on("message",(topic, message)=>{
      console.log(topic)
      console.log(message.toString())
  })
}

存在的問題

到這裏基礎封裝已經完成了,可以進行基礎的使用,但是,我個人使用中發現一些問題:

  1. mqtt自動重連機制沒有限制次數,導致無限重連

  2. 沒有對斷網等特殊情況做一個容錯處理

  3. 頁面切換到後臺時,可能會導致mqtt斷開

  4. 訂閱,傳送,監聽訊息不夠優雅

  5. 可選:針對訊息釋出主題和資訊,可以考慮儲存起來,每次釋出成功後才清除,若遇到斷網等情況,重連後將未釋出的訊息重新發布

最佳化調整

針對上面存在的一些問題,重新對mqtt的封裝進行一些調整。

mqtt重連機制最佳化

由於mqtt重連觸發後會一直自動重連,且沒有次數限制引數,而是會一直根據設定的時長自動重連;但在理想情況下,斷開連線後,進行自動重連n次,如果還是連線不上,就不繼續連線。

1.修改配置

設定mqttreconnectPeriod0,去掉mqtt的內部重連機制。

2.自定義重連邏輯

假設定義最大重連次數 maxReconnectCount= 20,當前重連次數 reconnectCount = 0。每次斷開連線後(需要過濾掉主動關閉導致的斷開連線)失敗後當前連線次數 +1,如果小於最大連線次數,就重新呼叫連線方法,連線成功後重置當前連線次數

修改監聽close的方法,為什麼這裏選擇close裡面去進行重連的操作,不選擇disconnect,因為連線超時,連線錯誤等情況下,disconnect不會觸發,但close會觸發。

// 監聽連線成功
this.mqttClient.on("connect", this.handleConnectEvent.bind(this));
// 監聽關閉
this.mqttClient.on("close", this.handleCloseEvent.bind(this));

// 監聽關閉處理函式
handleCloseEvent() {
    if (this.disconnectReason === "disconnect") return;
    // 判斷當前連線次數是否超過最大次數
    if (this.reconnectCount >= this.maxReconnectCount) {
      console.error("mqtt超過最大重連次數");
      clearTimeout(this.reconnectTimer);
      // 重置重連計數器
      this.reconnectCount = 0;
      this.mqttClient && this.mqttClient.end();
      return;
    }

    console.log("正在重連,剩餘重連次數:", this.maxReconnectCount - this.reconnectCount);
    this.reconnectCount++;
    this.reconnectTimer && clearTimeout(this.reconnectTimer);
    this.reconnectTimer = setTimeout(() => {
      this.connect();
    }, 3000);  //這裏的重連間隔時長可以加一個配置項去替代
 }
 
 // 監聽連線成功處理函式
 handleConnectEvent(){
    // 重置重連計數器
    this.reconnectCount = 0;
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }
 }

異常斷線處理

針對斷網,可以對onlineoffline進行監聽,當斷線或重新線上後可以進行一些處理。

// 監聽網路狀態的變化
window.addEventListener("online", this.handleOnlineChange);
window.addEventListener("offline", this.handleOnlineChange);

// 監聽視窗顯示是否可見
window.addEventListener("visibilitychange", this.handleVisibilityChange);

handleOnlineChange(val){
    const online= typeof val === "boolean" ? val : val.target.online;
    if(online){
       if (!mqttServer.isConnected()) {
           this.connect()
        }
    }
}

handleVisibilityChange(){
  if (!document.hidden) {
    // 視窗重新可見了,判斷是否需要重連
     if (!mqttServer.isConnected()) {
       this.connect()
    }
  }
}

合理管理訂閱傳送

如果按照我們之前封裝的效果,使用mqtt是這樣的:

import {mqttServer,mqttConnect} from "@/utils/mqttServer"

mqttServer.subscribe("/test/aaa")
mqttServer.subscribe("/test/bbb")
mqttServer.subscribe("/test/ccc")

mqttServer.mqttClient.on("message",(topic, message)=>{
  console.log(topic)
  console.log(message.toString())
})

mqttServer.publish("/test/aaa",{content:"hello"})
mqttServer.publish("/test/bbb",{content:"test123"})

onBeforeUnmount(()=>{
  mqttServer.unSubscribe("/test/aaa")
  mqttServer.unSubscribe("/test/bbb")
  mqttServer.unSubscribe("/test/ccc")
})

乍一看也能用,但是如果基本上每個頁面都需要使用呢?或者臨時調整了某個mqtt訂閱地址,再或者我現在想傳送publish時,攜帶當前登入人資訊,這樣管理起來就比較麻煩了,所以這裏我們可以建立一個集中管理方法,對所有傳送,監聽都進行一個管理。

爲了方便使用,這裏定義一個useMqtt檔案,他需要包含以下功能:

  • 進入頁面自動訂閱和分發監聽觸發的函式

  • 離開頁面時自動取消訂閱

  • 暴露publish方法,傳送可攜帶公用引數

  • 可選:多個頁面共享同一狀態

//useMqtt.js
import {mqttServer} from "@/utils/mqttServer"

export const useMqtt = (mqttHooks = {}) => {
   // 主題Map
  const topicMap = new Map();
  
  onMounted(subscribeTopic)
  onBeforeUnmount(unsubscribeTopic)
  
  function subscribeTopic(){
    Object.keys(mqttHooks).forEach((topic) => {
      if (mqttServer.isConnected()) {
        subscribe(topic, mqttHooks[topic]);
      }
    });
     mqttServer.mqttClient.on("message", handleMqttMessage);
  }
  
  function unsubscribeTopic(){
    Array.from(topicMap).forEach((item) => {
      if (mqttServer.isConnected()) {
        unsubscribe(item[0]);
      }
    });
     mqttServer.mqttClient?.off?.("message", handleMqttMessage);
  }
  
 // 訊息監聽處理
 function handleMqttMessage(messageTopic, message) {
    if (topicMap.has(messageTopic)) {
      const objectData = JSON.parse(message);
      const callback = topicMap.get(messageTopic);
      if (typeof callback === "function") {
        callback(objectData);
      }
    }
  }
  
  // 傳送訊息
  function publish(topic, message) {
    mqttServer.publish(topic, {
      ...message,
      // ...一些公用的引數
    });
  }
  
  //訂閱某個主題
  function subscribe(topic, callback) {
    mqttServer.subscribe(topic);
    topicMap.set(topic, callback);
  }

  //取消訂閱某個主題
  function unsubscribe(topic) {
    mqttServer.unsubscribe(topic);
    topicMap.delete(topic);
  }
  
  return{
    subscribe,
    unsubscribe,
    publish
  }
})

同時,爲了對topic主題進行統一管理,定義topic常量,方便使用,如果使用ts,可透過enum來進行實現。

export const TOPIC_TYPE={
  A:"/test/aaa",
  B:"/test/bbb"
  C:"/test/ccc"
}

頁面使用:

import {useMqtt} from "./useMqtt"
import {TOPIC_TYPE} from "@/constant/topic"

const {subscribe,unsubscribe,publish } = useMqtt({
  [TOPIC_TYPE.A]: handleMqttTestAMessage,
  [TOPIC_TYPE.B]: handleMqttTestBMessage,
  [TOPIC_TYPE.C]: handleMqttTestCMessage,
});

function handleMqttTestAMessage(){}

function handleMqttTestBMessage(){}

function handleMqttTestCMessage(){}

// 如果有些需要後期訂閱的主題,可以使用 subscribe後續訂閱
subscribe(TOPIC_TYPE.D,handleMqttTestDMessage)

//傳送訊息
publish(TOPIC_TYPE.A,{content:"hello"})

重連繼續傳送(可選)

考慮到有這樣的場景,本來準備傳送10條訊息,但是由於這個時候網路波動了一下,只發送出3條,剩餘7條未傳送,正常情況下肯定是沒有了。有兩個辦法:

  1. 設定qos2,保證只收到一次

  2. 新增一個佇列-消費的機制,儲存每次傳送的請求,消費後就刪除

這裏我選擇了新增一個佇列,是因為qos雖然設定為2,可以保證該請求會被收到,但是他會一直向對方確認你是否收到了我的請求,我現在可能需要的場景是重連後再進行該操作,所以這裏採用了新增佇列機制。

定義publishList欄位,用於儲存待消費的資料。修改連線成功的事件監聽。

// 監聽連線成功
this.mqttClient.on("connect", this.handleConnectEvent.bind(this));

 // 監聽連線成功處理函式
 handleConnectEvent(){
    //...重置重連計數器
    
    //傳送未消費的資料
    if (this.publishList.length > 0) {
      const queue = this.publishList.slice();
      this.publishList = [];
      for (let i = 0; i < queue.length; i++) {
        const { topic, payload, qos } = queue[i];
        this.publish(topic, payload, qos);
      }
    }
 }

修改publish傳送方法:

  publish(topic, payload = {}, qos = 0) {
    if (this.isConnected()) {
      this.mqttClient.publish(topic, JSON.stringify(payload), qos);
    } else if (this.reconnectTimer) {
      // 如果當前處於重連狀態,還在呼叫publish
      // 將未傳送出去的訊息存起來,連線成功後在傳送
      // 可選 限制10條
      this.publishList = this.publishList.slice(-9);
      this.publishList.push({
        topic,
        payload,
        qos,
      });
    }
  }

最後

到這裏mqtt的封裝整合已經完成了,由於篇幅有限,這裏就不貼原始碼了,有需要的後續可以開放原始碼,希望這個文章能對你有幫助。

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.