diff --git a/gcloud/pubsub/client.py b/gcloud/pubsub/client.py index ecafae99af7d..aea40257591e 100644 --- a/gcloud/pubsub/client.py +++ b/gcloud/pubsub/client.py @@ -65,20 +65,12 @@ def list_topics(self, page_size=None, page_token=None): more topics can be retrieved with another call (pass that value as ``page_token``). """ - params = {} - - if page_size is not None: - params['pageSize'] = page_size - - if page_token is not None: - params['pageToken'] = page_token - - path = '/projects/%s/topics' % (self.project,) - resp = self.connection.api_request(method='GET', path=path, - query_params=params) + conn = self.connection + resources, next_token = conn.list_topics( + self.project, page_size, page_token) topics = [Topic.from_api_repr(resource, self) - for resource in resp.get('topics', ())] - return topics, resp.get('nextPageToken') + for resource in resources] + return topics, next_token def list_subscriptions(self, page_size=None, page_token=None): """List subscriptions for the project associated with this client. @@ -104,23 +96,14 @@ def list_subscriptions(self, page_size=None, page_token=None): more topics can be retrieved with another call (pass that value as ``page_token``). """ - params = {} - - if page_size is not None: - params['pageSize'] = page_size - - if page_token is not None: - params['pageToken'] = page_token - - path = '/projects/%s/subscriptions' % (self.project,) - - resp = self.connection.api_request(method='GET', path=path, - query_params=params) + conn = self.connection + resources, next_token = conn.list_subscriptions( + self.project, page_size, page_token) topics = {} subscriptions = [Subscription.from_api_repr(resource, self, topics=topics) - for resource in resp.get('subscriptions', ())] - return subscriptions, resp.get('nextPageToken') + for resource in resources] + return subscriptions, next_token def topic(self, name, timestamp_messages=False): """Creates a topic bound to the current client. diff --git a/gcloud/pubsub/connection.py b/gcloud/pubsub/connection.py index f7e187eff704..63b956d45713 100644 --- a/gcloud/pubsub/connection.py +++ b/gcloud/pubsub/connection.py @@ -88,3 +88,401 @@ def build_api_url(self, path, query_params=None, return super(Connection, self.__class__).build_api_url( path, query_params=query_params, api_base_url=api_base_url, api_version=api_version) + + def list_topics(self, project, page_size=None, page_token=None): + """List topics for the project associated with this client. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list + + :type project: string + :param project: project ID + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + + :rtype: tuple, (list, str) + :returns: list of ``Topic`` resource dicts, plus a + "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). + """ + params = {} + + if page_size is not None: + params['pageSize'] = page_size + + if page_token is not None: + params['pageToken'] = page_token + + path = '/projects/%s/topics' % (project,) + resp = self.api_request(method='GET', path=path, query_params=params) + return resp.get('topics', ()), resp.get('nextPageToken') + + def list_subscriptions(self, project, page_size=None, page_token=None): + """List subscriptions for the project associated with this client. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list + + :type project: string + :param project: project ID + + :type page_size: int + :param page_size: maximum number of subscriptions to return, If not + passed, defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of subscriptions. + If not passed, the API will return the first page + of subscriptions. + + :rtype: tuple, (list, str) + :returns: list of ``Subscription`` resource dicts, plus a + "next page token" string: if not None, indicates that + more subscriptions can be retrieved with another call (pass + that value as ``page_token``). + """ + params = {} + + if page_size is not None: + params['pageSize'] = page_size + + if page_token is not None: + params['pageToken'] = page_token + + path = '/projects/%s/subscriptions' % (project,) + resp = self.api_request(method='GET', path=path, query_params=params) + return resp.get('subscriptions', ()), resp.get('nextPageToken') + + def topic_create(self, topic_path): + """API call: create a topic via a PUT request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create + + :type topic_path: string + :param topic_path: the fully-qualfied path of the new topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + """ + return self.api_request(method='PUT', path='/%s' % (topic_path,)) + + def topic_get(self, topic_path): + """API call: retrieve a topic via a GET request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get + + :type topic_path: string + :param topic_path: the fully-qualfied path of the topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + """ + return self.api_request(method='GET', path='/%s' % (topic_path,)) + + def topic_delete(self, topic_path): + """API call: delete a topic via a DELETE request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/delete + + :type topic_path: string + :param topic_path: the fully-qualfied path of the topic, in format + ``projects//topics/``. + """ + self.api_request(method='DELETE', path='/%s' % (topic_path,)) + + def topic_publish(self, topic_path, messages): + """API call: publish a message to a topic via a POST request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish + + :type topic_path: string + :param topic_path: the fully-qualfied path of the topic, in format + ``projects//topics/``. + + :type messages: list of dict + :param messages: messages to be published. + + :rtype: list of string + :returns: list of opaque IDs for published messages. + """ + data = {'messages': messages} + response = self.api_request( + method='POST', path='/%s:publish' % (topic_path,), data=data) + return response['messageIds'] + + def topic_list_subscriptions(self, topic_path, page_size=None, + page_token=None): + """API call: list subscriptions bound to a topic via a GET request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list + + :type topic_path: string + :param topic_path: the fully-qualfied path of the topic, in format + ``projects//topics/``. + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + + :rtype: list of strings + :returns: fully-qualified names of subscriptions for the supplied + topic. + """ + params = {} + + if page_size is not None: + params['pageSize'] = page_size + + if page_token is not None: + params['pageToken'] = page_token + + path = '/%s/subscriptions' % (topic_path,) + resp = self.api_request(method='GET', path=path, query_params=params) + return resp.get('subscriptions', ()), resp.get('nextPageToken') + + def get_iam_policy(self, target_path): + """Fetch the IAM policy for the target. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/getIamPolicy + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/getIamPolicy + + :type target_path: string + :param target_path: the path of the target object. + + :rtype: dict + :returns: the resource returned by the ``getIamPolicy`` API request. + """ + path = '/%s:getIamPolicy' % (target_path,) + return self.api_request(method='GET', path=path) + + def set_iam_policy(self, target_path, policy): + """Update the IAM policy for the target. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/setIamPolicy + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/setIamPolicy + + :type target_path: string + :param target_path: the path of the target object. + + :type policy: dict + :param policy: the new policy resource. + + :rtype: dict + :returns: the resource returned by the ``setIamPolicy`` API request. + """ + wrapped = {'policy': policy} + path = '/%s:setIamPolicy' % (target_path,) + return self.api_request(method='POST', path=path, data=wrapped) + + def test_iam_permissions(self, target_path, permissions): + """Update the IAM policy for the target. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/testIamPermissions + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/testIamPermissions + + :type target_path: string + :param target_path: the path of the target object. + + :type permissions: list of string + :param permissions: the permissions to check + + :rtype: dict + :returns: the resource returned by the ``getIamPolicy`` API request. + """ + wrapped = {'permissions': permissions} + path = '/%s:testIamPermissions' % (target_path,) + resp = self.api_request(method='POST', path=path, data=wrapped) + return resp.get('permissions', []) + + def subscription_create(self, subscription_path, topic_path, + ack_deadline=None, push_endpoint=None): + """API call: create a subscription via a PUT request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/create + + :type subscription_path: string + :param subscription_path: the fully-qualfied path of the new + subscription, in format + ``projects//subscriptions/``. + + :type topic_path: string + :param topic_path: the fully-qualfied path of the topic being + subscribed, in format + ``projects//topics/``. + + :type ack_deadline: int, or ``NoneType`` + :param ack_deadline: the deadline (in seconds) by which messages pulled + from the back-end must be acknowledged. + + :type push_endpoint: string, or ``NoneType`` + :param push_endpoint: URL to which messages will be pushed by the + back-end. If not set, the application must pull + messages. + + :rtype: dict + :returns: ``Subscription`` resource returned from the API. + """ + path = '/%s' % (subscription_path,) + resource = {'topic': topic_path} + + if ack_deadline is not None: + resource['ackDeadlineSeconds'] = ack_deadline + + if push_endpoint is not None: + resource['pushConfig'] = {'pushEndpoint': push_endpoint} + + return self.api_request(method='PUT', path=path, data=resource) + + def subscription_get(self, subscription_path): + """API call: retrieve a subscription via a GET request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/get + + :type subscription_path: string + :param subscription_path: the fully-qualfied path of the subscription, + in format + ``projects//subscriptions/``. + + :rtype: dict + :returns: ``Subscription`` resource returned from the API. + """ + path = '/%s' % (subscription_path,) + return self.api_request(method='GET', path=path) + + def subscription_delete(self, subscription_path): + """API call: delete a subscription via a DELETE request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete + + :type subscription_path: string + :param subscription_path: the fully-qualfied path of the subscription, + in format + ``projects//subscriptions/``. + """ + path = '/%s' % (subscription_path,) + self.api_request(method='DELETE', path=path) + + def subscription_modify_push_config(self, subscription_path, + push_endpoint): + """API call: update push config of a subscription via a POST request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig + + :type subscription_path: string + :param subscription_path: the fully-qualfied path of the new + subscription, in format + ``projects//subscriptions/``. + + :type push_endpoint: string, or ``NoneType`` + :param push_endpoint: URL to which messages will be pushed by the + back-end. If not set, the application must pull + messages. + """ + path = '/%s:modifyPushConfig' % (subscription_path,) + resource = {'pushConfig': {'pushEndpoint': push_endpoint}} + self.api_request(method='POST', path=path, data=resource) + + def subscription_pull(self, subscription_path, return_immediately=False, + max_messages=1): + """API call: update push config of a subscription via a POST request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig + + :type subscription_path: string + :param subscription_path: the fully-qualfied path of the new + subscription, in format + ``projects//subscriptions/``. + + :type return_immediately: boolean + :param return_immediately: if True, the back-end returns even if no + messages are available; if False, the API + call blocks until one or more messages are + available. + + :type max_messages: int + :param max_messages: the maximum number of messages to return. + + :rtype: list of dict + :returns: the ``receivedMessages`` element of the response. + """ + path = '/%s:pull' % (subscription_path,) + data = { + 'returnImmediately': return_immediately, + 'maxMessages': max_messages, + } + response = self.api_request(method='POST', path=path, data=data) + return response['receivedMessages'] + + def subscription_acknowledge(self, subscription_path, ack_ids): + """API call: acknowledge retrieved messages for the subscription. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig + + :type subscription_path: string + :param subscription_path: the fully-qualfied path of the new + subscription, in format + ``projects//subscriptions/``. + + :type ack_ids: list of string + :param ack_ids: ack IDs of messages being acknowledged + """ + path = '/%s:acknowledge' % (subscription_path,) + data = { + 'ackIds': ack_ids, + } + self.api_request(method='POST', path=path, data=data) + + def subscription_modify_ack_deadline(self, subscription_path, ack_ids, + ack_deadline): + """API call: acknowledge retrieved messages for the subscription. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyAckDeadline + + :type subscription_path: string + :param subscription_path: the fully-qualfied path of the new + subscription, in format + ``projects//subscriptions/``. + + :type ack_ids: list of string + :param ack_ids: ack IDs of messages being acknowledged + + :type ack_deadline: int + :param ack_deadline: the deadline (in seconds) by which messages pulled + from the back-end must be acknowledged. + """ + path = '/%s:modifyAckDeadline' % (subscription_path,) + data = { + 'ackIds': ack_ids, + 'ackDeadlineSeconds': ack_deadline, + } + self.api_request(method='POST', path=path, data=data) diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index 5eecbc115f83..0d7fb5aa99dd 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -114,9 +114,9 @@ def project(self): return self._client.project @property - def path(self): + def full_name(self): """URL path for the subscription's APIs""" - return '/projects/%s/subscriptions/%s' % (self.project, self.name) + return 'projects/%s/subscriptions/%s' % (self.project, self.name) def _require_client(self, client): """Check client or verify over-ride. @@ -143,16 +143,10 @@ def create(self, client=None): :param client: the client to use. If not passed, falls back to the ``client`` stored on the current subscription's topic. """ - data = {'topic': self.topic.full_name} - - if self.ack_deadline is not None: - data['ackDeadlineSeconds'] = self.ack_deadline - - if self.push_endpoint is not None: - data['pushConfig'] = {'pushEndpoint': self.push_endpoint} - client = self._require_client(client) - client.connection.api_request(method='PUT', path=self.path, data=data) + client.connection.subscription_create( + self.full_name, self.topic.full_name, self.ack_deadline, + self.push_endpoint) def exists(self, client=None): """API call: test existence of the subscription via a GET request @@ -166,7 +160,7 @@ def exists(self, client=None): """ client = self._require_client(client) try: - client.connection.api_request(method='GET', path=self.path) + client.connection.subscription_get(self.full_name) except NotFound: return False else: @@ -183,11 +177,24 @@ def reload(self, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - data = client.connection.api_request(method='GET', path=self.path) + data = client.connection.subscription_get(self.full_name) self.ack_deadline = data.get('ackDeadlineSeconds') push_config = data.get('pushConfig', {}) self.push_endpoint = push_config.get('pushEndpoint') + def delete(self, client=None): + """API call: delete the subscription via a DELETE request. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + client = self._require_client(client) + client.connection.subscription_delete(self.full_name) + def modify_push_configuration(self, push_endpoint, client=None): """API call: update the push endpoint for the subscription. @@ -204,13 +211,8 @@ def modify_push_configuration(self, push_endpoint, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - data = {} - config = data['pushConfig'] = {} - if push_endpoint is not None: - config['pushEndpoint'] = push_endpoint - client.connection.api_request( - method='POST', path='%s:modifyPushConfig' % (self.path,), - data=data) + client.connection.subscription_modify_push_config( + self.full_name, push_endpoint) self.push_endpoint = push_endpoint def pull(self, return_immediately=False, max_messages=1, client=None): @@ -238,12 +240,10 @@ def pull(self, return_immediately=False, max_messages=1, client=None): is an instance of :class:`gcloud.pubsub.message.Message`. """ client = self._require_client(client) - data = {'returnImmediately': return_immediately, - 'maxMessages': max_messages} - response = client.connection.api_request( - method='POST', path='%s:pull' % (self.path,), data=data) + response = client.connection.subscription_pull( + self.full_name, return_immediately, max_messages) return [(info['ackId'], Message.from_api_repr(info['message'])) - for info in response.get('receivedMessages', ())] + for info in response] def acknowledge(self, ack_ids, client=None): """API call: acknowledge retrieved messages for the subscription. @@ -259,18 +259,16 @@ def acknowledge(self, ack_ids, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - data = {'ackIds': ack_ids} - client.connection.api_request( - method='POST', path='%s:acknowledge' % (self.path,), data=data) + client.connection.subscription_acknowledge(self.full_name, ack_ids) - def modify_ack_deadline(self, ack_id, ack_deadline, client=None): + def modify_ack_deadline(self, ack_ids, ack_deadline, client=None): """API call: update acknowledgement deadline for a retrieved message. See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyAckDeadline - :type ack_id: string - :param ack_id: ack ID of message being updated + :type ack_ids: list of string + :param ack_ids: ack IDs of messages being updated :type ack_deadline: int :param ack_deadline: new deadline for the message, in seconds @@ -280,23 +278,8 @@ def modify_ack_deadline(self, ack_id, ack_deadline, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - data = {'ackIds': [ack_id], 'ackDeadlineSeconds': ack_deadline} - client.connection.api_request( - method='POST', path='%s:modifyAckDeadline' % (self.path,), - data=data) - - def delete(self, client=None): - """API call: delete the subscription via a DELETE request. - - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete - - :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current subscription's topic. - """ - client = self._require_client(client) - client.connection.api_request(method='DELETE', path=self.path) + client.connection.subscription_modify_ack_deadline( + self.full_name, ack_ids, ack_deadline) def get_iam_policy(self, client=None): """Fetch the IAM policy for the subscription. @@ -313,8 +296,7 @@ def get_iam_policy(self, client=None): ``getIamPolicy`` API request. """ client = self._require_client(client) - path = '%s:getIamPolicy' % (self.path,) - resp = client.connection.api_request(method='GET', path=path) + resp = client.connection.get_iam_policy(self.full_name) return Policy.from_api_repr(resp) def set_iam_policy(self, policy, client=None): @@ -336,11 +318,8 @@ def set_iam_policy(self, policy, client=None): ``setIamPolicy`` API request. """ client = self._require_client(client) - path = '%s:setIamPolicy' % (self.path,) resource = policy.to_api_repr() - wrapped = {'policy': resource} - resp = client.connection.api_request( - method='POST', path=path, data=wrapped) + resp = client.connection.set_iam_policy(self.full_name, resource) return Policy.from_api_repr(resp) def check_iam_permissions(self, permissions, client=None): @@ -360,10 +339,5 @@ def check_iam_permissions(self, permissions, client=None): :returns: subset of ``permissions`` allowed by current IAM policy. """ client = self._require_client(client) - path = '%s:testIamPermissions' % (self.path,) - data = { - 'permissions': list(permissions), - } - resp = client.connection.api_request( - method='POST', path=path, data=data) - return resp.get('permissions', ()) + return client.connection.test_iam_permissions( + self.full_name, list(permissions)) diff --git a/gcloud/pubsub/test_client.py b/gcloud/pubsub/test_client.py index 54d54cc72162..f4e1a89836d3 100644 --- a/gcloud/pubsub/test_client.py +++ b/gcloud/pubsub/test_client.py @@ -16,6 +16,11 @@ class TestClient(unittest2.TestCase): + PROJECT = 'PROJECT' + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_NAME = 'subscription_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) def _getTargetClass(self): from gcloud.pubsub.client import Client @@ -26,182 +31,130 @@ def _makeOne(self, *args, **kw): def test_list_topics_no_paging(self): from gcloud.pubsub.topic import Topic - PROJECT = 'PROJECT' - CREDS = _Credentials() - - CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + creds = _Credentials() + client = self._makeOne(project=self.PROJECT, credentials=creds) + conn = client.connection = _Connection() + conn._list_topics_response = [{'name': self.TOPIC_PATH}], None - TOPIC_NAME = 'topic_name' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + topics, next_page_token = client.list_topics() - RETURNED = {'topics': [{'name': TOPIC_PATH}]} - # Replace the connection on the client with one of our own. - CLIENT_OBJ.connection = _Connection(RETURNED) - - # Execute request. - topics, next_page_token = CLIENT_OBJ.list_topics() - # Test values are correct. self.assertEqual(len(topics), 1) self.assertTrue(isinstance(topics[0], Topic)) - self.assertEqual(topics[0].name, TOPIC_NAME) + self.assertEqual(topics[0].name, self.TOPIC_NAME) self.assertEqual(next_page_token, None) - self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) - req = CLIENT_OBJ.connection._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) - self.assertEqual(req['query_params'], {}) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._listed_topics, (self.PROJECT, None, None)) def test_list_topics_with_paging(self): from gcloud.pubsub.topic import Topic - PROJECT = 'PROJECT' - CREDS = _Credentials() - - CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) - - TOPIC_NAME = 'topic_name' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) TOKEN1 = 'TOKEN1' TOKEN2 = 'TOKEN2' SIZE = 1 - RETURNED = {'topics': [{'name': TOPIC_PATH}], - 'nextPageToken': TOKEN2} - # Replace the connection on the client with one of our own. - CLIENT_OBJ.connection = _Connection(RETURNED) - - # Execute request. - topics, next_page_token = CLIENT_OBJ.list_topics(SIZE, TOKEN1) - # Test values are correct. + creds = _Credentials() + client = self._makeOne(project=self.PROJECT, credentials=creds) + conn = client.connection = _Connection() + conn._list_topics_response = [{'name': self.TOPIC_PATH}], TOKEN2 + + topics, next_page_token = client.list_topics(SIZE, TOKEN1) + self.assertEqual(len(topics), 1) self.assertTrue(isinstance(topics[0], Topic)) - self.assertEqual(topics[0].name, TOPIC_NAME) + self.assertEqual(topics[0].name, self.TOPIC_NAME) self.assertEqual(next_page_token, TOKEN2) - self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) - req = CLIENT_OBJ.connection._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) - self.assertEqual(req['query_params'], - {'pageSize': SIZE, 'pageToken': TOKEN1}) - def test_list_topics_missing_key(self): - PROJECT = 'PROJECT' - CREDS = _Credentials() + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._listed_topics, (self.PROJECT, 1, TOKEN1)) - CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + def test_list_topics_missing_key(self): + creds = _Credentials() + client = self._makeOne(project=self.PROJECT, credentials=creds) + conn = client.connection = _Connection() + conn._list_topics_response = (), None - RETURNED = {} - # Replace the connection on the client with one of our own. - CLIENT_OBJ.connection = _Connection(RETURNED) + topics, next_page_token = client.list_topics() - # Execute request. - topics, next_page_token = CLIENT_OBJ.list_topics() - # Test values are correct. self.assertEqual(len(topics), 0) self.assertEqual(next_page_token, None) - self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) - req = CLIENT_OBJ.connection._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) - self.assertEqual(req['query_params'], {}) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._listed_topics, (self.PROJECT, None, None)) def test_list_subscriptions_no_paging(self): from gcloud.pubsub.subscription import Subscription - PROJECT = 'PROJECT' - CREDS = _Credentials() - - CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) - - SUB_NAME = 'subscription_name' - SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) - TOPIC_NAME = 'topic_name' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - SUB_INFO = [{'name': SUB_PATH, 'topic': TOPIC_PATH}] - RETURNED = {'subscriptions': SUB_INFO} - # Replace the connection on the client with one of our own. - CLIENT_OBJ.connection = _Connection(RETURNED) - - # Execute request. - subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions() - # Test values are correct. + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} + creds = _Credentials() + client = self._makeOne(project=self.PROJECT, credentials=creds) + conn = client.connection = _Connection() + conn._list_subscriptions_response = [SUB_INFO], None + + subscriptions, next_page_token = client.list_subscriptions() + self.assertEqual(len(subscriptions), 1) self.assertTrue(isinstance(subscriptions[0], Subscription)) - self.assertEqual(subscriptions[0].name, SUB_NAME) - self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) + self.assertEqual(subscriptions[0].name, self.SUB_NAME) + self.assertEqual(subscriptions[0].topic.name, self.TOPIC_NAME) self.assertEqual(next_page_token, None) - self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) - req = CLIENT_OBJ.connection._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT) - self.assertEqual(req['query_params'], {}) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._listed_subscriptions, + (self.PROJECT, None, None)) def test_list_subscriptions_with_paging(self): from gcloud.pubsub.subscription import Subscription - PROJECT = 'PROJECT' - CREDS = _Credentials() - - CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) - - SUB_NAME = 'subscription_name' - SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) - TOPIC_NAME = 'topic_name' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} + creds = _Credentials() + client = self._makeOne(project=self.PROJECT, credentials=creds) ACK_DEADLINE = 42 PUSH_ENDPOINT = 'https://push.example.com/endpoint' + SUB_INFO = {'name': self.SUB_PATH, + 'topic': self.TOPIC_PATH, + 'ackDeadlineSeconds': ACK_DEADLINE, + 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}} TOKEN1 = 'TOKEN1' TOKEN2 = 'TOKEN2' SIZE = 1 - SUB_INFO = [{'name': SUB_PATH, - 'topic': TOPIC_PATH, - 'ackDeadlineSeconds': ACK_DEADLINE, - 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}}] - RETURNED = {'subscriptions': SUB_INFO, 'nextPageToken': TOKEN2} - # Replace the connection on the client with one of our own. - CLIENT_OBJ.connection = _Connection(RETURNED) - - # Execute request. - subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions( + conn = client.connection = _Connection() + conn._list_subscriptions_response = [SUB_INFO], TOKEN2 + + subscriptions, next_page_token = client.list_subscriptions( SIZE, TOKEN1) - # Test values are correct. + self.assertEqual(len(subscriptions), 1) self.assertTrue(isinstance(subscriptions[0], Subscription)) - self.assertEqual(subscriptions[0].name, SUB_NAME) - self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) + self.assertEqual(subscriptions[0].name, self.SUB_NAME) + self.assertEqual(subscriptions[0].topic.name, self.TOPIC_NAME) self.assertEqual(subscriptions[0].ack_deadline, ACK_DEADLINE) self.assertEqual(subscriptions[0].push_endpoint, PUSH_ENDPOINT) self.assertEqual(next_page_token, TOKEN2) - self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) - req = CLIENT_OBJ.connection._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT) - self.assertEqual(req['query_params'], - {'pageSize': SIZE, 'pageToken': TOKEN1}) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._listed_subscriptions, + (self.PROJECT, SIZE, TOKEN1)) def test_list_subscriptions_w_missing_key(self): PROJECT = 'PROJECT' - CREDS = _Credentials() + creds = _Credentials() - CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + client = self._makeOne(project=PROJECT, credentials=creds) + conn = client.connection = _Connection() + conn._list_subscriptions_response = (), None - RETURNED = {} - # Replace the connection on the client with one of our own. - CLIENT_OBJ.connection = _Connection(RETURNED) + subscriptions, next_page_token = client.list_subscriptions() - # Execute request. - subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions() - # Test values are correct. self.assertEqual(len(subscriptions), 0) self.assertEqual(next_page_token, None) - self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) - req = CLIENT_OBJ.connection._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT) - self.assertEqual(req['query_params'], {}) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._listed_subscriptions, + (self.PROJECT, None, None)) def test_topic(self): PROJECT = 'PROJECT' TOPIC_NAME = 'TOPIC_NAME' - CREDS = _Credentials() + creds = _Credentials() - client_obj = self._makeOne(project=PROJECT, credentials=CREDS) + client_obj = self._makeOne(project=PROJECT, credentials=creds) new_topic = client_obj.topic(TOPIC_NAME) self.assertEqual(new_topic.name, TOPIC_NAME) self.assertTrue(new_topic._client is client_obj) @@ -230,7 +183,10 @@ def __init__(self, *responses): self._responses = responses self._requested = [] - def api_request(self, **kw): - self._requested.append(kw) - response, self._responses = self._responses[0], self._responses[1:] - return response + def list_topics(self, project, page_size, page_token): + self._listed_topics = (project, page_size, page_token) + return self._list_topics_response + + def list_subscriptions(self, project, page_size, page_token): + self._listed_subscriptions = (project, page_size, page_token) + return self._list_subscriptions_response diff --git a/gcloud/pubsub/test_connection.py b/gcloud/pubsub/test_connection.py index 581c61091e3a..d25338126135 100644 --- a/gcloud/pubsub/test_connection.py +++ b/gcloud/pubsub/test_connection.py @@ -16,6 +16,14 @@ class TestConnection(unittest2.TestCase): + PROJECT = 'PROJECT' + LIST_TOPICS_PATH = 'projects/%s/topics' % (PROJECT,) + LIST_SUBSCRIPTIONS_PATH = 'projects/%s/subscriptions' % (PROJECT,) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + LIST_TOPIC_SUBSCRIPTIONS_PATH = '%s/subscriptions' % (TOPIC_PATH,) + SUB_NAME = 'subscription_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) def _getTargetClass(self): from gcloud.pubsub.connection import Connection @@ -99,5 +107,659 @@ def test_build_api_url_w_base_url_override(self): conn.API_VERSION, 'foo', ]) - self.assertEqual( - conn.build_api_url('/foo', api_base_url=base_url2), URI) + self.assertEqual(conn.build_api_url('/foo', api_base_url=base_url2), + URI) + + def _verify_uri(self, uri, expected_path, **expected_qs): + from six.moves.urllib import parse + klass = self._getTargetClass() + scheme, netloc, path, query, _ = parse.urlsplit(uri) + self.assertEqual('%s://%s' % (scheme, netloc), klass.API_BASE_URL) + self.assertEqual(path, '/%s/%s' % (klass.API_VERSION, expected_path)) + qs = dict(parse.parse_qsl(query)) + self.assertEqual(qs, expected_qs) + + def test_list_topics_no_paging(self): + import json + RETURNED = {'topics': [{'name': self.TOPIC_PATH}]} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + topics, next_token = conn.list_topics(self.PROJECT) + + self.assertEqual(len(topics), 1) + topic = topics[0] + self.assertTrue(isinstance(topic, dict)) + self.assertEqual(topic['name'], self.TOPIC_PATH) + self.assertEqual(next_token, None) + + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], self.LIST_TOPICS_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_list_topics_with_paging(self): + import json + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + RETURNED = { + 'topics': [{'name': self.TOPIC_PATH}], + 'nextPageToken': 'TOKEN2', + } + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + topics, next_token = conn.list_topics( + self.PROJECT, page_token=TOKEN1, page_size=SIZE) + + self.assertEqual(len(topics), 1) + topic = topics[0] + self.assertTrue(isinstance(topic, dict)) + self.assertEqual(topic['name'], self.TOPIC_PATH) + self.assertEqual(next_token, TOKEN2) + + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], self.LIST_TOPICS_PATH, + pageToken=TOKEN1, pageSize=str(SIZE)) + self.assertEqual(http._called_with['body'], None) + + def test_list_topics_missing_key(self): + import json + RETURNED = {} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + topics, next_token = conn.list_topics(self.PROJECT) + + self.assertEqual(len(topics), 0) + self.assertEqual(next_token, None) + + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], self.LIST_TOPICS_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_list_subscriptions_no_paging(self): + import json + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} + RETURNED = {'subscriptions': [SUB_INFO]} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + subscriptions, next_token = conn.list_subscriptions(self.PROJECT) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertTrue(isinstance(subscription, dict)) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(next_token, None) + + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], + self.LIST_SUBSCRIPTIONS_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_list_subscriptions_with_paging(self): + import json + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} + RETURNED = { + 'subscriptions': [SUB_INFO], + 'nextPageToken': 'TOKEN2', + } + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + subscriptions, next_token = conn.list_subscriptions( + self.PROJECT, page_token=TOKEN1, page_size=SIZE) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertTrue(isinstance(subscription, dict)) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(next_token, TOKEN2) + + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], + self.LIST_SUBSCRIPTIONS_PATH, + pageToken=TOKEN1, pageSize=str(SIZE)) + self.assertEqual(http._called_with['body'], None) + + def test_list_subscriptions_missing_key(self): + import json + RETURNED = {} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + subscriptions, next_token = conn.list_subscriptions(self.PROJECT) + + self.assertEqual(len(subscriptions), 0) + self.assertEqual(next_token, None) + + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], + self.LIST_SUBSCRIPTIONS_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_topic_create(self): + import json + RETURNED = {'name': self.TOPIC_PATH} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + resource = conn.topic_create(self.TOPIC_PATH) + + self.assertEqual(resource, RETURNED) + self.assertEqual(http._called_with['method'], 'PUT') + self._verify_uri(http._called_with['uri'], self.TOPIC_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_topic_get(self): + import json + RETURNED = {'name': self.TOPIC_PATH} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + resource = conn.topic_get(self.TOPIC_PATH) + + self.assertEqual(resource, RETURNED) + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], self.TOPIC_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_topic_delete(self): + import json + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps({})) + conn = self._makeOne(http=http) + + conn.topic_delete(self.TOPIC_PATH) + + self.assertEqual(http._called_with['method'], 'DELETE') + self._verify_uri(http._called_with['uri'], self.TOPIC_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_topic_publish(self): + import base64 + import json + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, 'attributes': {}} + RETURNED = {'messageIds': [MSGID]} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + resource = conn.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + self.assertEqual(resource, [MSGID]) + self.assertEqual(http._called_with['method'], 'POST') + self._verify_uri(http._called_with['uri'], + '%s:publish' % (self.TOPIC_PATH,)) + self.assertEqual(http._called_with['body'], + json.dumps({'messages': [MESSAGE]})) + + def test_topic_list_subscriptions_no_paging(self): + import json + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} + RETURNED = {'subscriptions': [SUB_INFO]} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + subscriptions, next_token = conn.topic_list_subscriptions( + self.TOPIC_PATH) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertTrue(isinstance(subscription, dict)) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(next_token, None) + + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], + self.LIST_TOPIC_SUBSCRIPTIONS_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_topic_list_subscriptions_with_paging(self): + import json + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} + RETURNED = { + 'subscriptions': [SUB_INFO], + 'nextPageToken': 'TOKEN2', + } + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + subscriptions, next_token = conn.topic_list_subscriptions( + self.TOPIC_PATH, page_token=TOKEN1, page_size=SIZE) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertTrue(isinstance(subscription, dict)) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(next_token, TOKEN2) + + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], + self.LIST_TOPIC_SUBSCRIPTIONS_PATH, + pageToken=TOKEN1, pageSize=str(SIZE)) + self.assertEqual(http._called_with['body'], None) + + def test_topic_list_subscriptions_missing_key(self): + import json + RETURNED = {} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + subscriptions, next_token = conn.topic_list_subscriptions( + self.TOPIC_PATH) + + self.assertEqual(len(subscriptions), 0) + self.assertEqual(next_token, None) + + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], + self.LIST_TOPIC_SUBSCRIPTIONS_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_get_iam_policy(self): + import json + from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE + PATH = '%s:getIamPolicy' % (self.TOPIC_PATH,) + OWNER1 = 'user:phred@example.com' + OWNER2 = 'group:cloud-logs@google.com' + EDITOR1 = 'domain:google.com' + EDITOR2 = 'user:phred@example.com' + VIEWER1 = 'serviceAccount:1234-abcdef@service.example.com' + VIEWER2 = 'user:phred@example.com' + RETURNED = { + 'etag': 'DEADBEEF', + 'version': 17, + 'bindings': [ + {'role': OWNER_ROLE, 'members': [OWNER1, OWNER2]}, + {'role': EDITOR_ROLE, 'members': [EDITOR1, EDITOR2]}, + {'role': VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, + ], + } + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + policy = conn.get_iam_policy(self.TOPIC_PATH) + + self.assertEqual(policy, RETURNED) + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], PATH) + self.assertEqual(http._called_with['body'], None) + + def test_set_iam_policy(self): + import json + from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE + PATH = '%s:setIamPolicy' % (self.TOPIC_PATH,) + OWNER1 = 'user:phred@example.com' + OWNER2 = 'group:cloud-logs@google.com' + EDITOR1 = 'domain:google.com' + EDITOR2 = 'user:phred@example.com' + VIEWER1 = 'serviceAccount:1234-abcdef@service.example.com' + VIEWER2 = 'user:phred@example.com' + POLICY = { + 'etag': 'DEADBEEF', + 'version': 17, + 'bindings': [ + {'role': OWNER_ROLE, 'members': [OWNER1, OWNER2]}, + {'role': EDITOR_ROLE, 'members': [EDITOR1, EDITOR2]}, + {'role': VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, + ], + } + RETURNED = POLICY.copy() + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + policy = conn.set_iam_policy(self.TOPIC_PATH, POLICY) + + self.assertEqual(policy, RETURNED) + self.assertEqual(http._called_with['method'], 'POST') + self._verify_uri(http._called_with['uri'], PATH) + self.assertEqual(http._called_with['body'], + json.dumps({'policy': POLICY})) + + def test_test_iam_permissions(self): + import json + from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE + PATH = '%s:testIamPermissions' % (self.TOPIC_PATH,) + ALL_ROLES = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE] + ALLOWED = ALL_ROLES[1:] + RETURNED = {'permissions': ALLOWED} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + allowed = conn.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES) + + self.assertEqual(allowed, ALLOWED) + self.assertEqual(http._called_with['method'], 'POST') + self._verify_uri(http._called_with['uri'], PATH) + self.assertEqual(http._called_with['body'], + json.dumps({'permissions': ALL_ROLES})) + + def test_test_iam_permissions_missing_key(self): + import json + from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE + PATH = '%s:testIamPermissions' % (self.TOPIC_PATH,) + ALL_ROLES = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE] + RETURNED = {} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + allowed = conn.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES) + + self.assertEqual(allowed, []) + self.assertEqual(http._called_with['method'], 'POST') + self._verify_uri(http._called_with['uri'], PATH) + self.assertEqual(http._called_with['body'], + json.dumps({'permissions': ALL_ROLES})) + + def test_subscription_create_defaults(self): + import json + RESOURCE = {'topic': self.TOPIC_PATH} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + RETURNED = RESOURCE.copy() + RETURNED['name'] = self.SUB_PATH + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + resource = conn.subscription_create(self.SUB_PATH, self.TOPIC_PATH) + + self.assertEqual(resource, RETURNED) + self.assertEqual(http._called_with['method'], 'PUT') + self._verify_uri(http._called_with['uri'], self.SUB_PATH) + self.assertEqual(http._called_with['body'], json.dumps(RESOURCE)) + + def test_subscription_create_explicit(self): + import json + ACK_DEADLINE = 90 + PUSH_ENDPOINT = 'https://api.example.com/push' + RESOURCE = { + 'topic': self.TOPIC_PATH, + 'ackDeadlineSeconds': ACK_DEADLINE, + 'pushConfig': { + 'pushEndpoint': PUSH_ENDPOINT, + }, + } + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + RETURNED = RESOURCE.copy() + RETURNED['name'] = self.SUB_PATH + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + resource = conn.subscription_create( + self.SUB_PATH, self.TOPIC_PATH, + ack_deadline=ACK_DEADLINE, push_endpoint=PUSH_ENDPOINT) + + self.assertEqual(resource, RETURNED) + self.assertEqual(http._called_with['method'], 'PUT') + self._verify_uri(http._called_with['uri'], self.SUB_PATH) + self.assertEqual(http._called_with['body'], json.dumps(RESOURCE)) + + def test_subscription_get(self): + import json + ACK_DEADLINE = 90 + PUSH_ENDPOINT = 'https://api.example.com/push' + RETURNED = { + 'topic': self.TOPIC_PATH, + 'name': self.SUB_PATH, + 'ackDeadlineSeconds': ACK_DEADLINE, + 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}, + } + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + resource = conn.subscription_get(self.SUB_PATH) + + self.assertEqual(resource, RETURNED) + self.assertEqual(http._called_with['method'], 'GET') + self._verify_uri(http._called_with['uri'], self.SUB_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_subscription_delete(self): + import json + RETURNED = {} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + conn.subscription_delete(self.SUB_PATH) + + self.assertEqual(http._called_with['method'], 'DELETE') + self._verify_uri(http._called_with['uri'], self.SUB_PATH) + self.assertEqual(http._called_with['body'], None) + + def test_subscription_modify_push_config(self): + import json + PUSH_ENDPOINT = 'https://api.example.com/push' + BODY = { + 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}, + } + RETURNED = {} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + conn.subscription_modify_push_config(self.SUB_PATH, PUSH_ENDPOINT) + + self.assertEqual(http._called_with['method'], 'POST') + self._verify_uri(http._called_with['uri'], + '%s:modifyPushConfig' % (self.SUB_PATH,)) + self.assertEqual(http._called_with['body'], json.dumps(BODY)) + + def test_subscription_pull_defaults(self): + import base64 + import json + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + ACK_ID = 'DEADBEEF' + MSG_ID = 'BEADCAFE' + MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}} + RETURNED = { + 'receivedMessages': [{'ackId': ACK_ID, 'message': MESSAGE}], + } + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + BODY = { + 'returnImmediately': False, + 'maxMessages': 1, + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + received = conn.subscription_pull(self.SUB_PATH) + + self.assertEqual(received, RETURNED['receivedMessages']) + self.assertEqual(http._called_with['method'], 'POST') + self._verify_uri(http._called_with['uri'], + '%s:pull' % (self.SUB_PATH,)) + self.assertEqual(http._called_with['body'], json.dumps(BODY)) + + def test_subscription_pull_explicit(self): + import base64 + import json + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + ACK_ID = 'DEADBEEF' + MSG_ID = 'BEADCAFE' + MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}} + RETURNED = { + 'receivedMessages': [{'ackId': ACK_ID, 'message': MESSAGE}], + } + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + MAX_MESSAGES = 10 + BODY = { + 'returnImmediately': True, + 'maxMessages': MAX_MESSAGES, + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + received = conn.subscription_pull( + self.SUB_PATH, return_immediately=True, max_messages=MAX_MESSAGES) + + self.assertEqual(received, RETURNED['receivedMessages']) + self.assertEqual(http._called_with['method'], 'POST') + self._verify_uri(http._called_with['uri'], + '%s:pull' % (self.SUB_PATH,)) + self.assertEqual(http._called_with['body'], json.dumps(BODY)) + + def test_subscription_acknowledge(self): + import json + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + BODY = { + 'ackIds': [ACK_ID1, ACK_ID2], + } + RETURNED = {} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + conn.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) + + self.assertEqual(http._called_with['method'], 'POST') + self._verify_uri(http._called_with['uri'], + '%s:acknowledge' % (self.SUB_PATH,)) + self.assertEqual(http._called_with['body'], json.dumps(BODY)) + + def test_subscription_modify_ack_deadline(self): + import json + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + NEW_DEADLINE = 90 + BODY = { + 'ackIds': [ACK_ID1, ACK_ID2], + 'ackDeadlineSeconds': NEW_DEADLINE, + } + RETURNED = {} + HEADERS = { + 'status': '200', + 'content-type': 'application/json', + } + http = _Http(HEADERS, json.dumps(RETURNED)) + conn = self._makeOne(http=http) + + conn.subscription_modify_ack_deadline( + self.SUB_PATH, [ACK_ID1, ACK_ID2], NEW_DEADLINE) + + self.assertEqual(http._called_with['method'], 'POST') + self._verify_uri(http._called_with['uri'], + '%s:modifyAckDeadline' % (self.SUB_PATH,)) + self.assertEqual(http._called_with['body'], json.dumps(BODY)) + + +class _Http(object): + + _called_with = None + + def __init__(self, headers, content): + from httplib2 import Response + self._response = Response(headers) + self._content = content + + def request(self, **kw): + self._called_with = kw + return self._response, self._content diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index d187e8cd07d6..cb4cbb75ba83 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -133,154 +133,197 @@ def test_from_api_repr_w_topics_w_topic_match(self): self.assertEqual(subscription.push_endpoint, self.ENDPOINT) def test_create_pull_wo_ack_deadline_w_bound_client(self): - PATH = '/%s' % (self.SUB_PATH,) - BODY = {'topic': self.TOPIC_PATH} - conn = _Connection({'name': self.SUB_PATH}) + RESPONSE = { + 'topic': self.TOPIC_PATH, + 'name': self.SUB_PATH, + } + conn = _Connection() + conn._subscription_create_response = RESPONSE client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) + subscription.create() - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'PUT') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], BODY) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._subscription_created, + (self.SUB_PATH, self.TOPIC_PATH, None, None)) def test_create_push_w_ack_deadline_w_alternate_client(self): - PATH = '/%s' % (self.SUB_PATH,) - BODY = {'topic': self.TOPIC_PATH, - 'ackDeadlineSeconds': self.DEADLINE, - 'pushConfig': {'pushEndpoint': self.ENDPOINT}} - conn1 = _Connection({'name': self.SUB_PATH}) + RESPONSE = { + 'topic': self.TOPIC_PATH, + 'name': self.SUB_PATH, + 'ackDeadlineSeconds': self.DEADLINE, + 'pushConfig': {'pushEndpoint': self.ENDPOINT} + } + conn = _Connection() + conn._subscription_create_response = RESPONSE + conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection({'name': self.SUB_PATH}) + conn2 = _Connection() + conn2._subscription_create_response = RESPONSE client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic, self.DEADLINE, self.ENDPOINT) + subscription.create(client=client2) + self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'PUT') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], BODY) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual( + conn2._subscription_created, + (self.SUB_PATH, self.TOPIC_PATH, self.DEADLINE, self.ENDPOINT)) def test_exists_miss_w_bound_client(self): - PATH = '/%s' % (self.SUB_PATH,) conn = _Connection() client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) + self.assertFalse(subscription.exists()) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], PATH) - self.assertEqual(req.get('query_params'), None) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._subscription_got, self.SUB_PATH) def test_exists_hit_w_alternate_client(self): - PATH = '/%s' % (self.SUB_PATH,) - conn1 = _Connection({'name': self.SUB_PATH, 'topic': self.TOPIC_PATH}) + RESPONSE = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} + conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection({'name': self.SUB_PATH, 'topic': self.TOPIC_PATH}) + conn2 = _Connection() + conn2._subscription_get_response = RESPONSE client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) + self.assertTrue(subscription.exists(client=client2)) + self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], PATH) - self.assertEqual(req.get('query_params'), None) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._subscription_got, self.SUB_PATH) def test_reload_w_bound_client(self): - PATH = '/%s' % (self.SUB_PATH,) - conn = _Connection({'name': self.SUB_PATH, - 'topic': self.TOPIC_PATH, - 'ackDeadlineSeconds': self.DEADLINE, - 'pushConfig': {'pushEndpoint': self.ENDPOINT}}) + RESPONSE = { + 'name': self.SUB_PATH, + 'topic': self.TOPIC_PATH, + 'ackDeadlineSeconds': self.DEADLINE, + 'pushConfig': {'pushEndpoint': self.ENDPOINT}, + } + conn = _Connection() + conn._subscription_get_response = RESPONSE client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) + subscription.reload() + self.assertEqual(subscription.ack_deadline, self.DEADLINE) self.assertEqual(subscription.push_endpoint, self.ENDPOINT) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], PATH) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._subscription_got, self.SUB_PATH) def test_reload_w_alternate_client(self): - PATH = '/%s' % (self.SUB_PATH,) + RESPONSE = { + 'name': self.SUB_PATH, + 'topic': self.TOPIC_PATH, + } conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection({'name': self.SUB_PATH, - 'topic': self.TOPIC_PATH, - 'ackDeadlineSeconds': self.DEADLINE, - 'pushConfig': {'pushEndpoint': self.ENDPOINT}}) + conn2 = _Connection() + conn2._subscription_get_response = RESPONSE client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) - subscription = self._makeOne(self.SUB_NAME, topic) + subscription = self._makeOne(self.SUB_NAME, topic, + self.DEADLINE, self.ENDPOINT) + subscription.reload(client=client2) - self.assertEqual(subscription.ack_deadline, self.DEADLINE) - self.assertEqual(subscription.push_endpoint, self.ENDPOINT) + + self.assertEqual(subscription.ack_deadline, None) + self.assertEqual(subscription.push_endpoint, None) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], PATH) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._subscription_got, self.SUB_PATH) + + def test_delete_w_bound_client(self): + RESPONSE = {} + conn = _Connection() + conn._subscription_delete_response = RESPONSE + client = _Client(project=self.PROJECT, connection=conn) + topic = _Topic(self.TOPIC_NAME, client=client) + subscription = self._makeOne(self.SUB_NAME, topic) + + subscription.delete() + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._subscription_deleted, self.SUB_PATH) + + def test_delete_w_alternate_client(self): + RESPONSE = {} + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection() + conn2._subscription_delete_response = RESPONSE + client2 = _Client(project=self.PROJECT, connection=conn2) + topic = _Topic(self.TOPIC_NAME, client=client1) + subscription = self._makeOne(self.SUB_NAME, topic, + self.DEADLINE, self.ENDPOINT) + + subscription.delete(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._subscription_deleted, self.SUB_PATH) def test_modify_push_config_w_endpoint_w_bound_client(self): - PATH = '/%s:modifyPushConfig' % (self.SUB_PATH,) - conn = _Connection({}) + conn = _Connection() + conn._subscription_modify_push_config_response = {} client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) + subscription.modify_push_configuration(push_endpoint=self.ENDPOINT) + self.assertEqual(subscription.push_endpoint, self.ENDPOINT) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], - {'pushConfig': {'pushEndpoint': self.ENDPOINT}}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._subscription_modified_push_config, + (self.SUB_PATH, self.ENDPOINT)) def test_modify_push_config_wo_endpoint_w_alternate_client(self): - PATH = '/%s:modifyPushConfig' % (self.SUB_PATH,) - conn1 = _Connection({}) + conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection({}) + conn2 = _Connection() + conn2._subscription_modify_push_config_response = {} client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic, push_endpoint=self.ENDPOINT) + subscription.modify_push_configuration(push_endpoint=None, client=client2) + self.assertEqual(subscription.push_endpoint, None) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], {'pushConfig': {}}) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._subscription_modified_push_config, + (self.SUB_PATH, None)) def test_pull_wo_return_immediately_max_messages_w_bound_client(self): import base64 from gcloud.pubsub.message import Message - PATH = '/%s:pull' % (self.SUB_PATH,) ACK_ID = 'DEADBEEF' MSG_ID = 'BEADCAFE' PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD) MESSAGE = {'messageId': MSG_ID, 'data': B64} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} - conn = _Connection({'receivedMessages': [REC_MESSAGE]}) + conn = _Connection() + conn._subscription_pull_response = [REC_MESSAGE] client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) + pulled = subscription.pull() + self.assertEqual(len(pulled), 1) ack_id, message = pulled[0] self.assertEqual(ack_id, ACK_ID) @@ -288,17 +331,13 @@ def test_pull_wo_return_immediately_max_messages_w_bound_client(self): self.assertEqual(message.data, PAYLOAD) self.assertEqual(message.message_id, MSG_ID) self.assertEqual(message.attributes, {}) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], - {'returnImmediately': False, 'maxMessages': 1}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._subscription_pulled, + (self.SUB_PATH, False, 1)) def test_pull_w_return_immediately_w_max_messages_w_alt_client(self): import base64 from gcloud.pubsub.message import Message - PATH = '/%s:pull' % (self.SUB_PATH,) ACK_ID = 'DEADBEEF' MSG_ID = 'BEADCAFE' PAYLOAD = b'This is the message text' @@ -307,12 +346,15 @@ def test_pull_w_return_immediately_w_max_messages_w_alt_client(self): REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection({'receivedMessages': [REC_MESSAGE]}) + conn2 = _Connection() + conn2._subscription_pull_response = [REC_MESSAGE] client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) + pulled = subscription.pull(return_immediately=True, max_messages=3, client=client2) + self.assertEqual(len(pulled), 1) ack_id, message = pulled[0] self.assertEqual(ack_id, ACK_ID) @@ -321,120 +363,90 @@ def test_pull_w_return_immediately_w_max_messages_w_alt_client(self): self.assertEqual(message.message_id, MSG_ID) self.assertEqual(message.attributes, {'a': 'b'}) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], - {'returnImmediately': True, 'maxMessages': 3}) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._subscription_pulled, + (self.SUB_PATH, True, 3)) def test_pull_wo_receivedMessages(self): - PATH = '/%s:pull' % (self.SUB_PATH,) conn = _Connection({}) + conn._subscription_pull_response = {} client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) + pulled = subscription.pull(return_immediately=False) + self.assertEqual(len(pulled), 0) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], - {'returnImmediately': False, 'maxMessages': 1}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._subscription_pulled, + (self.SUB_PATH, False, 1)) def test_acknowledge_w_bound_client(self): - PATH = '/%s:acknowledge' % (self.SUB_PATH,) ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' - conn = _Connection({}) + conn = _Connection() + conn._subscription_acknowlege_response = {} client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) + subscription.acknowledge([ACK_ID1, ACK_ID2]) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._subscription_acked, + (self.SUB_PATH, [ACK_ID1, ACK_ID2])) def test_acknowledge_w_alternate_client(self): - PATH = '/%s:acknowledge' % (self.SUB_PATH,) ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' - conn1 = _Connection({}) + conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection({}) + conn2 = _Connection() + conn2._subscription_acknowlege_response = {} client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) + subscription.acknowledge([ACK_ID1, ACK_ID2], client=client2) + self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._subscription_acked, + (self.SUB_PATH, [ACK_ID1, ACK_ID2])) def test_modify_ack_deadline_w_bound_client(self): - PATH = '/%s:modifyAckDeadline' % (self.SUB_PATH,) - ACK_ID = 'DEADBEEF' - SENT = {'ackIds': [ACK_ID], 'ackDeadlineSeconds': self.DEADLINE} - conn = _Connection({}) + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + conn = _Connection() + conn._subscription_modify_ack_deadline_response = {} client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) - subscription.modify_ack_deadline(ACK_ID, self.DEADLINE) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], SENT) + + subscription.modify_ack_deadline([ACK_ID1, ACK_ID2], self.DEADLINE) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._subscription_modified_ack_deadline, + (self.SUB_PATH, [ACK_ID1, ACK_ID2], self.DEADLINE)) def test_modify_ack_deadline_w_alternate_client(self): - PATH = '/%s:modifyAckDeadline' % (self.SUB_PATH,) - ACK_ID = 'DEADBEEF' - SENT = {'ackIds': [ACK_ID], 'ackDeadlineSeconds': self.DEADLINE} - conn1 = _Connection({}) + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection({}) + conn2 = _Connection() + conn2._subscription_modify_ack_deadline_response = {} client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) - subscription.modify_ack_deadline(ACK_ID, self.DEADLINE, client=client2) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], SENT) - def test_delete_w_bound_client(self): - PATH = '/%s' % (self.SUB_PATH,) - conn = _Connection({}) - client = _Client(project=self.PROJECT, connection=conn) - topic = _Topic(self.TOPIC_NAME, client=client) - subscription = self._makeOne(self.SUB_NAME, topic) - subscription.delete() - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'DELETE') - self.assertEqual(req['path'], PATH) + subscription.modify_ack_deadline( + [ACK_ID1, ACK_ID2], self.DEADLINE, client=client2) - def test_delete_w_alternate_client(self): - PATH = '/%s' % (self.SUB_PATH,) - conn1 = _Connection({}) - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection({}) - client2 = _Client(project=self.PROJECT, connection=conn2) - topic = _Topic(self.TOPIC_NAME, client=client1) - subscription = self._makeOne(self.SUB_NAME, topic) - subscription.delete(client=client2) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'DELETE') - self.assertEqual(req['path'], PATH) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._subscription_modified_ack_deadline, + (self.SUB_PATH, [ACK_ID1, ACK_ID2], self.DEADLINE)) def test_get_iam_policy_w_bound_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE @@ -453,9 +465,8 @@ def test_get_iam_policy_w_bound_client(self): {'role': VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, ], } - PATH = '/%s:getIamPolicy' % (self.SUB_PATH,) - - conn = _Connection(POLICY) + conn = _Connection() + conn._get_iam_policy_response = POLICY client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) @@ -467,20 +478,16 @@ def test_get_iam_policy_w_bound_client(self): self.assertEqual(sorted(policy.owners), [OWNER2, OWNER1]) self.assertEqual(sorted(policy.editors), [EDITOR1, EDITOR2]) self.assertEqual(sorted(policy.viewers), [VIEWER1, VIEWER2]) - - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], PATH) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._got_iam_policy, self.SUB_PATH) def test_get_iam_policy_w_alternate_client(self): POLICY = { 'etag': 'ACAB', } - PATH = '/%s:getIamPolicy' % (self.SUB_PATH,) - conn1 = _Connection() - conn2 = _Connection(POLICY) + conn2 = _Connection() + conn2._get_iam_policy_response = POLICY client1 = _Client(project=self.PROJECT, connection=conn1) client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) @@ -495,10 +502,8 @@ def test_get_iam_policy_w_alternate_client(self): self.assertEqual(sorted(policy.viewers), []) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], PATH) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._got_iam_policy, self.SUB_PATH) def test_set_iam_policy_w_bound_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE @@ -521,9 +526,8 @@ def test_set_iam_policy_w_bound_client(self): RESPONSE = POLICY.copy() RESPONSE['etag'] = 'ABACABAF' RESPONSE['version'] = 18 - PATH = '/%s:setIamPolicy' % (self.SUB_PATH,) - - conn = _Connection(RESPONSE) + conn = _Connection() + conn._set_iam_policy_response = RESPONSE client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) @@ -542,20 +546,15 @@ def test_set_iam_policy_w_bound_client(self): self.assertEqual(sorted(new_policy.owners), [OWNER1, OWNER2]) self.assertEqual(sorted(new_policy.editors), [EDITOR1, EDITOR2]) self.assertEqual(sorted(new_policy.viewers), [VIEWER1, VIEWER2]) - - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], {'policy': POLICY}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._set_iam_policy, (self.SUB_PATH, POLICY)) def test_set_iam_policy_w_alternate_client(self): from gcloud.pubsub.iam import Policy RESPONSE = {'etag': 'ACAB'} - PATH = '/%s:setIamPolicy' % (self.SUB_PATH,) - conn1 = _Connection() - conn2 = _Connection(RESPONSE) + conn2 = _Connection() + conn2._set_iam_policy_response = RESPONSE client1 = _Client(project=self.PROJECT, connection=conn1) client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) @@ -569,25 +568,15 @@ def test_set_iam_policy_w_alternate_client(self): self.assertEqual(sorted(new_policy.owners), []) self.assertEqual(sorted(new_policy.editors), []) self.assertEqual(sorted(new_policy.viewers), []) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], {'policy': {}}) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._set_iam_policy, (self.SUB_PATH, {})) def test_check_iam_permissions_w_bound_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE ROLES = [VIEWER_ROLE, EDITOR_ROLE, OWNER_ROLE] - PATH = '/%s:testIamPermissions' % (self.SUB_PATH,) - REQUESTED = { - 'permissions': ROLES, - } - RESPONSE = { - 'permissions': ROLES[:-1], - } - conn = _Connection(RESPONSE) + conn = _Connection() + conn._test_iam_permissions_response = ROLES[:-1] client = _Client(project=self.PROJECT, connection=conn) topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) @@ -595,23 +584,17 @@ def test_check_iam_permissions_w_bound_client(self): allowed = subscription.check_iam_permissions(ROLES) self.assertEqual(allowed, ROLES[:-1]) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], REQUESTED) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._tested_iam_permissions, + (self.SUB_PATH, ROLES)) def test_check_iam_permissions_w_alternate_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE ROLES = [VIEWER_ROLE, EDITOR_ROLE, OWNER_ROLE] - PATH = '/%s:testIamPermissions' % (self.SUB_PATH,) - REQUESTED = { - 'permissions': ROLES, - } - RESPONSE = {} conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection(RESPONSE) + conn2 = _Connection() + conn2._test_iam_permissions_response = [] client2 = _Client(project=self.PROJECT, connection=conn2) topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) @@ -620,29 +603,68 @@ def test_check_iam_permissions_w_alternate_client(self): self.assertEqual(len(allowed), 0) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], PATH) - self.assertEqual(req['data'], REQUESTED) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._tested_iam_permissions, + (self.SUB_PATH, ROLES)) -class _Connection(object): +class _Connection(object): # pylint: disable=too-many-instance-attributes def __init__(self, *responses): self._responses = responses self._requested = [] - def api_request(self, **kw): - from gcloud.exceptions import NotFound - self._requested.append(kw) + def subscription_create(self, subscription_path, topic_path, + ack_deadline=None, push_endpoint=None): + self._subscription_created = ( + subscription_path, topic_path, ack_deadline, push_endpoint) + return self._subscription_create_response + def subscription_get(self, subscription_path): + from gcloud.exceptions import NotFound + self._subscription_got = subscription_path try: - response, self._responses = self._responses[0], self._responses[1:] - except: - raise NotFound('miss') - else: - return response + return self._subscription_get_response + except AttributeError: + raise NotFound(subscription_path) + + def subscription_delete(self, subscription_path): + self._subscription_deleted = subscription_path + return self._subscription_delete_response + + def subscription_modify_push_config( + self, subscription_path, push_endpoint): + self._subscription_modified_push_config = ( + subscription_path, push_endpoint) + return self._subscription_modify_push_config_response + + def subscription_pull(self, subscription_path, return_immediately, + max_messages): + self._subscription_pulled = ( + subscription_path, return_immediately, max_messages) + return self._subscription_pull_response + + def subscription_acknowledge(self, subscription_path, ack_ids): + self._subscription_acked = (subscription_path, ack_ids) + return self._subscription_acknowlege_response + + def subscription_modify_ack_deadline(self, subscription_path, ack_ids, + ack_deadline): + self._subscription_modified_ack_deadline = ( + subscription_path, ack_ids, ack_deadline) + return self._subscription_modify_ack_deadline_response + + def get_iam_policy(self, target_path): + self._got_iam_policy = target_path + return self._get_iam_policy_response + + def set_iam_policy(self, target_path, policy): + self._set_iam_policy = target_path, policy + return self._set_iam_policy_response + + def test_iam_permissions(self, target_path, permissions): + self._tested_iam_permissions = target_path, permissions + return self._test_iam_permissions_response class _Topic(object): diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py index c6967bfe4b72..c1844dc60a69 100644 --- a/gcloud/pubsub/test_topic.py +++ b/gcloud/pubsub/test_topic.py @@ -16,6 +16,9 @@ class TestTopic(unittest2.TestCase): + PROJECT = 'PROJECT' + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) def _getTargetClass(self): from gcloud.pubsub.topic import Topic @@ -25,120 +28,125 @@ def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) def test_ctor_w_explicit_timestamp(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - CLIENT = _Client(project=PROJECT) - topic = self._makeOne(TOPIC_NAME, - client=CLIENT, + client = _Client(project=self.PROJECT) + topic = self._makeOne(self.TOPIC_NAME, + client=client, timestamp_messages=True) - self.assertEqual(topic.name, TOPIC_NAME) - self.assertEqual(topic.project, PROJECT) - self.assertEqual(topic.full_name, - 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) + self.assertEqual(topic.name, self.TOPIC_NAME) + self.assertEqual(topic.project, self.PROJECT) + self.assertEqual(topic.full_name, self.TOPIC_PATH) self.assertTrue(topic.timestamp_messages) def test_from_api_repr(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - CLIENT = _Client(project=PROJECT) - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - resource = {'name': PATH} + client = _Client(project=self.PROJECT) + resource = {'name': self.TOPIC_PATH} klass = self._getTargetClass() - topic = klass.from_api_repr(resource, client=CLIENT) - self.assertEqual(topic.name, TOPIC_NAME) - self.assertTrue(topic._client is CLIENT) - self.assertEqual(topic.project, PROJECT) - self.assertEqual(topic.full_name, PATH) + topic = klass.from_api_repr(resource, client=client) + self.assertEqual(topic.name, self.TOPIC_NAME) + self.assertTrue(topic._client is client) + self.assertEqual(topic.project, self.PROJECT) + self.assertEqual(topic.full_name, self.TOPIC_PATH) def test_from_api_repr_with_bad_client(self): - TOPIC_NAME = 'topic_name' PROJECT1 = 'PROJECT1' PROJECT2 = 'PROJECT2' - CLIENT = _Client(project=PROJECT1) - PATH = 'projects/%s/topics/%s' % (PROJECT2, TOPIC_NAME) + client = _Client(project=PROJECT1) + PATH = 'projects/%s/topics/%s' % (PROJECT2, self.TOPIC_NAME) resource = {'name': PATH} klass = self._getTargetClass() self.assertRaises(ValueError, klass.from_api_repr, - resource, client=CLIENT) + resource, client=client) def test_create_w_bound_client(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'name': PATH}) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + conn = _Connection() + conn._topic_create_response = {'name': self.TOPIC_PATH} + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) + topic.create() - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'PUT') - self.assertEqual(req['path'], '/%s' % PATH) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_created, self.TOPIC_PATH) def test_create_w_alternate_client(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn1 = _Connection({'name': PATH}) - CLIENT1 = _Client(project=PROJECT, connection=conn1) - conn2 = _Connection({'name': PATH}) - CLIENT2 = _Client(project=PROJECT, connection=conn2) - topic = self._makeOne(TOPIC_NAME, client=CLIENT1) - topic.create(client=CLIENT2) + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection() + conn2._topic_create_response = {'name': self.TOPIC_PATH} + client2 = _Client(project=self.PROJECT, connection=conn2) + topic = self._makeOne(self.TOPIC_NAME, client=client1) + + topic.create(client=client2) + self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'PUT') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._topic_created, self.TOPIC_PATH) def test_exists_miss_w_bound_client(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection() - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) + self.assertFalse(topic.exists()) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_got, self.TOPIC_PATH) def test_exists_hit_w_alternate_client(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn1 = _Connection({'name': PATH}) - CLIENT1 = _Client(project=PROJECT, connection=conn1) - conn2 = _Connection({'name': PATH}) - CLIENT2 = _Client(project=PROJECT, connection=conn2) - topic = self._makeOne(TOPIC_NAME, client=CLIENT1) - self.assertTrue(topic.exists(client=CLIENT2)) + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection() + conn2._topic_get_response = {'name': self.TOPIC_PATH} + client2 = _Client(project=self.PROJECT, connection=conn2) + topic = self._makeOne(self.TOPIC_NAME, client=client1) + + self.assertTrue(topic.exists(client=client2)) + self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._topic_got, self.TOPIC_PATH) + + def test_delete_w_bound_client(self): + conn = _Connection() + conn._topic_delete_response = {} + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) + + topic.delete() + + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_deleted, self.TOPIC_PATH) + + def test_delete_w_alternate_client(self): + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection() + conn2._topic_delete_response = {} + client2 = _Client(project=self.PROJECT, connection=conn2) + topic = self._makeOne(self.TOPIC_NAME, client=client1) + + topic.delete(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._topic_deleted, self.TOPIC_PATH) def test_publish_single_bytes_wo_attrs_w_bound_client(self): import base64 - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' - MESSAGE = {'data': B64, - 'attributes': {}} - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'messageIds': [MSGID]}) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + MESSAGE = {'data': B64, 'attributes': {}} + conn = _Connection() + conn._topic_publish_response = [MSGID] + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) + msgid = topic.publish(PAYLOAD) + self.assertEqual(msgid, MSGID) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s:publish' % PATH) - self.assertEqual(req['data'], {'messages': [MESSAGE]}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_published, (self.TOPIC_PATH, [MESSAGE])) def test_publish_single_bytes_wo_attrs_w_add_timestamp_alt_client(self): import base64 @@ -151,80 +159,69 @@ def test_publish_single_bytes_wo_attrs_w_add_timestamp_alt_client(self): def _utcnow(): return NOW - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' - MESSAGE = {'data': B64, - 'attributes': {'timestamp': NOW.strftime(_RFC3339_MICROS)}} - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn1 = _Connection({'messageIds': [MSGID]}) - CLIENT1 = _Client(project=PROJECT, connection=conn1) - conn2 = _Connection({'messageIds': [MSGID]}) - CLIENT2 = _Client(project=PROJECT, connection=conn2) - - topic = self._makeOne(TOPIC_NAME, client=CLIENT1, + MESSAGE = { + 'data': B64, + 'attributes': {'timestamp': NOW.strftime(_RFC3339_MICROS)}, + } + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection() + conn2._topic_publish_response = [MSGID] + client2 = _Client(project=self.PROJECT, connection=conn2) + + topic = self._makeOne(self.TOPIC_NAME, client=client1, timestamp_messages=True) with _Monkey(MUT, _NOW=_utcnow): - msgid = topic.publish(PAYLOAD, client=CLIENT2) + msgid = topic.publish(PAYLOAD, client=client2) self.assertEqual(msgid, MSGID) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s:publish' % PATH) - self.assertEqual(req['data'], {'messages': [MESSAGE]}) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._topic_published, (self.TOPIC_PATH, [MESSAGE])) def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): import base64 - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' OVERRIDE = '2015-04-10T16:46:22.868399Z' MESSAGE = {'data': B64, 'attributes': {'timestamp': OVERRIDE}} - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'messageIds': [MSGID]}) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT, + conn = _Connection() + conn._topic_publish_response = [MSGID] + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client, timestamp_messages=True) + msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE) + self.assertEqual(msgid, MSGID) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s:publish' % PATH) - self.assertEqual(req['data'], {'messages': [MESSAGE]}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_published, (self.TOPIC_PATH, [MESSAGE])) def test_publish_single_w_attrs(self): import base64 - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'messageIds': [MSGID]}) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + conn = _Connection() + conn._topic_publish_response = [MSGID] + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) + msgid = topic.publish(PAYLOAD, attr1='value1', attr2='value2') + self.assertEqual(msgid, MSGID) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s:publish' % PATH) - self.assertEqual(req['data'], {'messages': [MESSAGE]}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_published, (self.TOPIC_PATH, [MESSAGE])) def test_publish_multiple_w_bound_client(self): import base64 - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' PAYLOAD1 = b'This is the first message text' PAYLOAD2 = b'This is the second message text' B64_1 = base64.b64encode(PAYLOAD1) @@ -235,63 +232,59 @@ def test_publish_multiple_w_bound_client(self): 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + conn = _Connection() + conn._topic_publish_response = [MSGID1, MSGID2] + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) + with topic.batch() as batch: batch.publish(PAYLOAD1) batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s:publish' % PATH) - self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_published, + (self.TOPIC_PATH, [MESSAGE1, MESSAGE2])) def test_publish_multiple_w_alternate_client(self): import base64 - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' PAYLOAD1 = b'This is the first message text' PAYLOAD2 = b'This is the second message text' B64_1 = base64.b64encode(PAYLOAD1) B64_2 = base64.b64encode(PAYLOAD2) MSGID1 = 'DEADBEEF' MSGID2 = 'BEADCAFE' - MESSAGE1 = {'data': B64_1.decode('ascii'), - 'attributes': {}} - MESSAGE2 = {'data': B64_2.decode('ascii'), - 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn1 = _Connection({'messageIds': [MSGID1, MSGID2]}) - CLIENT1 = _Client(project=PROJECT, connection=conn1) - conn2 = _Connection({'messageIds': [MSGID1, MSGID2]}) - CLIENT2 = _Client(project=PROJECT, connection=conn2) - topic = self._makeOne(TOPIC_NAME, client=CLIENT1) - with topic.batch(client=CLIENT2) as batch: + MESSAGE1 = {'data': B64_1.decode('ascii'), 'attributes': {}} + MESSAGE2 = { + 'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}, + } + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection() + conn2._topic_publish_response = [MSGID1, MSGID2] + client2 = _Client(project=self.PROJECT, connection=conn2) + topic = self._makeOne(self.TOPIC_NAME, client=client1) + + with topic.batch(client=client2) as batch: batch.publish(PAYLOAD1) batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s:publish' % PATH) - self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._topic_published, + (self.TOPIC_PATH, [MESSAGE1, MESSAGE2])) def test_publish_multiple_error(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' PAYLOAD1 = b'This is the first message text' PAYLOAD2 = b'This is the second message text' - MSGID1 = 'DEADBEEF' - MSGID2 = 'BEADCAFE' - conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - CLIENT = _Client(project=PROJECT) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + conn = _Connection() + client = _Client(project=self.PROJECT) + topic = self._makeOne(self.TOPIC_NAME, client=client) + try: with topic.batch() as batch: batch.publish(PAYLOAD1) @@ -299,44 +292,15 @@ def test_publish_multiple_error(self): raise _Bugout() except _Bugout: pass + self.assertEqual(list(batch), []) self.assertEqual(len(conn._requested), 0) - - def test_delete_w_bound_client(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({}) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) - topic.delete() - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'DELETE') - self.assertEqual(req['path'], '/%s' % PATH) - - def test_delete_w_alternate_client(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn1 = _Connection({}) - CLIENT1 = _Client(project=PROJECT, connection=conn1) - conn2 = _Connection({}) - CLIENT2 = _Client(project=PROJECT, connection=conn2) - topic = self._makeOne(TOPIC_NAME, client=CLIENT1) - topic.delete(client=CLIENT2) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'DELETE') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(getattr(conn, '_topic_published', self), self) def test_subscription(self): from gcloud.pubsub.subscription import Subscription - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - CLIENT = _Client(project=PROJECT) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + client = _Client(project=self.PROJECT) + topic = self._makeOne(self.TOPIC_NAME, client=client) SUBSCRIPTION_NAME = 'subscription_name' subscription = topic.subscription(SUBSCRIPTION_NAME) @@ -346,24 +310,22 @@ def test_subscription(self): def test_list_subscriptions_no_paging(self): from gcloud.pubsub.subscription import Subscription - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' SUB_NAME_1 = 'subscription_1' - SUB_PATH_1 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_1) + SUB_PATH_1 = 'projects/%s/subscriptions/%s' % ( + self.PROJECT, SUB_NAME_1) SUB_NAME_2 = 'subscription_2' - SUB_PATH_2 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_2) - TOPIC_NAME = 'topic_name' + SUB_PATH_2 = 'projects/%s/subscriptions/%s' % ( + self.PROJECT, SUB_NAME_2) SUBS_LIST = [SUB_PATH_1, SUB_PATH_2] TOKEN = 'TOKEN' - RETURNED = {'subscriptions': SUBS_LIST, 'nextPageToken': TOKEN} - conn = _Connection(RETURNED) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + conn = _Connection() + conn._topic_list_subscriptions_response = SUBS_LIST, TOKEN + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) - # Execute request. subscriptions, next_page_token = topic.list_subscriptions() - # Test values are correct. + self.assertEqual(len(subscriptions), 2) subscription = subscriptions[0] @@ -377,36 +339,30 @@ def test_list_subscriptions_no_paging(self): self.assertTrue(subscription.topic is topic) self.assertEqual(next_page_token, TOKEN) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], - '/projects/%s/topics/%s/subscriptions' - % (PROJECT, TOPIC_NAME)) - self.assertEqual(req['query_params'], {}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_listed, + (self.TOPIC_PATH, None, None)) def test_list_subscriptions_with_paging(self): from gcloud.pubsub.subscription import Subscription - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' SUB_NAME_1 = 'subscription_1' - SUB_PATH_1 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_1) + SUB_PATH_1 = 'projects/%s/subscriptions/%s' % ( + self.PROJECT, SUB_NAME_1) SUB_NAME_2 = 'subscription_2' - SUB_PATH_2 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_2) - TOPIC_NAME = 'topic_name' + SUB_PATH_2 = 'projects/%s/subscriptions/%s' % ( + self.PROJECT, SUB_NAME_2) SUBS_LIST = [SUB_PATH_1, SUB_PATH_2] PAGE_SIZE = 10 TOKEN = 'TOKEN' - RETURNED = {'subscriptions': SUBS_LIST} - conn = _Connection(RETURNED) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + conn = _Connection() + conn._topic_list_subscriptions_response = SUBS_LIST, None + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) - # Execute request. subscriptions, next_page_token = topic.list_subscriptions( page_size=PAGE_SIZE, page_token=TOKEN) - # Test values are correct. + self.assertEqual(len(subscriptions), 2) subscription = subscriptions[0] @@ -420,37 +376,24 @@ def test_list_subscriptions_with_paging(self): self.assertTrue(subscription.topic is topic) self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], - '/projects/%s/topics/%s/subscriptions' - % (PROJECT, TOPIC_NAME)) - self.assertEqual(req['query_params'], - {'pageSize': PAGE_SIZE, 'pageToken': TOKEN}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_listed, + (self.TOPIC_PATH, PAGE_SIZE, TOKEN)) def test_list_subscriptions_missing_key(self): - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - TOPIC_NAME = 'topic_name' - - conn = _Connection({}) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + conn = _Connection() + conn._topic_list_subscriptions_response = (), None + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) - # Execute request. subscriptions, next_page_token = topic.list_subscriptions() - # Test values are correct. + self.assertEqual(len(subscriptions), 0) self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], - '/projects/%s/topics/%s/subscriptions' - % (PROJECT, TOPIC_NAME)) - self.assertEqual(req['query_params'], {}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_listed, + (self.TOPIC_PATH, None, None)) def test_get_iam_policy_w_bound_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE @@ -469,14 +412,11 @@ def test_get_iam_policy_w_bound_client(self): {'role': VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, ], } - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - TOPIC_NAME = 'topic_name' - PATH = 'projects/%s/topics/%s:getIamPolicy' % (PROJECT, TOPIC_NAME) - conn = _Connection(POLICY) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + conn = _Connection() + conn._get_iam_policy_response = POLICY + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) policy = topic.get_iam_policy() @@ -485,28 +425,22 @@ def test_get_iam_policy_w_bound_client(self): self.assertEqual(sorted(policy.owners), [OWNER2, OWNER1]) self.assertEqual(sorted(policy.editors), [EDITOR1, EDITOR2]) self.assertEqual(sorted(policy.viewers), [VIEWER1, VIEWER2]) - - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._got_iam_policy, self.TOPIC_PATH) def test_get_iam_policy_w_alternate_client(self): POLICY = { 'etag': 'ACAB', } - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - TOPIC_NAME = 'topic_name' - PATH = 'projects/%s/topics/%s:getIamPolicy' % (PROJECT, TOPIC_NAME) conn1 = _Connection() - conn2 = _Connection(POLICY) - CLIENT1 = _Client(project=PROJECT, connection=conn1) - CLIENT2 = _Client(project=PROJECT, connection=conn2) - topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + conn2 = _Connection() + conn2._get_iam_policy_response = POLICY + client1 = _Client(project=self.PROJECT, connection=conn1) + client2 = _Client(project=self.PROJECT, connection=conn2) + topic = self._makeOne(self.TOPIC_NAME, client=client1) - policy = topic.get_iam_policy(client=CLIENT2) + policy = topic.get_iam_policy(client=client2) self.assertEqual(policy.etag, 'ACAB') self.assertEqual(policy.version, None) @@ -515,10 +449,8 @@ def test_get_iam_policy_w_alternate_client(self): self.assertEqual(sorted(policy.viewers), []) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._got_iam_policy, self.TOPIC_PATH) def test_set_iam_policy_w_bound_client(self): from gcloud.pubsub.iam import Policy @@ -541,14 +473,11 @@ def test_set_iam_policy_w_bound_client(self): RESPONSE = POLICY.copy() RESPONSE['etag'] = 'ABACABAF' RESPONSE['version'] = 18 - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - TOPIC_NAME = 'topic_name' - PATH = 'projects/%s/topics/%s:setIamPolicy' % (PROJECT, TOPIC_NAME) - - conn = _Connection(RESPONSE) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + + conn = _Connection() + conn._set_iam_policy_response = RESPONSE + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) policy = Policy('DEADBEEF', 17) policy.owners.add(OWNER1) policy.owners.add(OWNER2) @@ -564,29 +493,22 @@ def test_set_iam_policy_w_bound_client(self): self.assertEqual(sorted(new_policy.owners), [OWNER1, OWNER2]) self.assertEqual(sorted(new_policy.editors), [EDITOR1, EDITOR2]) self.assertEqual(sorted(new_policy.viewers), [VIEWER1, VIEWER2]) - - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) - self.assertEqual(req['data'], {'policy': POLICY}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._set_iam_policy, (self.TOPIC_PATH, POLICY)) def test_set_iam_policy_w_alternate_client(self): from gcloud.pubsub.iam import Policy RESPONSE = {'etag': 'ACAB'} - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - TOPIC_NAME = 'topic_name' - PATH = 'projects/%s/topics/%s:setIamPolicy' % (PROJECT, TOPIC_NAME) conn1 = _Connection() - conn2 = _Connection(RESPONSE) - CLIENT1 = _Client(project=PROJECT, connection=conn1) - CLIENT2 = _Client(project=PROJECT, connection=conn2) - topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + conn2 = _Connection() + conn2._set_iam_policy_response = RESPONSE + client1 = _Client(project=self.PROJECT, connection=conn1) + client2 = _Client(project=self.PROJECT, connection=conn2) + topic = self._makeOne(self.TOPIC_NAME, client=client1) policy = Policy() - new_policy = topic.set_iam_policy(policy, client=CLIENT2) + new_policy = topic.set_iam_policy(policy, client=client2) self.assertEqual(new_policy.etag, 'ACAB') self.assertEqual(new_policy.version, None) @@ -595,67 +517,45 @@ def test_set_iam_policy_w_alternate_client(self): self.assertEqual(sorted(new_policy.viewers), []) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) - self.assertEqual(req['data'], {'policy': {}}) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._set_iam_policy, (self.TOPIC_PATH, {})) def test_check_iam_permissions_w_bound_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - PATH = 'projects/%s/topics/%s:testIamPermissions' % ( - PROJECT, TOPIC_NAME) ROLES = [VIEWER_ROLE, EDITOR_ROLE, OWNER_ROLE] - REQUESTED = { - 'permissions': ROLES, - } - RESPONSE = { - 'permissions': ROLES[:-1], - } - conn = _Connection(RESPONSE) - CLIENT = _Client(project=PROJECT, connection=conn) - topic = self._makeOne(TOPIC_NAME, client=CLIENT) + conn = _Connection() + conn._test_iam_permissions_response = ROLES[:-1] + client = _Client(project=self.PROJECT, connection=conn) + topic = self._makeOne(self.TOPIC_NAME, client=client) allowed = topic.check_iam_permissions(ROLES) self.assertEqual(allowed, ROLES[:-1]) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) - self.assertEqual(req['data'], REQUESTED) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._tested_iam_permissions, + (self.TOPIC_PATH, ROLES)) def test_check_iam_permissions_w_alternate_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - PATH = 'projects/%s/topics/%s:testIamPermissions' % ( - PROJECT, TOPIC_NAME) ROLES = [VIEWER_ROLE, EDITOR_ROLE, OWNER_ROLE] - REQUESTED = { - 'permissions': ROLES, - } - RESPONSE = {} conn1 = _Connection() - CLIENT1 = _Client(project=PROJECT, connection=conn1) - conn2 = _Connection(RESPONSE) - CLIENT2 = _Client(project=PROJECT, connection=conn2) - topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection() + conn2._test_iam_permissions_response = [] + client2 = _Client(project=self.PROJECT, connection=conn2) + topic = self._makeOne(self.TOPIC_NAME, client=client1) - allowed = topic.check_iam_permissions(ROLES, client=CLIENT2) + allowed = topic.check_iam_permissions(ROLES, client=client2) self.assertEqual(len(allowed), 0) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) - self.assertEqual(req['data'], REQUESTED) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._tested_iam_permissions, + (self.TOPIC_PATH, ROLES)) class TestBatch(unittest2.TestCase): + PROJECT = 'PROJECT' def _getTargetClass(self): from gcloud.pubsub.topic import Batch @@ -666,10 +566,10 @@ def _makeOne(self, *args, **kwargs): def test_ctor_defaults(self): topic = _Topic() - CLIENT = _Client(project='PROJECT') - batch = self._makeOne(topic, CLIENT) + client = _Client(project=self.PROJECT) + batch = self._makeOne(topic, client) self.assertTrue(batch.topic is topic) - self.assertTrue(batch.client is CLIENT) + self.assertTrue(batch.client is client) self.assertEqual(len(batch.messages), 0) self.assertEqual(len(batch.message_ids), 0) @@ -693,9 +593,9 @@ def test_publish_bytes_wo_attrs(self): MESSAGE = {'data': B64, 'attributes': {}} connection = _Connection() - CLIENT = _Client(project='PROJECT', connection=connection) + client = _Client(project=self.PROJECT, connection=connection) topic = _Topic() - batch = self._makeOne(topic, client=CLIENT) + batch = self._makeOne(topic, client=client) batch.publish(PAYLOAD) self.assertEqual(len(connection._requested), 0) self.assertEqual(batch.messages, [MESSAGE]) @@ -707,9 +607,9 @@ def test_publish_bytes_w_add_timestamp(self): MESSAGE = {'data': B64, 'attributes': {'timestamp': 'TIMESTAMP'}} connection = _Connection() - CLIENT = _Client(project='PROJECT', connection=connection) + client = _Client(project=self.PROJECT, connection=connection) topic = _Topic(timestamp_messages=True) - batch = self._makeOne(topic, client=CLIENT) + batch = self._makeOne(topic, client=client) batch.publish(PAYLOAD) self.assertEqual(len(connection._requested), 0) self.assertEqual(batch.messages, [MESSAGE]) @@ -726,20 +626,21 @@ def test_commit_w_bound_client(self): 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - CLIENT = _Client(project='PROJECT', connection=conn) + conn = _Connection() + conn._topic_publish_response = [MSGID1, MSGID2] + client = _Client(project='PROJECT', connection=conn) topic = _Topic() - batch = self._makeOne(topic, client=CLIENT) + batch = self._makeOne(topic, client=client) + batch.publish(PAYLOAD1) batch.publish(PAYLOAD2, attr1='value1', attr2='value2') batch.commit() + self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '%s:publish' % topic.path) - self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_published, + (topic.full_name, [MESSAGE1, MESSAGE2])) def test_commit_w_alternate_client(self): import base64 @@ -753,23 +654,24 @@ def test_commit_w_alternate_client(self): 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn1 = _Connection({'messageIds': [MSGID1, MSGID2]}) - CLIENT1 = _Client(project='PROJECT', connection=conn1) - conn2 = _Connection({'messageIds': [MSGID1, MSGID2]}) - CLIENT2 = _Client(project='PROJECT', connection=conn2) + conn1 = _Connection() + client1 = _Client(project='PROJECT', connection=conn1) + conn2 = _Connection() + conn2._topic_publish_response = [MSGID1, MSGID2] + client2 = _Client(project='PROJECT', connection=conn2) topic = _Topic() - batch = self._makeOne(topic, client=CLIENT1) + batch = self._makeOne(topic, client=client1) + batch.publish(PAYLOAD1) batch.publish(PAYLOAD2, attr1='value1', attr2='value2') - batch.commit(client=CLIENT2) + batch.commit(client=client2) + self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '%s:publish' % topic.path) - self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + self.assertEqual(len(conn2._requested), 0) + self.assertEqual(conn2._topic_published, + (topic.full_name, [MESSAGE1, MESSAGE2])) def test_context_mgr_success(self): import base64 @@ -783,10 +685,11 @@ def test_context_mgr_success(self): 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - CLIENT = _Client(project='PROJECT', connection=conn) + conn = _Connection() + conn._topic_publish_response = [MSGID1, MSGID2] + client = _Client(project='PROJECT', connection=conn) topic = _Topic() - batch = self._makeOne(topic, client=CLIENT) + batch = self._makeOne(topic, client=client) with batch as other: batch.publish(PAYLOAD1) @@ -795,11 +698,9 @@ def test_context_mgr_success(self): self.assertTrue(other is batch) self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '%s:publish' % topic.path) - self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + self.assertEqual(len(conn._requested), 0) + self.assertEqual(conn._topic_published, + (topic.full_name, [MESSAGE1, MESSAGE2])) def test_context_mgr_failure(self): import base64 @@ -814,9 +715,9 @@ def test_context_mgr_failure(self): MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - CLIENT = _Client(project='PROJECT', connection=conn) + client = _Client(project='PROJECT', connection=conn) topic = _Topic() - batch = self._makeOne(topic, client=CLIENT) + batch = self._makeOne(topic, client=client) try: with batch as other: @@ -830,31 +731,59 @@ def test_context_mgr_failure(self): self.assertEqual(list(batch), []) self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2]) self.assertEqual(len(conn._requested), 0) + self.assertEqual(getattr(conn, '_topic_published', self), self) -class _Connection(object): +class _Connection(object): # pylint: disable=too-many-instance-attributes def __init__(self, *responses): self._responses = responses self._requested = [] - def api_request(self, **kw): - from gcloud.exceptions import NotFound - self._requested.append(kw) + def topic_create(self, topic_path): + self._topic_created = topic_path + return self._topic_create_response + def topic_get(self, topic_path): + from gcloud.exceptions import NotFound + self._topic_got = topic_path try: - response, self._responses = self._responses[0], self._responses[1:] - except: - raise NotFound('miss') - else: - return response + return self._topic_get_response + except AttributeError: + raise NotFound(topic_path) + + def topic_delete(self, topic_path): + self._topic_deleted = topic_path + return self._topic_delete_response + + def topic_publish(self, topic_path, messages): + self._topic_published = topic_path, messages + return self._topic_publish_response + + def topic_list_subscriptions(self, topic_path, page_size=None, + page_token=None): + self._topic_listed = topic_path, page_size, page_token + return self._topic_list_subscriptions_response + + def get_iam_policy(self, target_path): + self._got_iam_policy = target_path + return self._get_iam_policy_response + + def set_iam_policy(self, target_path, policy): + self._set_iam_policy = target_path, policy + return self._set_iam_policy_response + + def test_iam_permissions(self, target_path, permissions): + self._tested_iam_permissions = target_path, permissions + return self._test_iam_permissions_response class _Topic(object): def __init__(self, name="NAME", project="PROJECT", timestamp_messages=False): - self.path = '/projects/%s/topics/%s' % (project, name) + self.full_name = 'projects/%s/topics/%s' % (project, name) + self.path = '/%s' % (self.full_name,) self.timestamp_messages = timestamp_messages def _timestamp_message(self, attrs): diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py index 5e442afeeb59..9aa8fa171a8a 100644 --- a/gcloud/pubsub/topic.py +++ b/gcloud/pubsub/topic.py @@ -98,11 +98,6 @@ def full_name(self): """Fully-qualified name used in topic / subscription APIs""" return 'projects/%s/topics/%s' % (self.project, self.name) - @property - def path(self): - """URL path for the topic's APIs""" - return '/%s' % (self.full_name) - def _require_client(self, client): """Check client or verify over-ride. @@ -128,7 +123,7 @@ def create(self, client=None): ``client`` stored on the current topic. """ client = self._require_client(client) - client.connection.api_request(method='PUT', path=self.path) + client.connection.topic_create(topic_path=self.full_name) def exists(self, client=None): """API call: test for the existence of the topic via a GET request @@ -143,12 +138,25 @@ def exists(self, client=None): client = self._require_client(client) try: - client.connection.api_request(method='GET', path=self.path) + client.connection.topic_get(topic_path=self.full_name) except NotFound: return False else: return True + def delete(self, client=None): + """API call: delete the topic via a DELETE request + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/delete + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. + """ + client = self._require_client(client) + client.connection.topic_delete(topic_path=self.full_name) + def _timestamp_message(self, attrs): """Add a timestamp to ``attrs``, if the topic is so configured. @@ -183,10 +191,9 @@ def publish(self, message, client=None, **attrs): self._timestamp_message(attrs) message_b = base64.b64encode(message).decode('ascii') message_data = {'data': message_b, 'attributes': attrs} - data = {'messages': [message_data]} - response = client.connection.api_request( - method='POST', path='%s:publish' % (self.path,), data=data) - return response['messageIds'][0] + message_ids = client.connection.topic_publish( + self.full_name, [message_data]) + return message_ids[0] def batch(self, client=None): """Return a batch to use as a context manager. @@ -201,19 +208,6 @@ def batch(self, client=None): client = self._require_client(client) return Batch(self, client) - def delete(self, client=None): - """API call: delete the topic via a DELETE request - - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/delete - - :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current topic. - """ - client = self._require_client(client) - client.connection.api_request(method='DELETE', path=self.path) - def list_subscriptions(self, page_size=None, page_token=None, client=None): """List subscriptions for the project associated with this client. @@ -240,24 +234,14 @@ def list_subscriptions(self, page_size=None, page_token=None, client=None): value as ``page_token``). """ client = self._require_client(client) - params = {} - - if page_size is not None: - params['pageSize'] = page_size - - if page_token is not None: - params['pageToken'] = page_token - - path = '/projects/%s/topics/%s/subscriptions' % ( - self.project, self.name) - - resp = client.connection.api_request(method='GET', path=path, - query_params=params) + conn = client.connection + sub_paths, next_token = conn.topic_list_subscriptions( + self.full_name, page_size, page_token) subscriptions = [] - for sub_path in resp.get('subscriptions', ()): + for sub_path in sub_paths: sub_name = subscription_name_from_path(sub_path, self.project) subscriptions.append(Subscription(sub_name, self)) - return subscriptions, resp.get('nextPageToken') + return subscriptions, next_token def get_iam_policy(self, client=None): """Fetch the IAM policy for the topic. @@ -274,8 +258,7 @@ def get_iam_policy(self, client=None): ``getIamPolicy`` API request. """ client = self._require_client(client) - path = '%s:getIamPolicy' % (self.path,) - resp = client.connection.api_request(method='GET', path=path) + resp = client.connection.get_iam_policy(self.full_name) return Policy.from_api_repr(resp) def set_iam_policy(self, policy, client=None): @@ -297,11 +280,8 @@ def set_iam_policy(self, policy, client=None): ``setIamPolicy`` API request. """ client = self._require_client(client) - path = '%s:setIamPolicy' % (self.path,) resource = policy.to_api_repr() - wrapped = {'policy': resource} - resp = client.connection.api_request( - method='POST', path=path, data=wrapped) + resp = client.connection.set_iam_policy(self.full_name, resource) return Policy.from_api_repr(resp) def check_iam_permissions(self, permissions, client=None): @@ -321,13 +301,8 @@ def check_iam_permissions(self, permissions, client=None): :returns: subset of ``permissions`` allowed by current IAM policy. """ client = self._require_client(client) - path = '%s:testIamPermissions' % (self.path,) - data = { - 'permissions': list(permissions), - } - resp = client.connection.api_request( - method='POST', path=path, data=data) - return resp.get('permissions', ()) + return client.connection.test_iam_permissions( + self.full_name, list(permissions)) class Batch(object): @@ -380,8 +355,7 @@ def commit(self, client=None): """ if client is None: client = self.client - response = client.connection.api_request( - method='POST', path='%s:publish' % self.topic.path, - data={'messages': self.messages[:]}) - self.message_ids.extend(response['messageIds']) + message_ids = client.connection.topic_publish( + self.topic.full_name, self.messages[:]) + self.message_ids.extend(message_ids) del self.messages[:] diff --git a/scripts/pylintrc_default b/scripts/pylintrc_default index 1f254578f5d7..b4d01032619d 100644 --- a/scripts/pylintrc_default +++ b/scripts/pylintrc_default @@ -290,6 +290,8 @@ good-names = i, j, k, ex, Run, _, # Regular expression matching correct method names # DEFAULT: method-rgx=[a-z_][a-z0-9_]{2,30}$ +# RATIONALE: mapping long API names onto connection methods +method-rgx=[a-z_][a-z0-9_]{2,35}$ # Naming hint for method names # DEFAULT: method-name-hint=[a-z_][a-z0-9_]{2,30}$