diff --git a/docs/pubsub-usage.rst b/docs/pubsub-usage.rst index 92eeee3e1281..6cf00b967085 100644 --- a/docs/pubsub-usage.rst +++ b/docs/pubsub-usage.rst @@ -158,6 +158,24 @@ Re-synchronize a subscription with the back-end: :start-after: [START subscription_reload] :end-before: [END subscription_reload] +Fetch the IAM policy for a subscription + +.. literalinclude:: pubsub_snippets.py + :start-after: [START subscription_get_iam_policy] + :end-before: [END subscription_get_iam_policy] + +Update the IAM policy for a subscription: + +.. literalinclude:: pubsub_snippets.py + :start-after: [START subscription_set_iam_policy] + :end-before: [END subscription_set_iam_policy] + +Test permissions allowed by the current IAM policy on a subscription: + +.. literalinclude:: pubsub_snippets.py + :start-after: [START subscription_check_iam_permissions] + :end-before: [END subscription_check_iam_permissions] + Delete a subscription: .. literalinclude:: pubsub_snippets.py @@ -193,20 +211,15 @@ Update the acknowlegement deadline for pulled messages: :start-after: [START subscription_modify_ack_deadline] :end-before: [END subscription_modify_ack_deadline] -Fetch the IAM policy for a subscription - -.. literalinclude:: pubsub_snippets.py - :start-after: [START subscription_get_iam_policy] - :end-before: [END subscription_get_iam_policy] - -Update the IAM policy for a subscription: +Fetch pending messages, acknowledging those whose processing doesn't raise an +error: .. literalinclude:: pubsub_snippets.py - :start-after: [START subscription_set_iam_policy] - :end-before: [END subscription_set_iam_policy] + :start-after: [START subscription_pull_autoack] + :end-before: [END subscription_pull_autoack] -Test permissions allowed by the current IAM policy on a subscription: +.. note:: -.. literalinclude:: pubsub_snippets.py - :start-after: [START subscription_check_iam_permissions] - :end-before: [END subscription_check_iam_permissions] + The ``pull`` API request occurs at entry to the ``with`` block, and the + ``acknowlege`` API request occurs at the end, passing only the ``ack_ids`` + which haven't been deleted from ``ack`` diff --git a/docs/pubsub_snippets.py b/docs/pubsub_snippets.py index 4b0fd632b478..f1656e8d17c6 100644 --- a/docs/pubsub_snippets.py +++ b/docs/pubsub_snippets.py @@ -178,21 +178,6 @@ def topic_publish_messages(client, to_delete): # [END topic_publish_message_with_attrs] -@snippet -def topic_batch(client, to_delete): - """Publish multiple messages in a single request.""" - TOPIC_NAME = 'topic_batch-%d' % (_millis(),) - topic = client.topic(TOPIC_NAME) - topic.create() - to_delete.append(topic) - - # [START topic_batch] - with topic.batch() as batch: - batch.publish(b'This is the message payload') - batch.publish(b'Another message payload', extra='EXTRA') - # [END topic_batch] API request on block exit - - @snippet def topic_subscription(client, to_delete): """Create subscriptions to a topic.""" @@ -358,6 +343,52 @@ def log_exception(_): (extras,)) +@snippet +def subscription_pull_w_autoack(client, to_delete): + """Pull messges from a topic, auto-acknowldging them""" + TOPIC_NAME = 'subscription_pull_autoack-%d' % (_millis(),) + SUB_NAME = 'subscription_pull_autoack-defaults-%d' % (_millis(),) + PAYLOAD1 = b'PAYLOAD1' + PAYLOAD2 = b'PAYLOAD2' + EXTRA = 'EXTRA' + topic = client.topic(TOPIC_NAME) + topic.create() + to_delete.append(topic) + + subscription = topic.subscription(SUB_NAME) + subscription.create() + to_delete.append(subscription) + + # [START topic_batch] + with topic.batch() as batch: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, extra=EXTRA) + # [END topic_batch] + + time.sleep(1) # eventually-consistent + + payloads = [] + extras = [] + + def do_something_with(message): # pylint: disable=unused-argument + payloads.append(message.data) + if message.attributes: + extras.append(message.attributes) + + # [START subscription_pull_autoack] + from gcloud.pubsub.subscription import AutoAck + with AutoAck(subscription, max_messages=10) as ack: + for ack_id, message in list(ack.items()): + try: + do_something_with(message) + except Exception: # pylint: disable=broad-except + del ack[ack_id] + # [END subscription_pull_autoack] + + assert set(payloads) == set(PAYLOAD1, PAYLOAD1), "eventual consistency" + assert extras == [{'extra': EXTRA}], "eventual consistency" + + @snippet def subscription_iam_policy(client, to_delete): """Fetch / set a subscription's IAM policy.""" diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index 8ce44e71cef4..e2050fb06211 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -123,6 +123,24 @@ def path(self): """URL path for the subscription's APIs""" return '/%s' % (self.full_name,) + def auto_ack(self, return_immediately=False, max_messages=1, client=None): + """:class:`AutoAck` factory + + :type return_immediately: boolean + :param return_immediately: passed through to :meth:`Subscription.pull` + + :type max_messages: int + :param max_messages: passed through to :meth:`Subscription.pull` + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: passed through to :meth:`Subscription.pull` and + :meth:`Subscription.acknowledge`. + + :rtype: :class:`AutoAck` + :returns: the instance created for the given ``ack_id`` and ``message`` + """ + return AutoAck(self, return_immediately, max_messages, client) + def _require_client(self, client): """Check client or verify over-ride. @@ -420,3 +438,53 @@ def check_iam_permissions(self, permissions, client=None): api = client.iam_policy_api return api.test_iam_permissions( self.full_name, list(permissions)) + + +class AutoAck(dict): + """Wrapper for :meth:`Subscription.pull` results. + + Mapping, tracks messages still-to-be-acknowledged. + + When used as a context manager, acknowledges all messages still in the + mapping on `__exit__`. When processing the pulled messsages, application + code MUST delete messages from the :class:`AutoAck` mapping which are not + successfully processed, e.g.: + + .. code-block: python + + with AutoAck(subscription) as ack: # calls ``subscription.pull`` + for ack_id, message in ack.items(): + try: + do_something_with(message): + except: + del ack[ack_id] + + :type subscription: :class:`Subscription` + :param subscription: subcription to be pulled. + + :type return_immediately: boolean + :param return_immediately: passed through to :meth:`Subscription.pull` + + :type max_messages: int + :param max_messages: passed through to :meth:`Subscription.pull` + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: passed through to :meth:`Subscription.pull` and + :meth:`Subscription.acknowledge`. + """ + def __init__(self, subscription, + return_immediately=False, max_messages=1, client=None): + super(AutoAck, self).__init__() + self._subscription = subscription + self._return_immediately = return_immediately + self._max_messages = max_messages + self._client = client + + def __enter__(self): + items = self._subscription.pull( + self._return_immediately, self._max_messages, self._client) + self.update(items) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._subscription.acknowledge(list(self), self._client) diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index 700dbb82bff5..84965dba7879 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -134,16 +134,40 @@ def test_from_api_repr_w_topics_w_topic_match(self): def test_full_name_and_path(self): PROJECT = 'PROJECT' - SUB_NAME = 'sub_name' - SUB_FULL = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + SUB_FULL = 'projects/%s/subscriptions/%s' % (PROJECT, self.SUB_NAME) SUB_PATH = '/%s' % (SUB_FULL,) TOPIC_NAME = 'topic_name' CLIENT = _Client(project=PROJECT) topic = _Topic(TOPIC_NAME, client=CLIENT) - subscription = self._makeOne(SUB_NAME, topic) + subscription = self._makeOne(self.SUB_NAME, topic) self.assertEqual(subscription.full_name, SUB_FULL) self.assertEqual(subscription.path, SUB_PATH) + def test_autoack_defaults(self): + from gcloud.pubsub.subscription import AutoAck + client = _Client(project=self.PROJECT) + topic = _Topic(self.TOPIC_NAME, client=client) + subscription = self._makeOne(self.SUB_NAME, topic) + auto_ack = subscription.auto_ack() + self.assertTrue(isinstance(auto_ack, AutoAck)) + self.assertTrue(auto_ack._subscription is subscription) + self.assertEqual(auto_ack._return_immediately, False) + self.assertEqual(auto_ack._max_messages, 1) + self.assertTrue(auto_ack._client is None) + + def test_autoack_explicit(self): + from gcloud.pubsub.subscription import AutoAck + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + topic = _Topic(self.TOPIC_NAME, client=client1) + subscription = self._makeOne(self.SUB_NAME, topic) + auto_ack = subscription.auto_ack(True, 10, client2) + self.assertTrue(isinstance(auto_ack, AutoAck)) + self.assertTrue(auto_ack._subscription is subscription) + self.assertEqual(auto_ack._return_immediately, True) + self.assertEqual(auto_ack._max_messages, 10) + self.assertTrue(auto_ack._client is client2) + def test_create_pull_wo_ack_deadline_w_bound_client(self): RESPONSE = { 'topic': self.TOPIC_PATH, @@ -642,6 +666,81 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, return self._subscription_modify_ack_deadline_response +class TestAutoAck(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.subscription import AutoAck + return AutoAck + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_defaults(self): + subscription = _FauxSubscription(()) + auto_ack = self._makeOne(subscription) + self.assertEqual(auto_ack._return_immediately, False) + self.assertEqual(auto_ack._max_messages, 1) + self.assertTrue(auto_ack._client is None) + + def test_ctor_explicit(self): + CLIENT = object() + subscription = _FauxSubscription(()) + auto_ack = self._makeOne( + subscription, return_immediately=True, max_messages=10, + client=CLIENT) + self.assertTrue(auto_ack._subscription is subscription) + self.assertEqual(auto_ack._return_immediately, True) + self.assertEqual(auto_ack._max_messages, 10) + self.assertTrue(auto_ack._client is CLIENT) + + def test___enter___w_defaults(self): + subscription = _FauxSubscription(()) + auto_ack = self._makeOne(subscription) + + with auto_ack as returned: + pass + + self.assertTrue(returned is auto_ack) + self.assertEqual(subscription._return_immediately, False) + self.assertEqual(subscription._max_messages, 1) + self.assertTrue(subscription._client is None) + + def test___enter___w_explicit(self): + CLIENT = object() + subscription = _FauxSubscription(()) + auto_ack = self._makeOne( + subscription, return_immediately=True, max_messages=10, + client=CLIENT) + + with auto_ack as returned: + pass + + self.assertTrue(returned is auto_ack) + self.assertEqual(subscription._return_immediately, True) + self.assertEqual(subscription._max_messages, 10) + self.assertTrue(subscription._client is CLIENT) + + def test___exit___(self): + CLIENT = object() + ACK_ID1, MESSAGE1 = 'ACK_ID1', _FallibleMessage() + ACK_ID2, MESSAGE2 = 'ACK_ID2', _FallibleMessage() + ACK_ID3, MESSAGE3 = 'ACK_ID3', _FallibleMessage(True) + ITEMS = [ + (ACK_ID1, MESSAGE1), + (ACK_ID2, MESSAGE2), + (ACK_ID3, MESSAGE3), + ] + subscription = _FauxSubscription(ITEMS) + auto_ack = self._makeOne(subscription, client=CLIENT) + with auto_ack: + for ack_id, message in list(auto_ack.items()): + if message.fail: + del auto_ack[ack_id] + self.assertEqual(sorted(subscription._acknowledged), + [ACK_ID1, ACK_ID2]) + self.assertTrue(subscription._ack_client is CLIENT) + + class _FauxIAMPolicy(object): def get_iam_policy(self, target_path): @@ -677,3 +776,30 @@ def __init__(self, project): def topic(self, name, timestamp_messages=False): from gcloud.pubsub.topic import Topic return Topic(name, client=self, timestamp_messages=timestamp_messages) + + +class _FallibleMessage(object): + + def __init__(self, fail=False): + self.fail = fail + + +class _FauxSubscription(object): + + def __init__(self, items): + self._items = items + self._mapping = dict(items) + self._acknowledged = set() + + def pull(self, return_immediately=False, max_messages=1, client=None): + self._return_immediately = return_immediately + self._max_messages = max_messages + self._client = client + return self._items + + def acknowledge(self, ack_ids, client=None): + self._ack_client = client + for ack_id in ack_ids: + message = self._mapping[ack_id] + assert not message.fail + self._acknowledged.add(ack_id)