-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt_client.py
152 lines (132 loc) · 5.04 KB
/
mqtt_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import json
import logging
from typing import Optional
import json
import logging
from typing import Optional
import paho.mqtt.client as mqtt
logger = logging.getLogger(__name__)
class MQTTClient:
"""
A class to handle MQTT connections and publish JSON-wrapped messages.
"""
def __init__(
self,
server: str,
port: int = 1883,
topic: str = "default/topic",
client_id: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
keepalive: int = 60,
tls: bool = False,
tls_ca_cert: Optional[str] = None,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
):
"""
Initialize the MQTT client.
:param server: MQTT broker address.
:param port: MQTT broker port. Default is 1883.
:param topic: MQTT topic to publish messages to.
:param client_id: MQTT client ID. If None, a random ID is generated.
:param username: Username for MQTT broker authentication.
:param password: Password for MQTT broker authentication.
:param keepalive: Keepalive interval in seconds. Default is 60.
:param tls: Whether to use TLS. Default is False.
:param tls_ca_cert: Path to CA certificate file.
:param tls_certfile: Path to client certificate file.
:param tls_keyfile: Path to client key file.
"""
self.server = server
self.port = port
self.topic = topic
self.client_id = client_id or f"mqtt_client_{id(self)}"
self.username = username
self.password = password
self.keepalive = keepalive
self.tls = tls
self.tls_ca_cert = tls_ca_cert
self.tls_certfile = tls_certfile
self.tls_keyfile = tls_keyfile
self.client = mqtt.Client(client_id=self.client_id)
if self.username and self.password:
self.client.username_pw_set(self.username, self.password)
if self.tls:
if not self.tls_ca_cert:
raise ValueError("CA certificate path must be provided for TLS.")
self.client.tls_set(
ca_certs=self.tls_ca_cert,
certfile=self.tls_certfile,
keyfile=self.tls_keyfile,
)
# Assign callbacks
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_publish = self.on_publish
# Setup logging
self.logger = logger
self.logger.debug(f"Initialization parameters: {self.__dict__}")
def on_connect(self, client, userdata, flags, rc):
"""
Callback when the client connects to the broker.
:param client: The client instance.
:param userdata: Private user data.
:param flags: Response flags.
:param rc: Connection result.
"""
if rc == 0:
self.logger.info(f"Connected to MQTT Broker: {self.server}:{self.port}")
else:
self.logger.error(f"Failed to connect, return code {rc}")
def on_disconnect(self, client, userdata, rc):
"""
Callback when the client disconnects from the broker.
:param client: The client instance.
:param userdata: Private user data.
:param rc: Disconnection result.
"""
self.logger.info("Disconnected from MQTT Broker")
if rc != 0:
self.logger.warning(f"Unexpected disconnection. Return code: {rc}")
def on_publish(self, client, userdata, mid):
"""
Callback when a message is published.
:param client: The client instance.
:param userdata: Private user data.
:param mid: Message ID.
"""
self.logger.debug(f"Message {mid} published.")
def connect(self):
"""
Connect to the MQTT broker and start the network loop.
"""
self.logger.debug(f"Connecting to MQTT Broker: {self.server}:{self.port}")
try:
self.client.connect(self.server, self.port, self.keepalive)
self.client.loop_start()
except Exception as e:
self.logger.error(f"Failed to connect to MQTT Broker: {e}")
raise
def disconnect(self):
"""
Disconnect from the MQTT broker and stop the network loop.
"""
self.client.loop_stop()
self.client.disconnect()
def publish_command(self, command: str):
"""
Wrap the command in a JSON message and publish to the MQTT topic.
:param command: The command string to publish.
"""
message = {"msg": command}
payload = json.dumps(message)
try:
result = self.client.publish(self.topic, payload)
if result.rc != mqtt.MQTT_ERR_SUCCESS:
self.logger.error(f"Failed to publish message: {result}")
else:
self.logger.info(f"Published message to {self.topic}: {payload}")
except Exception as e:
self.logger.error(f"Exception during publish: {e}")
raise