Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions docs/pubsub-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,24 @@ Re-synchronize a subscription with the back-end:
:start-after: [START subscription_reload]
:end-before: [END subscription_reload]

Fetch the IAM policy for a subscription

.. literalinclude:: pubsub_snippets.py
:start-after: [START subscription_get_iam_policy]
:end-before: [END subscription_get_iam_policy]

Update the IAM policy for a subscription:

.. literalinclude:: pubsub_snippets.py
:start-after: [START subscription_set_iam_policy]
:end-before: [END subscription_set_iam_policy]

Test permissions allowed by the current IAM policy on a subscription:

.. literalinclude:: pubsub_snippets.py
:start-after: [START subscription_check_iam_permissions]
:end-before: [END subscription_check_iam_permissions]

Delete a subscription:

.. literalinclude:: pubsub_snippets.py
Expand Down Expand Up @@ -193,20 +211,15 @@ Update the acknowlegement deadline for pulled messages:
:start-after: [START subscription_modify_ack_deadline]
:end-before: [END subscription_modify_ack_deadline]

Fetch the IAM policy for a subscription

.. literalinclude:: pubsub_snippets.py
:start-after: [START subscription_get_iam_policy]
:end-before: [END subscription_get_iam_policy]

Update the IAM policy for a subscription:
Fetch pending messages, acknowledging those whose processing doesn't raise an

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

error:

.. literalinclude:: pubsub_snippets.py
:start-after: [START subscription_set_iam_policy]
:end-before: [END subscription_set_iam_policy]
:start-after: [START subscription_pull_autoack]
:end-before: [END subscription_pull_autoack]

Test permissions allowed by the current IAM policy on a subscription:
.. note::

.. literalinclude:: pubsub_snippets.py
:start-after: [START subscription_check_iam_permissions]
:end-before: [END subscription_check_iam_permissions]
The ``pull`` API request occurs at entry to the ``with`` block, and the
``acknowlege`` API request occurs at the end, passing only the ``ack_ids``
which haven't been deleted from ``ack``
61 changes: 46 additions & 15 deletions docs/pubsub_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,6 @@ def topic_publish_messages(client, to_delete):
# [END topic_publish_message_with_attrs]


@snippet
def topic_batch(client, to_delete):
"""Publish multiple messages in a single request."""
TOPIC_NAME = 'topic_batch-%d' % (_millis(),)
topic = client.topic(TOPIC_NAME)
topic.create()
to_delete.append(topic)

# [START topic_batch]
with topic.batch() as batch:
batch.publish(b'This is the message payload')
batch.publish(b'Another message payload', extra='EXTRA')
# [END topic_batch] API request on block exit


@snippet
def topic_subscription(client, to_delete):
"""Create subscriptions to a topic."""
Expand Down Expand Up @@ -358,6 +343,52 @@ def log_exception(_):
(extras,))


@snippet
def subscription_pull_w_autoack(client, to_delete):
"""Pull messges from a topic, auto-acknowldging them"""
TOPIC_NAME = 'subscription_pull_autoack-%d' % (_millis(),)
SUB_NAME = 'subscription_pull_autoack-defaults-%d' % (_millis(),)
PAYLOAD1 = b'PAYLOAD1'
PAYLOAD2 = b'PAYLOAD2'
EXTRA = 'EXTRA'
topic = client.topic(TOPIC_NAME)
topic.create()
to_delete.append(topic)

subscription = topic.subscription(SUB_NAME)
subscription.create()
to_delete.append(subscription)

# [START topic_batch]
with topic.batch() as batch:
batch.publish(PAYLOAD1)
batch.publish(PAYLOAD2, extra=EXTRA)
# [END topic_batch]

time.sleep(1) # eventually-consistent

payloads = []
extras = []

def do_something_with(message): # pylint: disable=unused-argument
payloads.append(message.data)
if message.attributes:
extras.append(message.attributes)

# [START subscription_pull_autoack]
from gcloud.pubsub.subscription import AutoAck
with AutoAck(subscription, max_messages=10) as ack:
for ack_id, message in list(ack.items()):
try:
do_something_with(message)
except Exception: # pylint: disable=broad-except
del ack[ack_id]
# [END subscription_pull_autoack]

