diff --git a/gcloud/pubsub/_implicit_environ.py b/gcloud/pubsub/_implicit_environ.py index 649b6f6528b8..9e8b90cbeb67 100644 --- a/gcloud/pubsub/_implicit_environ.py +++ b/gcloud/pubsub/_implicit_environ.py @@ -38,4 +38,24 @@ def get_default_connection(): return _DEFAULTS.connection +def _require_connection(connection=None): + """Infer a connection from the environment, if not passed explicitly. + + :type connection: :class:`gcloud.pubsub.connection.Connection` + :param connection: Optional. + + :rtype: :class:`gcloud.pubsub.connection.Connection` + :returns: A connection based on the current environment. + :raises: :class:`EnvironmentError` if ``connection`` is ``None``, and + cannot be inferred from the environment. + """ + if connection is None: + connection = get_default_connection() + + if connection is None: + raise EnvironmentError('Connection could not be inferred.') + + return connection + + _DEFAULTS = _DefaultsContainer() diff --git a/gcloud/pubsub/api.py b/gcloud/pubsub/api.py index 4816a5a1d6a5..6849e17a98d9 100644 --- a/gcloud/pubsub/api.py +++ b/gcloud/pubsub/api.py @@ -15,7 +15,7 @@ """Define API functions (not bound to classes).""" from gcloud._helpers import get_default_project -from gcloud.pubsub._implicit_environ import get_default_connection +from gcloud.pubsub._implicit_environ import _require_connection from gcloud.pubsub.subscription import Subscription from gcloud.pubsub.topic import Topic @@ -53,8 +53,7 @@ def list_topics(page_size=None, page_token=None, if project is None: project = get_default_project() - if connection is None: - connection = get_default_connection() + connection = _require_connection(connection) params = {} @@ -66,8 +65,7 @@ def list_topics(page_size=None, page_token=None, path = '/projects/%s/topics' % project resp = connection.api_request(method='GET', path=path, query_params=params) - topics = [Topic.from_api_repr(resource, connection) - for resource in resp['topics']] + topics = [Topic.from_api_repr(resource) for resource in resp['topics']] return topics, resp.get('nextPageToken') @@ -110,8 +108,7 @@ def list_subscriptions(page_size=None, page_token=None, topic_name=None, if project is None: project = get_default_project() - if connection is None: - connection = get_default_connection() + connection = _require_connection(connection) params = {} @@ -128,8 +125,6 @@ def list_subscriptions(page_size=None, page_token=None, topic_name=None, resp = connection.api_request(method='GET', path=path, query_params=params) topics = {} - subscriptions = [Subscription.from_api_repr(resource, - connection=connection, - topics=topics) + subscriptions = [Subscription.from_api_repr(resource, topics=topics) for resource in resp['subscriptions']] return subscriptions, resp.get('nextPageToken') diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index e526892bd4f2..8ecf1c3f76e8 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -17,6 +17,7 @@ from gcloud.exceptions import NotFound from gcloud.pubsub.message import Message from gcloud.pubsub.topic import Topic +from gcloud.pubsub._implicit_environ import _require_connection class Subscription(object): @@ -46,17 +47,12 @@ def __init__(self, name, topic, ack_deadline=None, push_endpoint=None): self.push_endpoint = push_endpoint @classmethod - def from_api_repr(cls, resource, connection=None, topics=None): + def from_api_repr(cls, resource, topics=None): """Factory: construct a topic given its API representation :type resource: dict :param resource: topic resource representation returned from the API - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the default inferred from the - environment. - :type topics: dict or None :param topics: A mapping of topic names -> topics. If not passed, the subscription will have a newly-created topic. @@ -68,8 +64,7 @@ def from_api_repr(cls, resource, connection=None, topics=None): t_name = resource['topic'] topic = topics.get(t_name) if topic is None: - topic = topics[t_name] = Topic.from_api_repr({'name': t_name}, - connection) + topic = topics[t_name] = Topic.from_api_repr({'name': t_name}) _, _, _, name = resource['name'].split('/') ack_deadline = resource.get('ackDeadlineSeconds') push_config = resource.get('pushConfig', {}) @@ -82,11 +77,15 @@ def path(self): project = self.topic.project return '/projects/%s/subscriptions/%s' % (project, self.name) - def create(self): + def create(self, connection=None): """API call: create the subscription via a PUT request See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ data = {'topic': self.topic.full_name} @@ -96,36 +95,44 @@ def create(self): if self.push_endpoint is not None: data['pushConfig'] = {'pushEndpoint': self.push_endpoint} - conn = self.topic.connection - conn.api_request(method='PUT', path=self.path, data=data) + connection = _require_connection(connection) + connection.api_request(method='PUT', path=self.path, data=data) - def exists(self): + def exists(self, connection=None): """API call: test existence of the subscription via a GET request See https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ - conn = self.topic.connection + connection = _require_connection(connection) try: - conn.api_request(method='GET', path=self.path) + connection.api_request(method='GET', path=self.path) except NotFound: return False else: return True - def reload(self): + def reload(self, connection=None): """API call: sync local subscription configuration via a GET request See https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ - conn = self.topic.connection - data = conn.api_request(method='GET', path=self.path) + connection = _require_connection(connection) + data = connection.api_request(method='GET', path=self.path) self.ack_deadline = data.get('ackDeadline') push_config = data.get('pushConfig', {}) self.push_endpoint = push_config.get('pushEndpoint') - def modify_push_configuration(self, push_endpoint): + def modify_push_configuration(self, push_endpoint, connection=None): """API call: update the push endpoint for the subscription. See: @@ -135,18 +142,22 @@ def modify_push_configuration(self, push_endpoint): :param push_endpoint: URL to which messages will be pushed by the back-end. If None, the application must pull messages. + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ + connection = _require_connection(connection) data = {} config = data['pushConfig'] = {} if push_endpoint is not None: config['pushEndpoint'] = push_endpoint - conn = self.topic.connection - conn.api_request(method='POST', - path='%s:modifyPushConfig' % self.path, - data=data) + connection.api_request(method='POST', + path='%s:modifyPushConfig' % self.path, + data=data) self.push_endpoint = push_endpoint - def pull(self, return_immediately=False, max_messages=1): + def pull(self, return_immediately=False, max_messages=1, connection=None): """API call: retrieve messages for the subscription. See: @@ -161,21 +172,25 @@ def pull(self, return_immediately=False, max_messages=1): :type max_messages: int :param max_messages: the maximum number of messages to return. + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. + :rtype: list of (ack_id, message) tuples :returns: sequence of tuples: ``ack_id`` is the ID to be used in a subsequent call to :meth:`acknowledge`, and ``message`` is an instance of :class:`gcloud.pubsub.message.Message`. """ + connection = _require_connection(connection) data = {'returnImmediately': return_immediately, 'maxMessages': max_messages} - conn = self.topic.connection - response = conn.api_request(method='POST', - path='%s:pull' % self.path, - data=data) + response = connection.api_request(method='POST', + path='%s:pull' % self.path, + data=data) return [(info['ackId'], Message.from_api_repr(info['message'])) for info in response['receivedMessages']] - def acknowledge(self, ack_ids): + def acknowledge(self, ack_ids, connection=None): """API call: acknowledge retrieved messages for the subscription. See: @@ -183,14 +198,18 @@ def acknowledge(self, ack_ids): :type ack_ids: list of string :param ack_ids: ack IDs of messages being acknowledged + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ + connection = _require_connection(connection) data = {'ackIds': ack_ids} - conn = self.topic.connection - conn.api_request(method='POST', - path='%s:acknowledge' % self.path, - data=data) + connection.api_request(method='POST', + path='%s:acknowledge' % self.path, + data=data) - def modify_ack_deadline(self, ack_id, ack_deadline): + def modify_ack_deadline(self, ack_id, ack_deadline, connection=None): """API call: update acknowledgement deadline for a retrieved message. See: @@ -201,18 +220,26 @@ def modify_ack_deadline(self, ack_id, ack_deadline): :type ack_deadline: int :param ack_deadline: new deadline for the message, in seconds + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ + connection = _require_connection(connection) data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline} - conn = self.topic.connection - conn.api_request(method='POST', - path='%s:modifyAckDeadline' % self.path, - data=data) + connection.api_request(method='POST', + path='%s:modifyAckDeadline' % self.path, + data=data) - def delete(self): + def delete(self, connection=None): """API call: delete the subscription via a DELETE request. See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the topic's connection. """ - conn = self.topic.connection - conn.api_request(method='DELETE', path=self.path) + connection = _require_connection(connection) + connection.api_request(method='DELETE', path=self.path) diff --git a/gcloud/pubsub/test__implicit_environ.py b/gcloud/pubsub/test__implicit_environ.py index 6f1f0a85f85f..4e03cb205886 100644 --- a/gcloud/pubsub/test__implicit_environ.py +++ b/gcloud/pubsub/test__implicit_environ.py @@ -23,3 +23,35 @@ def _callFUT(self): def test_wo_override(self): self.assertTrue(self._callFUT() is None) + + +class Test__require_connection(unittest2.TestCase): + + def _callFUT(self, connection=None): + from gcloud.pubsub._implicit_environ import _require_connection + return _require_connection(connection=connection) + + def _monkey(self, connection): + from gcloud.pubsub._testing import _monkey_defaults + return _monkey_defaults(connection=connection) + + def test_implicit_unset(self): + with self._monkey(None): + with self.assertRaises(EnvironmentError): + self._callFUT() + + def test_implicit_unset_passed_explicitly(self): + CONNECTION = object() + with self._monkey(None): + self.assertTrue(self._callFUT(CONNECTION) is CONNECTION) + + def test_implicit_set(self): + IMPLICIT_CONNECTION = object() + with self._monkey(IMPLICIT_CONNECTION): + self.assertTrue(self._callFUT() is IMPLICIT_CONNECTION) + + def test_implicit_set_passed_explicitly(self): + IMPLICIT_CONNECTION = object() + CONNECTION = object() + with self._monkey(IMPLICIT_CONNECTION): + self.assertTrue(self._callFUT(CONNECTION) is CONNECTION) diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index d074bc168564..e3fb7edc7735 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -44,9 +44,8 @@ def test_ctor_explicit(self): self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) - def test_from_api_repr_no_topics_no_connection(self): + def test_from_api_repr_no_topics(self): from gcloud.pubsub.topic import Topic - from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) @@ -58,16 +57,13 @@ def test_from_api_repr_no_topics_no_connection(self): 'name': SUB_PATH, 'ackDeadlineSeconds': DEADLINE, 'pushConfig': {'pushEndpoint': ENDPOINT}} - conn = _Connection() klass = self._getTargetClass() - with _monkey_defaults(connection=conn): - subscription = klass.from_api_repr(resource, connection=conn) + subscription = klass.from_api_repr(resource) self.assertEqual(subscription.name, SUB_NAME) topic = subscription.topic self.assertTrue(isinstance(topic, Topic)) self.assertEqual(topic.name, TOPIC_NAME) self.assertEqual(topic.project, PROJECT) - self.assertTrue(topic.connection is conn) self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) @@ -84,18 +80,15 @@ def test_from_api_repr_w_topics_no_topic_match(self): 'name': SUB_PATH, 'ackDeadlineSeconds': DEADLINE, 'pushConfig': {'pushEndpoint': ENDPOINT}} - conn = _Connection() topics = {} klass = self._getTargetClass() - subscription = klass.from_api_repr(resource, connection=conn, - topics=topics) + subscription = klass.from_api_repr(resource, topics=topics) self.assertEqual(subscription.name, SUB_NAME) topic = subscription.topic self.assertTrue(isinstance(topic, Topic)) self.assertTrue(topic is topics[TOPIC_PATH]) self.assertEqual(topic.name, TOPIC_NAME) self.assertEqual(topic.project, PROJECT) - self.assertTrue(topic.connection is conn) self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) @@ -120,7 +113,8 @@ def test_from_api_repr_w_topics_w_topic_match(self): self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) - def test_create_pull_wo_ack_deadline(self): + def test_create_pull_wo_ack_deadline_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -128,16 +122,17 @@ def test_create_pull_wo_ack_deadline(self): TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) BODY = {'topic': TOPIC_PATH} conn = _Connection({'name': SUB_PATH}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - subscription.create() + with _monkey_defaults(connection=conn): + subscription.create() self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'PUT') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req['data'], BODY) - def test_create_push_w_ack_deadline(self): + def test_create_push_w_ack_deadline_w_explicit_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -149,47 +144,73 @@ def test_create_push_w_ack_deadline(self): 'ackDeadline': DEADLINE, 'pushConfig': {'pushEndpoint': ENDPOINT}} conn = _Connection({'name': SUB_PATH}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic, DEADLINE, ENDPOINT) - subscription.create() + subscription.create(connection=conn) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'PUT') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req['data'], BODY) - def test_exists_miss(self): + def test_exists_miss_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' conn = _Connection() - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - self.assertFalse(subscription.exists()) + with _monkey_defaults(connection=conn): + self.assertFalse(subscription.exists()) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req.get('query_params'), None) - def test_exists_hit(self): + def test_exists_hit_w_explicit_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'name': SUB_PATH, 'topic': TOPIC_PATH}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - self.assertTrue(subscription.exists()) + self.assertTrue(subscription.exists(connection=conn)) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req.get('query_params'), None) - def test_reload(self): + def test_reload_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + conn = _Connection({'name': SUB_PATH, + 'topic': TOPIC_PATH, + 'ackDeadline': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}}) + topic = _Topic(TOPIC_NAME, project=PROJECT) + subscription = self._makeOne(SUB_NAME, topic) + with _monkey_defaults(connection=conn): + subscription.reload() + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + + def test_reload_w_explicit_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -201,9 +222,9 @@ def test_reload(self): 'topic': TOPIC_PATH, 'ackDeadline': DEADLINE, 'pushConfig': {'pushEndpoint': ENDPOINT}}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - subscription.reload() + subscription.reload(connection=conn) self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) self.assertEqual(len(conn._requested), 1) @@ -211,16 +232,18 @@ def test_reload(self): self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % SUB_PATH) - def test_modify_push_config_w_endpoint(self): + def test_modify_push_config_w_endpoint_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' ENDPOINT = 'https://api.example.com/push' conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - subscription.modify_push_configuration(push_endpoint=ENDPOINT) + with _monkey_defaults(connection=conn): + subscription.modify_push_configuration(push_endpoint=ENDPOINT) self.assertEqual(subscription.push_endpoint, ENDPOINT) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -229,16 +252,17 @@ def test_modify_push_config_w_endpoint(self): self.assertEqual(req['data'], {'pushConfig': {'pushEndpoint': ENDPOINT}}) - def test_modify_push_config_wo_endpoint(self): + def test_modify_push_config_wo_endpoint_w_explicit_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' ENDPOINT = 'https://api.example.com/push' conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic, push_endpoint=ENDPOINT) - subscription.modify_push_configuration(push_endpoint=None) + subscription.modify_push_configuration(push_endpoint=None, + connection=conn) self.assertEqual(subscription.push_endpoint, None) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -246,9 +270,10 @@ def test_modify_push_config_wo_endpoint(self): self.assertEqual(req['path'], '/%s:modifyPushConfig' % SUB_PATH) self.assertEqual(req['data'], {'pushConfig': {}}) - def test_pull_wo_return_immediately_wo_max_messages(self): + def test_pull_wo_return_immediately_max_messages_w_implicit_conn(self): import base64 from gcloud.pubsub.message import Message + from gcloud.pubsub._testing import _monkey_defaults PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -260,9 +285,10 @@ def test_pull_wo_return_immediately_wo_max_messages(self): MESSAGE = {'messageId': MSG_ID, 'data': B64} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} conn = _Connection({'receivedMessages': [REC_MESSAGE]}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - pulled = subscription.pull() + with _monkey_defaults(connection=conn): + pulled = subscription.pull() self.assertEqual(len(pulled), 1) ack_id, message = pulled[0] self.assertEqual(ack_id, ACK_ID) @@ -277,7 +303,7 @@ def test_pull_wo_return_immediately_wo_max_messages(self): self.assertEqual(req['data'], {'returnImmediately': False, 'maxMessages': 1}) - def test_pull_w_return_immediately_w_max_messages(self): + def test_pull_w_return_immediately_w_max_messages_w_explicit_conn(self): import base64 from gcloud.pubsub.message import Message PROJECT = 'PROJECT' @@ -291,9 +317,10 @@ def test_pull_w_return_immediately_w_max_messages(self): MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} conn = _Connection({'receivedMessages': [REC_MESSAGE]}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - pulled = subscription.pull(return_immediately=True, max_messages=3) + pulled = subscription.pull(return_immediately=True, max_messages=3, + connection=conn) self.assertEqual(len(pulled), 1) ack_id, message = pulled[0] self.assertEqual(ack_id, ACK_ID) @@ -308,7 +335,26 @@ def test_pull_w_return_immediately_w_max_messages(self): self.assertEqual(req['data'], {'returnImmediately': True, 'maxMessages': 3}) - def test_acknowledge(self): + def test_acknowledge_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + conn = _Connection({}) + topic = _Topic(TOPIC_NAME, project=PROJECT) + subscription = self._makeOne(SUB_NAME, topic) + with _monkey_defaults(connection=conn): + subscription.acknowledge([ACK_ID1, ACK_ID2]) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH) + self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) + + def test_acknowledge_w_explicit_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -316,16 +362,36 @@ def test_acknowledge(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - subscription.acknowledge([ACK_ID1, ACK_ID2]) + subscription.acknowledge([ACK_ID1, ACK_ID2], connection=conn) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH) self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) - def test_modify_ack_deadline(self): + def test_modify_ack_deadline_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID = 'DEADBEEF' + DEADLINE = 42 + conn = _Connection({}) + topic = _Topic(TOPIC_NAME, project=PROJECT) + subscription = self._makeOne(SUB_NAME, topic) + with _monkey_defaults(connection=conn): + subscription.modify_ack_deadline(ACK_ID, DEADLINE) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:modifyAckDeadline' % SUB_PATH) + self.assertEqual(req['data'], + {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE}) + + def test_modify_ack_deadline_w_explicit_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -333,9 +399,9 @@ def test_modify_ack_deadline(self): ACK_ID = 'DEADBEEF' DEADLINE = 42 conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - subscription.modify_ack_deadline(ACK_ID, DEADLINE) + subscription.modify_ack_deadline(ACK_ID, DEADLINE, connection=conn) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') @@ -343,15 +409,31 @@ def test_modify_ack_deadline(self): self.assertEqual(req['data'], {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE}) - def test_delete(self): + def test_delete_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + conn = _Connection({}) + topic = _Topic(TOPIC_NAME, project=PROJECT) + subscription = self._makeOne(SUB_NAME, topic) + with _monkey_defaults(connection=conn): + subscription.delete() + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + + def test_delete_w_explicit_connection(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, project=PROJECT) subscription = self._makeOne(SUB_NAME, topic) - subscription.delete() + subscription.delete(connection=conn) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'DELETE') @@ -378,9 +460,8 @@ def api_request(self, **kw): class _Topic(object): - def __init__(self, name, project, connection): + def __init__(self, name, project): self.name = name self.project = project - self.connection = connection self.full_name = 'projects/%s/topics/%s' % (project, name) self.path = '/projects/%s/topics/%s' % (project, name) diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py index 932eb2f5229f..af90c2705c10 100644 --- a/gcloud/pubsub/test_topic.py +++ b/gcloud/pubsub/test_topic.py @@ -24,38 +24,31 @@ def _getTargetClass(self): def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) - def test_ctor_wo_inferred_project_or_connection(self): - from gcloud._testing import _monkey_defaults as _monkey_base_defaults - from gcloud.pubsub._testing import _monkey_defaults + def test_ctor_wo_inferred_project(self): + from gcloud._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' - conn = _Connection() - with _monkey_base_defaults(project=PROJECT): - with _monkey_defaults(connection=conn): - topic = self._makeOne(TOPIC_NAME) + with _monkey_defaults(project=PROJECT): + topic = self._makeOne(TOPIC_NAME) self.assertEqual(topic.name, TOPIC_NAME) self.assertEqual(topic.project, PROJECT) self.assertEqual(topic.full_name, 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) - self.assertTrue(topic.connection is conn) self.assertFalse(topic.timestamp_messages) - def test_ctor_w_explicit_project_connection_and_timestamp(self): + def test_ctor_w_explicit_project_and_timestamp(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' - conn = _Connection() topic = self._makeOne(TOPIC_NAME, project=PROJECT, - connection=conn, timestamp_messages=True) self.assertEqual(topic.name, TOPIC_NAME) self.assertEqual(topic.project, PROJECT) self.assertEqual(topic.full_name, 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) - self.assertTrue(topic.connection is conn) self.assertTrue(topic.timestamp_messages) - def test_from_api_repr_wo_connection(self): + def test_from_api_repr(self): from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' @@ -68,70 +61,61 @@ def test_from_api_repr_wo_connection(self): self.assertEqual(topic.name, TOPIC_NAME) self.assertEqual(topic.project, PROJECT) self.assertEqual(topic.full_name, PATH) - self.assertTrue(topic.connection is conn) - - def test_from_api_repr_w_connection(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - resource = {'name': PATH} - conn = object() - klass = self._getTargetClass() - topic = klass.from_api_repr(resource, connection=conn) - self.assertEqual(topic.name, TOPIC_NAME) - self.assertEqual(topic.project, PROJECT) - self.assertEqual(topic.full_name, PATH) - self.assertTrue(topic.connection is conn) - def test_create(self): + def test_create_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'name': PATH}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) - topic.create() + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + with _monkey_defaults(connection=conn): + topic.create() self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'PUT') self.assertEqual(req['path'], '/%s' % PATH) - def test_exists_miss(self): + def test_create_w_explicit_connection(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection() - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) - self.assertFalse(topic.exists()) + conn = _Connection({'name': PATH}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + topic.create(connection=conn) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] - self.assertEqual(req['method'], 'GET') + self.assertEqual(req['method'], 'PUT') self.assertEqual(req['path'], '/%s' % PATH) - def test_exists_hit(self): + def test_exists_miss_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'name': PATH}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) - self.assertTrue(topic.exists()) + conn = _Connection() + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + with _monkey_defaults(connection=conn): + self.assertFalse(topic.exists()) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % PATH) - def test_delete(self): + def test_exists_hit_w_explicit_connection(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) - topic.delete() + conn = _Connection({'name': PATH}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + self.assertTrue(topic.exists(connection=conn)) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] - self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % PATH) - def test_publish_single_bytes_wo_attrs(self): + def test_publish_single_bytes_wo_attrs_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults import base64 TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' @@ -142,8 +126,9 @@ def test_publish_single_bytes_wo_attrs(self): 'attributes': {}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) - msgid = topic.publish(PAYLOAD) + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + with _monkey_defaults(connection=conn): + msgid = topic.publish(PAYLOAD) self.assertEqual(msgid, MSGID) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -151,12 +136,13 @@ def test_publish_single_bytes_wo_attrs(self): self.assertEqual(req['path'], '/%s:publish' % PATH) self.assertEqual(req['data'], {'messages': [MESSAGE]}) - def test_publish_single_bytes_wo_attrs_w_add_timestamp(self): + def test_publish_single_bytes_wo_attrs_w_add_timestamp_explicit_conn(self): import base64 import datetime from gcloud.pubsub import topic as MUT from gcloud._helpers import _RFC3339_MICROS from gcloud._testing import _Monkey + from gcloud.pubsub._testing import _monkey_defaults NOW = datetime.datetime.utcnow() def _utcnow(): @@ -171,10 +157,11 @@ def _utcnow(): 'attributes': {'timestamp': NOW.strftime(_RFC3339_MICROS)}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn, + topic = self._makeOne(TOPIC_NAME, project=PROJECT, timestamp_messages=True) with _Monkey(MUT, _NOW=_utcnow): - msgid = topic.publish(PAYLOAD) + with _monkey_defaults(connection=conn): + msgid = topic.publish(PAYLOAD, connection=conn) self.assertEqual(msgid, MSGID) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -184,7 +171,7 @@ def _utcnow(): def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): import base64 - + from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PAYLOAD = b'This is the message text' @@ -195,9 +182,10 @@ def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): 'attributes': {'timestamp': OVERRIDE}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn, + topic = self._makeOne(TOPIC_NAME, project=PROJECT, timestamp_messages=True) - msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE) + with _monkey_defaults(connection=conn): + msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE) self.assertEqual(msgid, MSGID) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -207,6 +195,7 @@ def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): def test_publish_single_w_attrs(self): import base64 + from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PAYLOAD = b'This is the message text' @@ -216,8 +205,9 @@ def test_publish_single_w_attrs(self): 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) - msgid = topic.publish(PAYLOAD, attr1='value1', attr2='value2') + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + with _monkey_defaults(connection=conn): + msgid = topic.publish(PAYLOAD, attr1='value1', attr2='value2') self.assertEqual(msgid, MSGID) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -225,7 +215,37 @@ def test_publish_single_w_attrs(self): self.assertEqual(req['path'], '/%s:publish' % PATH) self.assertEqual(req['data'], {'messages': [MESSAGE]}) - def test_publish_multiple(self): + def test_publish_multiple_w_implicit_connection(self): + import base64 + from gcloud.pubsub._testing import _monkey_defaults + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + with _monkey_defaults(connection=conn): + with topic.batch() as batch: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_publish_multiple_w_explicit_connection(self): import base64 TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' @@ -235,14 +255,14 @@ def test_publish_multiple(self): B64_2 = base64.b64encode(PAYLOAD2) MSGID1 = 'DEADBEEF' MSGID2 = 'BEADCAFE' - MESSAGE1 = {'data': B64_1, + MESSAGE1 = {'data': B64_1.decode('ascii'), 'attributes': {}} - MESSAGE2 = {'data': B64_2, + MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) - with topic.batch() as batch: + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + with topic.batch(connection=conn) as batch: batch.publish(PAYLOAD1) batch.publish(PAYLOAD2, attr1='value1', attr2='value2') self.assertEqual(list(batch), [MSGID1, MSGID2]) @@ -254,9 +274,6 @@ def test_publish_multiple(self): self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) def test_publish_multiple_error(self): - class Bugout(Exception): - pass - TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PAYLOAD1 = b'This is the first message text' @@ -264,15 +281,216 @@ class Bugout(Exception): MSGID1 = 'DEADBEEF' MSGID2 = 'BEADCAFE' conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, project=PROJECT) try: with topic.batch() as batch: + batch.publish(PAYLOAD1, connection=conn) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2', + connection=conn) + raise _Bugout() + except _Bugout: + pass + self.assertEqual(list(batch), []) + self.assertEqual(len(conn._requested), 0) + + def test_delete_w_implicit_connection(self): + from gcloud.pubsub._testing import _monkey_defaults + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + with _monkey_defaults(connection=conn): + topic.delete() + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_delete_w_explicit_connection(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT) + topic.delete(connection=conn) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['path'], '/%s' % PATH) + + +class TestBatch(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.topic import Batch + return Batch + + def _makeOne(self, topic): + return self._getTargetClass()(topic) + + def test_ctor_defaults(self): + topic = _Topic() + batch = self._makeOne(topic) + self.assertTrue(batch.topic is topic) + self.assertEqual(len(batch.messages), 0) + self.assertEqual(len(batch.message_ids), 0) + + def test___iter___empty(self): + topic = _Topic() + batch = self._makeOne(topic) + self.assertEqual(list(batch), []) + + def test___iter___non_empty(self): + topic = _Topic() + batch = self._makeOne(topic) + batch.message_ids[:] = ['ONE', 'TWO', 'THREE'] + self.assertEqual(list(batch), ['ONE', 'TWO', 'THREE']) + + def test_publish_bytes_wo_attrs(self): + import base64 + from gcloud.pubsub._testing import _monkey_defaults + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MESSAGE = {'data': B64, + 'attributes': {}} + connection = _Connection() + topic = _Topic() + batch = self._makeOne(topic) + with _monkey_defaults(connection=connection): + batch.publish(PAYLOAD) + self.assertEqual(len(connection._requested), 0) + self.assertEqual(batch.messages, [MESSAGE]) + + def test_publish_bytes_w_add_timestamp(self): + import base64 + from gcloud.pubsub._testing import _monkey_defaults + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MESSAGE = {'data': B64, + 'attributes': {'timestamp': 'TIMESTAMP'}} + connection = _Connection() + topic = _Topic(timestamp_messages=True) + with _monkey_defaults(connection=connection): + batch = self._makeOne(topic) + batch.publish(PAYLOAD) + self.assertEqual(len(connection._requested), 0) + self.assertEqual(batch.messages, [MESSAGE]) + + def test_commit_w_implicit_connection(self): + import base64 + from gcloud.pubsub._testing import _monkey_defaults + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + topic = _Topic() + batch = self._makeOne(topic) + with _monkey_defaults(connection=conn): + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + batch.commit() + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '%s:publish' % topic.path) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_commit_w_explicit_connection(self): + import base64 + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + topic = _Topic() + batch = self._makeOne(topic) + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + batch.commit(connection=conn) + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '%s:publish' % topic.path) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_context_mgr_success(self): + import base64 + from gcloud.pubsub._testing import _monkey_defaults + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + topic = _Topic() + batch = self._makeOne(topic) + + with _monkey_defaults(connection=conn): + with batch as other: batch.publish(PAYLOAD1) batch.publish(PAYLOAD2, attr1='value1', attr2='value2') - raise Bugout() - except Bugout: + + self.assertTrue(other is batch) + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '%s:publish' % topic.path) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_context_mgr_failure(self): + import base64 + from gcloud.pubsub._testing import _monkey_defaults + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + topic = _Topic() + batch = self._makeOne(topic) + + try: + with _monkey_defaults(connection=conn): + with batch as other: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + raise _Bugout() + except _Bugout: pass + + self.assertTrue(other is batch) self.assertEqual(list(batch), []) + self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2]) self.assertEqual(len(conn._requested), 0) @@ -292,3 +510,19 @@ def api_request(self, **kw): raise NotFound('miss') else: return response + + +class _Topic(object): + + def __init__(self, name="NAME", project="PROJECT", + timestamp_messages=False): + self.path = '/projects/%s/topics/%s' % (project, name) + self.timestamp_messages = timestamp_messages + + def _timestamp_message(self, attrs): + if self.timestamp_messages: + attrs['timestamp'] = 'TIMESTAMP' + + +class _Bugout(Exception): + pass diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py index 98a3a74a4550..ab8a61e7779e 100644 --- a/gcloud/pubsub/topic.py +++ b/gcloud/pubsub/topic.py @@ -20,7 +20,7 @@ from gcloud._helpers import get_default_project from gcloud._helpers import _RFC3339_MICROS from gcloud.exceptions import NotFound -from gcloud.pubsub._implicit_environ import get_default_connection +from gcloud.pubsub._implicit_environ import _require_connection _NOW = datetime.datetime.utcnow @@ -40,42 +40,29 @@ class Topic(object): :param project: the project to which the topic belongs. If not passed, falls back to the default inferred from the environment. - :type connection: :class:gcloud.pubsub.connection.Connection - :param connection: the connection to use. If not passed, - falls back to the default inferred from the - :type timestamp_messages: boolean :param timestamp_messages: If true, the topic will add a ``timestamp`` key to the attributes of each published message: the value will be an RFC 3339 timestamp. """ - def __init__(self, name, project=None, connection=None, - timestamp_messages=False): + def __init__(self, name, project=None, timestamp_messages=False): if project is None: project = get_default_project() - if connection is None: - connection = get_default_connection() self.name = name self.project = project - self.connection = connection self.timestamp_messages = timestamp_messages @classmethod - def from_api_repr(cls, resource, connection=None): + def from_api_repr(cls, resource): """Factory: construct a topic given its API representation :type resource: dict :param resource: topic resource representation returned from the API - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the default inferred from the - environment. - :rtype: :class:`gcloud.pubsub.topic.Topic` """ _, project, _, name = resource['name'].split('/') - return cls(name, project, connection) + return cls(name, project) @property def full_name(self): @@ -87,28 +74,49 @@ def path(self): """URL path for the topic's APIs""" return '/%s' % (self.full_name) - def create(self): + def create(self, connection=None): """API call: create the topic via a PUT request See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/create + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the ``connection`` attribute. """ - self.connection.api_request(method='PUT', path=self.path) + connection = _require_connection(connection) + connection.api_request(method='PUT', path=self.path) - def exists(self): + def exists(self, connection=None): """API call: test for the existence of the topic via a GET request See https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/get + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the ``connection`` attribute. """ + connection = _require_connection(connection) + try: - self.connection.api_request(method='GET', path=self.path) + connection.api_request(method='GET', path=self.path) except NotFound: return False else: return True - def publish(self, message, **attrs): + def _timestamp_message(self, attrs): + """Add a timestamp to ``attrs``, if the topic is so configured. + + If ``attrs`` already has the key, do nothing. + + Helper method for ``publish``/``Batch.publish``. + """ + if self.timestamp_messages and 'timestamp' not in attrs: + attrs['timestamp'] = _NOW().strftime(_RFC3339_MICROS) + + def publish(self, message, connection=None, **attrs): """API call: publish a message to a topic via a POST request See: @@ -117,47 +125,65 @@ def publish(self, message, **attrs): :type message: bytes :param message: the message payload + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the ``connection`` attribute. + :type attrs: dict (string -> string) :message attrs: key-value pairs to send as message attributes :rtype: str :returns: message ID assigned by the server to the published message """ - if self.timestamp_messages and 'timestamp' not in attrs: - attrs['timestamp'] = _NOW().strftime(_RFC3339_MICROS) + connection = _require_connection(connection) + + self._timestamp_message(attrs) message_b = base64.b64encode(message).decode('ascii') message_data = {'data': message_b, 'attributes': attrs} data = {'messages': [message_data]} - response = self.connection.api_request(method='POST', - path='%s:publish' % self.path, - data=data) + response = connection.api_request(method='POST', + path='%s:publish' % self.path, + data=data) return response['messageIds'][0] - def batch(self): + def batch(self, connection=None): """Return a batch to use as a context manager. - :rtype: :class:_Batch + :rtype: :class:Batch """ - return _Batch(self) + return Batch(self, connection=connection) - def delete(self): + def delete(self, connection=None): """API call: delete the topic via a DELETE request See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/delete + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the ``connection`` attribute. """ - self.connection.api_request(method='DELETE', path=self.path) + connection = _require_connection(connection) + connection.api_request(method='DELETE', path=self.path) -class _Batch(object): +class Batch(object): """Context manager: collect messages to publish via a single API call. Helper returned by :meth:Topic.batch + + :type topic: :class:`gcloud.pubsub.topic.Topic` + :param topic: the topic being published + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the implicit default. """ - def __init__(self, topic): + def __init__(self, topic, connection=None): self.topic = topic self.messages = [] self.message_ids = [] + self.connection = connection def __enter__(self): return self @@ -178,14 +204,23 @@ def publish(self, message, **attrs): :type attrs: dict (string -> string) :message attrs: key-value pairs to send as message attributes """ + self.topic._timestamp_message(attrs) self.messages.append( - {'data': base64.b64encode(message), 'attributes': attrs}) - - def commit(self): - """Send saved messages as a single API call.""" - conn = self.topic.connection - response = conn.api_request(method='POST', - path='%s:publish' % self.topic.path, - data={'messages': self.messages[:]}) + {'data': base64.b64encode(message).decode('ascii'), + 'attributes': attrs}) + + def commit(self, connection=None): + """Send saved messages as a single API call. + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the ``connection`` attribute. + """ + if connection is None and self.connection is not None: + connection = self.connection + connection = _require_connection(connection) + response = connection.api_request(method='POST', + path='%s:publish' % self.topic.path, + data={'messages': self.messages[:]}) self.message_ids.extend(response['messageIds']) del self.messages[:]