切換語言為:簡體
Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

  • 爱糖宝
  • 2024-08-30
  • 2066
  • 0
  • 0

一、Eclipse Paho介紹

Eclipse Paho 是一個開源專案,由 Eclipse Foundation 主持,提供可靠的開源實現來處理 MQTT(Message Queuing Telemetry Transport)協議以及其他與物聯網 (IoT) 相關的協議。MQTT 是一種輕量級的釋出/訂閱訊息傳輸協議,專為具有低頻寬和不可靠網路連線的裝置設計。Paho 提供了多種語言的客戶端庫,使得開發者可以在各種平臺上開發基於 MQTT 協議的應用程式。

1.1 主要特點

  1. 跨平臺支援:Paho 支援多種作業系統和硬件平臺,包括Windows、Linux、macOS 以及嵌入式系統。

  2. 多語言實現:Paho 客戶端庫提供了多種程式語言的選擇,如 C、C++、Java 和 Python 等。

  3. 可靠性:Paho 能夠在各種網路條件下可靠地工作,包括高延遲、丟包和間歇性連線等。

  4. 安全性:Paho 支援 TLS/SSL 加密通訊,以保證資料的安全傳輸。

  5. 靈活性:除了基本的 MQTT 協議實現之外,Paho 還允許擴充套件和定製以適應特定的需求。

1.2 Eclipse Paho MQTT C客戶端庫特點

Eclipse Paho MQTT支援多種語言,其中的C客戶端庫是一個用於實現MQTT協議客戶端的開源C語言庫。

  1. 跨平臺支援:該庫設計為可移植的,支援多種作業系統和硬件平臺,包括Linux、Windows、MacOS以及嵌入式系統。

  2. 易於整合:庫的設計使得它易於整合到現有的C或C++專案中,為開發者提供了簡單而強大的API來構建MQTT客戶端。

  3. 靈活的連線選項:支援TLS/SSL加密的MQTT連線,提供安全的通訊通道。同時,支援QoS(服務質量)級別0(最多一次)、1(至少一次)和2(僅一次)的訊息傳遞,確保訊息傳遞的可靠性。

  4. 非同步操作:大多數庫操作都是非同步的,允許應用程式在等待網路響應時繼續執行其他任務,提高了應用程式的響應性和效率。

  5. 客戶端和伺服器訊息處理:庫支援客戶端到伺服器的訊息釋出(PUBLISH)以及從伺服器到客戶端的訊息訂閱(SUBSCRIBE)和接收(RECEIVE)。

  6. 持久會話和遺囑訊息:支援持久會話,即使客戶端斷開連線也能保持訂閱和QoS狀態。同時,可以設定遺囑訊息,在客戶端異常斷開時傳送特定訊息。

  7. 回撥函式:透過提供回撥函式來處理連線、斷開連線、訊息接收等事件,使得事件處理邏輯更加靈活。

二、Eclipse Paho原始碼下載

官網地址:projects.eclipse.org/projects/io…

Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

庫的下載地址:github.com/eclipse/pah…

在頁面上可以看到原始碼下載和編譯好的庫檔案下載。

提供了Linux下、Windows下編譯好的庫檔案,可以直接使用。  如果你現在嵌入式平臺上、其他平臺上使用,那需要自己下載原始碼進行編譯,使用。

Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

三、編譯Eclipse Paho庫檔案

3.1 下載編譯openSSL

如果要支援ssl加密的支援,那就需要先編譯安裝 openSSL。

下載地址:codeload.github.com/openssl/ope…

編譯步驟:

1、解壓縮

tar zxf openssl-OpenSSL_1_1_1g.tar.gz

2、進入目錄,並配置輸出目錄和編譯器

cd openssl-OpenSSL_1_1_1g/
./config no-asm shared no-async --prefix=`pwd`/ssl_result

如果是交叉編譯器,可以指定 --cross-compile-prefix=arm-linux-

Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

3、執行下面命令,刪除Makefile檔案的 -m64

(如果指定了交叉編譯器)

sed -i 's/-m64//' Makefile

執行後,可以避免出現這個編譯錯誤:arm-linux-: error: unrecognized command line option '-m64'

4、編譯、安裝

make && make install

