Skip to content

Commit 057fda5

Browse files
authored
Handle Suback Packet (#349)
* handle ack packet * update suback packet * update runner image * improve error structure * add suback test
1 parent 33016f1 commit 057fda5

File tree

5 files changed

+62
-7
lines changed

5 files changed

+62
-7
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ env:
1515

1616
jobs:
1717
unit-tests:
18-
runs-on: ubuntu-20.04
18+
runs-on: ubuntu-latest
1919
strategy:
2020
fail-fast: false
2121

AWSIoTPythonSDK/core/protocol/mqtt_core.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from AWSIoTPythonSDK.core.protocol.internal.defaults import METRICS_PREFIX
2929
from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS
3030
from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids
31-
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS
31+
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS, SUBACK_ERROR
3232
from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectError
3333
from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectTimeoutException
3434
from AWSIoTPythonSDK.exception.AWSIoTExceptions import disconnectError
@@ -41,7 +41,7 @@
4141
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueDisabledException
4242
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueFullException
4343
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueDisabledException
44-
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError
44+
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError, subackError
4545
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException
4646
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeError
4747
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeTimeoutException
@@ -58,6 +58,12 @@
5858
from queue import Queue
5959

6060

61+
class SubackPacket(object):
62+
def __init__(self):
63+
self.event = Event()
64+
self.data = None
65+
66+
6167
class MqttCore(object):
6268

6369
_logger = logging.getLogger(__name__)
@@ -298,12 +304,15 @@ def subscribe(self, topic, qos, message_callback=None):
298304
if ClientStatus.STABLE != self._client_status.get_status():
299305
self._handle_offline_request(RequestTypes.SUBSCRIBE, (topic, qos, message_callback, None))
300306
else:
301-
event = Event()
302-
rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback)
303-
if not event.wait(self._operation_timeout_sec):
307+
suback = SubackPacket()
308+
rc, mid = self._subscribe_async(topic, qos, self._create_blocking_suback_callback(suback), message_callback)
309+
if not suback.event.wait(self._operation_timeout_sec):
304310
self._internal_async_client.remove_event_callback(mid)
305311
self._logger.error("Subscribe timed out")
306312
raise subscribeTimeoutException()
313+
if suback.data and suback.data[0] == SUBACK_ERROR:
314+
self._logger.error(f"Suback error return code: {suback.data[0]}")
315+
raise subackError(suback=suback.data)
307316
ret = True
308317
return ret
309318

@@ -361,6 +370,12 @@ def ack_callback(mid, data=None):
361370
event.set()
362371
return ack_callback
363372

373+
def _create_blocking_suback_callback(self, ack: SubackPacket):
374+
def ack_callback(mid, data=None):
375+
ack.data = data
376+
ack.event.set()
377+
return ack_callback
378+
364379
def _handle_offline_request(self, type, data):
365380
self._logger.info("Offline request detected!")
366381
offline_request = QueueableRequest(type, data)

AWSIoTPythonSDK/core/protocol/paho/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@
9797
CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
9898
CONNACK_REFUSED_NOT_AUTHORIZED = 5
9999

100+
# SUBACK codes
101+
SUBACK_ERROR = 0x80
102+
100103
# Connection state
101104
mqtt_cs_new = 0
102105
mqtt_cs_connected = 1
@@ -137,6 +140,7 @@
137140
MSG_QUEUEING_DROP_OLDEST = 0
138141
MSG_QUEUEING_DROP_NEWEST = 1
139142

143+
140144
if sys.version_info[0] < 3:
141145
sockpair_data = "0"
142146
else:

AWSIoTPythonSDK/exception/AWSIoTExceptions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ class subscribeError(operationError.operationError):
7979
def __init__(self, errorCode):
8080
self.message = "Subscribe Error: " + str(errorCode)
8181

82+
class subackError(operationError.operationError):
83+
def __init__(self, suback=None):
84+
self.message = "Received Error suback. Subscription failed."
85+
self.suback = suback
86+
8287

8388
class subscribeQueueFullException(operationError.operationError):
8489
def __init__(self):

