切换语言为:繁体

偏loT场景 mqtt + webRTC 实现音视频通话

  • 爱糖宝
  • 2024-09-19
  • 2060
  • 0
  • 0

前言

最近主要用到的技术比较有意思的是 mqtt + webRTC 进行 音视频通话,由于之前对这块接触的比较少,所以经过这段时间的学习和使用,也算是对这块有个大概的了解,这里记录下学习使用过程中出现的一些问题和实现思路,希望能对你有帮助。

音视频通话

这里采用的方案是通过webRTC进行实现,主要实现方法是先各自拿到双方的 MediaStream,然后通过信令服务器交换SDP和内网穿透,最后建立通信实现音视频实时通话;其中webRTC还可用来获取本机IP地址信息等。信令服务器一般使用的是webSocketmqtt.js  等实时方案,由于我们项目使用场景偏loT,所以这里采用mqtt.js来实现信令服务器进行消息发送和接收。

WebRTC(Web 实时通信)是一种使 Web 应用程序和站点能够捕获和选择性地流式传输音频或视频媒体,以及在浏览器之间交换任意数据的而无需中间件的技术。WebRTC 的一系列标准使得在不需要用户安装插件或任何其他第三方软件的情况下,可以实现点对点数据共享和电话会议。

mqtt封装集成

mqtt基本介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。

mqtt常用于物联网,车联网,智慧xx等领域,基于发布订阅模式,比较轻量简单。

特点

  • 开放消息协议,简单易实现

  • 发布订阅模式,一对多消息发布

  • 基于TCP/IP网络连接,提供有序,无损,双向连接。

  • 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量。

  • 消息QoS支持,可靠传输保证

常用API

  • mqtt.connect([url], options)  连接到mqtt,其中mqtt options默认参数如下:

{
    keepalive: 60,
    reschedulePings: true,
    protocolId: "MQTT",
    protocolVersion: 4,
    reconnectPeriod: 1e3,
    connectTimeout: 30 * 1e3,
    clean: true,
    resubscribe: true,
    writeCache: true,
    timerVariant: "auto"
}

这里介绍下options常用属性:

  • clientId         客户端id,默认mqttjs_${Math.random().toString(16).substr(2, 8)}

  • reschedulePings  心跳检测,默认true

  • keepalive        心跳检测时长默认60秒,设置为0则不开启检测

  • clean            断开是否清除回话,默认true

  • reconnectPeriod  断开重连时长,默认1000ms,设置为0则不自动重连

  • connectTimeout   连接超时时长,默认30秒

  • timerVariant     设置使用定时器类型,默认auto,自动检测适合的定时器类型,可选workernative,设置worker将使用web workers来执行定时器,keepalive,reconnectPeriod等需要定时器的地方,都是通过这个地方设置使用对应的定时器。

connect方法会返回一个client客户端实例,后续监听消息等都是通过客户端实例实现;下面列举一些client常见的监听和方法。

监听:

  • connect 当连接到mqtt时触发

client.on("connect", () => {
  console.log(`mqtt Connection succeeded!`);
});

  • disconnect 当客户端主动调用end方法或断网等导致连接断开的时候触发

client.on("disconnect", () => {
  console.log(`mqtt disconenct succeeded!`);
});

  • close 当客户端主动调用end(true)方法、断网、连接超时等导致连接断开的时候触发,发生在disconnect后,释放资源时触发

client.on("close", () => {
  console.log(`mqtt closed!`);
});

  • reconnect 当客户端重连的时候触发

client.on("reconnect", () => {
  console.log(`mqtt reconnection succeeded!`);
});

  • error 当客户端无法连接、连接超时、解析错误等触发

client.on("error", (error) => {
  console.log(`mqtt Connection failed!`,error);
});

  • message

当收到发布的消息触发,有三个参数 topic,message, packettopic为发布的主题,message为消息内容,注意mqtt协议message是通过字节流的形式传递的,所以拿到值之后需要通过toString或者JSON.parse进行转换,packetmqtt报文信息,主要包含qostopicpayload等。

client.on("connect", (topic,message,packet) => {
  console.log(topic);
  console.log(message.toString());
});

方法:

  • subscribe 订阅某个主题/多个主题,有三个参数topic,options,callback

client.subscribe("/test/topic",{qos:0},(err, granted)=>{
  //err为订阅失败的错误信息
  //granted为订阅的信息,topic,qos等
})