assert set(payloads) == set(PAYLOAD1, PAYLOAD1), "eventual consistency"
assert extras == [{'extra': EXTRA}], "eventual consistency"


@snippet
def subscription_iam_policy(client, to_delete):
"""Fetch / set a subscription's IAM policy."""
Expand Down
68 changes: 68 additions & 0 deletions gcloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,24 @@ def path(self):
"""URL path for the subscription's APIs"""
return '/%s' % (self.full_name,)

def auto_ack(self, return_immediately=False, max_messages=1, client=None):
""":class:`AutoAck` factory

:type return_immediately: boolean
:param return_immediately: passed through to :meth:`Subscription.pull`

:type max_messages: int
:param max_messages: passed through to :meth:`Subscription.pull`

:type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
:param client: passed through to :meth:`Subscription.pull` and
:meth:`Subscription.acknowledge`.

:rtype: :class:`AutoAck`
:returns: the instance created for the given ``ack_id`` and ``message``
"""
return AutoAck(self, return_immediately, max_messages, client)

def _require_client(self, client):
"""Check client or verify over-ride.

Expand Down Expand Up @@ -420,3 +438,53 @@ def check_iam_permissions(self, permissions, client=None):
api = client.iam_policy_api
return api.test_iam_permissions(
self.full_name, list(permissions))


class AutoAck(dict):
"""Wrapper for :meth:`Subscription.pull` results.

Mapping, tracks messages still-to-be-acknowledged.

When used as a context manager, acknowledges all messages still in the
mapping on `__exit__`. When processing the pulled messsages, application
code MUST delete messages from the :class:`AutoAck` mapping which are not
successfully processed, e.g.:

.. code-block: python

with AutoAck(subscription) as ack: # calls ``subscription.pull``
for ack_id, message in ack.items():
try:
do_something_with(message):
except:
del ack[ack_id]

:type subscription: :class:`Subscription`
:param subscription: subcription to be pulled.

:type return_immediately: boolean
:param return_immediately: passed through to :meth:`Subscription.pull`

:type max_messages: int
:param max_messages: passed through to :meth:`Subscription.pull`

