From 7a6afaa3ac02527bea3e17d4847d639b8b8c805f Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Thu, 26 Jan 2017 13:34:10 -0800 Subject: [PATCH 1/4] Pubsub batch autocommitting. This PR adds some functionality to the Batch object: * The ability to specify `max_messages` and have the batch automatically call `commit` when the number of messages gets that high. * The ability to specify `max_interval` and have the batch automatically commit when a publish occurs and the batch is at least as old as the specified interval. This is one of two changes requested by the PubSub team. --- pubsub/google/cloud/pubsub/topic.py | 46 ++++++++++++++++++- pubsub/unit_tests/test_topic.py | 71 +++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/pubsub/google/cloud/pubsub/topic.py b/pubsub/google/cloud/pubsub/topic.py index 12c6c3a68450..15e400b4a620 100644 --- a/pubsub/google/cloud/pubsub/topic.py +++ b/pubsub/google/cloud/pubsub/topic.py @@ -14,6 +14,8 @@ """Define API Topics.""" +import time + from google.cloud._helpers import _datetime_to_rfc3339 from google.cloud._helpers import _NOW from google.cloud.exceptions import NotFound @@ -408,15 +410,40 @@ class Batch(object): :type topic: :class:`google.cloud.pubsub.topic.Topic` :param topic: the topic being published - :type client: :class:`google.cloud.pubsub.client.Client` :param client: The client to use. + :type client: :class:`google.cloud.pubsub.client.Client` + + :param max_interval: The maximum interval, in seconds, before the batch + will automatically commit. Note that this does not + run a background loop; it just checks when each + message is published. Therefore, this is intended + for situations where messages are published at + reasonably regular intervals. Defaults to infinity + (off). + :type max_interval: float + + :param max_messages: The maximum number of messages to hold in the batch + before automatically commiting. Defaults to infinity + (off). + :type max_messages: float """ - def __init__(self, topic, client): + INFINITY = float('inf') + + def __init__(self, topic, client, max_interval=INFINITY, + max_messages=INFINITY): self.topic = topic self.messages = [] self.message_ids = [] self.client = client + # Set the autocommit rules. If the interval or number of messages + # is exceeded, then the .publish() method will imply a commit. + self._max_interval = max_interval + self._max_messages = max_messages + + # Set the initial starting timestamp (used against the interval). + self._start_timestamp = float(time.time()) + def __enter__(self): return self @@ -435,12 +462,27 @@ def publish(self, message, **attrs): :type attrs: dict (string -> string) :param attrs: key-value pairs to send as message attributes + + :rtype: None + :returns: None """ self.topic._timestamp_message(attrs) self.messages.append( {'data': message, 'attributes': attrs}) + # If too much time has elapsed since the first message + # was added, autocommit. + if self._max_interval < self.INFINITY: + if float(time.time()) - self._start_timestamp > self._max_interval: + self._start_timestamp = float(time.time()) + return self.commit() + + # If the number of messages on the list is greater than the + # maximum allowed, autocommit (with the batch's client). + if len(self.messages) >= self._max_messages: + return self.commit() + def commit(self, client=None): """Send saved messages as a single API call. diff --git a/pubsub/unit_tests/test_topic.py b/pubsub/unit_tests/test_topic.py index 5009e53a0a89..6a41323d0657 100644 --- a/pubsub/unit_tests/test_topic.py +++ b/pubsub/unit_tests/test_topic.py @@ -779,6 +779,77 @@ def test_context_mgr_failure(self): self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2]) self.assertEqual(getattr(api, '_topic_published', self), self) + def test_message_count_autocommit(self): + """Establish that if the batch is assigned to take a maximum + number of messages, that it commits when it reaches that maximum. + """ + client = _Client(project='PROJECT') + topic = _Topic(name='TOPIC') + + # Track commits, but do not perform them. + Batch = self._get_target_class() + with mock.patch.object(Batch, 'commit') as commit: + with self._make_one(topic, client=client, max_messages=5) as batch: + self.assertIsInstance(batch, self._get_target_class()) + + # Publish four messages and establish that the batch does + # not commit. + for i in range(0, 4): + batch.publish({ + 'attributes': {}, + 'data': 'Batch message %d.' % i, + }) + commit.assert_not_called() + + # Publish a fifth message and observe the commit. + batch.publish({ + 'attributes': {}, + 'data': 'The final call to trigger a commit!', + }) + commit.assert_called_once_with() + + # There should be a second commit after the context manager + # exits. + self.assertEqual(commit.call_count, 2) + + @mock.patch('time.time') + def test_message_time_autocommit(self, mock_time): + """Establish that if the batch is sufficiently old, that it commits + the next time it receives a publish. + """ + client = _Client(project='PROJECT') + topic = _Topic(name='TOPIC') + + # Track commits, but do not perform them. + Batch = self._get_target_class() + with mock.patch.object(Batch, 'commit') as commit: + mock_time.return_value = 0.0 + with self._make_one(topic, client=client, max_interval=5) as batch: + self.assertIsInstance(batch, self._get_target_class()) + + # Publish some messages and establish that the batch does + # not commit. + for i in range(0, 10): + batch.publish({ + 'attributes': {}, + 'data': 'Batch message %d.' % i, + }) + commit.assert_not_called() + + # Move time ahead so that this batch is too old. + mock_time.return_value = 10.0 + + # Publish another message and observe the commit. + batch.publish({ + 'attributes': {}, + 'data': 'The final call to trigger a commit!', + }) + commit.assert_called_once_with() + + # There should be a second commit after the context manager + # exits. + self.assertEqual(commit.call_count, 2) + class _FauxPublisherAPI(object): _api_called = 0 From eab0b7b04baf7bf0b69c5b4ac644b70673fee723 Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Fri, 27 Jan 2017 09:35:13 -0800 Subject: [PATCH 2/4] Addressing comments from @dhermes. --- pubsub/google/cloud/pubsub/topic.py | 16 ++++++++-------- pubsub/unit_tests/test_topic.py | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pubsub/google/cloud/pubsub/topic.py b/pubsub/google/cloud/pubsub/topic.py index 15e400b4a620..36b9004ad36c 100644 --- a/pubsub/google/cloud/pubsub/topic.py +++ b/pubsub/google/cloud/pubsub/topic.py @@ -442,7 +442,7 @@ def __init__(self, topic, client, max_interval=INFINITY, self._max_messages = max_messages # Set the initial starting timestamp (used against the interval). - self._start_timestamp = float(time.time()) + self._start_timestamp = time.time() def __enter__(self): return self @@ -462,9 +462,6 @@ def publish(self, message, **attrs): :type attrs: dict (string -> string) :param attrs: key-value pairs to send as message attributes - - :rtype: None - :returns: None """ self.topic._timestamp_message(attrs) self.messages.append( @@ -474,14 +471,17 @@ def publish(self, message, **attrs): # If too much time has elapsed since the first message # was added, autocommit. if self._max_interval < self.INFINITY: - if float(time.time()) - self._start_timestamp > self._max_interval: - self._start_timestamp = float(time.time()) - return self.commit() + now = time.time() + if now - self._start_timestamp > self._max_interval: + self.commit() + self._start_timestamp = now + return # If the number of messages on the list is greater than the # maximum allowed, autocommit (with the batch's client). if len(self.messages) >= self._max_messages: - return self.commit() + self.commit() + return def commit(self, client=None): """Send saved messages as a single API call. diff --git a/pubsub/unit_tests/test_topic.py b/pubsub/unit_tests/test_topic.py index 6a41323d0657..f264b4dcd036 100644 --- a/pubsub/unit_tests/test_topic.py +++ b/pubsub/unit_tests/test_topic.py @@ -790,14 +790,14 @@ def test_message_count_autocommit(self): Batch = self._get_target_class() with mock.patch.object(Batch, 'commit') as commit: with self._make_one(topic, client=client, max_messages=5) as batch: - self.assertIsInstance(batch, self._get_target_class()) + self.assertIsInstance(batch, Batch) # Publish four messages and establish that the batch does # not commit. for i in range(0, 4): batch.publish({ 'attributes': {}, - 'data': 'Batch message %d.' % i, + 'data': 'Batch message %d.' % (i,), }) commit.assert_not_called() @@ -825,14 +825,14 @@ def test_message_time_autocommit(self, mock_time): with mock.patch.object(Batch, 'commit') as commit: mock_time.return_value = 0.0 with self._make_one(topic, client=client, max_interval=5) as batch: - self.assertIsInstance(batch, self._get_target_class()) + self.assertIsInstance(batch, Batch) # Publish some messages and establish that the batch does # not commit. for i in range(0, 10): batch.publish({ 'attributes': {}, - 'data': 'Batch message %d.' % i, + 'data': 'Batch message %d.' % (i,), }) commit.assert_not_called() From d35b49067ab052ea3fe0346250efbd2e6de5da8c Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Fri, 27 Jan 2017 10:06:30 -0800 Subject: [PATCH 3/4] Remove unneeded -lt check @dhermes. --- pubsub/google/cloud/pubsub/topic.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pubsub/google/cloud/pubsub/topic.py b/pubsub/google/cloud/pubsub/topic.py index 36b9004ad36c..4183ff68bf33 100644 --- a/pubsub/google/cloud/pubsub/topic.py +++ b/pubsub/google/cloud/pubsub/topic.py @@ -470,12 +470,11 @@ def publish(self, message, **attrs): # If too much time has elapsed since the first message # was added, autocommit. - if self._max_interval < self.INFINITY: - now = time.time() - if now - self._start_timestamp > self._max_interval: - self.commit() - self._start_timestamp = now - return + now = time.time() + if now - self._start_timestamp > self._max_interval: + self.commit() + self._start_timestamp = now + return # If the number of messages on the list is greater than the # maximum allowed, autocommit (with the batch's client). From 7cc6ea7cbeca28d682c88a7a32141dc234624be1 Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Fri, 27 Jan 2017 10:18:39 -0800 Subject: [PATCH 4/4] Make INFINITY have a leading underscore. @dhermes --- pubsub/google/cloud/pubsub/topic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pubsub/google/cloud/pubsub/topic.py b/pubsub/google/cloud/pubsub/topic.py index 4183ff68bf33..0dd5b4fda038 100644 --- a/pubsub/google/cloud/pubsub/topic.py +++ b/pubsub/google/cloud/pubsub/topic.py @@ -427,10 +427,10 @@ class Batch(object): (off). :type max_messages: float """ - INFINITY = float('inf') + _INFINITY = float('inf') - def __init__(self, topic, client, max_interval=INFINITY, - max_messages=INFINITY): + def __init__(self, topic, client, max_interval=_INFINITY, + max_messages=_INFINITY): self.topic = topic self.messages = [] self.message_ids = []