@@ -171,6 +171,98 @@ int MqttClient::messageRetain() const
171171 return -1 ;
172172}
173173
174+ void MqttClient::setClient (arduino::Client* client) {
175+ if (_client != nullptr && _client->connected ()) {
176+ // TODO if the current client is connected we cannot perform the change, first call disconnect
177+ return ;
178+ }
179+
180+ _client = client;
181+ }
182+
183+ void MqttClient::setReceiveCallback (MqttReceiveCallback cbk) {
184+ _cbk = cbk;
185+ }
186+
187+ class MqttReadStream : public IStream {
188+ public:
189+ MqttReadStream (MqttClient& ref, int available)
190+ : ref(ref), _available(available) { }
191+
192+ size_t readBytes (uint8_t * buf, size_t s) override {
193+ size_t to_read = s < _available ? s : _available;
194+ to_read = ref.readBytes (buf, to_read);
195+ _available -= to_read;
196+ return to_read;
197+ }
198+
199+ int available () override { return _available; }
200+
201+ int read () override {
202+ if (_available > 0 ) {
203+ _available--;
204+ return ref.read ();
205+ } else {
206+ return -1 ; // TODO return proper error code
207+ }
208+ }
209+ private:
210+ MqttClient& ref;
211+ int _available;
212+ };
213+
214+ class ArduinoMqttOStream : public MqttOStream {
215+ public:
216+ // TODO change pointer to reference, since it won't change
217+ ArduinoMqttOStream (MqttClient &ref, error_t err=0 )
218+ : MqttOStream(err), ref(ref) { }// TODO replace err default value with success
219+
220+ ~ArduinoMqttOStream () {
221+ ref.endMessage ();
222+ }
223+
224+ size_t write (uint8_t a) override {
225+ if (rc == 1 ) {
226+ return ref.write (a);
227+ }
228+ return 0 ;
229+ }
230+
231+ size_t write (const uint8_t *buffer, size_t size) override {
232+ if (rc == 1 ) {
233+ return ref.write (buffer, size);
234+ }
235+ return 0 ;
236+ }
237+
238+ int availableForWrite () override { return 0 ; }
239+
240+ private:
241+ MqttClient& ref;
242+ };
243+
244+
245+ error_t MqttClient::publish (Topic t, uint8_t payload[], size_t size, MqttQos qos, MqttPublishFlag flags) {
246+ int error = this ->beginMessage (t, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled);
247+
248+ if (error == 0 ) { // TODO replace this with a proper enum value
249+ return error;
250+ }
251+
252+ int res = this ->write (payload, size);
253+ this ->endMessage ();
254+
255+ return res;
256+ }
257+
258+ MqttOStream&& MqttClient::publish(Topic t, MqttQos qos, MqttPublishFlag flags) {
259+ int error = this ->beginMessage (
260+ t, (flags & RetainEnabled) == RetainEnabled,
261+ static_cast <uint8_t >(qos), (flags & DupEnabled) == DupEnabled);
262+
263+ return std::move (ArduinoMqttOStream (*this , error));
264+ }
265+
174266int MqttClient::beginMessage (const char * topic, unsigned long size, bool retain, uint8_t qos, bool dup)
175267{
176268 _txMessageTopic = topic;
@@ -259,6 +351,20 @@ int MqttClient::endMessage()
259351 return 1 ;
260352}
261353
354+ void MqttClient::setWill (
355+ Topic willTopic, const uint8_t * will_message, size_t will_size, MqttQos qos, MqttPublishFlag flags) {
356+ int error = this ->beginWill (willTopic, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled);
357+
358+ if (error == 0 ) { // TODO replace this with a proper enum value
359+ return ;
360+ }
361+
362+ int res = this ->write (will_message, will_size);
363+ this ->endWill ();
364+
365+ return ;
366+ }
367+
262368int MqttClient::beginWill (const char * topic, unsigned short size, bool retain, uint8_t qos)
263369{
264370 int topicLength = strlen (topic);
@@ -314,7 +420,7 @@ int MqttClient::endWill()
314420 return 1 ;
315421}
316422
317- int MqttClient::subscribe (const char * topic, uint8_t qos)
423+ error_t MqttClient::subscribe (Topic topic, MqttQos qos)
318424{
319425 int topicLength = strlen (topic);
320426 int remainingLength = topicLength + 5 ;
@@ -362,12 +468,12 @@ int MqttClient::subscribe(const char* topic, uint8_t qos)
362468 return 0 ;
363469}
364470
365- int MqttClient::subscribe (const String& topic, uint8_t qos)
471+ error_t MqttClient::subscribe (const String& topic, MqttQos qos)
366472{
367473 return subscribe (topic.c_str (), qos);
368474}
369475
370- int MqttClient::unsubscribe (const char * topic)
476+ error_t MqttClient::unsubscribe (Topic topic)
371477{
372478 int topicLength = strlen (topic);
373479 int remainingLength = topicLength + 4 ;
@@ -565,16 +671,19 @@ void MqttClient::poll()
565671 } else {
566672 _rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
567673
568- if (_onMessage) {
674+ if (_cbk) {
675+ MqttReadStream stream (*this , _rxLength);
676+ _cbk (_rxMessageTopic.c_str (), stream);
677+ } else if (_onMessage) {
569678#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
570679 _onMessage (this ,_rxLength);
571680#else
572681 _onMessage (_rxLength);
573682#endif
683+ }
574684
575- if (_rxLength == 0 ) {
576- _rxState = MQTT_CLIENT_RX_STATE_READ_TYPE;
577- }
685+ if ((_onMessage || _cbk) && _rxLength == 0 ) {
686+ _rxState = MQTT_CLIENT_RX_STATE_READ_TYPE;
578687 }
579688 }
580689 }
@@ -592,7 +701,10 @@ void MqttClient::poll()
592701
593702 _rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
594703
595- if (_onMessage) {
704+ if (_cbk) {
705+ MqttReadStream stream (*this , _rxLength);
706+ _cbk (_rxMessageTopic.c_str (), stream);
707+ } else if (_onMessage) {
596708#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
597709 _onMessage (this ,_rxLength);
598710#else
@@ -647,12 +759,12 @@ void MqttClient::poll()
647759 }
648760}
649761
650- int MqttClient::connect (IPAddress ip, uint16_t port)
762+ error_t MqttClient::connect (IPAddress ip, uint16_t port)
651763{
652764 return connect (ip, NULL , port);
653765}
654766
655- int MqttClient::connect (const char *host, uint16_t port)
767+ error_t MqttClient::connect (const char *host, uint16_t port)
656768{
657769 return connect ((uint32_t )0 , host, port);
658770}
@@ -833,7 +945,7 @@ int MqttClient::subscribeQoS() const
833945 return _subscribeQos;
834946}
835947
836- int MqttClient::connect (IPAddress ip, const char * host, uint16_t port)
948+ error_t MqttClient::connect (IPAddress ip, const char * host, uint16_t port)
837949{
838950 if (clientConnected ()) {
839951 _client->stop ();
@@ -1041,7 +1153,7 @@ void MqttClient::pubcomp(uint16_t id)
10411153 endPacket ();
10421154}
10431155
1044- void MqttClient::ping ()
1156+ error_t MqttClient::ping ()
10451157{
10461158 uint8_t packetBuffer[2 ];
10471159
0 commit comments