Skip to content

MQTT5 support. #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions src/MQTTClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ inline lwmqtt_err_t lwmqtt_arduino_network_write(void *ref, uint8_t *buffer, siz
}

static void MQTTClientHandler(lwmqtt_client_t * /*client*/, void *ref, lwmqtt_string_t topic,
lwmqtt_message_t message) {
lwmqtt_message_t message, lwmqtt_serialized_properties_t props) {
// get callback
auto cb = (MQTTClientCallback *)ref;

Expand All @@ -72,7 +72,7 @@ static void MQTTClientHandler(lwmqtt_client_t * /*client*/, void *ref, lwmqtt_st

// call the advanced callback and return if available
if (cb->advanced != nullptr) {
cb->advanced(cb->client, terminated_topic, (char *)message.payload, (int)message.payload_len);
cb->advanced(cb->client, terminated_topic, (char *)message.payload, (int)message.payload_len, props);
return;
}

Expand Down Expand Up @@ -118,7 +118,7 @@ MQTTClient::~MQTTClient() {
free(this->writeBuf);
}

void MQTTClient::begin(const char hostname[], int port, Client &client) {
void MQTTClient::begin(const char hostname[], int port, Client &client, lwmqtt_protocol_t protocol) {
// set hostname and port
this->setHost(hostname, port);

Expand All @@ -127,6 +127,7 @@ void MQTTClient::begin(const char hostname[], int port, Client &client) {

// initialize client
lwmqtt_init(&this->client, this->writeBuf, this->bufSize, this->readBuf, this->bufSize);
lwmqtt_set_protocol(&this->client, protocol);

// set timers
lwmqtt_set_timers(&this->client, &this->timer1, &this->timer2, lwmqtt_arduino_timer_set, lwmqtt_arduino_timer_get);
Expand Down Expand Up @@ -222,7 +223,8 @@ void MQTTClient::setOptions(int keepAlive, bool cleanSession, int timeout) {
this->timeout = (uint32_t)timeout;
}

bool MQTTClient::publish(const char topic[], const char payload[], int length, bool retained, int qos) {
bool MQTTClient::publish(const char topic[], const char payload[], int length, bool retained, int qos,
lwmqtt_properties_t props) {
// return immediately if not connected
if (!this->connected()) {
return false;
Expand All @@ -236,7 +238,7 @@ bool MQTTClient::publish(const char topic[], const char payload[], int length, b
message.qos = lwmqtt_qos_t(qos);

// publish message
this->_lastError = lwmqtt_publish(&this->client, lwmqtt_string(topic), message, this->timeout);
this->_lastError = lwmqtt_publish(&this->client, lwmqtt_string(topic), message, props, this->timeout);
if (this->_lastError != LWMQTT_SUCCESS) {
// close connection
this->close();
Expand Down Expand Up @@ -294,14 +296,15 @@ bool MQTTClient::connect(const char clientId[], const char username[], const cha
return true;
}

bool MQTTClient::subscribe(const char topic[], int qos) {
bool MQTTClient::subscribe(const char topic[], int qos, lwmqtt_properties_t props) {
// return immediately if not connected
if (!this->connected()) {
return false;
}

// subscribe to topic
this->_lastError = lwmqtt_subscribe_one(&this->client, lwmqtt_string(topic), (lwmqtt_qos_t)qos, this->timeout);
lwmqtt_sub_options_t subopts = {.qos = (lwmqtt_qos_t)qos};
this->_lastError = lwmqtt_subscribe_one(&this->client, lwmqtt_string(topic), subopts, props, this->timeout);
if (this->_lastError != LWMQTT_SUCCESS) {
// close connection
this->close();
Expand All @@ -312,14 +315,14 @@ bool MQTTClient::subscribe(const char topic[], int qos) {
return true;
}

bool MQTTClient::unsubscribe(const char topic[]) {
bool MQTTClient::unsubscribe(const char topic[], lwmqtt_properties_t props) {
// return immediately if not connected
if (!this->connected()) {
return false;
}

// unsubscribe from topic
this->_lastError = lwmqtt_unsubscribe_one(&this->client, lwmqtt_string(topic), this->timeout);
this->_lastError = lwmqtt_unsubscribe_one(&this->client, lwmqtt_string(topic), props, this->timeout);
if (this->_lastError != LWMQTT_SUCCESS) {
// close connection
this->close();
Expand Down Expand Up @@ -368,14 +371,14 @@ bool MQTTClient::connected() {
return this->netClient != nullptr && this->netClient->connected() == 1 && this->_connected;
}

bool MQTTClient::disconnect() {
bool MQTTClient::disconnect(uint8_t reason, lwmqtt_properties_t props) {
// return immediately if not connected anymore
if (!this->connected()) {
return false;
}

// cleanly disconnect
this->_lastError = lwmqtt_disconnect(&this->client, this->timeout);
this->_lastError = lwmqtt_disconnect(&this->client, reason, props, this->timeout);

// close
this->close();
Expand Down
40 changes: 32 additions & 8 deletions src/MQTTClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ typedef struct {
class MQTTClient;

typedef void (*MQTTClientCallbackSimple)(String &topic, String &payload);
typedef void (*MQTTClientCallbackAdvanced)(MQTTClient *client, char topic[], char bytes[], int length);
typedef void (*MQTTClientCallbackAdvanced)(MQTTClient *client, char topic[], char bytes[], int length,
lwmqtt_serialized_properties_t props);

typedef struct {
MQTTClient *client = nullptr;
Expand All @@ -50,7 +51,7 @@ class MQTTClient {
lwmqtt_arduino_network_t network = {nullptr};
lwmqtt_arduino_timer_t timer1 = {0, nullptr};
lwmqtt_arduino_timer_t timer2 = {0, nullptr};
lwmqtt_client_t client = {0};
lwmqtt_client_t client = {LWMQTT_MQTT311, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};

bool _connected = false;
lwmqtt_return_code_t _returnCode = (lwmqtt_return_code_t)0;
Expand All @@ -61,8 +62,9 @@ class MQTTClient {

~MQTTClient();

void begin(const char hostname[], Client &client) { this->begin(hostname, 1883, client); }
void begin(const char hostname[], int port, Client &client);
void begin(const char hostname[], Client &client) { this->begin(hostname, 1883, client, LWMQTT_MQTT311); }
void begin(const char hostname[], int port, Client &client) { this->begin(hostname, port, client, LWMQTT_MQTT311); }
void begin(const char hostname[], int port, Client &client, lwmqtt_protocol_t protocol);

void onMessage(MQTTClientCallbackSimple cb);
void onMessageAdvanced(MQTTClientCallbackAdvanced cb);
Expand Down Expand Up @@ -91,6 +93,10 @@ class MQTTClient {
bool publish(const String &topic, const String &payload, bool retained, int qos) {
return this->publish(topic.c_str(), payload.c_str(), retained, qos);
}
bool publish(const String &topic, const String &payload, bool retained, int qos, lwmqtt_properties_t props) {
return this->publish(topic.c_str(), payload.c_str(), payload.length(), retained, qos, props);
}

bool publish(const char topic[], const String &payload) { return this->publish(topic, payload.c_str()); }
bool publish(const char topic[], const String &payload, bool retained, int qos) {
return this->publish(topic, payload.c_str(), retained, qos);
Expand All @@ -104,23 +110,41 @@ class MQTTClient {
bool publish(const char topic[], const char payload[], int length) {
return this->publish(topic, payload, length, false, 0);
}
bool publish(const char topic[], const char payload[], int length, bool retained, int qos);
bool publish(const char topic[], const char payload[], int length, bool retained, int qos) {
lwmqtt_properties_t props = lwmqtt_empty_props;
return this->publish(topic, payload, length, retained, qos, props);
}

bool publish(const char topic[], const char payload[], int length, bool retained, int qos, lwmqtt_properties_t props);

bool subscribe(const String &topic) { return this->subscribe(topic.c_str()); }
bool subscribe(const String &topic, int qos) { return this->subscribe(topic.c_str(), qos); }
bool subscribe(const char topic[]) { return this->subscribe(topic, 0); }
bool subscribe(const char topic[], int qos);
bool subscribe(const char topic[], int qos) {
lwmqtt_properties_t props = lwmqtt_empty_props;
return this->subscribe(topic, qos, props);
}
bool subscribe(const char topic[], int qos, lwmqtt_properties_t props);

bool unsubscribe(const String &topic) { return this->unsubscribe(topic.c_str()); }
bool unsubscribe(const char topic[]);
bool unsubscribe(const char topic[]) {
lwmqtt_properties_t props = lwmqtt_empty_props;
return this->unsubscribe(topic, props);
}
bool unsubscribe(const char topic[], lwmqtt_properties_t props);

bool loop();
bool connected();

lwmqtt_err_t lastError() { return this->_lastError; }
lwmqtt_return_code_t returnCode() { return this->_returnCode; }

bool disconnect();
bool disconnect() {
lwmqtt_properties_t props = lwmqtt_empty_props;
return this->disconnect(0, props);
}

bool disconnect(uint8_t reason, lwmqtt_properties_t props);

private:
void close();
Expand Down
Loading