From 3d1bb30bf62ee51ffda6a432979877ddca17b60a Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Fri, 18 Mar 2016 13:21:15 -0400 Subject: [PATCH 1/2] Add support for auto-acknowledging pulled messages. Follows @jgeewax's suggested implementation in: https://github.com/GoogleCloudPlatform/gcloud-python/issues/798#issuecomment-135787991 --- docs/pubsub-usage.rst | 26 +++++++++++++++ gcloud/pubsub/subscription.py | 26 +++++++++++++++ gcloud/pubsub/test_subscription.py | 53 ++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+) diff --git a/docs/pubsub-usage.rst b/docs/pubsub-usage.rst index 99d01d2cde50..fb4711c550a9 100644 --- a/docs/pubsub-usage.rst +++ b/docs/pubsub-usage.rst @@ -272,3 +272,29 @@ Fetch messages for a pull subscription without blocking (none pending): >>> messages = [recv[1] for recv in received] >>> [message.message_id for message in messages] [] + + +Fetch pending messages, acknowledging those whose processing doesn't raise an +error: + +.. doctest:: + + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = topic.subscription('subscription_name') + >>> with topic.batch() as batch: + ... batch.publish('this is the first message_payload') + ... batch.publish('this is the second message_payload', + ... attr1='value1', attr2='value2') + >>> from gcloud.pubsub.subscription import AutoAck + >>> for ack_id, message in subscription.pull(max_messages=10): # API request + ... with AutoAck(subscription, ack_id, message): + ... do_something_with(message) + +.. note:: + + One ``acknowledge`` API request occurs at the end of each ``with`` block, + passing only the ``ack_id`` of the message just processed. If + ``do_something_with`` raises an exception, the ``acknowledge`` API + request is skipped. diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index e3da8a06f5b4..441e623a4912 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -263,3 +263,29 @@ def delete(self, client=None): """ client = self._require_client(client) client.connection.api_request(method='DELETE', path=self.path) + + +class AutoAck(object): + """Automatically acknowlege a single message if processed without error. + + :type subscription: :class:`Subscription` + :param subscription: the subscription from which the message was pulled, + and to which it must be acknowledged. + + :type ack_id: string + :param ack_id: the ID for acknowledging the message + + :type message: :class:`gcloud.pubsub.message.Message` + :param message: the message to be acknowleged + """ + def __init__(self, subscription, ack_id, message): + self.subscription = subscription + self.ack_id = ack_id + self.message = message + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is None: + self.subscription.acknowledge([self.ack_id]) diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index 494454c15b21..7c142bdb2947 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -485,6 +485,48 @@ def test_delete_w_alternate_client(self): self.assertEqual(req['path'], '/%s' % SUB_PATH) +class TestAutoAck(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.subscription import AutoAck + return AutoAck + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor(self): + ACK_ID = 'ACK_ID' + MESSAGE = object() + subscription = _FauxSubscription() + auto_ack = self._makeOne(subscription, ACK_ID, MESSAGE) + self.assertTrue(auto_ack.subscription is subscription) + self.assertEqual(auto_ack.ack_id, ACK_ID) + self.assertTrue(auto_ack.message is MESSAGE) + + def test_as_context_mgr_no_error(self): + ACK_ID = 'ACK_ID' + MESSAGE = object() + subscription = _FauxSubscription() + + with self._makeOne(subscription, ACK_ID, MESSAGE): + pass + + self.assertEqual(list(subscription._acknowledged), [ACK_ID]) + self.assertEqual(subscription._ack_client, None) + + def test_as_context_mgr_w_error(self): + ACK_ID = 'ACK_ID' + MESSAGE = object() + subscription = _FauxSubscription() + + with self.assertRaises(ValueError): + with self._makeOne(subscription, ACK_ID, MESSAGE): + raise ValueError() + + self.assertEqual(list(subscription._acknowledged), []) + self.assertTrue(getattr(subscription, '_ack_client', self) is self) + + class _Connection(object): def __init__(self, *responses): @@ -522,3 +564,14 @@ def __init__(self, project, connection=None): def topic(self, name, timestamp_messages=False): from gcloud.pubsub.topic import Topic return Topic(name, client=self, timestamp_messages=timestamp_messages) + + +class _FauxSubscription(object): + + def __init__(self): + self._acknowledged = set() + + def acknowledge(self, ack_ids, client=None): + self._ack_client = client + for ack_id in ack_ids: + self._acknowledged.add(ack_id) From 539d53d913c24cefdd9a14bf922ff3dab1c7c001 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Sun, 20 Mar 2016 19:02:37 -0400 Subject: [PATCH 2/2] Add 'Subscription.auto_ack' factory. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1637#discussion_r56763064 --- docs/pubsub-usage.rst | 3 +-- gcloud/pubsub/subscription.py | 14 ++++++++++++++ gcloud/pubsub/test_subscription.py | 13 +++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/docs/pubsub-usage.rst b/docs/pubsub-usage.rst index fb4711c550a9..685f256978e3 100644 --- a/docs/pubsub-usage.rst +++ b/docs/pubsub-usage.rst @@ -287,9 +287,8 @@ error: ... batch.publish('this is the first message_payload') ... batch.publish('this is the second message_payload', ... attr1='value1', attr2='value2') - >>> from gcloud.pubsub.subscription import AutoAck >>> for ack_id, message in subscription.pull(max_messages=10): # API request - ... with AutoAck(subscription, ack_id, message): + ... with subscription.auto_ack(subscription, ack_id, message): ... do_something_with(message) .. note:: diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index 441e623a4912..8cd42e9d9a2e 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -84,6 +84,20 @@ def path(self): project = self.topic.project return '/projects/%s/subscriptions/%s' % (project, self.name) + def auto_ack(self, ack_id, message): + """:class:`AutoAck` factory + + :type ack_id: string + :param ack_id: the ID for acknowledging the message + + :type message: :class:`gcloud.pubsub.message.Message` + :param message: the message to be acknowleged + + :rtype: :class:`AutoAck` + :returns: the instance created for the given ``ack_id`` and ``message`` + """ + return AutoAck(self, ack_id, message) + def _require_client(self, client): """Check client or verify over-ride. diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index 7c142bdb2947..dd01f149a5fd 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -116,6 +116,19 @@ def test_from_api_repr_w_topics_w_topic_match(self): self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) + def test_autoack(self): + from gcloud.pubsub.subscription import AutoAck + SUB_NAME = 'sub_name' + ACK_ID = 'ACK_ID' + TOPIC = object() + MESSAGE = object + subscription = self._makeOne(SUB_NAME, TOPIC) + auto_ack = subscription.auto_ack(ACK_ID, MESSAGE) + self.assertTrue(isinstance(auto_ack, AutoAck)) + self.assertTrue(auto_ack.subscription is subscription) + self.assertEqual(auto_ack.ack_id, ACK_ID) + self.assertTrue(auto_ack.message is MESSAGE) + def test_create_pull_wo_ack_deadline_w_bound_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name'