成功編譯後,在openssl-OpenSSL_1_1_1g/目錄會生成一個ssl_result目錄,可以看到裡面生成的庫.

編譯過程中。

Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

編譯完成。

Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

3.2 paho.mqtt.c 編譯

當前下載的是 paho.mqtt.c-1.3.13.tar.gz

下載地址:github.com/eclipse/pah…

wget https://github.com/eclipse/paho.mqtt.c/archive/refs/tags/v1.3.13.tar.gz

Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

編譯步驟:

1、解壓縮,建立要安裝目錄paho.mqtt.c_result

tar zxf paho.mqtt.c-1.3.13.tar.gz

mkdir paho.mqtt.c_result/bin -p
mkdir paho.mqtt.c_result/include -p
mkdir paho.mqtt.c_result/lib -p
mkdir paho.mqtt.c_result/share/man/man1 -p

2、進入目錄,交叉編譯

cd paho.mqtt.c-1.3.13/
make  CC=gcc CFLAGS:="-I `pwd`/../ssl_result/include" LDFLAGS:="-L `pwd`/../ssl_result/lib"

編譯過程中:

Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

編譯完成:

Linux下編譯Eclipse Paho庫採用MQTT協議連線MQTT伺服器

如果是交叉編譯需要指定交叉編譯器完整名字:

cd paho.mqtt.c-1.3.13/
make  CC=arm-linux-gcc CFLAGS:="-I `pwd`/../ssl_result/include" LDFLAGS:="-L `pwd`/../ssl_result/lib"

引數介紹:

CFLAGS:=“-I `pwd`/…/ssl_result/include”:指定前面編譯的 openssl 的標頭檔案;
LDFLAGS:=“-L `pwd`/…/ssl_result/lib”:指定前面編譯的 openssl 的庫檔案路徑;

3、make install,安裝編譯結果

make install prefix=`pwd`/../paho.mqtt.c_result 
prefix=`pwd`/…/paho.mqtt.c_result :指定安裝目錄路徑;

編譯完成後,會生成目錄。

3.3 paho.mqtt.cpp 編譯

當前下載的是paho.mqtt.cpp-1.3.2.tar.gz

下載地址:github.com/eclipse/pah…

如果你下載的版本跟我的一樣,可以使用下面的指令碼進行編譯。

編譯之前,先建立一個指令碼檔案,名字為paho.mqtt.cpp_install,將下面程式碼貼上進去儲存。

#! /bin/sh
RESULT_DIR=$(pwd)/result_dir
RESULT_SSL=${RESULT_DIR}/ssl_result
RESULT_MQTT_C=${RESULT_DIR}/paho.mqtt.c_result
RESULT_MQTT_CPP=${RESULT_DIR}/paho.mqtt.cpp_result
CROSSS_COMPILE_TOOL=arm-linux-   #如果你是需要交叉編譯

編譯步驟:

1、解壓縮

tar zxf paho.mqtt.cpp-1.3.2.tar.gz

2、進入目錄

cd paho.mqtt.cpp-1.3.2/

3、執行 cmake

mkdir build_arm  
cd build_arm

編譯不需要 openssl 的庫

cmake ..  -DCMAKE_CXX_COMPILER=${CROSSS_COMPILE_TOOL}g++ \
-DCMAKE_INSTALL_PREFIX=${RESULT_MQTT_CPP} \
-DPAHO_MQTT_C_LIBRARIES=${RESULT_MQTT_C}/lib/libpaho-mqtt3a.so \
-DPAHO_MQTT_C_INCLUDE_DIRS=${RESULT_MQTT_C}/include \
-DPAHO_WITH_SSL=OFF \
-DCMAKE_CXX_FLAGS="-std=gnu++11 -mcpu=cortex-a53"

說明:如果你在PC機編譯目標是PC機本身使用,-mcpu=cortex-a53 就不用寫。 如果交叉編譯的目標是嵌入式晶片,就如實寫構架。

編譯帶有 openssl 的庫

