2020sys .path .append ("../lib/" )
2121import ssl
2222import time
23- import paho .mqtt .client as mqtt
23+ import thread
24+ import protocol .paho .client as mqtt
2425from threading import Lock
2526from exception .AWSIoTExceptions import connectError
2627from exception .AWSIoTExceptions import connectTimeoutException
@@ -45,6 +46,10 @@ class mqttCore:
4546 _unsubscribeSent = False
4647 _connectdisconnectTimeout = 0 # Default connect/disconnect timeout set to 0 second
4748 _mqttOperationTimeout = 0 # Default MQTT operation timeout set to 0 second
49+ # Use websocket
50+ _useWebsocket = False
51+ # Subscribe record
52+ _subscribePool = dict ()
4853 # Broker information
4954 _host = ""
5055 _port = - 1
@@ -76,13 +81,25 @@ def getMQTTOperationTimeoutSecond(self):
7681 def setUserData (self , srcUserData ):
7782 self ._pahoClient .user_data_set (srcUserData )
7883
79- def createPahoClient (self , clientID , cleanSession , userdata , protocol ):
80- return mqtt .Client (clientID , cleanSession , userdata , protocol ) # Throw exception when error happens
84+ def createPahoClient (self , clientID , cleanSession , userdata , protocol , useWebsocket ):
85+ return mqtt .Client (clientID , cleanSession , userdata , protocol , useWebsocket ) # Throw exception when error happens
86+
87+ def _doResubscribe (self ):
88+ if self ._connectResultCode == 0 : # If this is a successful connect...
89+ for key in self ._subscribePool .keys ():
90+ qos , callback = self ._subscribePool .get (key )
91+ try :
92+ self .subscribe (key , qos , callback )
93+ except subscribeError :
94+ pass
95+ except subscribeTimeoutException :
96+ pass # Subscribe error resulted from network error, will redo subscription in the next re-connect
8197
8298 # Callbacks
8399 def on_connect (self , client , userdata , flags , rc ):
84100 self ._disconnectResultCode = sys .maxint
85101 self ._connectResultCode = rc
102+ thread .start_new_thread (self ._doResubscribe , ())
86103 self ._log .writeLog ("Connect result code " + str (rc ))
87104
88105 def on_disconnect (self , client , userdata , rc ):
@@ -103,18 +120,19 @@ def on_message(self, client, userdata, message):
103120 self ._log .writeLog ("Received (No custom callback registered) : message: " + str (message .payload ) + " from topic: " + str (message .topic ))
104121
105122 ####### API starts here #######
106- def __init__ (self , clientID , cleanSession , protocol , srcLogManager ):
123+ def __init__ (self , clientID , cleanSession , protocol , srcLogManager , srcUseWebsocket = False ):
107124 if clientID is None or cleanSession is None or protocol is None or srcLogManager is None :
108125 raise TypeError ("None type inputs detected." )
109126 self ._log = srcLogManager
110127 self ._clientID = clientID
111- self ._pahoClient = self .createPahoClient (clientID , cleanSession , None , protocol ) # User data is set to None as default
128+ self ._pahoClient = self .createPahoClient (clientID , cleanSession , None , protocol , srcUseWebsocket ) # User data is set to None as default
112129 self ._log .writeLog ("Paho MQTT Client init." )
113130 self ._pahoClient .on_connect = self .on_connect
114131 self ._pahoClient .on_disconnect = self .on_disconnect
115132 self ._pahoClient .on_message = self .on_message
116133 self ._pahoClient .on_subscribe = self .on_subscribe
117134 self ._pahoClient .on_unsubscribe = self .on_unsubscribe
135+ self ._useWebsocket = srcUseWebsocket
118136 self ._log .writeLog ("Register Paho MQTT Client callbacks." )
119137 self ._log .writeLog ("mqttCore init." )
120138
@@ -135,7 +153,10 @@ def connect(self, keepAliveInterval=60):
135153 # Return connect succeeded/failed
136154 ret = False
137155 # TLS configuration
138- self ._pahoClient .tls_set (self ._cafile , self ._cert , self ._key , ssl .CERT_REQUIRED , ssl .PROTOCOL_SSLv23 ) # Throw exception...
156+ if self ._useWebsocket :
157+ self ._pahoClient .tls_set (ca_certs = self ._cafile , cert_reqs = ssl .CERT_REQUIRED , tls_version = ssl .PROTOCOL_SSLv23 )
158+ else :
159+ self ._pahoClient .tls_set (self ._cafile , self ._cert , self ._key , ssl .CERT_REQUIRED , ssl .PROTOCOL_SSLv23 ) # Throw exception...
139160 # Connect
140161 self ._pahoClient .connect (self ._host , self ._port , keepAliveInterval ) # Throw exception...
141162 self ._pahoClient .loop_start ()
@@ -215,6 +236,7 @@ def subscribe(self, topic, qos, callback):
215236 if (self ._subscribeSent ):
216237 ret = rc == 0
217238 if (ret ):
239+ self ._subscribePool [topic ] = (qos , callback )
218240 self ._log .writeLog ("Subscribe request " + str (mid ) + " succeeded. Time consumption: " + str (float (TenmsCount ) * 10 ) + "ms." )
219241 else :
220242 if (callback is not None ):
@@ -252,6 +274,10 @@ def unsubscribe(self, topic):
252274 if (self ._unsubscribeSent ):
253275 ret = rc == 0
254276 if (ret ):
277+ try :
278+ del self ._subscribePool [topic ]
279+ except KeyError :
280+ pass # Ignore topic that is never subscribed to
255281 self ._log .writeLog ("Unsubscribe request " + str (mid ) + " succeeded. Time consumption: " + str (float (TenmsCount ) * 10 ) + "ms." )
256282 self ._pahoClient .message_callback_remove (topic )
257283 self ._log .writeLog ("Remove the callback." )
0 commit comments