一、前言
隨著物聯網技術的快速發(fā)展,越來越多的設備和系統需要通過網絡進行連接和數據交換,以實現智能化管理和控制。華為云物聯網平臺作為業(yè)界領先的物聯網解決方案提供商,提供了穩(wěn)定可靠的MQTT服務器,使得設備能夠輕松接入云端,實現數據的實時上傳和下發(fā)。
當前決定開發(fā)一個基于Python的MQTT客戶端項目,利用paho-mqtt庫與華為云物聯網MQTT服務器進行通信。該項目實現設備數據的定時上傳功能,確保設備狀態(tài)和數據能夠及時被云端系統獲取并處理。同時,項目還具備接收服務器下發(fā)消息的能力,使得設備能夠根據云端指令進行相應的操作或響應。
通過本項目,能夠構建一個高效、穩(wěn)定的物聯網通信框架,為設備的遠程監(jiān)控、控制和管理提供有力支持。通過定時上傳數據,可以實時了解設備的運行狀態(tài)、環(huán)境參數等關鍵信息,從而及時發(fā)現并解決潛在問題。同時,接收服務器下發(fā)消息的功能也使得設備能夠靈活響應云端指令,實現更加智能化的控制和管理。
本項目通過Python的paho-mqtt庫與華為云物聯網MQTT服務器進行通信,實現設備數據的定時上傳和服務器消息的接收處理,為物聯網設備的遠程監(jiān)控、控制和管理提供可靠的技術支持。
二、Python代碼編寫
2.1 安裝Paho MQTT庫
打開自己電腦的Python安裝目錄。
如果不知道自己的Python安裝目錄在哪里。
可以使用 which
或 where
命令
- 在Linux或macOS上,打開終端并輸入:
which python3
- 在Windows上,打開命令提示符并輸入:
where python
這些命令會顯示Python可執(zhí)行文件的路徑。
例如:
C:Users11266>where python
C:Users11266AppDataLocalProgramsPythonPython311python.exe
C:Users11266AppDataLocalMicrosoftWindowsAppspython.exe
找到pip.exe文件的路徑。
在文件路徑里輸入cmd
按下回車鍵,打開命令行。
在命令行輸入pip install paho-mqtt
安裝 paho-mqtt
庫。
例如:
C:Users11266AppDataLocalProgramsPythonPython311Scripts>pip install paho-mqtt
Collecting paho-mqtt
Downloading paho_mqtt-2.1.0-py3-none-any.whl (67 kB)
---------------------------------------- 67.2/67.2 kB 1.2 MB/s eta 0:00:00
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-2.1.0
[notice] A new release of pip is available: 23.1.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip
C:Users11266AppDataLocalProgramsPythonPython311Scripts>
2.2 paho-mqtt庫介紹
paho-mqtt
是一個Python客戶端庫,用于與MQTT代理服務器進行通信。MQTT(Message Queuing Telemetry Transport)是一種輕量級的、發(fā)布-訂閱模式的消息傳輸協議,常用于物聯網(IoT)應用和實時數據傳輸。paho-mqtt
庫提供了在Python中實現MQTT客戶端的功能,使你可以連接到MQTT代理服務器、訂閱主題、發(fā)布消息等。
特性
- 支持MQTT 3.1和3.1.1版本的協議規(guī)范。
- 提供同步和異步的消息發(fā)布和訂閱功能。
- 支持TLS/SSL加密連接,以確保安全的通信。
- 具有遺囑消息和保持活動功能,以增強連接的穩(wěn)定性。
- 可以設置用戶名和密碼進行連接認證。
- 提供靈活的回調函數機制,用于處理連接、訂閱和接收消息等事件。
安裝
可以使用pip來安裝 paho-mqtt
庫:
pip install paho-mqtt
總結
paho-mqtt
庫提供了一個方便的方式來在Python應用程序中實現MQTT客戶端功能。它支持多種MQTT特性,并且易于使用。通過這個庫,你可以輕松地構建與MQTT代理服務器進行通信的應用程序,從而實現實時數據傳輸、遠程控制和監(jiān)控等功能。
2.3 我的華為云MQTT服務器信息
MQTT服務器IP地址: 117.78.5.125
MQTT端口號: 1883
客戶端ID:65ec636771d845632aff9496_dev1_0_0_2024052901
用戶名:65ec636771d845632aff9496_dev1
登錄密碼:a6e312275a031e7629e3133fefeac555dbce6dc06b56c039dd8a224084ee5b44
訂閱主題:$oc/devices/65ec636771d845632aff9496_dev1/sys/messages/down
發(fā)布主題:$oc/devices/65ec636771d845632aff9496_dev1/sys/properties/report
發(fā)布的消息:{"services": [{"service_id": "stm32","properties":{"MQ135":60,"DHT11_T":24,"DHT11_H":60,"SOIL_H":50,"motor":1,"FLAME":0,"GPS":{"lon":120.21,"lat":30.19}}}]}
2.4 實現代碼
import paho.mqtt.client as mqtt
import json
import time
from threading import Timer
# MQTT 配置信息
MQTT_SERVER = "117.78.5.125"
MQTT_PORT = 1883
CLIENT_ID = "65ec636771d845632aff9496_dev1_0_0_2024052901"
USERNAME = "65ec636771d845632aff9496_dev1"
PASSWORD = "a6e312275a031e7629e3133fefeac555dbce6dc06b56c039dd8a224084ee5b44"
SUBSCRIBE_TOPIC = "$oc/devices/65ec636771d845632aff9496_dev1/sys/messages/down"
PUBLISH_TOPIC = "$oc/devices/65ec636771d845632aff9496_dev1/sys/properties/report"
PUBLISH_INTERVAL = 10 # 定時器間隔,單位:秒
# 要發(fā)布的消息內容
message_payload = {
"services": [
{
"service_id": "stm32",
"properties": {
"MQ135": 60,
"DHT11_T": 24,
"DHT11_H": 60,
"SOIL_H": 50,
"motor": 1,
"FLAME": 0,
"GPS": {
"lon": 120.21,
"lat": 30.19
}
}
}
]
}
# 回調函數 - 當連接到服務器時被調用
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe(SUBSCRIBE_TOPIC)
# 回調函數 - 當收到服務器下發(fā)的消息時被調用
def on_message(client, userdata, msg):
print(f"Received message from topic '{msg.topic}': {msg.payload.decode()}")
# 定時發(fā)布消息
def publish_message():
client.publish(PUBLISH_TOPIC, json.dumps(message_payload))
print(f"Published message: {json.dumps(message_payload)}")
# 設置定時器,以繼續(xù)定期發(fā)布消息
Timer(PUBLISH_INTERVAL, publish_message).start()
# 創(chuàng)建MQTT客戶端并設置回調函數
client = mqtt.Client(CLIENT_ID, protocol=mqtt.MQTTv311)
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
# 連接到MQTT服務器
client.connect(MQTT_SERVER, MQTT_PORT, 60)
# 啟動網絡循環(huán)
client.loop_start()
# 啟動定時發(fā)布消息
publish_message()
# 主線程保持運行狀態(tài)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Disconnecting from MQTT server...")
client.loop_stop()
client.disconnect()
代碼說明
- 配置部分:設置了MQTT服務器信息、客戶端ID、用戶名、密碼、訂閱主題和發(fā)布主題等。
- 回調函數:
on_connect
: 在連接成功后訂閱指定的主題。on_message
: 收到消息時打印消息內容。
- 定時發(fā)布消息:使用
Timer
類定時發(fā)布消息到指定的主題。 - 主程序:創(chuàng)建并配置MQTT客戶端,連接到MQTT服務器,啟動網絡循環(huán)和定時發(fā)布消息的功能。
運行代碼
運行此代碼后,將:
- 連接到指定的MQTT服務器。
- 訂閱
$oc/devices/65ec636771d845632aff9496_dev1/sys/messages/down
主題。 - 每10秒鐘向
$oc/devices/65ec636771d845632aff9496_dev1/sys/properties/report
主題發(fā)布一次消息。 - 打印所有從服務器接收到的消息。