cmake ..  -DCMAKE_CXX_COMPILER=${CROSSS_COMPILE_TOOL}g++ \
-DCMAKE_INSTALL_PREFIX=${RESULT_MQTT_CPP} \
-DPAHO_MQTT_C_LIBRARIES=${RESULT_MQTT_C}/lib/libpaho-mqtt3a.so \
-DPAHO_MQTT_C_INCLUDE_DIRS=${RESULT_MQTT_C}/include \
-DOPENSSL_SSL_LIBRARY=${RESULT_SSL}/lib/libssl.so  \
-DOPENSSL_INCLUDE_DIR=${RESULT_SSL}/include  \
-DOPENSSL_CRYPTO_LIBRARY=${RESULT_SSL}/lib/libcrypto.so \
-DCMAKE_CXX_FLAGS="-std=gnu++11 -mcpu=cortex-a53"

說明:如果你在PC機編譯目標是PC機本身使用,-mcpu=cortex-a53 就不用寫。 如果交叉編譯的目標是嵌入式晶片,就如實寫構架。

4、編譯

make && make install

5、將下載的原始碼包 paho.mqtt.cpp-1.3.2.tar.gz 和 上面儲存的指令碼paho.mqtt.cpp_install 放到同一目錄,並且將前面編譯好的openssl庫、paho.mqtt.c庫放在指令碼指定的結果目錄,當前是放到 result_dir 目錄的。

6、執行./paho.mqtt.cpp_install.sh 編譯,編譯完成後,在result_dir目錄下會生成一個名為paho.mqtt.cpp_result的目錄。

四、程式碼案例

4.1 訂閱—async_subscribe.cpp

這是使用了 libpaho-mqttpp3.so 進行訂閱訊息的原始碼,原始碼路徑在原始碼的這個路徑:paho.mqtt.cpp-1.3.2/src/samples/async_subscribe.cpp,只更改了伺服器地址。

完整程式碼如下:

#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include "mqtt/async_client.h"

const std::string SERVER_ADDRESS("117.78.5.125:1883");
const std::string CLIENT_ID("paho_cpp_async_subcribe");
const std::string TOPIC("hello");

const int	QOS = 1;
const int	N_RETRY_ATTEMPTS = 5;

/

// Callbacks for the success or failures of requested actions.
// This could be used to initiate further action, but here we just log the
// results to the console.

class action_listener : public virtual mqtt::iaction_listener
{
	std::string name_;

	void on_failure(const mqtt::token& tok) override {
		std::cout << name_ << " failure";
		if (tok.get_message_id() != 0)
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
		std::cout << std::endl;
	}

	void on_success(const mqtt::token& tok) override {
		std::cout << name_ << " success";
		if (tok.get_message_id() != 0)
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
		auto top = tok.get_topics();
		if (top && !top->empty())
			std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
		std::cout << std::endl;
	}

public:
	action_listener(const std::string& name) : name_(name) {}
};

/

/**
 * Local callback & listener class for use with the client connection.
 * This is primarily intended to receive messages, but it will also monitor
 * the connection to the broker. If the connection is lost, it will attempt
 * to restore the connection and re-subscribe to the topic.
 */
class callback : public virtual mqtt::callback,
					public virtual mqtt::iaction_listener

{
	// Counter for the number of connection retries
	int nretry_;
	// The MQTT client
	mqtt::async_client& cli_;
	// Options to use if we need to reconnect
	mqtt::connect_options& connOpts_;
	// An action listener to display the result of actions.
	action_listener subListener_;

	// This deomonstrates manually reconnecting to the broker by calling
	// connect() again. This is a possibility for an application that keeps
	// a copy of it's original connect_options, or if the app wants to
	// reconnect with different options.
	// Another way this can be done manually, if using the same options, is
	// to just call the async_client::reconnect() method.
	void reconnect() {
		std::this_thread::sleep_for(std::chrono::milliseconds(2500));
		try {
			cli_.connect(connOpts_, nullptr, *this);
		}
		catch (const mqtt::exception& exc) {
			std::cerr << "Error: " << exc.what() << std::endl;
			exit(1);
		}
	}

	// Re-connection failure
	void on_failure(const mqtt::token& tok) override {
		std::cout << "Connection attempt failed" << std::endl;
		if (++nretry_ > N_RETRY_ATTEMPTS)
			exit(1);
		reconnect();
	}

	// (Re)connection success
	// Either this or connected() can be used for callbacks.
	void on_success(const mqtt::token& tok) override {}

	// (Re)connection success
	void connected(const std::string& cause) override {
		std::cout << "\nConnection success" << std::endl;
		std::cout << "\nSubscribing to topic '" << TOPIC << "'\n"
			<< "\tfor client " << CLIENT_ID
			<< " using QoS" << QOS << "\n"
			<< "\nPress Q<Enter> to quit\n" << std::endl;

		cli_.subscribe(TOPIC, QOS, nullptr, subListener_);
	}

	// Callback for when the connection is lost.
	// This will initiate the attempt to manually reconnect.
	void connection_lost(const std::string& cause) override {
		std::cout << "\nConnection lost" << std::endl;
		if (!cause.empty())
			std::cout << "\tcause: " << cause << std::endl;

		std::cout << "Reconnecting..." << std::endl;
		nretry_ = 0;
		reconnect();
	}

	// Callback for when a message arrives.
	void message_arrived(mqtt::const_message_ptr msg) override {
		std::cout << "Message arrived" << std::endl;
		std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
		std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
	}

	void delivery_complete(mqtt::delivery_token_ptr token) override {}

public:
	callback(mqtt::async_client& cli, mqtt::connect_options& connOpts)
				: nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription") {}
};