其中qos参数需要注意下,MQTT 定义了三个 QoS等级,分别为:

  • 0,最多交付一次,发送完不用等待对方是否确认收到,也不会存储当前发送的信息和重发,消息可能会丢失。

  • 1,至少交付一次,发送完需要等对方确认收到后才会停止发送,可能会重复。

  • 2,只交付一次,保证只会收到一次,不会重复。

  • unsubscribe 取消订阅某个主题/多个主题,有三个参数topic,options,callback

client.unsubscribe("/test/topic",(err)=>{
  //err为取消订阅失败的错误信息
})

  • publish

publish 发布消息,有四个参数topicpayloadoptionscallback,其中topic为发布的主题,payload为发送的参数,必须是String或者Buffer,options可选配置例如qos等参数。

client.publish("/test/topic",JSON.stringify({text:"hello"}),{qos:0},(err)=>{
   if(err){
      console.log(err)
   }
})

  • end 关闭mqtt连接,有三个参数forceoptionscallback,其中force若传参为true,则立即关闭客户端。

client.end(true)

mqtt基础封装

为了保证全局只有唯一一个mqtt实例,所以这里定义一个简单的class MQTTServer

import mqtt from "mqtt";

class MQTTServer{
   constructor(url, options = {}) {
    this.url = url;
    this.mqttClient = null;
    this.clientId = null;
    this.options = {
      clean: true, // true: 清除会话, false: 保留会话
      connectTimeout: 2000, // 超时时间 毫秒
      ...options,
    };
  }
  
  isConnected() {
    return this.mqttClient?.connected??false;
  }
  
  connect(clientId) {
    this.clientId =`${clientId || new Date().getTime()}_${Math.round(Math.random() * 100000)}`
    // 连接到mqtt
    this.mqttClient = mqtt.connect(this.url, {...this.options,clientId:this.clientId});

    // 监听连接成功
    this.mqttClient.on("connect", () => {
      console.log(`mqtt ${this.url} Connection succeeded!`);
    });
    
    // 监听错误
    this.mqttClient.on("error", (error) => {
      console.log("mqtt Connection failed", error);
    });
    
    // 监听重连
    this.mqttClient.on("reconnect", () => {
      console.log("mqtt reconnect");
    });
    
    // 监听断开连接
    this.mqttClient.on("disconnect",()=>{
      console.log("mqtt disconnect");
    });

    // 监听关闭
    this.mqttClient.on("close",()=>{
      console.log("mqtt close",);
    });
  }
  
  disconnect() {
      this.mqttClient.end(true, () => {
        this.mqttClient = null;
        console.log(`mqtt Successfully disconnected at ${this.url}`);
      });
  }
  
  subscribe(topics) {
      this.mqttClient.subscribe(topics, { qos: 0 }, (error, granted) => {
        if (error) {
          console.error("mqtt Subscribe to topic error", error);
        }
      });
  }
  
  unsubscribe(topics) {
      this.mqttClient.unsubscribe(topics, (error) => {
        if (error) {
          console.log("mqtt Unsubscribe to topic error", error);
        } 
      });
  }
  
  publish(topic, payload = {}, qos = 0) {
      this.mqttClient.publish(topic, JSON.stringify(payload), qos, (error) => {
        if (error) {
          console.log(`mqtt Publish topic ${topic} error`, error);
        }
      });
  }
}

对外暴露:

const mqttServer = new MQTTServer(`ws://localhost:9001`);

function connectMqtt(clientId){
   mqttServer.connect(clientId);
}

export {mqttServer,mqttConnect}

使用:

import {mqttServer,mqttConnect} from "@/utils/mqttServer"

//登录完成后调用 mqttConnect

function login(){
  ...登录逻辑
  mqttConnect(userInfo.userId);
  
  //订阅消息
  mqttServer.subscribe("/test/aaa")
  
  mqttServer.mqttClient.on("message",(topic, message)=>{
      console.log(topic)
      console.log(message.toString())
  })
}

存在的问题

到这里基础封装已经完成了,可以进行基础的使用,但是,我个人使用中发现一些问题:

  1. mqtt自动重连机制没有限制次数,导致无限重连

  2. 没有对断网等特殊情况做一个容错处理

  3. 页面切换到后台时,可能会导致mqtt断开

  4. 订阅,发送,监听消息不够优雅

  5. 可选:针对消息发布主题和信息,可以考虑存储起来,每次发布成功后才清除,若遇到断网等情况,重连后将未发布的消息重新发布

