一、Eclipse Paho介紹
Eclipse Paho 是一個開源專案,由 Eclipse Foundation 主持,提供可靠的開源實現來處理 MQTT(Message Queuing Telemetry Transport)協議以及其他與物聯網 (IoT) 相關的協議。MQTT 是一種輕量級的釋出/訂閱訊息傳輸協議,專為具有低頻寬和不可靠網路連線的裝置設計。Paho 提供了多種語言的客戶端庫,使得開發者可以在各種平臺上開發基於 MQTT 協議的應用程式。
1.1 主要特點
跨平臺支援:Paho 支援多種作業系統和硬件平臺,包括Windows、Linux、macOS 以及嵌入式系統。
多語言實現:Paho 客戶端庫提供了多種程式語言的選擇,如 C、C++、Java 和 Python 等。
可靠性:Paho 能夠在各種網路條件下可靠地工作,包括高延遲、丟包和間歇性連線等。
安全性:Paho 支援 TLS/SSL 加密通訊,以保證資料的安全傳輸。
靈活性:除了基本的 MQTT 協議實現之外,Paho 還允許擴充套件和定製以適應特定的需求。
1.2 Eclipse Paho MQTT C客戶端庫特點
Eclipse Paho MQTT支援多種語言,其中的C客戶端庫是一個用於實現MQTT協議客戶端的開源C語言庫。
跨平臺支援:該庫設計為可移植的,支援多種作業系統和硬件平臺,包括Linux、Windows、MacOS以及嵌入式系統。
易於整合:庫的設計使得它易於整合到現有的C或C++專案中,為開發者提供了簡單而強大的API來構建MQTT客戶端。
靈活的連線選項:支援TLS/SSL加密的MQTT連線,提供安全的通訊通道。同時,支援QoS(服務質量)級別0(最多一次)、1(至少一次)和2(僅一次)的訊息傳遞,確保訊息傳遞的可靠性。
非同步操作:大多數庫操作都是非同步的,允許應用程式在等待網路響應時繼續執行其他任務,提高了應用程式的響應性和效率。
客戶端和伺服器訊息處理:庫支援客戶端到伺服器的訊息釋出(PUBLISH)以及從伺服器到客戶端的訊息訂閱(SUBSCRIBE)和接收(RECEIVE)。
持久會話和遺囑訊息:支援持久會話,即使客戶端斷開連線也能保持訂閱和QoS狀態。同時,可以設定遺囑訊息,在客戶端異常斷開時傳送特定訊息。
回撥函式:透過提供回撥函式來處理連線、斷開連線、訊息接收等事件,使得事件處理邏輯更加靈活。
二、Eclipse Paho原始碼下載
官網地址:projects.eclipse.org/projects/io…
庫的下載地址:github.com/eclipse/pah…
在頁面上可以看到原始碼下載和編譯好的庫檔案下載。
提供了Linux下、Windows下編譯好的庫檔案,可以直接使用。 如果你現在嵌入式平臺上、其他平臺上使用,那需要自己下載原始碼進行編譯,使用。
三、編譯Eclipse Paho庫檔案
3.1 下載編譯openSSL
如果要支援ssl加密的支援,那就需要先編譯安裝 openSSL。
下載地址:codeload.github.com/openssl/ope…
編譯步驟:
1、解壓縮
tar zxf openssl-OpenSSL_1_1_1g.tar.gz
2、進入目錄,並配置輸出目錄和編譯器
cd openssl-OpenSSL_1_1_1g/ ./config no-asm shared no-async --prefix=`pwd`/ssl_result
如果是交叉編譯器,可以指定 --cross-compile-prefix=arm-linux-
。
3、執行下面命令,刪除Makefile檔案的 -m64
(如果指定了交叉編譯器)
sed -i 's/-m64//' Makefile
執行後,可以避免出現這個編譯錯誤:arm-linux-: error: unrecognized command line option '-m64'
4、編譯、安裝
make && make install
成功編譯後,在openssl-OpenSSL_1_1_1g/目錄會生成一個ssl_result目錄,可以看到裡面生成的庫.
編譯過程中。
編譯完成。
3.2 paho.mqtt.c 編譯
當前下載的是 paho.mqtt.c-1.3.13.tar.gz
wget https://github.com/eclipse/paho.mqtt.c/archive/refs/tags/v1.3.13.tar.gz
編譯步驟:
1、解壓縮,建立要安裝目錄paho.mqtt.c_result
tar zxf paho.mqtt.c-1.3.13.tar.gz mkdir paho.mqtt.c_result/bin -p mkdir paho.mqtt.c_result/include -p mkdir paho.mqtt.c_result/lib -p mkdir paho.mqtt.c_result/share/man/man1 -p
2、進入目錄,交叉編譯
cd paho.mqtt.c-1.3.13/ make CC=gcc CFLAGS:="-I `pwd`/../ssl_result/include" LDFLAGS:="-L `pwd`/../ssl_result/lib"
編譯過程中:
編譯完成:
如果是交叉編譯需要指定交叉編譯器完整名字:
cd paho.mqtt.c-1.3.13/ make CC=arm-linux-gcc CFLAGS:="-I `pwd`/../ssl_result/include" LDFLAGS:="-L `pwd`/../ssl_result/lib"
引數介紹:
CFLAGS:=“-I `pwd`/…/ssl_result/include”:指定前面編譯的 openssl 的標頭檔案; LDFLAGS:=“-L `pwd`/…/ssl_result/lib”:指定前面編譯的 openssl 的庫檔案路徑;
3、make install,安裝編譯結果
make install prefix=`pwd`/../paho.mqtt.c_result prefix=`pwd`/…/paho.mqtt.c_result :指定安裝目錄路徑;
編譯完成後,會生成目錄。
3.3 paho.mqtt.cpp 編譯
當前下載的是paho.mqtt.cpp-1.3.2.tar.gz
如果你下載的版本跟我的一樣,可以使用下面的指令碼進行編譯。
編譯之前,先建立一個指令碼檔案,名字為paho.mqtt.cpp_install
,將下面程式碼貼上進去儲存。
#! /bin/sh RESULT_DIR=$(pwd)/result_dir RESULT_SSL=${RESULT_DIR}/ssl_result RESULT_MQTT_C=${RESULT_DIR}/paho.mqtt.c_result RESULT_MQTT_CPP=${RESULT_DIR}/paho.mqtt.cpp_result CROSSS_COMPILE_TOOL=arm-linux- #如果你是需要交叉編譯
編譯步驟:
1、解壓縮
tar zxf paho.mqtt.cpp-1.3.2.tar.gz
2、進入目錄
cd paho.mqtt.cpp-1.3.2/
3、執行 cmake
mkdir build_arm cd build_arm
編譯不需要 openssl 的庫
cmake .. -DCMAKE_CXX_COMPILER=${CROSSS_COMPILE_TOOL}g++ \ -DCMAKE_INSTALL_PREFIX=${RESULT_MQTT_CPP} \ -DPAHO_MQTT_C_LIBRARIES=${RESULT_MQTT_C}/lib/libpaho-mqtt3a.so \ -DPAHO_MQTT_C_INCLUDE_DIRS=${RESULT_MQTT_C}/include \ -DPAHO_WITH_SSL=OFF \ -DCMAKE_CXX_FLAGS="-std=gnu++11 -mcpu=cortex-a53"
說明:如果你在PC機編譯目標是PC機本身使用,-mcpu=cortex-a53
就不用寫。 如果交叉編譯的目標是嵌入式晶片,就如實寫構架。
編譯帶有 openssl 的庫
cmake .. -DCMAKE_CXX_COMPILER=${CROSSS_COMPILE_TOOL}g++ \ -DCMAKE_INSTALL_PREFIX=${RESULT_MQTT_CPP} \ -DPAHO_MQTT_C_LIBRARIES=${RESULT_MQTT_C}/lib/libpaho-mqtt3a.so \ -DPAHO_MQTT_C_INCLUDE_DIRS=${RESULT_MQTT_C}/include \ -DOPENSSL_SSL_LIBRARY=${RESULT_SSL}/lib/libssl.so \ -DOPENSSL_INCLUDE_DIR=${RESULT_SSL}/include \ -DOPENSSL_CRYPTO_LIBRARY=${RESULT_SSL}/lib/libcrypto.so \ -DCMAKE_CXX_FLAGS="-std=gnu++11 -mcpu=cortex-a53"
說明:如果你在PC機編譯目標是PC機本身使用,-mcpu=cortex-a53
就不用寫。 如果交叉編譯的目標是嵌入式晶片,就如實寫構架。
4、編譯
make && make install
5、將下載的原始碼包 paho.mqtt.cpp-1.3.2.tar.gz
和 上面儲存的指令碼paho.mqtt.cpp_install
放到同一目錄,並且將前面編譯好的openssl庫、paho.mqtt.c庫放在指令碼指定的結果目錄,當前是放到 result_dir 目錄的。
6、執行./paho.mqtt.cpp_install.sh
編譯,編譯完成後,在result_dir目錄下會生成一個名為paho.mqtt.cpp_result
的目錄。
四、程式碼案例
4.1 訂閱—async_subscribe.cpp
這是使用了 libpaho-mqttpp3.so
進行訂閱訊息的原始碼,原始碼路徑在原始碼的這個路徑:paho.mqtt.cpp-1.3.2/src/samples/async_subscribe.cpp
,只更改了伺服器地址。
完整程式碼如下:
#include <iostream> #include <cstdlib> #include <string> #include <cstring> #include <cctype> #include <thread> #include <chrono> #include "mqtt/async_client.h" const std::string SERVER_ADDRESS("117.78.5.125:1883"); const std::string CLIENT_ID("paho_cpp_async_subcribe"); const std::string TOPIC("hello"); const int QOS = 1; const int N_RETRY_ATTEMPTS = 5; / // Callbacks for the success or failures of requested actions. // This could be used to initiate further action, but here we just log the // results to the console. class action_listener : public virtual mqtt::iaction_listener { std::string name_; void on_failure(const mqtt::token& tok) override { std::cout << name_ << " failure"; if (tok.get_message_id() != 0) std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl; std::cout << std::endl; } void on_success(const mqtt::token& tok) override { std::cout << name_ << " success"; if (tok.get_message_id() != 0) std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl; auto top = tok.get_topics(); if (top && !top->empty()) std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl; std::cout << std::endl; } public: action_listener(const std::string& name) : name_(name) {} }; / /** * Local callback & listener class for use with the client connection. * This is primarily intended to receive messages, but it will also monitor * the connection to the broker. If the connection is lost, it will attempt * to restore the connection and re-subscribe to the topic. */ class callback : public virtual mqtt::callback, public virtual mqtt::iaction_listener { // Counter for the number of connection retries int nretry_; // The MQTT client mqtt::async_client& cli_; // Options to use if we need to reconnect mqtt::connect_options& connOpts_; // An action listener to display the result of actions. action_listener subListener_; // This deomonstrates manually reconnecting to the broker by calling // connect() again. This is a possibility for an application that keeps // a copy of it's original connect_options, or if the app wants to // reconnect with different options. // Another way this can be done manually, if using the same options, is // to just call the async_client::reconnect() method. void reconnect() { std::this_thread::sleep_for(std::chrono::milliseconds(2500)); try { cli_.connect(connOpts_, nullptr, *this); } catch (const mqtt::exception& exc) { std::cerr << "Error: " << exc.what() << std::endl; exit(1); } } // Re-connection failure void on_failure(const mqtt::token& tok) override { std::cout << "Connection attempt failed" << std::endl; if (++nretry_ > N_RETRY_ATTEMPTS) exit(1); reconnect(); } // (Re)connection success // Either this or connected() can be used for callbacks. void on_success(const mqtt::token& tok) override {} // (Re)connection success void connected(const std::string& cause) override { std::cout << "\nConnection success" << std::endl; std::cout << "\nSubscribing to topic '" << TOPIC << "'\n" << "\tfor client " << CLIENT_ID << " using QoS" << QOS << "\n" << "\nPress Q<Enter> to quit\n" << std::endl; cli_.subscribe(TOPIC, QOS, nullptr, subListener_); } // Callback for when the connection is lost. // This will initiate the attempt to manually reconnect. void connection_lost(const std::string& cause) override { std::cout << "\nConnection lost" << std::endl; if (!cause.empty()) std::cout << "\tcause: " << cause << std::endl; std::cout << "Reconnecting..." << std::endl; nretry_ = 0; reconnect(); } // Callback for when a message arrives. void message_arrived(mqtt::const_message_ptr msg) override { std::cout << "Message arrived" << std::endl; std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl; std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl; } void delivery_complete(mqtt::delivery_token_ptr token) override {} public: callback(mqtt::async_client& cli, mqtt::connect_options& connOpts) : nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription") {} }; / int main(int argc, char* argv[]) { // A subscriber often wants the server to remember its messages when its // disconnected. In that case, it needs a unique ClientID and a // non-clean session. mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID); mqtt::connect_options connOpts; connOpts.set_clean_session(false); // Install the callback(s) before connecting. callback cb(cli, connOpts); cli.set_callback(cb); // Start the connection. // When completed, the callback will subscribe to topic. try { std::cout << "Connecting to the MQTT server..." << std::flush; cli.connect(connOpts, nullptr, cb); } catch (const mqtt::exception& exc) { std::cerr << "\nERROR: Unable to connect to MQTT server: '" << SERVER_ADDRESS << "'" << exc << std::endl; return 1; } // Just block till user tells us to quit. while (std::tolower(std::cin.get()) != 'q') ; // Disconnect try { std::cout << "\nDisconnecting from the MQTT server..." << std::flush; cli.disconnect()->wait(); std::cout << "OK" << std::endl; } catch (const mqtt::exception& exc) { std::cerr << exc << std::endl; return 1; } return 0; }
編譯指令:
g++ async_subscribe.cpp -I result_dir/paho.mqtt.cpp_result/include/ -I result_dir/paho.mqtt.c_result/include/ -L result_dir/paho.mqtt.cpp_result/lib/ -L result_dir/paho.mqtt.c_result/lib/ -l paho-mqttpp3 -l paho-mqtt3a
4.2 釋出——async_publish.cpp
這是使用了 libpaho-mqttpp3.so 進行釋出訊息的原始碼,原始碼路徑在原始碼的這個路徑:paho.mqtt.cpp-1.3.2/src/samples/async_publish.cpp
,只更改了伺服器地址。
完整程式碼如下:
#include <iostream> #include <cstdlib> #include <string> #include <thread> #include <atomic> #include <chrono> #include <cstring> #include "mqtt/async_client.h" using namespace std; const string DFLT_SERVER_ADDRESS { "117.78.5.125:1883" }; const string CLIENT_ID { "paho_cpp_async_publish" }; const string PERSIST_DIR { "./persist" }; const string TOPIC { "hello" }; const char* PAYLOAD1 = "Hello World!"; const char* PAYLOAD2 = "Hi there!"; const char* PAYLOAD3 = "Is anyone listening?"; const char* PAYLOAD4 = "Someone is always listening."; const char* LWT_PAYLOAD = "Last will and testament."; const int QOS = 1; const auto TIMEOUT = std::chrono::seconds(10); / /** * A callback class for use with the main MQTT client. */ class callback : public virtual mqtt::callback { public: void connection_lost(const string& cause) override { cout << "\nConnection lost" << endl; if (!cause.empty()) cout << "\tcause: " << cause << endl; } void delivery_complete(mqtt::delivery_token_ptr tok) override { cout << "\tDelivery complete for token: " << (tok ? tok->get_message_id() : -1) << endl; } }; / /** * A base action listener. */ class action_listener : public virtual mqtt::iaction_listener { protected: void on_failure(const mqtt::token& tok) override { cout << "\tListener failure for token: " << tok.get_message_id() << endl; } void on_success(const mqtt::token& tok) override { cout << "\tListener success for token: " << tok.get_message_id() << endl; } }; / /** * A derived action listener for publish events. */ class delivery_action_listener : public action_listener { atomic<bool> done_; void on_failure(const mqtt::token& tok) override { action_listener::on_failure(tok); done_ = true; } void on_success(const mqtt::token& tok) override { action_listener::on_success(tok); done_ = true; } public: delivery_action_listener() : done_(false) {} bool is_done() const { return done_; } }; / int main(int argc, char* argv[]) { // A client that just publishes normally doesn't need a persistent // session or Client ID unless it's using persistence, then the local // library requires an ID to identify the persistence files. string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS, clientID = (argc > 2) ? string(argv[2]) : CLIENT_ID; cout << "Initializing for server '" << address << "'..." << endl; mqtt::async_client client(address, clientID, PERSIST_DIR); callback cb; client.set_callback(cb); auto connOpts = mqtt::connect_options_builder() .clean_session() .will(mqtt::message(TOPIC, LWT_PAYLOAD, strlen(LWT_PAYLOAD), QOS, false)) .finalize(); cout << " ...OK" << endl; try { cout << "\nConnecting..." << endl; mqtt::token_ptr conntok = client.connect(connOpts); cout << "Waiting for the connection..." << endl; conntok->wait(); cout << " ...OK" << endl; // First use a message pointer. cout << "\nSending message..." << endl; mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, PAYLOAD1); pubmsg->set_qos(QOS); client.publish(pubmsg)->wait_for(TIMEOUT); cout << " ...OK" << endl; // Now try with itemized publish. cout << "\nSending next message..." << endl; mqtt::delivery_token_ptr pubtok; pubtok = client.publish(TOPIC, PAYLOAD2, strlen(PAYLOAD2), QOS, false); cout << " ...with token: " << pubtok->get_message_id() << endl; cout << " ...for message with " << pubtok->get_message()->get_payload().size() << " bytes" << endl; pubtok->wait_for(TIMEOUT); cout << " ...OK" << endl; // Now try with a listener cout << "\nSending next message..." << endl; action_listener listener; pubmsg = mqtt::make_message(TOPIC, PAYLOAD3); pubtok = client.publish(pubmsg, nullptr, listener); pubtok->wait(); cout << " ...OK" << endl; // Finally try with a listener, but no token cout << "\nSending final message..." << endl; delivery_action_listener deliveryListener; pubmsg = mqtt::make_message(TOPIC, PAYLOAD4); client.publish(pubmsg, nullptr, deliveryListener); while (!deliveryListener.is_done()) { this_thread::sleep_for(std::chrono::milliseconds(100)); } cout << "OK" << endl; // Double check that there are no pending tokens auto toks = client.get_pending_delivery_tokens(); if (!toks.empty()) cout << "Error: There are pending delivery tokens!" << endl; // Disconnect cout << "\nDisconnecting..." << endl; client.disconnect()->wait(); cout << " ...OK" << endl; } catch (const mqtt::exception& exc) { cerr << exc.what() << endl; return 1; } return 0; }
編譯指令:
g++ async_publish.cpp -I result_dir/paho.mqtt.cpp_result/include/ -I result_dir/paho.mqtt.c_result/include/ -L result_dir/paho.mqtt.cpp_result/lib/ -L result_dir/paho.mqtt.c_result/lib/ -l paho-mqttpp3 -l paho-mqtt3a -o async_publish