/

int main(int argc, char* argv[])
{
	// A subscriber often wants the server to remember its messages when its
	// disconnected. In that case, it needs a unique ClientID and a
	// non-clean session.

	mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);

	mqtt::connect_options connOpts;
	connOpts.set_clean_session(false);

	// Install the callback(s) before connecting.
	callback cb(cli, connOpts);
	cli.set_callback(cb);

	// Start the connection.
	// When completed, the callback will subscribe to topic.

	try {
		std::cout << "Connecting to the MQTT server..." << std::flush;
		cli.connect(connOpts, nullptr, cb);
	}
	catch (const mqtt::exception& exc) {
		std::cerr << "\nERROR: Unable to connect to MQTT server: '"
			<< SERVER_ADDRESS << "'" << exc << std::endl;
		return 1;
	}

	// Just block till user tells us to quit.

	while (std::tolower(std::cin.get()) != 'q')
		;

	// Disconnect

	try {
		std::cout << "\nDisconnecting from the MQTT server..." << std::flush;
		cli.disconnect()->wait();
		std::cout << "OK" << std::endl;
	}
	catch (const mqtt::exception& exc) {
		std::cerr << exc << std::endl;
		return 1;
	}

 	return 0;
}

編譯指令:

g++ async_subscribe.cpp -I result_dir/paho.mqtt.cpp_result/include/ -I result_dir/paho.mqtt.c_result/include/ -L result_dir/paho.mqtt.cpp_result/lib/ -L result_dir/paho.mqtt.c_result/lib/ -l paho-mqttpp3 -l paho-mqtt3a

4.2 釋出——async_publish.cpp

這是使用了 libpaho-mqttpp3.so 進行釋出訊息的原始碼,原始碼路徑在原始碼的這個路徑:paho.mqtt.cpp-1.3.2/src/samples/async_publish.cpp,只更改了伺服器地址。

完整程式碼如下:

#include <iostream>
#include <cstdlib>
#include <string>
#include <thread>
#include <atomic>
#include <chrono>
#include <cstring>
#include "mqtt/async_client.h"

using namespace std;

const string DFLT_SERVER_ADDRESS	{ "117.78.5.125:1883" };
const string CLIENT_ID				{ "paho_cpp_async_publish" };
const string PERSIST_DIR			{ "./persist" };

const string TOPIC { "hello" };

const char* PAYLOAD1 = "Hello World!";
const char* PAYLOAD2 = "Hi there!";
const char* PAYLOAD3 = "Is anyone listening?";
const char* PAYLOAD4 = "Someone is always listening.";

const char* LWT_PAYLOAD = "Last will and testament.";

const int  QOS = 1;

const auto TIMEOUT = std::chrono::seconds(10);

/

/**
 * A callback class for use with the main MQTT client.
 */
class callback : public virtual mqtt::callback
{
public:
	void connection_lost(const string& cause) override {
		cout << "\nConnection lost" << endl;
		if (!cause.empty())
			cout << "\tcause: " << cause << endl;
	}

	void delivery_complete(mqtt::delivery_token_ptr tok) override {
		cout << "\tDelivery complete for token: "
			<< (tok ? tok->get_message_id() : -1) << endl;
	}
};

