diff --git a/src/MQTTClient.cpp b/src/MQTTClient.cpp index 1ac5095..6b3be86 100644 --- a/src/MQTTClient.cpp +++ b/src/MQTTClient.cpp @@ -56,7 +56,7 @@ inline lwmqtt_err_t lwmqtt_arduino_network_write(void *ref, uint8_t *buffer, siz } static void MQTTClientHandler(lwmqtt_client_t * /*client*/, void *ref, lwmqtt_string_t topic, - lwmqtt_message_t message) { + lwmqtt_message_t message, lwmqtt_serialized_properties_t props) { // get callback auto cb = (MQTTClientCallback *)ref; @@ -72,7 +72,7 @@ static void MQTTClientHandler(lwmqtt_client_t * /*client*/, void *ref, lwmqtt_st // call the advanced callback and return if available if (cb->advanced != nullptr) { - cb->advanced(cb->client, terminated_topic, (char *)message.payload, (int)message.payload_len); + cb->advanced(cb->client, terminated_topic, (char *)message.payload, (int)message.payload_len, props); return; } @@ -118,7 +118,7 @@ MQTTClient::~MQTTClient() { free(this->writeBuf); } -void MQTTClient::begin(const char hostname[], int port, Client &client) { +void MQTTClient::begin(const char hostname[], int port, Client &client, lwmqtt_protocol_t protocol) { // set hostname and port this->setHost(hostname, port); @@ -127,6 +127,7 @@ void MQTTClient::begin(const char hostname[], int port, Client &client) { // initialize client lwmqtt_init(&this->client, this->writeBuf, this->bufSize, this->readBuf, this->bufSize); + lwmqtt_set_protocol(&this->client, protocol); // set timers lwmqtt_set_timers(&this->client, &this->timer1, &this->timer2, lwmqtt_arduino_timer_set, lwmqtt_arduino_timer_get); @@ -222,7 +223,8 @@ void MQTTClient::setOptions(int keepAlive, bool cleanSession, int timeout) { this->timeout = (uint32_t)timeout; } -bool MQTTClient::publish(const char topic[], const char payload[], int length, bool retained, int qos) { +bool MQTTClient::publish(const char topic[], const char payload[], int length, bool retained, int qos, + lwmqtt_properties_t props) { // return immediately if not connected if (!this->connected()) { return false; @@ -236,7 +238,7 @@ bool MQTTClient::publish(const char topic[], const char payload[], int length, b message.qos = lwmqtt_qos_t(qos); // publish message - this->_lastError = lwmqtt_publish(&this->client, lwmqtt_string(topic), message, this->timeout); + this->_lastError = lwmqtt_publish(&this->client, lwmqtt_string(topic), message, props, this->timeout); if (this->_lastError != LWMQTT_SUCCESS) { // close connection this->close(); @@ -294,14 +296,15 @@ bool MQTTClient::connect(const char clientId[], const char username[], const cha return true; } -bool MQTTClient::subscribe(const char topic[], int qos) { +bool MQTTClient::subscribe(const char topic[], int qos, lwmqtt_properties_t props) { // return immediately if not connected if (!this->connected()) { return false; } // subscribe to topic - this->_lastError = lwmqtt_subscribe_one(&this->client, lwmqtt_string(topic), (lwmqtt_qos_t)qos, this->timeout); + lwmqtt_sub_options_t subopts = {.qos = (lwmqtt_qos_t)qos}; + this->_lastError = lwmqtt_subscribe_one(&this->client, lwmqtt_string(topic), subopts, props, this->timeout); if (this->_lastError != LWMQTT_SUCCESS) { // close connection this->close(); @@ -312,14 +315,14 @@ bool MQTTClient::subscribe(const char topic[], int qos) { return true; } -bool MQTTClient::unsubscribe(const char topic[]) { +bool MQTTClient::unsubscribe(const char topic[], lwmqtt_properties_t props) { // return immediately if not connected if (!this->connected()) { return false; } // unsubscribe from topic - this->_lastError = lwmqtt_unsubscribe_one(&this->client, lwmqtt_string(topic), this->timeout); + this->_lastError = lwmqtt_unsubscribe_one(&this->client, lwmqtt_string(topic), props, this->timeout); if (this->_lastError != LWMQTT_SUCCESS) { // close connection this->close(); @@ -368,14 +371,14 @@ bool MQTTClient::connected() { return this->netClient != nullptr && this->netClient->connected() == 1 && this->_connected; } -bool MQTTClient::disconnect() { +bool MQTTClient::disconnect(uint8_t reason, lwmqtt_properties_t props) { // return immediately if not connected anymore if (!this->connected()) { return false; } // cleanly disconnect - this->_lastError = lwmqtt_disconnect(&this->client, this->timeout); + this->_lastError = lwmqtt_disconnect(&this->client, reason, props, this->timeout); // close this->close(); diff --git a/src/MQTTClient.h b/src/MQTTClient.h index b92db19..7cf6cec 100644 --- a/src/MQTTClient.h +++ b/src/MQTTClient.h @@ -23,7 +23,8 @@ typedef struct { class MQTTClient; typedef void (*MQTTClientCallbackSimple)(String &topic, String &payload); -typedef void (*MQTTClientCallbackAdvanced)(MQTTClient *client, char topic[], char bytes[], int length); +typedef void (*MQTTClientCallbackAdvanced)(MQTTClient *client, char topic[], char bytes[], int length, + lwmqtt_serialized_properties_t props); typedef struct { MQTTClient *client = nullptr; @@ -50,7 +51,7 @@ class MQTTClient { lwmqtt_arduino_network_t network = {nullptr}; lwmqtt_arduino_timer_t timer1 = {0, nullptr}; lwmqtt_arduino_timer_t timer2 = {0, nullptr}; - lwmqtt_client_t client = {0}; + lwmqtt_client_t client = {LWMQTT_MQTT311, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; bool _connected = false; lwmqtt_return_code_t _returnCode = (lwmqtt_return_code_t)0; @@ -61,8 +62,9 @@ class MQTTClient { ~MQTTClient(); - void begin(const char hostname[], Client &client) { this->begin(hostname, 1883, client); } - void begin(const char hostname[], int port, Client &client); + void begin(const char hostname[], Client &client) { this->begin(hostname, 1883, client, LWMQTT_MQTT311); } + void begin(const char hostname[], int port, Client &client) { this->begin(hostname, port, client, LWMQTT_MQTT311); } + void begin(const char hostname[], int port, Client &client, lwmqtt_protocol_t protocol); void onMessage(MQTTClientCallbackSimple cb); void onMessageAdvanced(MQTTClientCallbackAdvanced cb); @@ -91,6 +93,10 @@ class MQTTClient { bool publish(const String &topic, const String &payload, bool retained, int qos) { return this->publish(topic.c_str(), payload.c_str(), retained, qos); } + bool publish(const String &topic, const String &payload, bool retained, int qos, lwmqtt_properties_t props) { + return this->publish(topic.c_str(), payload.c_str(), payload.length(), retained, qos, props); + } + bool publish(const char topic[], const String &payload) { return this->publish(topic, payload.c_str()); } bool publish(const char topic[], const String &payload, bool retained, int qos) { return this->publish(topic, payload.c_str(), retained, qos); @@ -104,15 +110,28 @@ class MQTTClient { bool publish(const char topic[], const char payload[], int length) { return this->publish(topic, payload, length, false, 0); } - bool publish(const char topic[], const char payload[], int length, bool retained, int qos); + bool publish(const char topic[], const char payload[], int length, bool retained, int qos) { + lwmqtt_properties_t props = lwmqtt_empty_props; + return this->publish(topic, payload, length, retained, qos, props); + } + + bool publish(const char topic[], const char payload[], int length, bool retained, int qos, lwmqtt_properties_t props); bool subscribe(const String &topic) { return this->subscribe(topic.c_str()); } bool subscribe(const String &topic, int qos) { return this->subscribe(topic.c_str(), qos); } bool subscribe(const char topic[]) { return this->subscribe(topic, 0); } - bool subscribe(const char topic[], int qos); + bool subscribe(const char topic[], int qos) { + lwmqtt_properties_t props = lwmqtt_empty_props; + return this->subscribe(topic, qos, props); + } + bool subscribe(const char topic[], int qos, lwmqtt_properties_t props); bool unsubscribe(const String &topic) { return this->unsubscribe(topic.c_str()); } - bool unsubscribe(const char topic[]); + bool unsubscribe(const char topic[]) { + lwmqtt_properties_t props = lwmqtt_empty_props; + return this->unsubscribe(topic, props); + } + bool unsubscribe(const char topic[], lwmqtt_properties_t props); bool loop(); bool connected(); @@ -120,7 +139,12 @@ class MQTTClient { lwmqtt_err_t lastError() { return this->_lastError; } lwmqtt_return_code_t returnCode() { return this->_returnCode; } - bool disconnect(); + bool disconnect() { + lwmqtt_properties_t props = lwmqtt_empty_props; + return this->disconnect(0, props); + } + + bool disconnect(uint8_t reason, lwmqtt_properties_t props); private: void close(); diff --git a/src/lwmqtt/client.c b/src/lwmqtt/client.c index b02c31f..9a4d83f 100644 --- a/src/lwmqtt/client.c +++ b/src/lwmqtt/client.c @@ -5,6 +5,7 @@ void lwmqtt_init(lwmqtt_client_t *client, uint8_t *write_buf, size_t write_buf_s client->last_packet_id = 1; client->keep_alive_interval = 0; client->pong_pending = false; + client->protocol = LWMQTT_MQTT311; client->write_buf = write_buf; client->write_buf_size = write_buf_size; @@ -24,6 +25,8 @@ void lwmqtt_init(lwmqtt_client_t *client, uint8_t *write_buf, size_t write_buf_s client->timer_get = NULL; } +void lwmqtt_set_protocol(lwmqtt_client_t *client, lwmqtt_protocol_t prot) { client->protocol = prot; } + void lwmqtt_set_network(lwmqtt_client_t *client, void *ref, lwmqtt_network_read_t read, lwmqtt_network_write_t write) { client->network = ref; client->network_read = read; @@ -205,14 +208,16 @@ static lwmqtt_err_t lwmqtt_cycle(lwmqtt_client_t *client, size_t *read, lwmqtt_p uint16_t packet_id; lwmqtt_string_t topic; lwmqtt_message_t msg; - err = lwmqtt_decode_publish(client->read_buf, client->read_buf_size, &dup, &packet_id, &topic, &msg); + lwmqtt_serialized_properties_t props; + err = lwmqtt_decode_publish(client->read_buf, client->read_buf_size, client->protocol, &dup, &packet_id, &topic, + &msg, &props); if (err != LWMQTT_SUCCESS) { return err; } // call callback if set if (client->callback != NULL) { - client->callback(client, client->callback_ref, topic, msg); + client->callback(client, client->callback_ref, topic, msg, props); } // break early on qos zero @@ -230,7 +235,9 @@ static lwmqtt_err_t lwmqtt_cycle(lwmqtt_client_t *client, size_t *read, lwmqtt_p // encode ack packet size_t len; - err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, ack_type, false, packet_id); + lwmqtt_properties_t ackprops = lwmqtt_empty_props; + err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, client->protocol, ack_type, false, + packet_id, 0, ackprops); if (err != LWMQTT_SUCCESS) { return err; } @@ -249,14 +256,19 @@ static lwmqtt_err_t lwmqtt_cycle(lwmqtt_client_t *client, size_t *read, lwmqtt_p // decode pubrec packet bool dup; uint16_t packet_id; - err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, LWMQTT_PUBREC_PACKET, &dup, &packet_id); + uint8_t status; + lwmqtt_serialized_properties_t ackprops; + err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, client->protocol, LWMQTT_PUBREC_PACKET, &dup, + &packet_id, &status, &ackprops); if (err != LWMQTT_SUCCESS) { return err; } // encode pubrel packet size_t len; - err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, LWMQTT_PUBREL_PACKET, 0, packet_id); + lwmqtt_properties_t props = lwmqtt_empty_props; + err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, client->protocol, LWMQTT_PUBREL_PACKET, + 0, packet_id, 0, props); if (err != LWMQTT_SUCCESS) { return err; } @@ -275,14 +287,19 @@ static lwmqtt_err_t lwmqtt_cycle(lwmqtt_client_t *client, size_t *read, lwmqtt_p // decode pubrec packet bool dup; uint16_t packet_id; - err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, LWMQTT_PUBREL_PACKET, &dup, &packet_id); + uint8_t status; + lwmqtt_serialized_properties_t ackprops; + err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, client->protocol, LWMQTT_PUBREL_PACKET, &dup, + &packet_id, &status, &ackprops); if (err != LWMQTT_SUCCESS) { return err; } // encode pubcomp packet size_t len; - err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, LWMQTT_PUBCOMP_PACKET, 0, packet_id); + lwmqtt_properties_t props = lwmqtt_empty_props; + err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, client->protocol, LWMQTT_PUBCOMP_PACKET, + 0, packet_id, 0, props); if (err != LWMQTT_SUCCESS) { return err; } @@ -305,7 +322,9 @@ static lwmqtt_err_t lwmqtt_cycle(lwmqtt_client_t *client, size_t *read, lwmqtt_p } // handle all other packets - default: { break; } + default: { + break; + } } return LWMQTT_SUCCESS; @@ -357,8 +376,8 @@ lwmqtt_err_t lwmqtt_connect(lwmqtt_client_t *client, lwmqtt_options_t options, l // set command timer client->timer_set(client->command_timer, timeout); - // save keep alive interval (take 75% to be a little earlier than actually needed) - client->keep_alive_interval = (uint32_t)(options.keep_alive) * 750; + // save keep alive interval + client->keep_alive_interval = (uint32_t)(options.keep_alive) * 1000; // set keep alive timer client->timer_set(client->keep_alive_timer, client->keep_alive_interval); @@ -371,7 +390,8 @@ lwmqtt_err_t lwmqtt_connect(lwmqtt_client_t *client, lwmqtt_options_t options, l // encode connect packet size_t len; - lwmqtt_err_t err = lwmqtt_encode_connect(client->write_buf, client->write_buf_size, &len, options, will); + lwmqtt_err_t err = + lwmqtt_encode_connect(client->write_buf, client->write_buf_size, &len, client->protocol, options, will); if (err != LWMQTT_SUCCESS) { return err; } @@ -393,7 +413,7 @@ lwmqtt_err_t lwmqtt_connect(lwmqtt_client_t *client, lwmqtt_options_t options, l // decode connack packet bool session_present; - err = lwmqtt_decode_connack(client->read_buf, client->read_buf_size, &session_present, return_code); + err = lwmqtt_decode_connack(client->read_buf, client->read_buf_size, client->protocol, &session_present, return_code); if (err != LWMQTT_SUCCESS) { return err; } @@ -406,15 +426,15 @@ lwmqtt_err_t lwmqtt_connect(lwmqtt_client_t *client, lwmqtt_options_t options, l return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_subscribe(lwmqtt_client_t *client, int count, lwmqtt_string_t *topic_filter, lwmqtt_qos_t *qos, - uint32_t timeout) { +lwmqtt_err_t lwmqtt_subscribe(lwmqtt_client_t *client, int count, lwmqtt_string_t *topic_filter, + lwmqtt_sub_options_t *opts, lwmqtt_properties_t props, uint32_t timeout) { // set command timer client->timer_set(client->command_timer, timeout); // encode subscribe packet size_t len; - lwmqtt_err_t err = lwmqtt_encode_subscribe(client->write_buf, client->write_buf_size, &len, - lwmqtt_get_next_packet_id(client), count, topic_filter, qos); + lwmqtt_err_t err = lwmqtt_encode_subscribe(client->write_buf, client->write_buf_size, &len, client->protocol, + lwmqtt_get_next_packet_id(client), count, topic_filter, opts, props); if (err != LWMQTT_SUCCESS) { return err; } @@ -438,13 +458,15 @@ lwmqtt_err_t lwmqtt_subscribe(lwmqtt_client_t *client, int count, lwmqtt_string_ int suback_count = 0; lwmqtt_qos_t granted_qos[count]; uint16_t packet_id; - err = lwmqtt_decode_suback(client->read_buf, client->read_buf_size, &packet_id, count, &suback_count, granted_qos); + err = lwmqtt_decode_suback(client->read_buf, client->read_buf_size, &packet_id, client->protocol, count, + &suback_count, granted_qos); if (err != LWMQTT_SUCCESS) { return err; } // check suback codes for (int i = 0; i < suback_count; i++) { + // TODO: Reverse this and just consider successes. if (granted_qos[i] == LWMQTT_QOS_FAILURE) { return LWMQTT_FAILED_SUBSCRIPTION; } @@ -453,19 +475,20 @@ lwmqtt_err_t lwmqtt_subscribe(lwmqtt_client_t *client, int count, lwmqtt_string_ return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_subscribe_one(lwmqtt_client_t *client, lwmqtt_string_t topic_filter, lwmqtt_qos_t qos, - uint32_t timeout) { - return lwmqtt_subscribe(client, 1, &topic_filter, &qos, timeout); +lwmqtt_err_t lwmqtt_subscribe_one(lwmqtt_client_t *client, lwmqtt_string_t topic_filter, lwmqtt_sub_options_t opts, + lwmqtt_properties_t props, uint32_t timeout) { + return lwmqtt_subscribe(client, 1, &topic_filter, &opts, props, timeout); } -lwmqtt_err_t lwmqtt_unsubscribe(lwmqtt_client_t *client, int count, lwmqtt_string_t *topic_filter, uint32_t timeout) { +lwmqtt_err_t lwmqtt_unsubscribe(lwmqtt_client_t *client, int count, lwmqtt_string_t *topic_filter, + lwmqtt_properties_t props, uint32_t timeout) { // set command timer client->timer_set(client->command_timer, timeout); // encode unsubscribe packet size_t len; - lwmqtt_err_t err = lwmqtt_encode_unsubscribe(client->write_buf, client->write_buf_size, &len, - lwmqtt_get_next_packet_id(client), count, topic_filter); + lwmqtt_err_t err = lwmqtt_encode_unsubscribe(client->write_buf, client->write_buf_size, &len, client->protocol, + lwmqtt_get_next_packet_id(client), count, topic_filter, props); if (err != LWMQTT_SUCCESS) { return err; } @@ -486,22 +509,32 @@ lwmqtt_err_t lwmqtt_unsubscribe(lwmqtt_client_t *client, int count, lwmqtt_strin } // decode unsuback packet - bool dup; uint16_t packet_id; - err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, LWMQTT_UNSUBACK_PACKET, &dup, &packet_id); + int ret_count; + lwmqtt_unsubscribe_status_t statuses[count]; + err = lwmqtt_decode_unsuback(client->read_buf, client->read_buf_size, &packet_id, client->protocol, count, &ret_count, + statuses); if (err != LWMQTT_SUCCESS) { return err; } + for (int i = 0; i < ret_count; i++) { + if (statuses[i] != LWMQTT_UNSUB_SUCCESS) { + // TODO: It might be nice to bubble this up (or the properties?) + return LWMQTT_FAILED_UNSUBSCRIPTION; + } + } + return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_unsubscribe_one(lwmqtt_client_t *client, lwmqtt_string_t topic_filter, uint32_t timeout) { - return lwmqtt_unsubscribe(client, 1, &topic_filter, timeout); +lwmqtt_err_t lwmqtt_unsubscribe_one(lwmqtt_client_t *client, lwmqtt_string_t topic_filter, lwmqtt_properties_t props, + uint32_t timeout) { + return lwmqtt_unsubscribe(client, 1, &topic_filter, props, timeout); } lwmqtt_err_t lwmqtt_publish(lwmqtt_client_t *client, lwmqtt_string_t topic, lwmqtt_message_t message, - uint32_t timeout) { + lwmqtt_properties_t props, uint32_t timeout) { // set command timer client->timer_set(client->command_timer, timeout); @@ -513,8 +546,8 @@ lwmqtt_err_t lwmqtt_publish(lwmqtt_client_t *client, lwmqtt_string_t topic, lwmq // encode publish packet size_t len = 0; - lwmqtt_err_t err = - lwmqtt_encode_publish(client->write_buf, client->write_buf_size, &len, 0, packet_id, topic, message); + lwmqtt_err_t err = lwmqtt_encode_publish(client->write_buf, client->write_buf_size, &len, client->protocol, 0, + packet_id, topic, message, props); if (err != LWMQTT_SUCCESS) { return err; } @@ -549,7 +582,10 @@ lwmqtt_err_t lwmqtt_publish(lwmqtt_client_t *client, lwmqtt_string_t topic, lwmq // decode ack packet bool dup; - err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, ack_type, &dup, &packet_id); + uint8_t status; + lwmqtt_serialized_properties_t ackprops; + err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, client->protocol, ack_type, &dup, &packet_id, + &status, &ackprops); if (err != LWMQTT_SUCCESS) { return err; } @@ -557,24 +593,18 @@ lwmqtt_err_t lwmqtt_publish(lwmqtt_client_t *client, lwmqtt_string_t topic, lwmq return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_disconnect(lwmqtt_client_t *client, uint32_t timeout) { +lwmqtt_err_t lwmqtt_disconnect(lwmqtt_client_t *client, uint8_t reason, lwmqtt_properties_t props, uint32_t timeout) { // set command timer client->timer_set(client->command_timer, timeout); - // encode disconnect packet - size_t len; - lwmqtt_err_t err = lwmqtt_encode_zero(client->write_buf, client->write_buf_size, &len, LWMQTT_DISCONNECT_PACKET); - if (err != LWMQTT_SUCCESS) { - return err; - } - - // send disconnected packet - err = lwmqtt_send_packet_in_buffer(client, len); + size_t len = 0; + lwmqtt_err_t err = + lwmqtt_encode_disconnect(client->write_buf, client->write_buf_size, &len, client->protocol, reason, props); if (err != LWMQTT_SUCCESS) { return err; } - return LWMQTT_SUCCESS; + return lwmqtt_send_packet_in_buffer(client, len); } lwmqtt_err_t lwmqtt_keep_alive(lwmqtt_client_t *client, uint32_t timeout) { @@ -616,3 +646,119 @@ lwmqtt_err_t lwmqtt_keep_alive(lwmqtt_client_t *client, uint32_t timeout) { return LWMQTT_SUCCESS; } + +lwmqtt_err_t lwmqtt_property_visitor(void *ref, lwmqtt_serialized_properties_t props, lwmqtt_property_callbacks_t cb) { + uint8_t *p = props.start; + uint8_t *end = p + props.size; + lwmqtt_err_t err; + + while (p < end) { + uint8_t prop, bval; + uint16_t i16val; + uint32_t i32val; + lwmqtt_string_t strval, k, v; + err = lwmqtt_read_byte(&p, end, &prop); + if (err != LWMQTT_SUCCESS) { + return err; + } + + switch (prop) { + // one byte + case LWMQTT_PROP_PAYLOAD_FORMAT_INDICATOR: + case LWMQTT_PROP_REQUEST_PROBLEM_INFORMATION: + case LWMQTT_PROP_MAXIMUM_QOS: + case LWMQTT_PROP_RETAIN_AVAILABLE: + case LWMQTT_PROP_REQUEST_RESPONSE_INFORMATION: + case LWMQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE: + case LWMQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE: + case LWMQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE: + err = lwmqtt_read_byte(&p, end, &bval); + if (err != LWMQTT_SUCCESS) { + return err; + } + if (cb.byte_prop) { + cb.byte_prop(ref, prop, bval); + } + break; + + // two byte int + case LWMQTT_PROP_SERVER_KEEP_ALIVE: + case LWMQTT_PROP_RECEIVE_MAXIMUM: + case LWMQTT_PROP_TOPIC_ALIAS_MAXIMUM: + case LWMQTT_PROP_TOPIC_ALIAS: + err = lwmqtt_read_num(&p, end, &i16val); + if (err != LWMQTT_SUCCESS) { + return err; + } + if (cb.int16_prop) { + cb.int16_prop(ref, prop, i16val); + } + break; + + // 4 byte int + case LWMQTT_PROP_MESSAGE_EXPIRY_INTERVAL: + case LWMQTT_PROP_SESSION_EXPIRY_INTERVAL: + case LWMQTT_PROP_WILL_DELAY_INTERVAL: + case LWMQTT_PROP_MAXIMUM_PACKET_SIZE: + err = lwmqtt_read_num32(&p, end, &i32val); + if (err != LWMQTT_SUCCESS) { + return err; + } + if (cb.int32_prop) { + cb.int32_prop(ref, prop, i32val); + } + break; + + // Variable byte int + case LWMQTT_PROP_SUBSCRIPTION_IDENTIFIER: + err = lwmqtt_read_varnum(&p, end, &i32val); + if (err != LWMQTT_SUCCESS) { + return err; + } + if (cb.int32_prop) { + cb.int32_prop(ref, prop, i32val); + } + break; + + // UTF-8 string + case LWMQTT_PROP_CONTENT_TYPE: + case LWMQTT_PROP_RESPONSE_TOPIC: + case LWMQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER: + case LWMQTT_PROP_AUTHENTICATION_METHOD: + case LWMQTT_PROP_RESPONSE_INFORMATION: + case LWMQTT_PROP_SERVER_REFERENCE: + case LWMQTT_PROP_REASON_STRING: + + // Arbitrary blobs as the same encoding. + case LWMQTT_PROP_CORRELATION_DATA: + case LWMQTT_PROP_AUTHENTICATION_DATA: + err = lwmqtt_read_string(&p, end, &strval); + if (err != LWMQTT_SUCCESS) { + return err; + } + if (cb.str_prop) { + cb.str_prop(ref, prop, strval); + } + break; + + case LWMQTT_PROP_USER_PROPERTY: + err = lwmqtt_read_string(&p, end, &k); + if (err != LWMQTT_SUCCESS) { + return err; + } + err = lwmqtt_read_string(&p, end, &v); + if (err != LWMQTT_SUCCESS) { + return err; + } + if (cb.user_prop) { + cb.user_prop(ref, k, v); + } + break; + + default: + return LWMQTT_MISSING_OR_WRONG_PACKET; + } + } + + return LWMQTT_SUCCESS; +} diff --git a/src/lwmqtt/helpers.c b/src/lwmqtt/helpers.c index 9c78f4e..96e556d 100644 --- a/src/lwmqtt/helpers.c +++ b/src/lwmqtt/helpers.c @@ -81,6 +81,36 @@ lwmqtt_err_t lwmqtt_write_num(uint8_t **buf, const uint8_t *buf_end, uint16_t nu return LWMQTT_SUCCESS; } +lwmqtt_err_t lwmqtt_write_num32(uint8_t **buf, const uint8_t *buf_end, uint32_t num) { + // check buffer size + if ((size_t)(buf_end - (*buf)) < 4) { + return LWMQTT_BUFFER_TOO_SHORT; + } + + // write bytes + (*buf)[0] = (uint8_t)(num >> 24); + (*buf)[1] = (uint8_t)((num >> 16) & 0xff); + (*buf)[2] = (uint8_t)((num >> 8) & 0xff); + (*buf)[3] = (uint8_t)(num & 0xff); + + // adjust pointer + *buf += 4; + + return LWMQTT_SUCCESS; +} + +lwmqtt_err_t lwmqtt_read_num32(uint8_t **buf, const uint8_t *buf_end, uint32_t *num) { + if ((size_t)(buf_end - (*buf)) < 4) { + return LWMQTT_BUFFER_TOO_SHORT; + } + + // read four byte integer + *num = ((uint32_t)(*buf)[0] << 24) | ((uint32_t)(*buf)[1] << 16) | ((uint32_t)(*buf)[2] << 8) | (uint32_t)(*buf)[3]; + + *buf += 4; + return LWMQTT_SUCCESS; +} + lwmqtt_err_t lwmqtt_read_string(uint8_t **buf, const uint8_t *buf_end, lwmqtt_string_t *str) { // read length uint16_t len; @@ -247,3 +277,162 @@ lwmqtt_err_t lwmqtt_write_varnum(uint8_t **buf, const uint8_t *buf_end, uint32_t return LWMQTT_SUCCESS; } + +static lwmqtt_err_t write_prop(uint8_t **buf, const uint8_t *buf_end, lwmqtt_property_t prop) { + lwmqtt_err_t err = lwmqtt_write_byte(buf, buf_end, prop.prop); + if (err != LWMQTT_SUCCESS) { + return err; + } + + switch (prop.prop) { + // one byte + case LWMQTT_PROP_PAYLOAD_FORMAT_INDICATOR: + case LWMQTT_PROP_REQUEST_PROBLEM_INFORMATION: + case LWMQTT_PROP_MAXIMUM_QOS: + case LWMQTT_PROP_RETAIN_AVAILABLE: + case LWMQTT_PROP_REQUEST_RESPONSE_INFORMATION: + case LWMQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE: + case LWMQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE: + case LWMQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE: + return lwmqtt_write_byte(buf, buf_end, prop.value.byte); + + // two byte int + case LWMQTT_PROP_SERVER_KEEP_ALIVE: + case LWMQTT_PROP_RECEIVE_MAXIMUM: + case LWMQTT_PROP_TOPIC_ALIAS_MAXIMUM: + case LWMQTT_PROP_TOPIC_ALIAS: + return lwmqtt_write_num(buf, buf_end, prop.value.int16); + + // 4 byte int + case LWMQTT_PROP_MESSAGE_EXPIRY_INTERVAL: + case LWMQTT_PROP_SESSION_EXPIRY_INTERVAL: + case LWMQTT_PROP_WILL_DELAY_INTERVAL: + case LWMQTT_PROP_MAXIMUM_PACKET_SIZE: + return lwmqtt_write_num32(buf, buf_end, prop.value.int32); + + // Variable byte int + case LWMQTT_PROP_SUBSCRIPTION_IDENTIFIER: + return lwmqtt_write_varnum(buf, buf_end, prop.value.int32); + + // UTF-8 string + case LWMQTT_PROP_CONTENT_TYPE: + case LWMQTT_PROP_RESPONSE_TOPIC: + case LWMQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER: + case LWMQTT_PROP_AUTHENTICATION_METHOD: + case LWMQTT_PROP_RESPONSE_INFORMATION: + case LWMQTT_PROP_SERVER_REFERENCE: + case LWMQTT_PROP_REASON_STRING: + + // Arbitrary blobs as the same encoding. + case LWMQTT_PROP_CORRELATION_DATA: + case LWMQTT_PROP_AUTHENTICATION_DATA: + return lwmqtt_write_string(buf, buf_end, prop.value.str); + + case LWMQTT_PROP_USER_PROPERTY: + lwmqtt_write_string(buf, buf_end, prop.value.pair.k); + lwmqtt_write_string(buf, buf_end, prop.value.pair.v); + } + + return LWMQTT_SUCCESS; +} + +static size_t proplen(lwmqtt_property_t prop) { + int ll; + switch (prop.prop) { + // one byte + case LWMQTT_PROP_PAYLOAD_FORMAT_INDICATOR: + case LWMQTT_PROP_REQUEST_PROBLEM_INFORMATION: + case LWMQTT_PROP_MAXIMUM_QOS: + case LWMQTT_PROP_RETAIN_AVAILABLE: + case LWMQTT_PROP_REQUEST_RESPONSE_INFORMATION: + case LWMQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE: + case LWMQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE: + case LWMQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE: + return 2; + + // two byte int + case LWMQTT_PROP_SERVER_KEEP_ALIVE: + case LWMQTT_PROP_RECEIVE_MAXIMUM: + case LWMQTT_PROP_TOPIC_ALIAS_MAXIMUM: + case LWMQTT_PROP_TOPIC_ALIAS: + return 3; + + // 4 byte int + case LWMQTT_PROP_MESSAGE_EXPIRY_INTERVAL: + case LWMQTT_PROP_SESSION_EXPIRY_INTERVAL: + case LWMQTT_PROP_WILL_DELAY_INTERVAL: + case LWMQTT_PROP_MAXIMUM_PACKET_SIZE: + return 5; + + // Variable byte int + case LWMQTT_PROP_SUBSCRIPTION_IDENTIFIER: + lwmqtt_varnum_length(prop.value.int32, &ll); + return 1 + ll; + + // UTF-8 string + case LWMQTT_PROP_CONTENT_TYPE: + case LWMQTT_PROP_RESPONSE_TOPIC: + case LWMQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER: + case LWMQTT_PROP_AUTHENTICATION_METHOD: + case LWMQTT_PROP_RESPONSE_INFORMATION: + case LWMQTT_PROP_SERVER_REFERENCE: + case LWMQTT_PROP_REASON_STRING: + + // Arbitrary blobs are the same encoding. + case LWMQTT_PROP_CORRELATION_DATA: + case LWMQTT_PROP_AUTHENTICATION_DATA: + return 3 + prop.value.str.len; + + case LWMQTT_PROP_USER_PROPERTY: + return 1 + 2 + prop.value.pair.k.len + 2 + prop.value.pair.v.len; + } + return 0; +} + +// Length of the properties, not including their length. +static size_t propsintlen(lwmqtt_properties_t props) { + uint32_t l = 0; + + for (int i = 0; i < props.len; i++) { + l += proplen(props.props[i]); + } + + return l; +} + +// Length of a properties set as it may appear on the wire (including +// the length of the length). +size_t lwmqtt_propslen(lwmqtt_protocol_t prot, lwmqtt_properties_t props) { + if (prot == LWMQTT_MQTT311) { + return 0; + } + + uint32_t l = propsintlen(props); + int ll; + // lwmqtt_err_t err = + lwmqtt_varnum_length(l, &ll); + + return l + ll; +} + +lwmqtt_err_t lwmqtt_write_props(uint8_t **buf, const uint8_t *buf_end, lwmqtt_protocol_t prot, + lwmqtt_properties_t props) { + if (prot == LWMQTT_MQTT311) { + return LWMQTT_SUCCESS; + } + + size_t len = propsintlen(props); + lwmqtt_err_t err = lwmqtt_write_varnum(buf, buf_end, len); + if (err != LWMQTT_SUCCESS) { + return err; + } + + for (int i = 0; i < props.len; i++) { + err = write_prop(buf, buf_end, props.props[i]); + if (err != LWMQTT_SUCCESS) { + return err; + } + } + + return LWMQTT_SUCCESS; +} diff --git a/src/lwmqtt/helpers.h b/src/lwmqtt/helpers.h index 978eaf4..c614c95 100644 --- a/src/lwmqtt/helpers.h +++ b/src/lwmqtt/helpers.h @@ -65,6 +65,10 @@ lwmqtt_err_t lwmqtt_read_num(uint8_t **buf, const uint8_t *buf_end, uint16_t *nu */ lwmqtt_err_t lwmqtt_write_num(uint8_t **buf, const uint8_t *buf_end, uint16_t num); +lwmqtt_err_t lwmqtt_write_num32(uint8_t **buf, const uint8_t *buf_end, uint32_t num); + +lwmqtt_err_t lwmqtt_read_num32(uint8_t **buf, const uint8_t *buf_end, uint32_t *num); + /** * Reads a string from the specified buffer into the passed object. The pointer is incremented by the bytes read. * @@ -134,4 +138,9 @@ lwmqtt_err_t lwmqtt_read_varnum(uint8_t **buf, const uint8_t *buf_end, uint32_t */ lwmqtt_err_t lwmqtt_write_varnum(uint8_t **buf, const uint8_t *buf_end, uint32_t varnum); +size_t lwmqtt_propslen(lwmqtt_protocol_t prot, lwmqtt_properties_t props); + +lwmqtt_err_t lwmqtt_write_props(uint8_t **buf, const uint8_t *buf_end, lwmqtt_protocol_t prot, + lwmqtt_properties_t props); + #endif diff --git a/src/lwmqtt/lwmqtt.h b/src/lwmqtt/lwmqtt.h index 7a7f142..f326e67 100644 --- a/src/lwmqtt/lwmqtt.h +++ b/src/lwmqtt/lwmqtt.h @@ -5,6 +5,8 @@ #include #include +typedef enum { LWMQTT_MQTT311, LWMQTT_MQTT5 } lwmqtt_protocol_t; + /** * The error type used by all exposed APIs. * @@ -27,6 +29,8 @@ typedef enum { LWMQTT_FAILED_SUBSCRIPTION = -11, LWMQTT_SUBACK_ARRAY_OVERFLOW = -12, LWMQTT_PONG_TIMEOUT = -13, + LWMQTT_FAILED_UNSUBSCRIPTION = -14, + LWMQTT_PUBACK_NACKED = -15, } lwmqtt_err_t; /** @@ -63,7 +67,85 @@ int lwmqtt_strcmp(lwmqtt_string_t a, const char *b); /** * The available QOS levels. */ -typedef enum { LWMQTT_QOS0 = 0, LWMQTT_QOS1 = 1, LWMQTT_QOS2 = 2, LWMQTT_QOS_FAILURE = 128 } lwmqtt_qos_t; +typedef enum __attribute__((__packed__)) { + LWMQTT_QOS0 = 0, + LWMQTT_QOS1 = 1, + LWMQTT_QOS2 = 2, + LWMQTT_QOS_FAILURE = 128 +} lwmqtt_qos_t; + +typedef enum __attribute__((__packed__)) { + LWMQTT_SUB_SEND_ON_SUB = 0, + LWMQTT_SUB_SEND_ON_SUB_NEW = 1, + LWMQTT_SUB_NO_SEND_ON_SUB = 2 +} lwmqtt_retain_handling_t; + +typedef struct { + lwmqtt_qos_t qos; + lwmqtt_retain_handling_t retain_handling : 3; + bool retain_as_published : 1; + bool no_local : 1; +} lwmqtt_sub_options_t; + +#define lwmqtt_default_sub_options \ + { LWMQTT_QOS0, LWMQTT_SUB_SEND_ON_SUB, false, false } + +typedef enum { + LWMQTT_PROP_PAYLOAD_FORMAT_INDICATOR = 0x01, + LWMQTT_PROP_MESSAGE_EXPIRY_INTERVAL = 0x02, + LWMQTT_PROP_CONTENT_TYPE = 0x03, + LWMQTT_PROP_RESPONSE_TOPIC = 0x08, + LWMQTT_PROP_CORRELATION_DATA = 0x09, + LWMQTT_PROP_SUBSCRIPTION_IDENTIFIER = 0x0B, + LWMQTT_PROP_SESSION_EXPIRY_INTERVAL = 0x11, + LWMQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER = 0x12, + LWMQTT_PROP_SERVER_KEEP_ALIVE = 0x13, + LWMQTT_PROP_AUTHENTICATION_METHOD = 0x15, + LWMQTT_PROP_AUTHENTICATION_DATA = 0x16, + LWMQTT_PROP_REQUEST_PROBLEM_INFORMATION = 0x17, + LWMQTT_PROP_WILL_DELAY_INTERVAL = 0x18, + LWMQTT_PROP_REQUEST_RESPONSE_INFORMATION = 0x19, + LWMQTT_PROP_RESPONSE_INFORMATION = 0x1A, + LWMQTT_PROP_SERVER_REFERENCE = 0x1C, + LWMQTT_PROP_REASON_STRING = 0x1F, + LWMQTT_PROP_RECEIVE_MAXIMUM = 0x21, + LWMQTT_PROP_TOPIC_ALIAS_MAXIMUM = 0x22, + LWMQTT_PROP_TOPIC_ALIAS = 0x23, + LWMQTT_PROP_MAXIMUM_QOS = 0x24, + LWMQTT_PROP_RETAIN_AVAILABLE = 0x25, + LWMQTT_PROP_USER_PROPERTY = 0x26, + LWMQTT_PROP_MAXIMUM_PACKET_SIZE = 0x27, + LWMQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE = 0x28, + LWMQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE = 0x29, + LWMQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE = 0x2A, +} lwmqtt_prop_t; + +typedef struct { + lwmqtt_prop_t prop; + union { + uint8_t byte; + uint32_t int32; + uint16_t int16; + lwmqtt_string_t str; + struct { + lwmqtt_string_t k; + lwmqtt_string_t v; + } pair; + } value; +} lwmqtt_property_t; + +typedef struct { + uint16_t len; + lwmqtt_property_t *props; +} lwmqtt_properties_t; + +#define lwmqtt_empty_props \ + { 0, NULL } + +typedef struct { + size_t size; + uint8_t *start; +} lwmqtt_serialized_properties_t; /** * The message object used to publish and receive messages. @@ -139,12 +221,15 @@ typedef int32_t (*lwmqtt_timer_get_t)(void *ref); * recommended to call any further lwmqtt methods in the callback as this might result in weird call stacks. The * callback should place the received messages in a queue and dispatch them after the caller has returned. */ -typedef void (*lwmqtt_callback_t)(lwmqtt_client_t *client, void *ref, lwmqtt_string_t str, lwmqtt_message_t msg); +typedef void (*lwmqtt_callback_t)(lwmqtt_client_t *client, void *ref, lwmqtt_string_t str, lwmqtt_message_t msg, + lwmqtt_serialized_properties_t props); /** * The client object. */ struct lwmqtt_client_t { + lwmqtt_protocol_t protocol; + uint16_t last_packet_id; uint32_t keep_alive_interval; bool pong_pending; @@ -177,6 +262,8 @@ struct lwmqtt_client_t { void lwmqtt_init(lwmqtt_client_t *client, uint8_t *write_buf, size_t write_buf_size, uint8_t *read_buf, size_t read_buf_size); +void lwmqtt_set_protocol(lwmqtt_client_t *client, lwmqtt_protocol_t prot); + /** * Will set the network reference and callbacks for this client object. * @@ -216,13 +303,14 @@ typedef struct { lwmqtt_qos_t qos; bool retained; lwmqtt_string_t payload; + lwmqtt_properties_t properties; } lwmqtt_will_t; /** * The default initializer for the will object. */ #define lwmqtt_default_will \ - { lwmqtt_default_string, LWMQTT_QOS0, false, lwmqtt_default_string } + { lwmqtt_default_string, LWMQTT_QOS0, false, lwmqtt_default_string, lwmqtt_empty_props } /** * The object containing the connection options for a client. @@ -233,13 +321,14 @@ typedef struct { bool clean_session; lwmqtt_string_t username; lwmqtt_string_t password; + lwmqtt_properties_t properties; } lwmqtt_options_t; /** * The default initializer for the options object. */ #define lwmqtt_default_options \ - { lwmqtt_default_string, 60, true, lwmqtt_default_string, lwmqtt_default_string } + { lwmqtt_default_string, 60, true, lwmqtt_default_string, lwmqtt_default_string, lwmqtt_empty_props } /** * The available return codes transported by the connack packet. @@ -281,7 +370,8 @@ lwmqtt_err_t lwmqtt_connect(lwmqtt_client_t *client, lwmqtt_options_t options, l * @param timeout - The command timeout. * @return An error value. */ -lwmqtt_err_t lwmqtt_publish(lwmqtt_client_t *client, lwmqtt_string_t topic, lwmqtt_message_t msg, uint32_t timeout); +lwmqtt_err_t lwmqtt_publish(lwmqtt_client_t *client, lwmqtt_string_t topic, lwmqtt_message_t msg, + lwmqtt_properties_t props, uint32_t timeout); /** * Will send a subscribe packet with multiple topic filters plus QOS levels and wait for the suback to complete. @@ -291,12 +381,12 @@ lwmqtt_err_t lwmqtt_publish(lwmqtt_client_t *client, lwmqtt_string_t topic, lwmq * @param client - The client object. * @param count - The number of topic filters and QOS levels. * @param topic_filter - The list of topic filters. - * @param qos - The list of QOS levels. + * @param opts - The list of subscription options. * @param timeout - The command timeout. * @return An error value. */ -lwmqtt_err_t lwmqtt_subscribe(lwmqtt_client_t *client, int count, lwmqtt_string_t *topic_filter, lwmqtt_qos_t *qos, - uint32_t timeout); +lwmqtt_err_t lwmqtt_subscribe(lwmqtt_client_t *client, int count, lwmqtt_string_t *topic_filter, + lwmqtt_sub_options_t *opts, lwmqtt_properties_t props, uint32_t timeout); /** * Will send a subscribe packet with a single topic filter plus QOS level and wait for the suback to complete. @@ -305,12 +395,22 @@ lwmqtt_err_t lwmqtt_subscribe(lwmqtt_client_t *client, int count, lwmqtt_string_ * * @param client - The client object. * @param topic_filter - The topic filter. - * @param qos - The QOS level. + * @param qos - The subscription options. * @param timeout - The command timeout. * @return An error value. */ -lwmqtt_err_t lwmqtt_subscribe_one(lwmqtt_client_t *client, lwmqtt_string_t topic_filter, lwmqtt_qos_t qos, - uint32_t timeout); +lwmqtt_err_t lwmqtt_subscribe_one(lwmqtt_client_t *client, lwmqtt_string_t topic_filter, lwmqtt_sub_options_t opts, + lwmqtt_properties_t props, uint32_t timeout); + +typedef enum { + LWMQTT_UNSUB_SUCCESS = 0, + LWMQTT_UNSUB_NO_SUB_EXISTED = 0x11, + LWMQTT_UNSUB_UNSPECIFIED_ERROR = 0x80, + LWMQTT_UNSUB_IMPL_SPECIFIC_ERROR = 0x83, + LWMQTT_UNSUB_NOT_AUTHORIZED = 0x87, + LWMQTT_UNSUB_TOPIC_FILTER_INVALID = 0x8f, + LWMQTT_UNSUB_PACKET_ID_IN_USE = 0x91, +} lwmqtt_unsubscribe_status_t; /** * Will send an unsubscribe packet with multiple topic filters and wait for the unsuback to complete. @@ -323,7 +423,8 @@ lwmqtt_err_t lwmqtt_subscribe_one(lwmqtt_client_t *client, lwmqtt_string_t topic * @param timeout - The command timeout. * @return An error value. */ -lwmqtt_err_t lwmqtt_unsubscribe(lwmqtt_client_t *client, int count, lwmqtt_string_t *topic_filter, uint32_t timeout); +lwmqtt_err_t lwmqtt_unsubscribe(lwmqtt_client_t *client, int count, lwmqtt_string_t *topic_filter, + lwmqtt_properties_t props, uint32_t timeout); /** * Will send an unsubscribe packet with a single topic filter and wait for the unsuback to complete. @@ -335,7 +436,8 @@ lwmqtt_err_t lwmqtt_unsubscribe(lwmqtt_client_t *client, int count, lwmqtt_strin * @param timeout - The command timeout. * @return An error value. */ -lwmqtt_err_t lwmqtt_unsubscribe_one(lwmqtt_client_t *client, lwmqtt_string_t topic_filter, uint32_t timeout); +lwmqtt_err_t lwmqtt_unsubscribe_one(lwmqtt_client_t *client, lwmqtt_string_t topic_filter, lwmqtt_properties_t props, + uint32_t timeout); /** * Will send a disconnect packet and finish the client. @@ -344,7 +446,7 @@ lwmqtt_err_t lwmqtt_unsubscribe_one(lwmqtt_client_t *client, lwmqtt_string_t top * @param timeout - The command timeout. * @return An error value. */ -lwmqtt_err_t lwmqtt_disconnect(lwmqtt_client_t *client, uint32_t timeout); +lwmqtt_err_t lwmqtt_disconnect(lwmqtt_client_t *client, uint8_t reason, lwmqtt_properties_t props, uint32_t timeout); /** * Will yield control to the client and receive incoming packets from the network. @@ -378,4 +480,14 @@ lwmqtt_err_t lwmqtt_yield(lwmqtt_client_t *client, size_t available, uint32_t ti */ lwmqtt_err_t lwmqtt_keep_alive(lwmqtt_client_t *client, uint32_t timeout); +typedef struct { + void (*byte_prop)(void *ref, lwmqtt_prop_t prop, uint8_t value); + void (*int16_prop)(void *ref, lwmqtt_prop_t prop, int16_t value); + void (*int32_prop)(void *ref, lwmqtt_prop_t prop, int32_t value); + void (*str_prop)(void *ref, lwmqtt_prop_t prop, lwmqtt_string_t value); + void (*user_prop)(void *ref, lwmqtt_string_t key, lwmqtt_string_t val); +} lwmqtt_property_callbacks_t; + +lwmqtt_err_t lwmqtt_property_visitor(void *ref, lwmqtt_serialized_properties_t props, lwmqtt_property_callbacks_t cb); + #endif // LWMQTT_H diff --git a/src/lwmqtt/packet.c b/src/lwmqtt/packet.c index 512b44d..ed4bd6c 100644 --- a/src/lwmqtt/packet.c +++ b/src/lwmqtt/packet.c @@ -1,5 +1,8 @@ #include "packet.h" +static lwmqtt_err_t decode_props(uint8_t **buf, const uint8_t *buf_len, lwmqtt_protocol_t protocol, + lwmqtt_serialized_properties_t *props); + lwmqtt_err_t lwmqtt_detect_packet_type(uint8_t *buf, size_t buf_len, lwmqtt_packet_type_t *packet_type) { // set default packet type *packet_type = LWMQTT_NO_PACKET; @@ -55,14 +58,14 @@ lwmqtt_err_t lwmqtt_detect_remaining_length(uint8_t *buf, size_t buf_len, uint32 return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_options_t options, - lwmqtt_will_t *will) { +lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, + lwmqtt_options_t options, lwmqtt_will_t *will) { // prepare pointers uint8_t *buf_ptr = buf; uint8_t *buf_end = buf + buf_len; // fixed header is 10 - uint32_t rem_len = 10; + uint32_t rem_len = 10 + lwmqtt_propslen(protocol, options.properties); // add client id to remaining length rem_len += options.client_id.len + 2; @@ -70,16 +73,17 @@ lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lw // add will if present to remaining length if (will != NULL) { rem_len += will->topic.len + 2 + will->payload.len + 2; + rem_len += lwmqtt_propslen(protocol, will->properties); } // add username if present to remaining length if (options.username.len > 0) { rem_len += options.username.len + 2; + } - // add password if present to remaining length - if (options.password.len > 0) { - rem_len += options.password.len + 2; - } + // add password if present to remaining length + if ((options.username.len > 0 || protocol == LWMQTT_MQTT5) && options.password.len > 0) { + rem_len += options.password.len + 2; } // check remaining length length @@ -112,7 +116,7 @@ lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lw } // write version number - err = lwmqtt_write_byte(&buf_ptr, buf_end, 4); + err = lwmqtt_write_byte(&buf_ptr, buf_end, protocol == LWMQTT_MQTT311 ? 4 : 5); if (err != LWMQTT_SUCCESS) { return err; } @@ -133,11 +137,10 @@ lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lw // set username flag if present if (options.username.len > 0) { lwmqtt_write_bits(&flags, 1, 7, 1); + } - // set password flag if present - if (options.password.len > 0) { - lwmqtt_write_bits(&flags, 1, 6, 1); - } + if ((options.username.len > 0 || protocol == LWMQTT_MQTT5) && options.password.len > 0) { + lwmqtt_write_bits(&flags, 1, 6, 1); } // write flags @@ -152,6 +155,12 @@ lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lw return err; } + // write connection properties + err = lwmqtt_write_props(&buf_ptr, buf_end, protocol, options.properties); + if (err != LWMQTT_SUCCESS) { + return err; + } + // write client id err = lwmqtt_write_string(&buf_ptr, buf_end, options.client_id); if (err != LWMQTT_SUCCESS) { @@ -160,6 +169,11 @@ lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lw // write will if present if (will != NULL) { + err = lwmqtt_write_props(&buf_ptr, buf_end, protocol, will->properties); + if (err != LWMQTT_SUCCESS) { + return err; + } + // write topic err = lwmqtt_write_string(&buf_ptr, buf_end, will->topic); if (err != LWMQTT_SUCCESS) { @@ -188,7 +202,7 @@ lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lw } // write password if present - if (options.username.len > 0 && options.password.len > 0) { + if ((options.username.len > 0 || protocol == LWMQTT_MQTT5) && options.password.len > 0) { err = lwmqtt_write_string(&buf_ptr, buf_end, options.password); if (err != LWMQTT_SUCCESS) { return err; @@ -201,7 +215,7 @@ lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lw return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_decode_connack(uint8_t *buf, size_t buf_len, bool *session_present, +lwmqtt_err_t lwmqtt_decode_connack(uint8_t *buf, size_t buf_len, lwmqtt_protocol_t protocol, bool *session_present, lwmqtt_return_code_t *return_code) { // prepare pointers uint8_t *buf_ptr = buf; @@ -227,7 +241,7 @@ lwmqtt_err_t lwmqtt_decode_connack(uint8_t *buf, size_t buf_len, bool *session_p } // check remaining length - if (rem_len != 2) { + if (protocol == LWMQTT_MQTT311 && rem_len != 2) { return LWMQTT_REMAINING_LENGTH_MISMATCH; } @@ -300,12 +314,16 @@ lwmqtt_err_t lwmqtt_encode_zero(uint8_t *buf, size_t buf_len, size_t *len, lwmqt return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_decode_ack(uint8_t *buf, size_t buf_len, lwmqtt_packet_type_t packet_type, bool *dup, - uint16_t *packet_id) { +lwmqtt_err_t lwmqtt_decode_ack(uint8_t *buf, size_t buf_len, lwmqtt_protocol_t protocol, + lwmqtt_packet_type_t packet_type, bool *dup, uint16_t *packet_id, uint8_t *status, + lwmqtt_serialized_properties_t *props) { // prepare pointer uint8_t *buf_ptr = buf; uint8_t *buf_end = buf + buf_len; + *status = 0; + props->size = 0; + // read header uint8_t header = 0; lwmqtt_err_t err = lwmqtt_read_byte(&buf_ptr, buf_end, &header); @@ -329,7 +347,7 @@ lwmqtt_err_t lwmqtt_decode_ack(uint8_t *buf, size_t buf_len, lwmqtt_packet_type_ } // check remaining length - if (rem_len != 2) { + if (protocol == LWMQTT_MQTT311 && rem_len != 2) { return LWMQTT_REMAINING_LENGTH_MISMATCH; } @@ -339,11 +357,33 @@ lwmqtt_err_t lwmqtt_decode_ack(uint8_t *buf, size_t buf_len, lwmqtt_packet_type_ return err; } + rem_len -= 2; + + if (rem_len > 0) { + lwmqtt_err_t err = lwmqtt_read_byte(&buf_ptr, buf_end, status); + if (err != LWMQTT_SUCCESS) { + return err; + } + rem_len--; + } + + if (rem_len > 0) { + err = decode_props(&buf_ptr, buf_end, protocol, props); + if (err != LWMQTT_SUCCESS) { + return err; + } + } + + if (*status != 0) { + return LWMQTT_PUBACK_NACKED; + } + return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_encode_ack(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_packet_type_t packet_type, bool dup, - uint16_t packet_id) { +lwmqtt_err_t lwmqtt_encode_ack(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, + lwmqtt_packet_type_t packet_type, bool dup, uint16_t packet_id, uint8_t status, + lwmqtt_properties_t props) { // prepare pointer uint8_t *buf_ptr = buf; uint8_t *buf_end = buf + buf_len; @@ -366,8 +406,13 @@ lwmqtt_err_t lwmqtt_encode_ack(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt return err; } + size_t rem_len = 2 + (protocol == LWMQTT_MQTT5 ? 1 : 0); + if (props.len > 0) { + rem_len += lwmqtt_propslen(protocol, props); + } + // write remaining length - err = lwmqtt_write_varnum(&buf_ptr, buf_end, 2); + err = lwmqtt_write_varnum(&buf_ptr, buf_end, rem_len); if (err != LWMQTT_SUCCESS) { return err; } @@ -378,14 +423,46 @@ lwmqtt_err_t lwmqtt_encode_ack(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt return err; } + if (protocol == LWMQTT_MQTT5) { + err = lwmqtt_write_byte(&buf_ptr, buf_end, status); + if (err != LWMQTT_SUCCESS) { + return err; + } + } + + if (props.len > 0) { + err = lwmqtt_write_props(&buf_ptr, buf_end, protocol, props); + if (err != LWMQTT_SUCCESS) { + return err; + } + } + // set written length *len = buf_ptr - buf; return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_decode_publish(uint8_t *buf, size_t buf_len, bool *dup, uint16_t *packet_id, lwmqtt_string_t *topic, - lwmqtt_message_t *msg) { +static lwmqtt_err_t decode_props(uint8_t **buf, const uint8_t *buf_len, lwmqtt_protocol_t protocol, + lwmqtt_serialized_properties_t *props) { + if (protocol == LWMQTT_MQTT311) { + return LWMQTT_SUCCESS; + } + uint32_t prop_len; + lwmqtt_err_t err = lwmqtt_read_varnum(buf, buf_len, &prop_len); + if (err != LWMQTT_SUCCESS) { + return err; + } + props->size = (size_t)prop_len; + props->start = *buf; + + *buf += prop_len; + return LWMQTT_SUCCESS; +} + +lwmqtt_err_t lwmqtt_decode_publish(uint8_t *buf, size_t buf_len, lwmqtt_protocol_t protocol, bool *dup, + uint16_t *packet_id, lwmqtt_string_t *topic, lwmqtt_message_t *msg, + lwmqtt_serialized_properties_t *props) { // prepare pointer uint8_t *buf_ptr = buf; uint8_t *buf_end = buf + buf_len; @@ -460,6 +537,11 @@ lwmqtt_err_t lwmqtt_decode_publish(uint8_t *buf, size_t buf_len, bool *dup, uint *packet_id = 0; } + err = decode_props(&buf_ptr, buf_end, protocol, props); + if (err != LWMQTT_SUCCESS) { + return err; + } + // set payload length msg->payload_len = buf_end - buf_ptr; @@ -472,14 +554,15 @@ lwmqtt_err_t lwmqtt_decode_publish(uint8_t *buf, size_t buf_len, bool *dup, uint return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_encode_publish(uint8_t *buf, size_t buf_len, size_t *len, bool dup, uint16_t packet_id, - lwmqtt_string_t topic, lwmqtt_message_t msg) { +lwmqtt_err_t lwmqtt_encode_publish(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, bool dup, + uint16_t packet_id, lwmqtt_string_t topic, lwmqtt_message_t msg, + lwmqtt_properties_t props) { // prepare pointer uint8_t *buf_ptr = buf; uint8_t *buf_end = buf + buf_len; // calculate remaining length - uint32_t rem_len = 2 + topic.len + (uint32_t)msg.payload_len; + uint32_t rem_len = 2 + topic.len + (uint32_t)msg.payload_len + lwmqtt_propslen(protocol, props); if (msg.qos > 0) { rem_len += 2; } @@ -532,6 +615,11 @@ lwmqtt_err_t lwmqtt_encode_publish(uint8_t *buf, size_t buf_len, size_t *len, bo } } + err = lwmqtt_write_props(&buf_ptr, buf_end, protocol, props); + if (err != LWMQTT_SUCCESS) { + return err; + } + // write payload err = lwmqtt_write_data(&buf_ptr, buf_end, msg.payload, msg.payload_len); if (err != LWMQTT_SUCCESS) { @@ -544,14 +632,20 @@ lwmqtt_err_t lwmqtt_encode_publish(uint8_t *buf, size_t buf_len, size_t *len, bo return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_encode_subscribe(uint8_t *buf, size_t buf_len, size_t *len, uint16_t packet_id, int count, - lwmqtt_string_t *topic_filters, lwmqtt_qos_t *qos_levels) { +static uint8_t encode_sub_opt(lwmqtt_sub_options_t opt) { + return (uint8_t)opt.qos | ((uint8_t)opt.retain_handling << 4) | (opt.retain_as_published ? 0x08 : 0) | + (opt.no_local ? 0x04 : 0); +} + +lwmqtt_err_t lwmqtt_encode_subscribe(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, + uint16_t packet_id, int count, lwmqtt_string_t *topic_filters, + lwmqtt_sub_options_t *sub_options, lwmqtt_properties_t props) { // prepare pointer uint8_t *buf_ptr = buf; uint8_t *buf_end = buf + buf_len; // calculate remaining length - uint32_t rem_len = 2; + uint32_t rem_len = 2 + lwmqtt_propslen(protocol, props); for (int i = 0; i < count; i++) { rem_len += 2 + topic_filters[i].len + 1; } @@ -590,6 +684,11 @@ lwmqtt_err_t lwmqtt_encode_subscribe(uint8_t *buf, size_t buf_len, size_t *len, return err; } + err = lwmqtt_write_props(&buf_ptr, buf_end, protocol, props); + if (err != LWMQTT_SUCCESS) { + return err; + } + // write all subscriptions for (int i = 0; i < count; i++) { // write topic @@ -598,8 +697,8 @@ lwmqtt_err_t lwmqtt_encode_subscribe(uint8_t *buf, size_t buf_len, size_t *len, return err; } - // write qos level - err = lwmqtt_write_byte(&buf_ptr, buf_end, (uint8_t)qos_levels[i]); + // write subscription options + err = lwmqtt_write_byte(&buf_ptr, buf_end, encode_sub_opt(sub_options[i])); if (err != LWMQTT_SUCCESS) { return err; } @@ -611,8 +710,8 @@ lwmqtt_err_t lwmqtt_encode_subscribe(uint8_t *buf, size_t buf_len, size_t *len, return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_decode_suback(uint8_t *buf, size_t buf_len, uint16_t *packet_id, int max_count, int *count, - lwmqtt_qos_t *granted_qos_levels) { +lwmqtt_err_t lwmqtt_decode_suback(uint8_t *buf, size_t buf_len, uint16_t *packet_id, lwmqtt_protocol_t protocol, + int max_count, int *count, lwmqtt_qos_t *granted_qos_levels) { // prepare pointer uint8_t *buf_ptr = buf; uint8_t *buf_end = buf + buf_len; @@ -635,6 +734,7 @@ lwmqtt_err_t lwmqtt_decode_suback(uint8_t *buf, size_t buf_len, uint16_t *packet if (err != LWMQTT_SUCCESS) { return err; } + uint8_t *end = buf_ptr + rem_len; // check remaining length (packet id + min. one suback code) if (rem_len < 3) { @@ -647,8 +747,14 @@ lwmqtt_err_t lwmqtt_decode_suback(uint8_t *buf, size_t buf_len, uint16_t *packet return err; } + lwmqtt_serialized_properties_t props; + err = decode_props(&buf_ptr, buf_end, protocol, &props); + if (err != LWMQTT_SUCCESS) { + return err; + } + // read all suback codes - for (*count = 0; *count < (int)rem_len - 2; (*count)++) { + for (*count = 0; buf_ptr < end; (*count)++) { // check max count if (*count > max_count) { return LWMQTT_SUBACK_ARRAY_OVERFLOW; @@ -673,7 +779,7 @@ lwmqtt_err_t lwmqtt_decode_suback(uint8_t *buf, size_t buf_len, uint16_t *packet granted_qos_levels[*count] = LWMQTT_QOS2; break; default: - granted_qos_levels[*count] = LWMQTT_QOS_FAILURE; + granted_qos_levels[*count] = protocol == LWMQTT_MQTT311 ? 0x80 : (lwmqtt_qos_t)raw_qos_level; break; } } @@ -681,14 +787,80 @@ lwmqtt_err_t lwmqtt_decode_suback(uint8_t *buf, size_t buf_len, uint16_t *packet return LWMQTT_SUCCESS; } -lwmqtt_err_t lwmqtt_encode_unsubscribe(uint8_t *buf, size_t buf_len, size_t *len, uint16_t packet_id, int count, - lwmqtt_string_t *topic_filters) { +lwmqtt_err_t lwmqtt_decode_unsuback(uint8_t *buf, size_t buf_len, uint16_t *packet_id, lwmqtt_protocol_t protocol, + int max_count, int *count, lwmqtt_unsubscribe_status_t *statuses) { + // prepare pointer + uint8_t *buf_ptr = buf; + uint8_t *buf_end = buf + buf_len; + + // read header + uint8_t header; + lwmqtt_err_t err = lwmqtt_read_byte(&buf_ptr, buf_end, &header); + if (err != LWMQTT_SUCCESS) { + return err; + } + + // check packet type + if (lwmqtt_read_bits(header, 4, 4) != LWMQTT_UNSUBACK_PACKET) { + return LWMQTT_MISSING_OR_WRONG_PACKET; + } + + // read remaining length + uint32_t rem_len; + err = lwmqtt_read_varnum(&buf_ptr, buf_end, &rem_len); + if (err != LWMQTT_SUCCESS) { + return err; + } + uint8_t *end = buf_ptr + rem_len; + + // read packet id + err = lwmqtt_read_num(&buf_ptr, buf_end, packet_id); + if (err != LWMQTT_SUCCESS) { + return err; + } + + if (protocol == LWMQTT_MQTT311) { + for (int i = 0; i < max_count; i++) { + statuses[i] = LWMQTT_UNSUB_SUCCESS; + } + *count = max_count; + return LWMQTT_SUCCESS; + } + + lwmqtt_serialized_properties_t props = {0, 0}; + err = decode_props(&buf_ptr, buf_end, protocol, &props); + if (err != LWMQTT_SUCCESS) { + return err; + } + + // read all suback codes + for (*count = 0; buf_ptr < end; (*count)++) { + // check max count + if (*count > max_count) { + return LWMQTT_SUBACK_ARRAY_OVERFLOW; + } + + // read qos level + uint8_t st; + err = lwmqtt_read_byte(&buf_ptr, buf_end, &st); + if (err != LWMQTT_SUCCESS) { + return err; + } + statuses[*count] = (lwmqtt_unsubscribe_status_t)st; + } + + return LWMQTT_SUCCESS; +} + +lwmqtt_err_t lwmqtt_encode_unsubscribe(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, + uint16_t packet_id, int count, lwmqtt_string_t *topic_filters, + lwmqtt_properties_t props) { // prepare pointer uint8_t *buf_ptr = buf; uint8_t *buf_end = buf + buf_len; // calculate remaining length - uint32_t rem_len = 2; + uint32_t rem_len = 2 + lwmqtt_propslen(protocol, props); for (int i = 0; i < count; i++) { rem_len += 2 + topic_filters[i].len; } @@ -727,6 +899,11 @@ lwmqtt_err_t lwmqtt_encode_unsubscribe(uint8_t *buf, size_t buf_len, size_t *len return err; } + err = lwmqtt_write_props(&buf_ptr, buf_end, protocol, props); + if (err != LWMQTT_SUCCESS) { + return err; + } + // write topics for (int i = 0; i < count; i++) { err = lwmqtt_write_string(&buf_ptr, buf_end, topic_filters[i]); @@ -740,3 +917,41 @@ lwmqtt_err_t lwmqtt_encode_unsubscribe(uint8_t *buf, size_t buf_len, size_t *len return LWMQTT_SUCCESS; } + +lwmqtt_err_t lwmqtt_encode_disconnect(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, + uint8_t reason, lwmqtt_properties_t props) { + uint8_t *buf_ptr = buf; + uint8_t *buf_end = buf_ptr + buf_len; + + uint8_t header = 0; + lwmqtt_write_bits(&header, LWMQTT_DISCONNECT_PACKET, 4, 4); + lwmqtt_err_t err = lwmqtt_write_byte(&buf_ptr, buf_end, header); + if (err != LWMQTT_SUCCESS) { + return err; + } + + uint32_t rem_len = 0; + if (protocol == LWMQTT_MQTT5) { + rem_len = 1 + lwmqtt_propslen(protocol, props); + } + + err = lwmqtt_write_varnum(&buf_ptr, buf_end, rem_len); + if (err != LWMQTT_SUCCESS) { + return err; + } + + if (protocol == LWMQTT_MQTT5) { + err = lwmqtt_write_byte(&buf_ptr, buf_end, reason); + if (err != LWMQTT_SUCCESS) { + return err; + } + + err = lwmqtt_write_props(&buf_ptr, buf_end, protocol, props); + if (err != LWMQTT_SUCCESS) { + return err; + } + } + + *len = buf_ptr - buf; + return LWMQTT_SUCCESS; +} diff --git a/src/lwmqtt/packet.h b/src/lwmqtt/packet.h index 5fe9e50..70b7ad0 100644 --- a/src/lwmqtt/packet.h +++ b/src/lwmqtt/packet.h @@ -53,27 +53,29 @@ lwmqtt_err_t lwmqtt_detect_remaining_length(uint8_t *buf, size_t buf_len, uint32 * @param buf - The buffer into which the packet will be encoded. * @param buf_len - The length of the specified buffer. * @param len - The encoded length of the packet. + * @param protocol - The MQTT protocol to use for this connection. * @param options - The options to be used to build the connect packet. * @param will - The last will and testament. * @return An error value. */ -lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_options_t options, - lwmqtt_will_t *will); +lwmqtt_err_t lwmqtt_encode_connect(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, + lwmqtt_options_t options, lwmqtt_will_t *will); /** * Decodes a connack packet from the supplied buffer. * * @param buf - The raw buffer data. * @param buf_len - The length of the specified buffer. + * @param protocol - The protocol to parse the decode as. * @param session_present - The session present flag. * @param return_code - The return code. * @return An error value. */ -lwmqtt_err_t lwmqtt_decode_connack(uint8_t *buf, size_t buf_len, bool *session_present, +lwmqtt_err_t lwmqtt_decode_connack(uint8_t *buf, size_t buf_len, lwmqtt_protocol_t protocol, bool *session_present, lwmqtt_return_code_t *return_code); /** - * Encodes a zero (disconnect, pingreq) packet into the supplied buffer. + * Encodes a zero (pingreq) packet into the supplied buffer. * * @param buf - The buffer into which the packet will be encoded. * @param buf_len - The length of the specified buffer. @@ -83,6 +85,9 @@ lwmqtt_err_t lwmqtt_decode_connack(uint8_t *buf, size_t buf_len, bool *session_p */ lwmqtt_err_t lwmqtt_encode_zero(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_packet_type_t packet_type); +lwmqtt_err_t lwmqtt_encode_disconnect(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, + uint8_t reason, lwmqtt_properties_t props); + /** * Decodes an ack (puback, pubrec, pubrel, pubcomp, unsuback) packet from the supplied buffer. * @@ -93,8 +98,9 @@ lwmqtt_err_t lwmqtt_encode_zero(uint8_t *buf, size_t buf_len, size_t *len, lwmqt * @param packet_id - The packet id. * @return An error value. */ -lwmqtt_err_t lwmqtt_decode_ack(uint8_t *buf, size_t buf_len, lwmqtt_packet_type_t packet_type, bool *dup, - uint16_t *packet_id); +lwmqtt_err_t lwmqtt_decode_ack(uint8_t *buf, size_t buf_len, lwmqtt_protocol_t protocol, + lwmqtt_packet_type_t packet_type, bool *dup, uint16_t *packet_id, uint8_t *status, + lwmqtt_serialized_properties_t *props); /** * Encodes an ack (puback, pubrec, pubrel, pubcomp) packet into the supplied buffer. @@ -107,8 +113,9 @@ lwmqtt_err_t lwmqtt_decode_ack(uint8_t *buf, size_t buf_len, lwmqtt_packet_type_ * @param packet_id - The packet id. * @return An error value. */ -lwmqtt_err_t lwmqtt_encode_ack(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_packet_type_t packet_type, bool dup, - uint16_t packet_id); +lwmqtt_err_t lwmqtt_encode_ack(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, + lwmqtt_packet_type_t packet_type, bool dup, uint16_t packet_id, uint8_t status, + lwmqtt_properties_t props); /** * Decodes a publish packet from the supplied buffer. @@ -121,8 +128,9 @@ lwmqtt_err_t lwmqtt_encode_ack(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt * @parma msg - The message. * @return An error value. */ -lwmqtt_err_t lwmqtt_decode_publish(uint8_t *buf, size_t buf_len, bool *dup, uint16_t *packet_id, lwmqtt_string_t *topic, - lwmqtt_message_t *msg); +lwmqtt_err_t lwmqtt_decode_publish(uint8_t *buf, size_t buf_len, lwmqtt_protocol_t protocol, bool *dup, + uint16_t *packet_id, lwmqtt_string_t *topic, lwmqtt_message_t *msg, + lwmqtt_serialized_properties_t *props); /** * Encodes a publish packet into the supplied buffer. @@ -136,8 +144,9 @@ lwmqtt_err_t lwmqtt_decode_publish(uint8_t *buf, size_t buf_len, bool *dup, uint * @param msg - The message. * @return An error value. */ -lwmqtt_err_t lwmqtt_encode_publish(uint8_t *buf, size_t buf_len, size_t *len, bool dup, uint16_t packet_id, - lwmqtt_string_t topic, lwmqtt_message_t msg); +lwmqtt_err_t lwmqtt_encode_publish(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, bool dup, + uint16_t packet_id, lwmqtt_string_t topic, lwmqtt_message_t msg, + lwmqtt_properties_t props); /** * Encodes a subscribe packet into the supplied buffer. @@ -148,11 +157,12 @@ lwmqtt_err_t lwmqtt_encode_publish(uint8_t *buf, size_t buf_len, size_t *len, bo * @param packet_id - The packet id. * @param count - The number of members in the topic_filters and qos_levels array. * @param topic_filters - The array of topic filter. - * @param qos_levels - The array of requested QoS levels. + * @param sub_options - The array of requested subscription options. * @return An error value. */ -lwmqtt_err_t lwmqtt_encode_subscribe(uint8_t *buf, size_t buf_len, size_t *len, uint16_t packet_id, int count, - lwmqtt_string_t *topic_filters, lwmqtt_qos_t *qos_levels); +lwmqtt_err_t lwmqtt_encode_subscribe(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t prototocol, + uint16_t packet_id, int count, lwmqtt_string_t *topic_filters, + lwmqtt_sub_options_t *sub_options, lwmqtt_properties_t props); /** * Decodes a suback packet from the supplied buffer. @@ -165,8 +175,11 @@ lwmqtt_err_t lwmqtt_encode_subscribe(uint8_t *buf, size_t buf_len, size_t *len, * @param granted_qos_levels - The granted QoS levels. * @return An error value. */ -lwmqtt_err_t lwmqtt_decode_suback(uint8_t *buf, size_t buf_len, uint16_t *packet_id, int max_count, int *count, - lwmqtt_qos_t *granted_qos_levels); +lwmqtt_err_t lwmqtt_decode_suback(uint8_t *buf, size_t buf_len, uint16_t *packet_id, lwmqtt_protocol_t protocol, + int max_count, int *count, lwmqtt_qos_t *granted_qos_levels); + +lwmqtt_err_t lwmqtt_decode_unsuback(uint8_t *buf, size_t buf_len, uint16_t *packet_id, lwmqtt_protocol_t protocol, + int max_count, int *count, lwmqtt_unsubscribe_status_t *statuses); /** * Encodes the supplied unsubscribe data into the supplied buffer, ready for sending @@ -179,7 +192,8 @@ lwmqtt_err_t lwmqtt_decode_suback(uint8_t *buf, size_t buf_len, uint16_t *packet * @param topic_filters - The array of topic filters. * @return An error value. */ -lwmqtt_err_t lwmqtt_encode_unsubscribe(uint8_t *buf, size_t buf_len, size_t *len, uint16_t packet_id, int count, - lwmqtt_string_t *topic_filters); +lwmqtt_err_t lwmqtt_encode_unsubscribe(uint8_t *buf, size_t buf_len, size_t *len, lwmqtt_protocol_t protocol, + uint16_t packet_id, int count, lwmqtt_string_t *topic_filters, + lwmqtt_properties_t props); #endif // LWMQTT_PACKET_H