:type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
:param client: passed through to :meth:`Subscription.pull` and
:meth:`Subscription.acknowledge`.
"""
def __init__(self, subscription,
return_immediately=False, max_messages=1, client=None):
super(AutoAck, self).__init__()

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

self._subscription = subscription
self._return_immediately = return_immediately
self._max_messages = max_messages
self._client = client

def __enter__(self):
items = self._subscription.pull(
self._return_immediately, self._max_messages, self._client)
self.update(items)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._subscription.acknowledge(list(self), self._client)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

132 changes: 129 additions & 3 deletions gcloud/pubsub/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,40 @@ def test_from_api_repr_w_topics_w_topic_match(self):

def test_full_name_and_path(self):
PROJECT = 'PROJECT'
SUB_NAME = 'sub_name'
SUB_FULL = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
SUB_FULL = 'projects/%s/subscriptions/%s' % (PROJECT, self.SUB_NAME)
SUB_PATH = '/%s' % (SUB_FULL,)
TOPIC_NAME = 'topic_name'
CLIENT = _Client(project=PROJECT)
topic = _Topic(TOPIC_NAME, client=CLIENT)
subscription = self._makeOne(SUB_NAME, topic)
subscription = self._makeOne(self.SUB_NAME, topic)
self.assertEqual(subscription.full_name, SUB_FULL)
self.assertEqual(subscription.path, SUB_PATH)

def test_autoack_defaults(self):
from gcloud.pubsub.subscription import AutoAck
client = _Client(project=self.PROJECT)
topic = _Topic(self.TOPIC_NAME, client=client)
subscription = self._makeOne(self.SUB_NAME, topic)
auto_ack = subscription.auto_ack()
self.assertTrue(isinstance(auto_ack, AutoAck))
self.assertTrue(auto_ack._subscription is subscription)
self.assertEqual(auto_ack._return_immediately, False)
self.assertEqual(auto_ack._max_messages, 1)
self.assertTrue(auto_ack._client is None)

def test_autoack_explicit(self):
from gcloud.pubsub.subscription import AutoAck
client1 = _Client(project=self.PROJECT)
client2 = _Client(project=self.PROJECT)
topic = _Topic(self.TOPIC_NAME, client=client1)
subscription = self._makeOne(self.SUB_NAME, topic)
auto_ack = subscription.auto_ack(True, 10, client2)
self.assertTrue(isinstance(auto_ack, AutoAck))
self.assertTrue(auto_ack._subscription is subscription)
self.assertEqual(auto_ack._return_immediately, True)
self.assertEqual(auto_ack._max_messages, 10)
self.assertTrue(auto_ack._client is client2)

def test_create_pull_wo_ack_deadline_w_bound_client(self):
RESPONSE = {
'topic': self.TOPIC_PATH,
Expand Down Expand Up @@ -642,6 +666,81 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
return self._subscription_modify_ack_deadline_response


class TestAutoAck(unittest2.TestCase):

def _getTargetClass(self):
from gcloud.pubsub.subscription import AutoAck
return AutoAck

def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)

def test_ctor_defaults(self):
subscription = _FauxSubscription(())
auto_ack = self._makeOne(subscription)
self.assertEqual(auto_ack._return_immediately, False)
self.assertEqual(auto_ack._max_messages, 1)
self.assertTrue(auto_ack._client is None)

def test_ctor_explicit(self):
CLIENT = object()
subscription = _FauxSubscription(())
auto_ack = self._makeOne(
subscription, return_immediately=True, max_messages=10,
client=CLIENT)
self.assertTrue(auto_ack._subscription is subscription)
self.assertEqual(auto_ack._return_immediately, True)
self.assertEqual(auto_ack._max_messages, 10)
self.assertTrue(auto_ack._client is CLIENT)

def test___enter___w_defaults(self):
subscription = _FauxSubscription(())
auto_ack = self._makeOne(subscription)

with auto_ack as returned:
pass

self.assertTrue(returned is auto_ack)
self.assertEqual(subscription._return_immediately, False)
self.assertEqual(subscription._max_messages, 1)
self.assertTrue(subscription._client is None)

def test___enter___w_explicit(self):
CLIENT = object()
subscription = _FauxSubscription(())
auto_ack = self._makeOne(
subscription, return_immediately=True, max_messages=10,
client=CLIENT)

with auto_ack as returned:
pass

self.assertTrue(returned is auto_ack)
self.assertEqual(subscription._return_immediately, True)
self.assertEqual(subscription._max_messages, 10)
self.assertTrue(subscription._client is CLIENT)

def test___exit___(self):
CLIENT = object()
ACK_ID1, MESSAGE1 = 'ACK_ID1', _FallibleMessage()
ACK_ID2, MESSAGE2 = 'ACK_ID2', _FallibleMessage()
ACK_ID3, MESSAGE3 = 'ACK_ID3', _FallibleMessage(True)
ITEMS = [
(ACK_ID1, MESSAGE1),
(ACK_ID2, MESSAGE2),
(ACK_ID3, MESSAGE3),
]
subscription = _FauxSubscription(ITEMS)
auto_ack = self._makeOne(subscription, client=CLIENT)
with auto_ack:
for ack_id, message in list(auto_ack.items()):
if message.fail:
del auto_ack[ack_id]
self.assertEqual(sorted(subscription._acknowledged),
[ACK_ID1, ACK_ID2])
self.assertTrue(subscription._ack_client is CLIENT)


class _FauxIAMPolicy(object):

def get_iam_policy(self, target_path):
Expand Down Expand Up @@ -677,3 +776,30 @@ def __init__(self, project):
def topic(self, name, timestamp_messages=False):
from gcloud.pubsub.topic import Topic
return Topic(name, client=self, timestamp_messages=timestamp_messages)


class _FallibleMessage(object):

def __init__(self, fail=False):
self.fail = fail


class _FauxSubscription(object):

def __init__(self, items):
self._items = items
self._mapping = dict(items)
self._acknowledged = set()

def pull(self, return_immediately=False, max_messages=1, client=None):
self._return_immediately = return_immediately
self._max_messages = max_messages
self._client = client
return self._items

def acknowledge(self, ack_ids, client=None):
self._ack_client = client
for ack_id in ack_ids:
message = self._mapping[ack_id]
assert not message.fail
self._acknowledged.add(ack_id)