Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions gcloud/pubsub/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
"""Define API Topics."""

import base64
import datetime

import pytz

_RFC3339_MICROS = '%Y-%m-%dT%H:%M:%S.%fZ'


class Message(object):
Expand Down Expand Up @@ -44,6 +49,24 @@ def attributes(self):
self._attributes = {}
return self._attributes

@property
def timestamp(self):
"""Return sortable timestamp from attributes, if passed.

Allows sorting messages in publication order (assuming consistent
clocks across all publishers).

:rtype: datetime
:returns: timestamp (in UTC timezone) parsed from RFC 3339 timestamp
:raises: ValueError if timestamp not in ``attributes``, or if it does
not match the RFC 3339 format.
"""
stamp = self.attributes.get('timestamp')
if stamp is None:
raise ValueError('No timestamp')

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

return datetime.datetime.strptime(stamp, _RFC3339_MICROS).replace(
tzinfo=pytz.UTC)

This comment was marked as spam.

This comment was marked as spam.


@classmethod
def from_api_repr(cls, api_repr):
"""Factory: construct message from API representation.
Expand Down
36 changes: 36 additions & 0 deletions gcloud/pubsub/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,39 @@ def test_from_api_repr_w_attributes(self):
self.assertEqual(message.data, DATA)
self.assertEqual(message.message_id, MESSAGE_ID)
self.assertEqual(message.attributes, ATTRS)

def test_timestamp_no_attributes(self):
DATA = b'DEADBEEF'
MESSAGE_ID = b'12345'
message = self._makeOne(data=DATA, message_id=MESSAGE_ID)

def _to_fail():
return message.timestamp

self.assertRaises(ValueError, _to_fail)

def test_timestamp_wo_timestamp_in_attributes(self):
DATA = b'DEADBEEF'
MESSAGE_ID = b'12345'
ATTRS = {'a': 'b'}
message = self._makeOne(data=DATA, message_id=MESSAGE_ID,
attributes=ATTRS)

def _to_fail():
return message.timestamp

self.assertRaises(ValueError, _to_fail)

def test_timestamp_w_timestamp_in_attributes(self):
from datetime import datetime
from pytz import utc
from gcloud.pubsub.message import _RFC3339_MICROS
DATA = b'DEADBEEF'
MESSAGE_ID = b'12345'
TIMESTAMP = '2015-04-10T18:42:27.131956Z'
naive = datetime.strptime(TIMESTAMP, _RFC3339_MICROS)
timestamp = naive.replace(tzinfo=utc)
ATTRS = {'timestamp': TIMESTAMP}
message = self._makeOne(data=DATA, message_id=MESSAGE_ID,
attributes=ATTRS)
self.assertEqual(message.timestamp, timestamp)
25 changes: 17 additions & 8 deletions regression/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_list_subscriptions(self):

def test_message_pull_mode_e2e(self):
TOPIC_NAME = 'subscribe-me'
topic = Topic(TOPIC_NAME)
topic = Topic(TOPIC_NAME, timestamp_messages=True)
self.assertFalse(topic.exists())
topic.create()
self.to_delete.append(topic)
Expand All @@ -113,14 +113,23 @@ def test_message_pull_mode_e2e(self):
subscription.create()
self.to_delete.append(subscription)

MESSAGE = b'MESSAGE'
EXTRA = 'EXTRA'
topic.publish(MESSAGE, extra=EXTRA)
MESSAGE_1 = b'MESSAGE ONE'
MESSAGE_2 = b'MESSAGE ONE'
EXTRA_1 = 'EXTRA 1'
EXTRA_2 = 'EXTRA 2'
topic.publish(MESSAGE_1, extra=EXTRA_1)
topic.publish(MESSAGE_2, extra=EXTRA_2)

received = subscription.pull()
received = subscription.pull(max_messages=2)
ack_ids = [recv[0] for recv in received]
subscription.acknowledge(ack_ids)
messages = [recv[1] for recv in received]
message, = messages
self.assertEqual(message.data, MESSAGE)
self.assertEqual(message.attributes, {'extra': EXTRA})

def _by_timestamp(message):
return message.timestamp

message1, message2 = sorted(messages, key=_by_timestamp)
self.assertEqual(message1.data, MESSAGE_1)
self.assertEqual(message1.attributes['extra'], EXTRA_1)
self.assertEqual(message2.data, MESSAGE_2)
self.assertEqual(message2.attributes['extra'], EXTRA_2)