优化调整

针对上面存在的一些问题,重新对mqtt的封装进行一些调整。

mqtt重连机制优化

由于mqtt重连触发后会一直自动重连,且没有次数限制参数,而是会一直根据设定的时长自动重连;但在理想情况下,断开连接后,进行自动重连n次,如果还是连接不上,就不继续连接。

1.修改配置

设置mqttreconnectPeriod0,去掉mqtt的内部重连机制。

2.自定义重连逻辑

假设定义最大重连次数 maxReconnectCount= 20,当前重连次数 reconnectCount = 0。每次断开连接后(需要过滤掉主动关闭导致的断开连接)失败后当前连接次数 +1,如果小于最大连接次数,就重新调用连接方法,连接成功后重置当前连接次数

修改监听close的方法,为什么这里选择close里面去进行重连的操作,不选择disconnect,因为连接超时,连接错误等情况下,disconnect不会触发,但close会触发。

// 监听连接成功
this.mqttClient.on("connect", this.handleConnectEvent.bind(this));
// 监听关闭
this.mqttClient.on("close", this.handleCloseEvent.bind(this));

// 监听关闭处理函数
handleCloseEvent() {
    if (this.disconnectReason === "disconnect") return;
    // 判断当前连接次数是否超过最大次数
    if (this.reconnectCount >= this.maxReconnectCount) {
      console.error("mqtt超过最大重连次数");
      clearTimeout(this.reconnectTimer);
      // 重置重连计数器
      this.reconnectCount = 0;
      this.mqttClient && this.mqttClient.end();
      return;
    }

    console.log("正在重连,剩余重连次数:", this.maxReconnectCount - this.reconnectCount);
    this.reconnectCount++;
    this.reconnectTimer && clearTimeout(this.reconnectTimer);
    this.reconnectTimer = setTimeout(() => {
      this.connect();
    }, 3000);  //这里的重连间隔时长可以加一个配置项去替代
 }
 
 // 监听连接成功处理函数
 handleConnectEvent(){
    // 重置重连计数器
    this.reconnectCount = 0;
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }
 }

异常断线处理

针对断网,可以对onlineoffline进行监听,当断线或重新在线后可以进行一些处理。

// 监听网络状态的变化
window.addEventListener("online", this.handleOnlineChange);
window.addEventListener("offline", this.handleOnlineChange);

// 监听窗口显示是否可见
window.addEventListener("visibilitychange", this.handleVisibilityChange);

handleOnlineChange(val){
    const online= typeof val === "boolean" ? val : val.target.online;
    if(online){
       if (!mqttServer.isConnected()) {
           this.connect()
        }
    }
}

handleVisibilityChange(){
  if (!document.hidden) {
    // 窗口重新可见了,判断是否需要重连
     if (!mqttServer.isConnected()) {
       this.connect()
    }
  }
}

合理管理订阅发送

如果按照我们之前封装的效果,使用mqtt是这样的:

import {mqttServer,mqttConnect} from "@/utils/mqttServer"

mqttServer.subscribe("/test/aaa")
mqttServer.subscribe("/test/bbb")
mqttServer.subscribe("/test/ccc")

mqttServer.mqttClient.on("message",(topic, message)=>{
  console.log(topic)
  console.log(message.toString())
})

mqttServer.publish("/test/aaa",{content:"hello"})
mqttServer.publish("/test/bbb",{content:"test123"})

onBeforeUnmount(()=>{
  mqttServer.unSubscribe("/test/aaa")
  mqttServer.unSubscribe("/test/bbb")
  mqttServer.unSubscribe("/test/ccc")
})

乍一看也能用,但是如果基本上每个页面都需要使用呢?或者临时调整了某个mqtt订阅地址,再或者我现在想发送publish时,携带当前登录人信息,这样管理起来就比较麻烦了,所以这里我们可以建立一个集中管理方法,对所有发送,监听都进行一个管理。

为了方便使用,这里定义一个useMqtt文件,他需要包含以下功能:

  • 进入页面自动订阅和分发监听触发的函数

  • 离开页面时自动取消订阅

  • 暴露publish方法,发送可携带公用参数

  • 可选:多个页面共享同一状态

//useMqtt.js
import {mqttServer} from "@/utils/mqttServer"