/

/**
 * A base action listener.
 */
class action_listener : public virtual mqtt::iaction_listener
{
protected:
	void on_failure(const mqtt::token& tok) override {
		cout << "\tListener failure for token: "
			<< tok.get_message_id() << endl;
	}

	void on_success(const mqtt::token& tok) override {
		cout << "\tListener success for token: "
			<< tok.get_message_id() << endl;
	}
};

/

/**
 * A derived action listener for publish events.
 */
class delivery_action_listener : public action_listener
{
	atomic<bool> done_;

	void on_failure(const mqtt::token& tok) override {
		action_listener::on_failure(tok);
		done_ = true;
	}

	void on_success(const mqtt::token& tok) override {
		action_listener::on_success(tok);
		done_ = true;
	}

public:
	delivery_action_listener() : done_(false) {}
	bool is_done() const { return done_; }
};

/

int main(int argc, char* argv[])
{
	// A client that just publishes normally doesn't need a persistent
	// session or Client ID unless it's using persistence, then the local
	// library requires an ID to identify the persistence files.

	string	address  = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS,
			clientID = (argc > 2) ? string(argv[2]) : CLIENT_ID;

	cout << "Initializing for server '" << address << "'..." << endl;
	mqtt::async_client client(address, clientID, PERSIST_DIR);

	callback cb;
	client.set_callback(cb);

	auto connOpts = mqtt::connect_options_builder()
		.clean_session()
		.will(mqtt::message(TOPIC, LWT_PAYLOAD, strlen(LWT_PAYLOAD), QOS, false))
		.finalize();

	cout << "  ...OK" << endl;

	try {
		cout << "\nConnecting..." << endl;
		mqtt::token_ptr conntok = client.connect(connOpts);
		cout << "Waiting for the connection..." << endl;
		conntok->wait();
		cout << "  ...OK" << endl;

		// First use a message pointer.

		cout << "\nSending message..." << endl;
		mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, PAYLOAD1);
		pubmsg->set_qos(QOS);
		client.publish(pubmsg)->wait_for(TIMEOUT);
		cout << "  ...OK" << endl;

		// Now try with itemized publish.

		cout << "\nSending next message..." << endl;
		mqtt::delivery_token_ptr pubtok;
		pubtok = client.publish(TOPIC, PAYLOAD2, strlen(PAYLOAD2), QOS, false);
		cout << "  ...with token: " << pubtok->get_message_id() << endl;
		cout << "  ...for message with " << pubtok->get_message()->get_payload().size()
			<< " bytes" << endl;
		pubtok->wait_for(TIMEOUT);
		cout << "  ...OK" << endl;

		// Now try with a listener

		cout << "\nSending next message..." << endl;
		action_listener listener;
		pubmsg = mqtt::make_message(TOPIC, PAYLOAD3);
		pubtok = client.publish(pubmsg, nullptr, listener);
		pubtok->wait();
		cout << "  ...OK" << endl;

		// Finally try with a listener, but no token

		cout << "\nSending final message..." << endl;
		delivery_action_listener deliveryListener;
		pubmsg = mqtt::make_message(TOPIC, PAYLOAD4);
		client.publish(pubmsg, nullptr, deliveryListener);

		while (!deliveryListener.is_done()) {
			this_thread::sleep_for(std::chrono::milliseconds(100));
		}
		cout << "OK" << endl;

		// Double check that there are no pending tokens

		auto toks = client.get_pending_delivery_tokens();
		if (!toks.empty())
			cout << "Error: There are pending delivery tokens!" << endl;

		// Disconnect
		cout << "\nDisconnecting..." << endl;
		client.disconnect()->wait();
		cout << "  ...OK" << endl;
	}
	catch (const mqtt::exception& exc) {
		cerr << exc.what() << endl;
		return 1;
	}

 	return 0;
}

編譯指令:

g++ async_publish.cpp -I result_dir/paho.mqtt.cpp_result/include/ -I result_dir/paho.mqtt.c_result/include/ -L result_dir/paho.mqtt.cpp_result/lib/ -L result_dir/paho.mqtt.c_result/lib/ -l paho-mqttpp3 -l paho-mqtt3a -o async_publish

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.