一、Eclipse Paho介绍
Eclipse Paho 是一个开源项目,由 Eclipse Foundation 主持,提供可靠的开源实现来处理 MQTT(Message Queuing Telemetry Transport)协议以及其他与物联网 (IoT) 相关的协议。MQTT 是一种轻量级的发布/订阅消息传输协议,专为具有低带宽和不可靠网络连接的设备设计。Paho 提供了多种语言的客户端库,使得开发者可以在各种平台上开发基于 MQTT 协议的应用程序。
1.1 主要特点
跨平台支持:Paho 支持多种操作系统和硬件平台,包括Windows、Linux、macOS 以及嵌入式系统。
多语言实现:Paho 客户端库提供了多种编程语言的选择,如 C、C++、Java 和 Python 等。
可靠性:Paho 能够在各种网络条件下可靠地工作,包括高延迟、丢包和间歇性连接等。
安全性:Paho 支持 TLS/SSL 加密通信,以保证数据的安全传输。
灵活性:除了基本的 MQTT 协议实现之外,Paho 还允许扩展和定制以适应特定的需求。
1.2 Eclipse Paho MQTT C客户端库特点
Eclipse Paho MQTT支持多种语言,其中的C客户端库是一个用于实现MQTT协议客户端的开源C语言库。
跨平台支持:该库设计为可移植的,支持多种操作系统和硬件平台,包括Linux、Windows、MacOS以及嵌入式系统。
易于集成:库的设计使得它易于集成到现有的C或C++项目中,为开发者提供了简单而强大的API来构建MQTT客户端。
灵活的连接选项:支持TLS/SSL加密的MQTT连接,提供安全的通信通道。同时,支持QoS(服务质量)级别0(最多一次)、1(至少一次)和2(仅一次)的消息传递,确保消息传递的可靠性。
异步操作:大多数库操作都是异步的,允许应用程序在等待网络响应时继续执行其他任务,提高了应用程序的响应性和效率。
客户端和服务器消息处理:库支持客户端到服务器的消息发布(PUBLISH)以及从服务器到客户端的消息订阅(SUBSCRIBE)和接收(RECEIVE)。
持久会话和遗嘱消息:支持持久会话,即使客户端断开连接也能保持订阅和QoS状态。同时,可以设置遗嘱消息,在客户端异常断开时发送特定消息。
回调函数:通过提供回调函数来处理连接、断开连接、消息接收等事件,使得事件处理逻辑更加灵活。
二、Eclipse Paho源码下载
官网地址:projects.eclipse.org/projects/io…
库的下载地址:github.com/eclipse/pah…
在页面上可以看到源码下载和编译好的库文件下载。
提供了Linux下、Windows下编译好的库文件,可以直接使用。 如果你现在嵌入式平台上、其他平台上使用,那需要自己下载源码进行编译,使用。
三、编译Eclipse Paho库文件
3.1 下载编译openSSL
如果要支持ssl加密的支持,那就需要先编译安装 openSSL。
下载地址:codeload.github.com/openssl/ope…
编译步骤:
1、解压缩
tar zxf openssl-OpenSSL_1_1_1g.tar.gz
2、进入目录,并配置输出目录和编译器
cd openssl-OpenSSL_1_1_1g/ ./config no-asm shared no-async --prefix=`pwd`/ssl_result
如果是交叉编译器,可以指定 --cross-compile-prefix=arm-linux-
。
3、执行下面命令,删除Makefile文件的 -m64
(如果指定了交叉编译器)
sed -i 's/-m64//' Makefile
执行后,可以避免出现这个编译错误:arm-linux-: error: unrecognized command line option '-m64'
4、编译、安装
make && make install
成功编译后,在openssl-OpenSSL_1_1_1g/目录会生成一个ssl_result目录,可以看到里面生成的库.
编译过程中。
编译完成。
3.2 paho.mqtt.c 编译
当前下载的是 paho.mqtt.c-1.3.13.tar.gz
wget https://github.com/eclipse/paho.mqtt.c/archive/refs/tags/v1.3.13.tar.gz
编译步骤:
1、解压缩,创建要安装目录paho.mqtt.c_result
tar zxf paho.mqtt.c-1.3.13.tar.gz mkdir paho.mqtt.c_result/bin -p mkdir paho.mqtt.c_result/include -p mkdir paho.mqtt.c_result/lib -p mkdir paho.mqtt.c_result/share/man/man1 -p
2、进入目录,交叉编译
cd paho.mqtt.c-1.3.13/ make CC=gcc CFLAGS:="-I `pwd`/../ssl_result/include" LDFLAGS:="-L `pwd`/../ssl_result/lib"
编译过程中:
编译完成:
如果是交叉编译需要指定交叉编译器完整名字:
cd paho.mqtt.c-1.3.13/ make CC=arm-linux-gcc CFLAGS:="-I `pwd`/../ssl_result/include" LDFLAGS:="-L `pwd`/../ssl_result/lib"
参数介绍:
CFLAGS:=“-I `pwd`/…/ssl_result/include”:指定前面编译的 openssl 的头文件; LDFLAGS:=“-L `pwd`/…/ssl_result/lib”:指定前面编译的 openssl 的库文件路径;
3、make install,安装编译结果
make install prefix=`pwd`/../paho.mqtt.c_result prefix=`pwd`/…/paho.mqtt.c_result :指定安装目录路径;
编译完成后,会生成目录。
3.3 paho.mqtt.cpp 编译
当前下载的是paho.mqtt.cpp-1.3.2.tar.gz
如果你下载的版本跟我的一样,可以使用下面的脚本进行编译。
编译之前,先创建一个脚本文件,名字为paho.mqtt.cpp_install
,将下面代码粘贴进去保存。
#! /bin/sh RESULT_DIR=$(pwd)/result_dir RESULT_SSL=${RESULT_DIR}/ssl_result RESULT_MQTT_C=${RESULT_DIR}/paho.mqtt.c_result RESULT_MQTT_CPP=${RESULT_DIR}/paho.mqtt.cpp_result CROSSS_COMPILE_TOOL=arm-linux- #如果你是需要交叉编译
编译步骤:
1、解压缩
tar zxf paho.mqtt.cpp-1.3.2.tar.gz
2、进入目录
cd paho.mqtt.cpp-1.3.2/
3、执行 cmake
mkdir build_arm cd build_arm
编译不需要 openssl 的库
cmake .. -DCMAKE_CXX_COMPILER=${CROSSS_COMPILE_TOOL}g++ \ -DCMAKE_INSTALL_PREFIX=${RESULT_MQTT_CPP} \ -DPAHO_MQTT_C_LIBRARIES=${RESULT_MQTT_C}/lib/libpaho-mqtt3a.so \ -DPAHO_MQTT_C_INCLUDE_DIRS=${RESULT_MQTT_C}/include \ -DPAHO_WITH_SSL=OFF \ -DCMAKE_CXX_FLAGS="-std=gnu++11 -mcpu=cortex-a53"
说明:如果你在PC机编译目标是PC机本身使用,-mcpu=cortex-a53
就不用写。 如果交叉编译的目标是嵌入式芯片,就如实写构架。
编译带有 openssl 的库
cmake .. -DCMAKE_CXX_COMPILER=${CROSSS_COMPILE_TOOL}g++ \ -DCMAKE_INSTALL_PREFIX=${RESULT_MQTT_CPP} \ -DPAHO_MQTT_C_LIBRARIES=${RESULT_MQTT_C}/lib/libpaho-mqtt3a.so \ -DPAHO_MQTT_C_INCLUDE_DIRS=${RESULT_MQTT_C}/include \ -DOPENSSL_SSL_LIBRARY=${RESULT_SSL}/lib/libssl.so \ -DOPENSSL_INCLUDE_DIR=${RESULT_SSL}/include \ -DOPENSSL_CRYPTO_LIBRARY=${RESULT_SSL}/lib/libcrypto.so \ -DCMAKE_CXX_FLAGS="-std=gnu++11 -mcpu=cortex-a53"
说明:如果你在PC机编译目标是PC机本身使用,-mcpu=cortex-a53
就不用写。 如果交叉编译的目标是嵌入式芯片,就如实写构架。
4、编译
make && make install
5、将下载的源码包 paho.mqtt.cpp-1.3.2.tar.gz
和 上面保存的脚本paho.mqtt.cpp_install
放到同一目录,并且将前面编译好的openssl库、paho.mqtt.c库放在脚本指定的结果目录,当前是放到 result_dir 目录的。
6、执行./paho.mqtt.cpp_install.sh
编译,编译完成后,在result_dir目录下会生成一个名为paho.mqtt.cpp_result
的目录。
四、代码案例
4.1 订阅—async_subscribe.cpp
这是使用了 libpaho-mqttpp3.so
进行订阅消息的源码,源码路径在源码的这个路径:paho.mqtt.cpp-1.3.2/src/samples/async_subscribe.cpp
,只更改了服务器地址。
完整代码如下:
#include <iostream> #include <cstdlib> #include <string> #include <cstring> #include <cctype> #include <thread> #include <chrono> #include "mqtt/async_client.h" const std::string SERVER_ADDRESS("117.78.5.125:1883"); const std::string CLIENT_ID("paho_cpp_async_subcribe"); const std::string TOPIC("hello"); const int QOS = 1; const int N_RETRY_ATTEMPTS = 5; / // Callbacks for the success or failures of requested actions. // This could be used to initiate further action, but here we just log the // results to the console. class action_listener : public virtual mqtt::iaction_listener { std::string name_; void on_failure(const mqtt::token& tok) override { std::cout << name_ << " failure"; if (tok.get_message_id() != 0) std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl; std::cout << std::endl; } void on_success(const mqtt::token& tok) override { std::cout << name_ << " success"; if (tok.get_message_id() != 0) std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl; auto top = tok.get_topics(); if (top && !top->empty()) std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl; std::cout << std::endl; } public: action_listener(const std::string& name) : name_(name) {} }; / /** * Local callback & listener class for use with the client connection. * This is primarily intended to receive messages, but it will also monitor * the connection to the broker. If the connection is lost, it will attempt * to restore the connection and re-subscribe to the topic. */ class callback : public virtual mqtt::callback, public virtual mqtt::iaction_listener { // Counter for the number of connection retries int nretry_; // The MQTT client mqtt::async_client& cli_; // Options to use if we need to reconnect mqtt::connect_options& connOpts_; // An action listener to display the result of actions. action_listener subListener_; // This deomonstrates manually reconnecting to the broker by calling // connect() again. This is a possibility for an application that keeps // a copy of it's original connect_options, or if the app wants to // reconnect with different options. // Another way this can be done manually, if using the same options, is // to just call the async_client::reconnect() method. void reconnect() { std::this_thread::sleep_for(std::chrono::milliseconds(2500)); try { cli_.connect(connOpts_, nullptr, *this); } catch (const mqtt::exception& exc) { std::cerr << "Error: " << exc.what() << std::endl; exit(1); } } // Re-connection failure void on_failure(const mqtt::token& tok) override { std::cout << "Connection attempt failed" << std::endl; if (++nretry_ > N_RETRY_ATTEMPTS) exit(1); reconnect(); } // (Re)connection success // Either this or connected() can be used for callbacks. void on_success(const mqtt::token& tok) override {} // (Re)connection success void connected(const std::string& cause) override { std::cout << "\nConnection success" << std::endl; std::cout << "\nSubscribing to topic '" << TOPIC << "'\n" << "\tfor client " << CLIENT_ID << " using QoS" << QOS << "\n" << "\nPress Q<Enter> to quit\n" << std::endl; cli_.subscribe(TOPIC, QOS, nullptr, subListener_); } // Callback for when the connection is lost. // This will initiate the attempt to manually reconnect. void connection_lost(const std::string& cause) override { std::cout << "\nConnection lost" << std::endl; if (!cause.empty()) std::cout << "\tcause: " << cause << std::endl; std::cout << "Reconnecting..." << std::endl; nretry_ = 0; reconnect(); } // Callback for when a message arrives. void message_arrived(mqtt::const_message_ptr msg) override { std::cout << "Message arrived" << std::endl; std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl; std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl; } void delivery_complete(mqtt::delivery_token_ptr token) override {} public: callback(mqtt::async_client& cli, mqtt::connect_options& connOpts) : nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription") {} }; / int main(int argc, char* argv[]) { // A subscriber often wants the server to remember its messages when its // disconnected. In that case, it needs a unique ClientID and a // non-clean session. mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID); mqtt::connect_options connOpts; connOpts.set_clean_session(false); // Install the callback(s) before connecting. callback cb(cli, connOpts); cli.set_callback(cb); // Start the connection. // When completed, the callback will subscribe to topic. try { std::cout << "Connecting to the MQTT server..." << std::flush; cli.connect(connOpts, nullptr, cb); } catch (const mqtt::exception& exc) { std::cerr << "\nERROR: Unable to connect to MQTT server: '" << SERVER_ADDRESS << "'" << exc << std::endl; return 1; } // Just block till user tells us to quit. while (std::tolower(std::cin.get()) != 'q') ; // Disconnect try { std::cout << "\nDisconnecting from the MQTT server..." << std::flush; cli.disconnect()->wait(); std::cout << "OK" << std::endl; } catch (const mqtt::exception& exc) { std::cerr << exc << std::endl; return 1; } return 0; }
编译指令:
g++ async_subscribe.cpp -I result_dir/paho.mqtt.cpp_result/include/ -I result_dir/paho.mqtt.c_result/include/ -L result_dir/paho.mqtt.cpp_result/lib/ -L result_dir/paho.mqtt.c_result/lib/ -l paho-mqttpp3 -l paho-mqtt3a
4.2 发布——async_publish.cpp
这是使用了 libpaho-mqttpp3.so 进行发布消息的源码,源码路径在源码的这个路径:paho.mqtt.cpp-1.3.2/src/samples/async_publish.cpp
,只更改了服务器地址。
完整代码如下:
#include <iostream> #include <cstdlib> #include <string> #include <thread> #include <atomic> #include <chrono> #include <cstring> #include "mqtt/async_client.h" using namespace std; const string DFLT_SERVER_ADDRESS { "117.78.5.125:1883" }; const string CLIENT_ID { "paho_cpp_async_publish" }; const string PERSIST_DIR { "./persist" }; const string TOPIC { "hello" }; const char* PAYLOAD1 = "Hello World!"; const char* PAYLOAD2 = "Hi there!"; const char* PAYLOAD3 = "Is anyone listening?"; const char* PAYLOAD4 = "Someone is always listening."; const char* LWT_PAYLOAD = "Last will and testament."; const int QOS = 1; const auto TIMEOUT = std::chrono::seconds(10); / /** * A callback class for use with the main MQTT client. */ class callback : public virtual mqtt::callback { public: void connection_lost(const string& cause) override { cout << "\nConnection lost" << endl; if (!cause.empty()) cout << "\tcause: " << cause << endl; } void delivery_complete(mqtt::delivery_token_ptr tok) override { cout << "\tDelivery complete for token: " << (tok ? tok->get_message_id() : -1) << endl; } }; / /** * A base action listener. */ class action_listener : public virtual mqtt::iaction_listener { protected: void on_failure(const mqtt::token& tok) override { cout << "\tListener failure for token: " << tok.get_message_id() << endl; } void on_success(const mqtt::token& tok) override { cout << "\tListener success for token: " << tok.get_message_id() << endl; } }; / /** * A derived action listener for publish events. */ class delivery_action_listener : public action_listener { atomic<bool> done_; void on_failure(const mqtt::token& tok) override { action_listener::on_failure(tok); done_ = true; } void on_success(const mqtt::token& tok) override { action_listener::on_success(tok); done_ = true; } public: delivery_action_listener() : done_(false) {} bool is_done() const { return done_; } }; / int main(int argc, char* argv[]) { // A client that just publishes normally doesn't need a persistent // session or Client ID unless it's using persistence, then the local // library requires an ID to identify the persistence files. string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS, clientID = (argc > 2) ? string(argv[2]) : CLIENT_ID; cout << "Initializing for server '" << address << "'..." << endl; mqtt::async_client client(address, clientID, PERSIST_DIR); callback cb; client.set_callback(cb); auto connOpts = mqtt::connect_options_builder() .clean_session() .will(mqtt::message(TOPIC, LWT_PAYLOAD, strlen(LWT_PAYLOAD), QOS, false)) .finalize(); cout << " ...OK" << endl; try { cout << "\nConnecting..." << endl; mqtt::token_ptr conntok = client.connect(connOpts); cout << "Waiting for the connection..." << endl; conntok->wait(); cout << " ...OK" << endl; // First use a message pointer. cout << "\nSending message..." << endl; mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, PAYLOAD1); pubmsg->set_qos(QOS); client.publish(pubmsg)->wait_for(TIMEOUT); cout << " ...OK" << endl; // Now try with itemized publish. cout << "\nSending next message..." << endl; mqtt::delivery_token_ptr pubtok; pubtok = client.publish(TOPIC, PAYLOAD2, strlen(PAYLOAD2), QOS, false); cout << " ...with token: " << pubtok->get_message_id() << endl; cout << " ...for message with " << pubtok->get_message()->get_payload().size() << " bytes" << endl; pubtok->wait_for(TIMEOUT); cout << " ...OK" << endl; // Now try with a listener cout << "\nSending next message..." << endl; action_listener listener; pubmsg = mqtt::make_message(TOPIC, PAYLOAD3); pubtok = client.publish(pubmsg, nullptr, listener); pubtok->wait(); cout << " ...OK" << endl; // Finally try with a listener, but no token cout << "\nSending final message..." << endl; delivery_action_listener deliveryListener; pubmsg = mqtt::make_message(TOPIC, PAYLOAD4); client.publish(pubmsg, nullptr, deliveryListener); while (!deliveryListener.is_done()) { this_thread::sleep_for(std::chrono::milliseconds(100)); } cout << "OK" << endl; // Double check that there are no pending tokens auto toks = client.get_pending_delivery_tokens(); if (!toks.empty()) cout << "Error: There are pending delivery tokens!" << endl; // Disconnect cout << "\nDisconnecting..." << endl; client.disconnect()->wait(); cout << " ...OK" << endl; } catch (const mqtt::exception& exc) { cerr << exc.what() << endl; return 1; } return 0; }
编译指令:
g++ async_publish.cpp -I result_dir/paho.mqtt.cpp_result/include/ -I result_dir/paho.mqtt.c_result/include/ -L result_dir/paho.mqtt.cpp_result/lib/ -L result_dir/paho.mqtt.c_result/lib/ -l paho-mqttpp3 -l paho-mqtt3a -o async_publish