export const useMqtt = (mqttHooks = {}) => {
   // 主题Map
  const topicMap = new Map();
  
  onMounted(subscribeTopic)
  onBeforeUnmount(unsubscribeTopic)
  
  function subscribeTopic(){
    Object.keys(mqttHooks).forEach((topic) => {
      if (mqttServer.isConnected()) {
        subscribe(topic, mqttHooks[topic]);
      }
    });
     mqttServer.mqttClient.on("message", handleMqttMessage);
  }
  
  function unsubscribeTopic(){
    Array.from(topicMap).forEach((item) => {
      if (mqttServer.isConnected()) {
        unsubscribe(item[0]);
      }
    });
     mqttServer.mqttClient?.off?.("message", handleMqttMessage);
  }
  
 // 消息监听处理
 function handleMqttMessage(messageTopic, message) {
    if (topicMap.has(messageTopic)) {
      const objectData = JSON.parse(message);
      const callback = topicMap.get(messageTopic);
      if (typeof callback === "function") {
        callback(objectData);
      }
    }
  }
  
  // 发送消息
  function publish(topic, message) {
    mqttServer.publish(topic, {
      ...message,
      // ...一些公用的参数
    });
  }
  
  //订阅某个主题
  function subscribe(topic, callback) {
    mqttServer.subscribe(topic);
    topicMap.set(topic, callback);
  }

  //取消订阅某个主题
  function unsubscribe(topic) {
    mqttServer.unsubscribe(topic);
    topicMap.delete(topic);
  }
  
  return{
    subscribe,
    unsubscribe,
    publish
  }
})

同时,为了对topic主题进行统一管理,定义topic常量,方便使用,如果使用ts,可通过enum来进行实现。

export const TOPIC_TYPE={
  A:"/test/aaa",
  B:"/test/bbb"
  C:"/test/ccc"
}

页面使用:

import {useMqtt} from "./useMqtt"
import {TOPIC_TYPE} from "@/constant/topic"

const {subscribe,unsubscribe,publish } = useMqtt({
  [TOPIC_TYPE.A]: handleMqttTestAMessage,
  [TOPIC_TYPE.B]: handleMqttTestBMessage,
  [TOPIC_TYPE.C]: handleMqttTestCMessage,
});

function handleMqttTestAMessage(){}

function handleMqttTestBMessage(){}

function handleMqttTestCMessage(){}

// 如果有些需要后期订阅的主题,可以使用 subscribe后续订阅
subscribe(TOPIC_TYPE.D,handleMqttTestDMessage)

//发送消息
publish(TOPIC_TYPE.A,{content:"hello"})

重连继续发送(可选)

考虑到有这样的场景,本来准备发送10条消息,但是由于这个时候网络波动了一下,只发送出3条,剩余7条未发送,正常情况下肯定是没有了。有两个办法:

  1. 设置qos2,保证只收到一次

  2. 添加一个队列-消费的机制,存储每次发送的请求,消费后就删除

这里我选择了添加一个队列,是因为qos虽然设置为2,可以保证该请求会被收到,但是他会一直向对方确认你是否收到了我的请求,我现在可能需要的场景是重连后再进行该操作,所以这里采用了添加队列机制。

定义publishList字段,用于存储待消费的数据。修改连接成功的事件监听。

// 监听连接成功
this.mqttClient.on("connect", this.handleConnectEvent.bind(this));

 // 监听连接成功处理函数
 handleConnectEvent(){
    //...重置重连计数器
    
    //发送未消费的数据
    if (this.publishList.length > 0) {
      const queue = this.publishList.slice();
      this.publishList = [];
      for (let i = 0; i < queue.length; i++) {
        const { topic, payload, qos } = queue[i];
        this.publish(topic, payload, qos);
      }
    }
 }

修改publish发送方法:

  publish(topic, payload = {}, qos = 0) {
    if (this.isConnected()) {
      this.mqttClient.publish(topic, JSON.stringify(payload), qos);
    } else if (this.reconnectTimer) {
      // 如果当前处于重连状态,还在调用publish
      // 将未发送出去的消息存起来,连接成功后在发送
      // 可选 限制10条
      this.publishList = this.publishList.slice(-9);
      this.publishList.push({
        topic,
        payload,
        qos,
      });
    }
  }

最后

到这里mqtt的封装集成已经完成了,由于篇幅有限,这里就不贴源码了,有需要的后续可以开放源码,希望这个文章能对你有帮助。

0条评论

您的电子邮件等信息不会被公开,以下所有项均必填

OK! You can skip this field.