test/core/protocol/test_mqtt_core.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import AWSIoTPythonSDK
22
from AWSIoTPythonSDK.core.protocol.mqtt_core import MqttCore
3+
from AWSIoTPythonSDK.core.protocol.mqtt_core import SubackPacket
34
from AWSIoTPythonSDK.core.protocol.internal.clients import InternalAsyncMqttClient
45
from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatusContainer
56
from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatus
@@ -20,6 +21,7 @@
2021
from AWSIoTPythonSDK.exception.AWSIoTExceptions import publishQueueFullException
2122
from AWSIoTPythonSDK.exception.AWSIoTExceptions import publishQueueDisabledException
2223
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError
24+
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subackError
2325
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException
2426
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueFullException
2527
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueDisabledException
@@ -29,6 +31,7 @@
2931
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueDisabledException
3032
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS
3133
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_ERRNO
34+
from AWSIoTPythonSDK.core.protocol.paho.client import SUBACK_ERROR
3235
from AWSIoTPythonSDK.core.protocol.paho.client import MQTTv311
3336
from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS
3437
try:
@@ -61,6 +64,7 @@
6164
KEY_EXPECTED_QUEUE_APPEND_RESULT = "ExpectedQueueAppendResult"
6265
KEY_EXPECTED_REQUEST_MID_OVERRIDE = "ExpectedRequestMidOverride"
6366
KEY_EXPECTED_REQUEST_TIMEOUT = "ExpectedRequestTimeout"
67+
KEY_EXPECTED_ACK_RESULT = "ExpectedAckPacketResult"
6468
SUCCESS_RC_EXPECTED_VALUES = {
6569
KEY_EXPECTED_REQUEST_RC : DUMMY_SUCCESS_RC
6670
}
@@ -73,6 +77,10 @@
7377
NO_TIMEOUT_EXPECTED_VALUES = {
7478
KEY_EXPECTED_REQUEST_TIMEOUT : False
7579
}
80+
ERROR_SUBACK_EXPECTED_VALUES = {
81+
KEY_EXPECTED_ACK_RESULT : (SUBACK_ERROR, None)
82+
}
83+
7684
QUEUED_EXPECTED_VALUES = {
7785
KEY_EXPECTED_QUEUE_APPEND_RESULT : AppendResults.APPEND_SUCCESS
7886
}
@@ -121,6 +129,9 @@ def setup_class(cls):
121129
RequestTypes.SUBSCRIBE: subscribeError,
122130
RequestTypes.UNSUBSCRIBE: unsubscribeError
123131
}
132+
cls.ack_error = {
133+
RequestTypes.SUBSCRIBE : subackError,
134+
}
124135
cls.request_queue_full = {
125136
RequestTypes.PUBLISH : publishQueueFullException,
126137
RequestTypes.SUBSCRIBE: subscribeQueueFullException,
@@ -518,6 +529,9 @@ def test_subscribe_success(self):
518529

519530
def test_subscribe_timeout(self):
520531
self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, TIMEOUT_EXPECTED_VALUES)
532+
533+
def test_subscribe_error_suback(self):
534+
self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, ERROR_SUBACK_EXPECTED_VALUES)
521535

522536
def test_subscribe_queued(self):
523537
self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, QUEUED_EXPECTED_VALUES)
@@ -547,6 +561,7 @@ def _internal_test_sync_api_with(self, request_type, expected_values):
547561
expected_request_mid = expected_values.get(KEY_EXPECTED_REQUEST_MID_OVERRIDE)
548562
expected_timeout = expected_values.get(KEY_EXPECTED_REQUEST_TIMEOUT)
549563
expected_append_result = expected_values.get(KEY_EXPECTED_QUEUE_APPEND_RESULT)
564+
expected_suback_result = expected_values.get(KEY_EXPECTED_ACK_RESULT)
550565

551566
if expected_request_mid is None:
552567
expected_request_mid = DUMMY_REQUEST_MID
@@ -562,7 +577,16 @@ def _internal_test_sync_api_with(self, request_type, expected_values):
562577
self.invoke_mqtt_core_sync_api[request_type](self, message_callback)
563578
else:
564579
self.python_event_mock.wait.return_value = True
565-
assert self.invoke_mqtt_core_sync_api[request_type](self, message_callback) is True
580+
if expected_suback_result is not None:
581+
self._use_mock_python_suback()
582+
# mock the suback with expected suback result
583+
self.python_suback_mock.data = expected_suback_result
584+
if expected_suback_result[0] == SUBACK_ERROR:
585+
with pytest.raises(self.ack_error[request_type]):
586+
self.invoke_mqtt_core_sync_api[request_type](self, message_callback)
587+
self.python_suback_patcher.stop()
588+
else:
589+
assert self.invoke_mqtt_core_sync_api[request_type](self, message_callback) is True
566590

567591
if expected_append_result is not None:
568592
self.client_status_mock.get_status.return_value = ClientStatus.ABNORMAL_DISCONNECT
@@ -583,3 +607,10 @@ def _use_mock_python_event(self):
583607
self.python_event_constructor = self.python_event_patcher.start()
584608
self.python_event_mock = MagicMock()
585609
self.python_event_constructor.return_value = self.python_event_mock
610+
611+
# Create a SubackPacket mock, which would mock the data in SubackPacket
612+
def _use_mock_python_suback(self):
613+
self.python_suback_patcher = patch(PATCH_MODULE_LOCATION + "SubackPacket", spec=SubackPacket)
614+
self.python_suback_constructor = self.python_suback_patcher.start()
615+
self.python_suback_mock = MagicMock()
616+
self.python_suback_constructor.return_value = self.python_suback_mock

0 commit comments

Comments
 (0)