Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/pubsub-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,28 @@ Fetch messages for a pull subscription without blocking (none pending):
>>> messages = [recv[1] for recv in received]
>>> [message.message_id for message in messages]
[]


Fetch pending messages, acknowledging those whose processing doesn't raise an
error:

.. doctest::

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name')
>>> with topic.batch() as batch:
... batch.publish('this is the first message_payload')
... batch.publish('this is the second message_payload',
... attr1='value1', attr2='value2')
>>> for ack_id, message in subscription.pull(max_messages=10): # API request
... with subscription.auto_ack(subscription, ack_id, message):
... do_something_with(message)

.. note::

One ``acknowledge`` API request occurs at the end of each ``with`` block,
passing only the ``ack_id`` of the message just processed. If
``do_something_with`` raises an exception, the ``acknowledge`` API
request is skipped.
40 changes: 40 additions & 0 deletions gcloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ def path(self):
project = self.topic.project
return '/projects/%s/subscriptions/%s' % (project, self.name)

def auto_ack(self, ack_id, message):
""":class:`AutoAck` factory

:type ack_id: string
:param ack_id: the ID for acknowledging the message

:type message: :class:`gcloud.pubsub.message.Message`
:param message: the message to be acknowleged

:rtype: :class:`AutoAck`
:returns: the instance created for the given ``ack_id`` and ``message``
"""
return AutoAck(self, ack_id, message)

def _require_client(self, client):
"""Check client or verify over-ride.

Expand Down Expand Up @@ -263,3 +277,29 @@ def delete(self, client=None):
"""
client = self._require_client(client)
client.connection.api_request(method='DELETE', path=self.path)


class AutoAck(object):
"""Automatically acknowlege a single message if processed without error.

:type subscription: :class:`Subscription`
:param subscription: the subscription from which the message was pulled,
and to which it must be acknowledged.

:type ack_id: string
:param ack_id: the ID for acknowledging the message

:type message: :class:`gcloud.pubsub.message.Message`
:param message: the message to be acknowleged
"""
def __init__(self, subscription, ack_id, message):
self.subscription = subscription
self.ack_id = ack_id
self.message = message

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.subscription.acknowledge([self.ack_id])
66 changes: 66 additions & 0 deletions gcloud/pubsub/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@ def test_from_api_repr_w_topics_w_topic_match(self):
self.assertEqual(subscription.ack_deadline, DEADLINE)
self.assertEqual(subscription.push_endpoint, ENDPOINT)

def test_autoack(self):
from gcloud.pubsub.subscription import AutoAck
SUB_NAME = 'sub_name'
ACK_ID = 'ACK_ID'
TOPIC = object()
MESSAGE = object
subscription = self._makeOne(SUB_NAME, TOPIC)
auto_ack = subscription.auto_ack(ACK_ID, MESSAGE)
self.assertTrue(isinstance(auto_ack, AutoAck))
self.assertTrue(auto_ack.subscription is subscription)
self.assertEqual(auto_ack.ack_id, ACK_ID)
self.assertTrue(auto_ack.message is MESSAGE)

def test_create_pull_wo_ack_deadline_w_bound_client(self):
PROJECT = 'PROJECT'
SUB_NAME = 'sub_name'
Expand Down Expand Up @@ -485,6 +498,48 @@ def test_delete_w_alternate_client(self):
self.assertEqual(req['path'], '/%s' % SUB_PATH)


class TestAutoAck(unittest2.TestCase):

def _getTargetClass(self):
from gcloud.pubsub.subscription import AutoAck
return AutoAck

def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)

def test_ctor(self):
ACK_ID = 'ACK_ID'
MESSAGE = object()
subscription = _FauxSubscription()
auto_ack = self._makeOne(subscription, ACK_ID, MESSAGE)
self.assertTrue(auto_ack.subscription is subscription)
self.assertEqual(auto_ack.ack_id, ACK_ID)
self.assertTrue(auto_ack.message is MESSAGE)

def test_as_context_mgr_no_error(self):
ACK_ID = 'ACK_ID'
MESSAGE = object()
subscription = _FauxSubscription()

with self._makeOne(subscription, ACK_ID, MESSAGE):
pass

self.assertEqual(list(subscription._acknowledged), [ACK_ID])
self.assertEqual(subscription._ack_client, None)

def test_as_context_mgr_w_error(self):
ACK_ID = 'ACK_ID'
MESSAGE = object()
subscription = _FauxSubscription()

with self.assertRaises(ValueError):
with self._makeOne(subscription, ACK_ID, MESSAGE):
raise ValueError()

self.assertEqual(list(subscription._acknowledged), [])
self.assertTrue(getattr(subscription, '_ack_client', self) is self)


class _Connection(object):

def __init__(self, *responses):
Expand Down Expand Up @@ -522,3 +577,14 @@ def __init__(self, project, connection=None):
def topic(self, name, timestamp_messages=False):
from gcloud.pubsub.topic import Topic
return Topic(name, client=self, timestamp_messages=timestamp_messages)


class _FauxSubscription(object):

def __init__(self):
self._acknowledged = set()

def acknowledge(self, ack_ids, client=None):
self._ack_client = client
for ack_id in ack_ids:
self._acknowledged.add(ack_id)