diff --git a/gcloud/pubsub/client.py b/gcloud/pubsub/client.py index aea40257591e..7f242eb2ee3b 100644 --- a/gcloud/pubsub/client.py +++ b/gcloud/pubsub/client.py @@ -17,6 +17,9 @@ from gcloud.client import JSONClient from gcloud.pubsub.connection import Connection +from gcloud.pubsub.connection import _PublisherAPI +from gcloud.pubsub.connection import _SubscriberAPI +from gcloud.pubsub.connection import _IAMPolicyAPI from gcloud.pubsub.subscription import Subscription from gcloud.pubsub.topic import Topic @@ -43,6 +46,28 @@ class Client(JSONClient): """ _connection_class = Connection + _publisher_api = _subscriber_api = _iam_policy_api = None + + @property + def publisher_api(self): + """Helper for publisher-related API calls.""" + if self._publisher_api is None: + self._publisher_api = _PublisherAPI(self.connection) + return self._publisher_api + + @property + def subscriber_api(self): + """Helper for subscriber-related API calls.""" + if self._subscriber_api is None: + self._subscriber_api = _SubscriberAPI(self.connection) + return self._subscriber_api + + @property + def iam_policy_api(self): + """Helper for IAM policy-related API calls.""" + if self._iam_policy_api is None: + self._iam_policy_api = _IAMPolicyAPI(self.connection) + return self._iam_policy_api def list_topics(self, page_size=None, page_token=None): """List topics for the project associated with this client. @@ -65,8 +90,8 @@ def list_topics(self, page_size=None, page_token=None): more topics can be retrieved with another call (pass that value as ``page_token``). """ - conn = self.connection - resources, next_token = conn.list_topics( + api = self.publisher_api + resources, next_token = api.list_topics( self.project, page_size, page_token) topics = [Topic.from_api_repr(resource, self) for resource in resources] @@ -96,8 +121,8 @@ def list_subscriptions(self, page_size=None, page_token=None): more topics can be retrieved with another call (pass that value as ``page_token``). """ - conn = self.connection - resources, next_token = conn.list_subscriptions( + api = self.subscriber_api + resources, next_token = api.list_subscriptions( self.project, page_size, page_token) topics = {} subscriptions = [Subscription.from_api_repr(resource, self, diff --git a/gcloud/pubsub/connection.py b/gcloud/pubsub/connection.py index 63b956d45713..01024b61bf53 100644 --- a/gcloud/pubsub/connection.py +++ b/gcloud/pubsub/connection.py @@ -89,8 +89,19 @@ def build_api_url(self, path, query_params=None, path, query_params=query_params, api_base_url=api_base_url, api_version=api_version) + +class _PublisherAPI(object): + """Helper mapping publisher-related APIs. + + :type connection: :class:`Connection` + :param connection: the connection used to make API requests. + """ + + def __init__(self, connection): + self._connection = connection + def list_topics(self, project, page_size=None, page_token=None): - """List topics for the project associated with this client. + """List topics for the project associated with this API. See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list @@ -113,6 +124,7 @@ def list_topics(self, project, page_size=None, page_token=None): more topics can be retrieved with another call (pass that value as ``page_token``). """ + conn = self._connection params = {} if page_size is not None: @@ -122,45 +134,9 @@ def list_topics(self, project, page_size=None, page_token=None): params['pageToken'] = page_token path = '/projects/%s/topics' % (project,) - resp = self.api_request(method='GET', path=path, query_params=params) + resp = conn.api_request(method='GET', path=path, query_params=params) return resp.get('topics', ()), resp.get('nextPageToken') - def list_subscriptions(self, project, page_size=None, page_token=None): - """List subscriptions for the project associated with this client. - - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list - - :type project: string - :param project: project ID - - :type page_size: int - :param page_size: maximum number of subscriptions to return, If not - passed, defaults to a value set by the API. - - :type page_token: string - :param page_token: opaque marker for the next "page" of subscriptions. - If not passed, the API will return the first page - of subscriptions. - - :rtype: tuple, (list, str) - :returns: list of ``Subscription`` resource dicts, plus a - "next page token" string: if not None, indicates that - more subscriptions can be retrieved with another call (pass - that value as ``page_token``). - """ - params = {} - - if page_size is not None: - params['pageSize'] = page_size - - if page_token is not None: - params['pageToken'] = page_token - - path = '/projects/%s/subscriptions' % (project,) - resp = self.api_request(method='GET', path=path, query_params=params) - return resp.get('subscriptions', ()), resp.get('nextPageToken') - def topic_create(self, topic_path): """API call: create a topic via a PUT request @@ -174,7 +150,8 @@ def topic_create(self, topic_path): :rtype: dict :returns: ``Topic`` resource returned from the API. """ - return self.api_request(method='PUT', path='/%s' % (topic_path,)) + conn = self._connection + return conn.api_request(method='PUT', path='/%s' % (topic_path,)) def topic_get(self, topic_path): """API call: retrieve a topic via a GET request @@ -189,7 +166,8 @@ def topic_get(self, topic_path): :rtype: dict :returns: ``Topic`` resource returned from the API. """ - return self.api_request(method='GET', path='/%s' % (topic_path,)) + conn = self._connection + return conn.api_request(method='GET', path='/%s' % (topic_path,)) def topic_delete(self, topic_path): """API call: delete a topic via a DELETE request @@ -201,7 +179,8 @@ def topic_delete(self, topic_path): :param topic_path: the fully-qualfied path of the topic, in format ``projects//topics/``. """ - self.api_request(method='DELETE', path='/%s' % (topic_path,)) + conn = self._connection + conn.api_request(method='DELETE', path='/%s' % (topic_path,)) def topic_publish(self, topic_path, messages): """API call: publish a message to a topic via a POST request @@ -219,8 +198,9 @@ def topic_publish(self, topic_path, messages): :rtype: list of string :returns: list of opaque IDs for published messages. """ + conn = self._connection data = {'messages': messages} - response = self.api_request( + response = conn.api_request( method='POST', path='/%s:publish' % (topic_path,), data=data) return response['messageIds'] @@ -248,6 +228,7 @@ def topic_list_subscriptions(self, topic_path, page_size=None, :returns: fully-qualified names of subscriptions for the supplied topic. """ + conn = self._connection params = {} if page_size is not None: @@ -257,65 +238,56 @@ def topic_list_subscriptions(self, topic_path, page_size=None, params['pageToken'] = page_token path = '/%s/subscriptions' % (topic_path,) - resp = self.api_request(method='GET', path=path, query_params=params) + resp = conn.api_request(method='GET', path=path, query_params=params) return resp.get('subscriptions', ()), resp.get('nextPageToken') - def get_iam_policy(self, target_path): - """Fetch the IAM policy for the target. - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/getIamPolicy - https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/getIamPolicy +class _SubscriberAPI(object): + """Helper mapping subscriber-related APIs. - :type target_path: string - :param target_path: the path of the target object. + :type connection: :class:`Connection` + :param connection: the connection used to make API requests. + """ - :rtype: dict - :returns: the resource returned by the ``getIamPolicy`` API request. - """ - path = '/%s:getIamPolicy' % (target_path,) - return self.api_request(method='GET', path=path) + def __init__(self, connection): + self._connection = connection - def set_iam_policy(self, target_path, policy): - """Update the IAM policy for the target. + def list_subscriptions(self, project, page_size=None, page_token=None): + """List subscriptions for the project associated with this API. See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/setIamPolicy - https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/setIamPolicy - - :type target_path: string - :param target_path: the path of the target object. + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list - :type policy: dict - :param policy: the new policy resource. + :type project: string + :param project: project ID - :rtype: dict - :returns: the resource returned by the ``setIamPolicy`` API request. - """ - wrapped = {'policy': policy} - path = '/%s:setIamPolicy' % (target_path,) - return self.api_request(method='POST', path=path, data=wrapped) + :type page_size: int + :param page_size: maximum number of subscriptions to return, If not + passed, defaults to a value set by the API. - def test_iam_permissions(self, target_path, permissions): - """Update the IAM policy for the target. + :type page_token: string + :param page_token: opaque marker for the next "page" of subscriptions. + If not passed, the API will return the first page + of subscriptions. - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/testIamPermissions - https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/testIamPermissions + :rtype: tuple, (list, str) + :returns: list of ``Subscription`` resource dicts, plus a + "next page token" string: if not None, indicates that + more subscriptions can be retrieved with another call (pass + that value as ``page_token``). + """ + conn = self._connection + params = {} - :type target_path: string - :param target_path: the path of the target object. + if page_size is not None: + params['pageSize'] = page_size - :type permissions: list of string - :param permissions: the permissions to check + if page_token is not None: + params['pageToken'] = page_token - :rtype: dict - :returns: the resource returned by the ``getIamPolicy`` API request. - """ - wrapped = {'permissions': permissions} - path = '/%s:testIamPermissions' % (target_path,) - resp = self.api_request(method='POST', path=path, data=wrapped) - return resp.get('permissions', []) + path = '/projects/%s/subscriptions' % (project,) + resp = conn.api_request(method='GET', path=path, query_params=params) + return resp.get('subscriptions', ()), resp.get('nextPageToken') def subscription_create(self, subscription_path, topic_path, ack_deadline=None, push_endpoint=None): @@ -346,6 +318,7 @@ def subscription_create(self, subscription_path, topic_path, :rtype: dict :returns: ``Subscription`` resource returned from the API. """ + conn = self._connection path = '/%s' % (subscription_path,) resource = {'topic': topic_path} @@ -355,7 +328,7 @@ def subscription_create(self, subscription_path, topic_path, if push_endpoint is not None: resource['pushConfig'] = {'pushEndpoint': push_endpoint} - return self.api_request(method='PUT', path=path, data=resource) + return conn.api_request(method='PUT', path=path, data=resource) def subscription_get(self, subscription_path): """API call: retrieve a subscription via a GET request @@ -371,8 +344,9 @@ def subscription_get(self, subscription_path): :rtype: dict :returns: ``Subscription`` resource returned from the API. """ + conn = self._connection path = '/%s' % (subscription_path,) - return self.api_request(method='GET', path=path) + return conn.api_request(method='GET', path=path) def subscription_delete(self, subscription_path): """API call: delete a subscription via a DELETE request @@ -385,8 +359,9 @@ def subscription_delete(self, subscription_path): in format ``projects//subscriptions/``. """ + conn = self._connection path = '/%s' % (subscription_path,) - self.api_request(method='DELETE', path=path) + conn.api_request(method='DELETE', path=path) def subscription_modify_push_config(self, subscription_path, push_endpoint): @@ -405,9 +380,10 @@ def subscription_modify_push_config(self, subscription_path, back-end. If not set, the application must pull messages. """ + conn = self._connection path = '/%s:modifyPushConfig' % (subscription_path,) resource = {'pushConfig': {'pushEndpoint': push_endpoint}} - self.api_request(method='POST', path=path, data=resource) + conn.api_request(method='POST', path=path, data=resource) def subscription_pull(self, subscription_path, return_immediately=False, max_messages=1): @@ -433,12 +409,13 @@ def subscription_pull(self, subscription_path, return_immediately=False, :rtype: list of dict :returns: the ``receivedMessages`` element of the response. """ + conn = self._connection path = '/%s:pull' % (subscription_path,) data = { 'returnImmediately': return_immediately, 'maxMessages': max_messages, } - response = self.api_request(method='POST', path=path, data=data) + response = conn.api_request(method='POST', path=path, data=data) return response['receivedMessages'] def subscription_acknowledge(self, subscription_path, ack_ids): @@ -455,11 +432,12 @@ def subscription_acknowledge(self, subscription_path, ack_ids): :type ack_ids: list of string :param ack_ids: ack IDs of messages being acknowledged """ + conn = self._connection path = '/%s:acknowledge' % (subscription_path,) data = { 'ackIds': ack_ids, } - self.api_request(method='POST', path=path, data=data) + conn.api_request(method='POST', path=path, data=data) def subscription_modify_ack_deadline(self, subscription_path, ack_ids, ack_deadline): @@ -480,9 +458,81 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, :param ack_deadline: the deadline (in seconds) by which messages pulled from the back-end must be acknowledged. """ + conn = self._connection path = '/%s:modifyAckDeadline' % (subscription_path,) data = { 'ackIds': ack_ids, 'ackDeadlineSeconds': ack_deadline, } - self.api_request(method='POST', path=path, data=data) + conn.api_request(method='POST', path=path, data=data) + + +class _IAMPolicyAPI(object): + """Helper mapping IAM policy-related APIs. + + :type connection: :class:`Connection` + :param connection: the connection used to make API requests. + """ + + def __init__(self, connection): + self._connection = connection + + def get_iam_policy(self, target_path): + """Fetch the IAM policy for the target. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/getIamPolicy + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/getIamPolicy + + :type target_path: string + :param target_path: the path of the target object. + + :rtype: dict + :returns: the resource returned by the ``getIamPolicy`` API request. + """ + conn = self._connection + path = '/%s:getIamPolicy' % (target_path,) + return conn.api_request(method='GET', path=path) + + def set_iam_policy(self, target_path, policy): + """Update the IAM policy for the target. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/setIamPolicy + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/setIamPolicy + + :type target_path: string + :param target_path: the path of the target object. + + :type policy: dict + :param policy: the new policy resource. + + :rtype: dict + :returns: the resource returned by the ``setIamPolicy`` API request. + """ + conn = self._connection + wrapped = {'policy': policy} + path = '/%s:setIamPolicy' % (target_path,) + return conn.api_request(method='POST', path=path, data=wrapped) + + def test_iam_permissions(self, target_path, permissions): + """Update the IAM policy for the target. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/testIamPermissions + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/testIamPermissions + + :type target_path: string + :param target_path: the path of the target object. + + :type permissions: list of string + :param permissions: the permissions to check + + :rtype: dict + :returns: the resource returned by the ``getIamPolicy`` API request. + """ + conn = self._connection + wrapped = {'permissions': permissions} + path = '/%s:testIamPermissions' % (target_path,) + resp = conn.api_request(method='POST', path=path, data=wrapped) + return resp.get('permissions', []) diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index 0d7fb5aa99dd..5f01a9db8baf 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -144,7 +144,8 @@ def create(self, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - client.connection.subscription_create( + api = client.subscriber_api + api.subscription_create( self.full_name, self.topic.full_name, self.ack_deadline, self.push_endpoint) @@ -159,8 +160,9 @@ def exists(self, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) + api = client.subscriber_api try: - client.connection.subscription_get(self.full_name) + api.subscription_get(self.full_name) except NotFound: return False else: @@ -177,7 +179,8 @@ def reload(self, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - data = client.connection.subscription_get(self.full_name) + api = client.subscriber_api + data = api.subscription_get(self.full_name) self.ack_deadline = data.get('ackDeadlineSeconds') push_config = data.get('pushConfig', {}) self.push_endpoint = push_config.get('pushEndpoint') @@ -193,7 +196,8 @@ def delete(self, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - client.connection.subscription_delete(self.full_name) + api = client.subscriber_api + api.subscription_delete(self.full_name) def modify_push_configuration(self, push_endpoint, client=None): """API call: update the push endpoint for the subscription. @@ -211,8 +215,8 @@ def modify_push_configuration(self, push_endpoint, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - client.connection.subscription_modify_push_config( - self.full_name, push_endpoint) + api = client.subscriber_api + api.subscription_modify_push_config(self.full_name, push_endpoint) self.push_endpoint = push_endpoint def pull(self, return_immediately=False, max_messages=1, client=None): @@ -240,7 +244,8 @@ def pull(self, return_immediately=False, max_messages=1, client=None): is an instance of :class:`gcloud.pubsub.message.Message`. """ client = self._require_client(client) - response = client.connection.subscription_pull( + api = client.subscriber_api + response = api.subscription_pull( self.full_name, return_immediately, max_messages) return [(info['ackId'], Message.from_api_repr(info['message'])) for info in response] @@ -259,7 +264,8 @@ def acknowledge(self, ack_ids, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - client.connection.subscription_acknowledge(self.full_name, ack_ids) + api = client.subscriber_api + api.subscription_acknowledge(self.full_name, ack_ids) def modify_ack_deadline(self, ack_ids, ack_deadline, client=None): """API call: update acknowledgement deadline for a retrieved message. @@ -278,7 +284,8 @@ def modify_ack_deadline(self, ack_ids, ack_deadline, client=None): ``client`` stored on the current subscription's topic. """ client = self._require_client(client) - client.connection.subscription_modify_ack_deadline( + api = client.subscriber_api + api.subscription_modify_ack_deadline( self.full_name, ack_ids, ack_deadline) def get_iam_policy(self, client=None): @@ -296,7 +303,8 @@ def get_iam_policy(self, client=None): ``getIamPolicy`` API request. """ client = self._require_client(client) - resp = client.connection.get_iam_policy(self.full_name) + api = client.iam_policy_api + resp = api.get_iam_policy(self.full_name) return Policy.from_api_repr(resp) def set_iam_policy(self, policy, client=None): @@ -318,8 +326,9 @@ def set_iam_policy(self, policy, client=None): ``setIamPolicy`` API request. """ client = self._require_client(client) + api = client.iam_policy_api resource = policy.to_api_repr() - resp = client.connection.set_iam_policy(self.full_name, resource) + resp = api.set_iam_policy(self.full_name, resource) return Policy.from_api_repr(resp) def check_iam_permissions(self, permissions, client=None): @@ -339,5 +348,6 @@ def check_iam_permissions(self, permissions, client=None): :returns: subset of ``permissions`` allowed by current IAM policy. """ client = self._require_client(client) - return client.connection.test_iam_permissions( + api = client.iam_policy_api + return api.test_iam_permissions( self.full_name, list(permissions)) diff --git a/gcloud/pubsub/test_client.py b/gcloud/pubsub/test_client.py index f4e1a89836d3..7a5a54d56e87 100644 --- a/gcloud/pubsub/test_client.py +++ b/gcloud/pubsub/test_client.py @@ -29,22 +29,58 @@ def _getTargetClass(self): def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) + def test_publisher_api(self): + from gcloud.pubsub.connection import _PublisherAPI + creds = _Credentials() + client = self._makeOne(project=self.PROJECT, credentials=creds) + conn = client.connection = object() + api = client.publisher_api + self.assertIsInstance(api, _PublisherAPI) + self.assertTrue(api._connection is conn) + # API instance is cached + again = client.publisher_api + self.assertTrue(again is api) + + def test_subscriber_api(self): + from gcloud.pubsub.connection import _SubscriberAPI + creds = _Credentials() + client = self._makeOne(project=self.PROJECT, credentials=creds) + conn = client.connection = object() + api = client.subscriber_api + self.assertIsInstance(api, _SubscriberAPI) + self.assertTrue(api._connection is conn) + # API instance is cached + again = client.subscriber_api + self.assertTrue(again is api) + + def test_iam_policy_api(self): + from gcloud.pubsub.connection import _IAMPolicyAPI + creds = _Credentials() + client = self._makeOne(project=self.PROJECT, credentials=creds) + conn = client.connection = object() + api = client.iam_policy_api + self.assertIsInstance(api, _IAMPolicyAPI) + self.assertTrue(api._connection is conn) + # API instance is cached + again = client.iam_policy_api + self.assertTrue(again is api) + def test_list_topics_no_paging(self): from gcloud.pubsub.topic import Topic creds = _Credentials() client = self._makeOne(project=self.PROJECT, credentials=creds) - conn = client.connection = _Connection() - conn._list_topics_response = [{'name': self.TOPIC_PATH}], None + client.connection = object() + api = client._publisher_api = _FauxPublisherAPI() + api._list_topics_response = [{'name': self.TOPIC_PATH}], None topics, next_page_token = client.list_topics() self.assertEqual(len(topics), 1) - self.assertTrue(isinstance(topics[0], Topic)) + self.assertIsInstance(topics[0], Topic) self.assertEqual(topics[0].name, self.TOPIC_NAME) self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._listed_topics, (self.PROJECT, None, None)) + self.assertEqual(api._listed_topics, (self.PROJECT, None, None)) def test_list_topics_with_paging(self): from gcloud.pubsub.topic import Topic @@ -53,51 +89,51 @@ def test_list_topics_with_paging(self): SIZE = 1 creds = _Credentials() client = self._makeOne(project=self.PROJECT, credentials=creds) - conn = client.connection = _Connection() - conn._list_topics_response = [{'name': self.TOPIC_PATH}], TOKEN2 + client.connection = object() + api = client._publisher_api = _FauxPublisherAPI() + api._list_topics_response = [{'name': self.TOPIC_PATH}], TOKEN2 topics, next_page_token = client.list_topics(SIZE, TOKEN1) self.assertEqual(len(topics), 1) - self.assertTrue(isinstance(topics[0], Topic)) + self.assertIsInstance(topics[0], Topic) self.assertEqual(topics[0].name, self.TOPIC_NAME) self.assertEqual(next_page_token, TOKEN2) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._listed_topics, (self.PROJECT, 1, TOKEN1)) + self.assertEqual(api._listed_topics, (self.PROJECT, 1, TOKEN1)) def test_list_topics_missing_key(self): creds = _Credentials() client = self._makeOne(project=self.PROJECT, credentials=creds) - conn = client.connection = _Connection() - conn._list_topics_response = (), None + client.connection = object() + api = client._publisher_api = _FauxPublisherAPI() + api._list_topics_response = (), None topics, next_page_token = client.list_topics() self.assertEqual(len(topics), 0) self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._listed_topics, (self.PROJECT, None, None)) + self.assertEqual(api._listed_topics, (self.PROJECT, None, None)) def test_list_subscriptions_no_paging(self): from gcloud.pubsub.subscription import Subscription SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} creds = _Credentials() client = self._makeOne(project=self.PROJECT, credentials=creds) - conn = client.connection = _Connection() - conn._list_subscriptions_response = [SUB_INFO], None + client.connection = object() + api = client._subscriber_api = _FauxSubscriberAPI() + api._list_subscriptions_response = [SUB_INFO], None subscriptions, next_page_token = client.list_subscriptions() self.assertEqual(len(subscriptions), 1) - self.assertTrue(isinstance(subscriptions[0], Subscription)) + self.assertIsInstance(subscriptions[0], Subscription) self.assertEqual(subscriptions[0].name, self.SUB_NAME) self.assertEqual(subscriptions[0].topic.name, self.TOPIC_NAME) self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._listed_subscriptions, + self.assertEqual(api._listed_subscriptions, (self.PROJECT, None, None)) def test_list_subscriptions_with_paging(self): @@ -114,22 +150,22 @@ def test_list_subscriptions_with_paging(self): TOKEN1 = 'TOKEN1' TOKEN2 = 'TOKEN2' SIZE = 1 - conn = client.connection = _Connection() - conn._list_subscriptions_response = [SUB_INFO], TOKEN2 + client.connection = object() + api = client._subscriber_api = _FauxSubscriberAPI() + api._list_subscriptions_response = [SUB_INFO], TOKEN2 subscriptions, next_page_token = client.list_subscriptions( SIZE, TOKEN1) self.assertEqual(len(subscriptions), 1) - self.assertTrue(isinstance(subscriptions[0], Subscription)) + self.assertIsInstance(subscriptions[0], Subscription) self.assertEqual(subscriptions[0].name, self.SUB_NAME) self.assertEqual(subscriptions[0].topic.name, self.TOPIC_NAME) self.assertEqual(subscriptions[0].ack_deadline, ACK_DEADLINE) self.assertEqual(subscriptions[0].push_endpoint, PUSH_ENDPOINT) self.assertEqual(next_page_token, TOKEN2) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._listed_subscriptions, + self.assertEqual(api._listed_subscriptions, (self.PROJECT, SIZE, TOKEN1)) def test_list_subscriptions_w_missing_key(self): @@ -137,16 +173,16 @@ def test_list_subscriptions_w_missing_key(self): creds = _Credentials() client = self._makeOne(project=PROJECT, credentials=creds) - conn = client.connection = _Connection() - conn._list_subscriptions_response = (), None + client.connection = object() + api = client._subscriber_api = _FauxSubscriberAPI() + api._list_subscriptions_response = (), None subscriptions, next_page_token = client.list_subscriptions() self.assertEqual(len(subscriptions), 0) self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._listed_subscriptions, + self.assertEqual(api._listed_subscriptions, (self.PROJECT, None, None)) def test_topic(self): @@ -177,16 +213,15 @@ def create_scoped(self, scope): return self -class _Connection(object): - - def __init__(self, *responses): - self._responses = responses - self._requested = [] +class _FauxPublisherAPI(object): def list_topics(self, project, page_size, page_token): self._listed_topics = (project, page_size, page_token) return self._list_topics_response + +class _FauxSubscriberAPI(object): + def list_subscriptions(self, project, page_size, page_token): self._listed_subscriptions = (project, page_size, page_token) return self._list_subscriptions_response diff --git a/gcloud/pubsub/test_connection.py b/gcloud/pubsub/test_connection.py index d25338126135..03895dda0bab 100644 --- a/gcloud/pubsub/test_connection.py +++ b/gcloud/pubsub/test_connection.py @@ -15,7 +15,7 @@ import unittest2 -class TestConnection(unittest2.TestCase): +class _Base(unittest2.TestCase): PROJECT = 'PROJECT' LIST_TOPICS_PATH = 'projects/%s/topics' % (PROJECT,) LIST_SUBSCRIPTIONS_PATH = 'projects/%s/subscriptions' % (PROJECT,) @@ -25,13 +25,16 @@ class TestConnection(unittest2.TestCase): SUB_NAME = 'subscription_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + +class TestConnection(_Base): + def _getTargetClass(self): from gcloud.pubsub.connection import Connection return Connection - def _makeOne(self, *args, **kw): - return self._getTargetClass()(*args, **kw) - def test_default_url(self): conn = self._makeOne() klass = self._getTargetClass() @@ -110,39 +113,40 @@ def test_build_api_url_w_base_url_override(self): self.assertEqual(conn.build_api_url('/foo', api_base_url=base_url2), URI) - def _verify_uri(self, uri, expected_path, **expected_qs): - from six.moves.urllib import parse - klass = self._getTargetClass() - scheme, netloc, path, query, _ = parse.urlsplit(uri) - self.assertEqual('%s://%s' % (scheme, netloc), klass.API_BASE_URL) - self.assertEqual(path, '/%s/%s' % (klass.API_VERSION, expected_path)) - qs = dict(parse.parse_qsl(query)) - self.assertEqual(qs, expected_qs) + +class Test_PublisherAPI(_Base): + + def _getTargetClass(self): + from gcloud.pubsub.connection import _PublisherAPI + return _PublisherAPI + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor(self): + connection = _Connection() + api = self._makeOne(connection) + self.assertTrue(api._connection is connection) def test_list_topics_no_paging(self): - import json RETURNED = {'topics': [{'name': self.TOPIC_PATH}]} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - topics, next_token = conn.list_topics(self.PROJECT) + topics, next_token = api.list_topics(self.PROJECT) self.assertEqual(len(topics), 1) topic = topics[0] - self.assertTrue(isinstance(topic, dict)) + self.assertIsInstance(topic, dict) self.assertEqual(topic['name'], self.TOPIC_PATH) self.assertEqual(next_token, None) - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], self.LIST_TOPICS_PATH) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_TOPICS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], {}) def test_list_topics_with_paging(self): - import json TOKEN1 = 'TOKEN1' TOKEN2 = 'TOKEN2' SIZE = 1 @@ -150,225 +154,115 @@ def test_list_topics_with_paging(self): 'topics': [{'name': self.TOPIC_PATH}], 'nextPageToken': 'TOKEN2', } - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - topics, next_token = conn.list_topics( + topics, next_token = api.list_topics( self.PROJECT, page_token=TOKEN1, page_size=SIZE) self.assertEqual(len(topics), 1) topic = topics[0] - self.assertTrue(isinstance(topic, dict)) + self.assertIsInstance(topic, dict) self.assertEqual(topic['name'], self.TOPIC_PATH) self.assertEqual(next_token, TOKEN2) - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], self.LIST_TOPICS_PATH, - pageToken=TOKEN1, pageSize=str(SIZE)) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_TOPICS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], + {'pageToken': TOKEN1, 'pageSize': SIZE}) def test_list_topics_missing_key(self): - import json RETURNED = {} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - topics, next_token = conn.list_topics(self.PROJECT) + topics, next_token = api.list_topics(self.PROJECT) self.assertEqual(len(topics), 0) self.assertEqual(next_token, None) - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], self.LIST_TOPICS_PATH) - self.assertEqual(http._called_with['body'], None) - - def test_list_subscriptions_no_paging(self): - import json - SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} - RETURNED = {'subscriptions': [SUB_INFO]} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) - - subscriptions, next_token = conn.list_subscriptions(self.PROJECT) - - self.assertEqual(len(subscriptions), 1) - subscription = subscriptions[0] - self.assertTrue(isinstance(subscription, dict)) - self.assertEqual(subscription['name'], self.SUB_PATH) - self.assertEqual(subscription['topic'], self.TOPIC_PATH) - self.assertEqual(next_token, None) - - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], - self.LIST_SUBSCRIPTIONS_PATH) - self.assertEqual(http._called_with['body'], None) - - def test_list_subscriptions_with_paging(self): - import json - TOKEN1 = 'TOKEN1' - TOKEN2 = 'TOKEN2' - SIZE = 1 - SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} - RETURNED = { - 'subscriptions': [SUB_INFO], - 'nextPageToken': 'TOKEN2', - } - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) - - subscriptions, next_token = conn.list_subscriptions( - self.PROJECT, page_token=TOKEN1, page_size=SIZE) - - self.assertEqual(len(subscriptions), 1) - subscription = subscriptions[0] - self.assertTrue(isinstance(subscription, dict)) - self.assertEqual(subscription['name'], self.SUB_PATH) - self.assertEqual(subscription['topic'], self.TOPIC_PATH) - self.assertEqual(next_token, TOKEN2) - - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], - self.LIST_SUBSCRIPTIONS_PATH, - pageToken=TOKEN1, pageSize=str(SIZE)) - self.assertEqual(http._called_with['body'], None) - - def test_list_subscriptions_missing_key(self): - import json - RETURNED = {} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) - - subscriptions, next_token = conn.list_subscriptions(self.PROJECT) - - self.assertEqual(len(subscriptions), 0) - self.assertEqual(next_token, None) - - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], - self.LIST_SUBSCRIPTIONS_PATH) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_TOPICS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], {}) def test_topic_create(self): - import json RETURNED = {'name': self.TOPIC_PATH} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - resource = conn.topic_create(self.TOPIC_PATH) + resource = api.topic_create(self.TOPIC_PATH) self.assertEqual(resource, RETURNED) - self.assertEqual(http._called_with['method'], 'PUT') - self._verify_uri(http._called_with['uri'], self.TOPIC_PATH) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'PUT') + path = '/%s' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) def test_topic_get(self): - import json RETURNED = {'name': self.TOPIC_PATH} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - resource = conn.topic_get(self.TOPIC_PATH) + resource = api.topic_get(self.TOPIC_PATH) self.assertEqual(resource, RETURNED) - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], self.TOPIC_PATH) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) def test_topic_delete(self): - import json - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps({})) - conn = self._makeOne(http=http) + RETURNED = {} + connection = _Connection(RETURNED) + api = self._makeOne(connection) - conn.topic_delete(self.TOPIC_PATH) + api.topic_delete(self.TOPIC_PATH) - self.assertEqual(http._called_with['method'], 'DELETE') - self._verify_uri(http._called_with['uri'], self.TOPIC_PATH) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'DELETE') + path = '/%s' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) def test_topic_publish(self): import base64 - import json PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {}} RETURNED = {'messageIds': [MSGID]} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - resource = conn.topic_publish(self.TOPIC_PATH, [MESSAGE]) + resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) self.assertEqual(resource, [MSGID]) - self.assertEqual(http._called_with['method'], 'POST') - self._verify_uri(http._called_with['uri'], - '%s:publish' % (self.TOPIC_PATH,)) - self.assertEqual(http._called_with['body'], - json.dumps({'messages': [MESSAGE]})) + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:publish' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], + {'messages': [MESSAGE]}) def test_topic_list_subscriptions_no_paging(self): - import json SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} RETURNED = {'subscriptions': [SUB_INFO]} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - subscriptions, next_token = conn.topic_list_subscriptions( + subscriptions, next_token = api.topic_list_subscriptions( self.TOPIC_PATH) self.assertEqual(len(subscriptions), 1) subscription = subscriptions[0] - self.assertTrue(isinstance(subscription, dict)) + self.assertIsInstance(subscription, dict) self.assertEqual(subscription['name'], self.SUB_PATH) self.assertEqual(subscription['topic'], self.TOPIC_PATH) self.assertEqual(next_token, None) - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], - self.LIST_TOPIC_SUBSCRIPTIONS_PATH) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_TOPIC_SUBSCRIPTIONS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], {}) def test_topic_list_subscriptions_with_paging(self): - import json TOKEN1 = 'TOKEN1' TOKEN2 = 'TOKEN2' SIZE = 1 @@ -377,182 +271,135 @@ def test_topic_list_subscriptions_with_paging(self): 'subscriptions': [SUB_INFO], 'nextPageToken': 'TOKEN2', } - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - subscriptions, next_token = conn.topic_list_subscriptions( + subscriptions, next_token = api.topic_list_subscriptions( self.TOPIC_PATH, page_token=TOKEN1, page_size=SIZE) self.assertEqual(len(subscriptions), 1) subscription = subscriptions[0] - self.assertTrue(isinstance(subscription, dict)) + self.assertIsInstance(subscription, dict) self.assertEqual(subscription['name'], self.SUB_PATH) self.assertEqual(subscription['topic'], self.TOPIC_PATH) self.assertEqual(next_token, TOKEN2) - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], - self.LIST_TOPIC_SUBSCRIPTIONS_PATH, - pageToken=TOKEN1, pageSize=str(SIZE)) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_TOPIC_SUBSCRIPTIONS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], + {'pageToken': TOKEN1, 'pageSize': SIZE}) def test_topic_list_subscriptions_missing_key(self): - import json RETURNED = {} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - subscriptions, next_token = conn.topic_list_subscriptions( + subscriptions, next_token = api.topic_list_subscriptions( self.TOPIC_PATH) self.assertEqual(len(subscriptions), 0) self.assertEqual(next_token, None) - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], - self.LIST_TOPIC_SUBSCRIPTIONS_PATH) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_TOPIC_SUBSCRIPTIONS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], {}) - def test_get_iam_policy(self): - import json - from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE - PATH = '%s:getIamPolicy' % (self.TOPIC_PATH,) - OWNER1 = 'user:phred@example.com' - OWNER2 = 'group:cloud-logs@google.com' - EDITOR1 = 'domain:google.com' - EDITOR2 = 'user:phred@example.com' - VIEWER1 = 'serviceAccount:1234-abcdef@service.example.com' - VIEWER2 = 'user:phred@example.com' - RETURNED = { - 'etag': 'DEADBEEF', - 'version': 17, - 'bindings': [ - {'role': OWNER_ROLE, 'members': [OWNER1, OWNER2]}, - {'role': EDITOR_ROLE, 'members': [EDITOR1, EDITOR2]}, - {'role': VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, - ], - } - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) - policy = conn.get_iam_policy(self.TOPIC_PATH) +class Test_SubscriberAPI(_Base): - self.assertEqual(policy, RETURNED) - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], PATH) - self.assertEqual(http._called_with['body'], None) + def _getTargetClass(self): + from gcloud.pubsub.connection import _SubscriberAPI + return _SubscriberAPI - def test_set_iam_policy(self): - import json - from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE - PATH = '%s:setIamPolicy' % (self.TOPIC_PATH,) - OWNER1 = 'user:phred@example.com' - OWNER2 = 'group:cloud-logs@google.com' - EDITOR1 = 'domain:google.com' - EDITOR2 = 'user:phred@example.com' - VIEWER1 = 'serviceAccount:1234-abcdef@service.example.com' - VIEWER2 = 'user:phred@example.com' - POLICY = { - 'etag': 'DEADBEEF', - 'version': 17, - 'bindings': [ - {'role': OWNER_ROLE, 'members': [OWNER1, OWNER2]}, - {'role': EDITOR_ROLE, 'members': [EDITOR1, EDITOR2]}, - {'role': VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, - ], - } - RETURNED = POLICY.copy() - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) - policy = conn.set_iam_policy(self.TOPIC_PATH, POLICY) + def test_ctor(self): + connection = _Connection() + api = self._makeOne(connection) + self.assertTrue(api._connection is connection) - self.assertEqual(policy, RETURNED) - self.assertEqual(http._called_with['method'], 'POST') - self._verify_uri(http._called_with['uri'], PATH) - self.assertEqual(http._called_with['body'], - json.dumps({'policy': POLICY})) + def test_list_subscriptions_no_paging(self): + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} + RETURNED = {'subscriptions': [SUB_INFO]} + connection = _Connection(RETURNED) + api = self._makeOne(connection) - def test_test_iam_permissions(self): - import json - from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE - PATH = '%s:testIamPermissions' % (self.TOPIC_PATH,) - ALL_ROLES = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE] - ALLOWED = ALL_ROLES[1:] - RETURNED = {'permissions': ALLOWED} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', + subscriptions, next_token = api.list_subscriptions(self.PROJECT) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertIsInstance(subscription, dict) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(next_token, None) + + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_SUBSCRIPTIONS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], {}) + + def test_list_subscriptions_with_paging(self): + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} + RETURNED = { + 'subscriptions': [SUB_INFO], + 'nextPageToken': 'TOKEN2', } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - allowed = conn.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES) + subscriptions, next_token = api.list_subscriptions( + self.PROJECT, page_token=TOKEN1, page_size=SIZE) - self.assertEqual(allowed, ALLOWED) - self.assertEqual(http._called_with['method'], 'POST') - self._verify_uri(http._called_with['uri'], PATH) - self.assertEqual(http._called_with['body'], - json.dumps({'permissions': ALL_ROLES})) + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertIsInstance(subscription, dict) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(next_token, TOKEN2) - def test_test_iam_permissions_missing_key(self): - import json - from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE - PATH = '%s:testIamPermissions' % (self.TOPIC_PATH,) - ALL_ROLES = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE] + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_SUBSCRIPTIONS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], + {'pageToken': TOKEN1, 'pageSize': SIZE}) + + def test_list_subscriptions_missing_key(self): RETURNED = {} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - allowed = conn.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES) + subscriptions, next_token = api.list_subscriptions(self.PROJECT) - self.assertEqual(allowed, []) - self.assertEqual(http._called_with['method'], 'POST') - self._verify_uri(http._called_with['uri'], PATH) - self.assertEqual(http._called_with['body'], - json.dumps({'permissions': ALL_ROLES})) + self.assertEqual(len(subscriptions), 0) + self.assertEqual(next_token, None) + + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_SUBSCRIPTIONS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], {}) def test_subscription_create_defaults(self): - import json RESOURCE = {'topic': self.TOPIC_PATH} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } RETURNED = RESOURCE.copy() RETURNED['name'] = self.SUB_PATH - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - resource = conn.subscription_create(self.SUB_PATH, self.TOPIC_PATH) + resource = api.subscription_create(self.SUB_PATH, self.TOPIC_PATH) self.assertEqual(resource, RETURNED) - self.assertEqual(http._called_with['method'], 'PUT') - self._verify_uri(http._called_with['uri'], self.SUB_PATH) - self.assertEqual(http._called_with['body'], json.dumps(RESOURCE)) + self.assertEqual(connection._called_with['method'], 'PUT') + path = '/%s' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], RESOURCE) def test_subscription_create_explicit(self): - import json ACK_DEADLINE = 90 PUSH_ENDPOINT = 'https://api.example.com/push' RESOURCE = { @@ -562,26 +409,22 @@ def test_subscription_create_explicit(self): 'pushEndpoint': PUSH_ENDPOINT, }, } - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } RETURNED = RESOURCE.copy() RETURNED['name'] = self.SUB_PATH - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - resource = conn.subscription_create( + resource = api.subscription_create( self.SUB_PATH, self.TOPIC_PATH, ack_deadline=ACK_DEADLINE, push_endpoint=PUSH_ENDPOINT) self.assertEqual(resource, RETURNED) - self.assertEqual(http._called_with['method'], 'PUT') - self._verify_uri(http._called_with['uri'], self.SUB_PATH) - self.assertEqual(http._called_with['body'], json.dumps(RESOURCE)) + self.assertEqual(connection._called_with['method'], 'PUT') + path = '/%s' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], RESOURCE) def test_subscription_get(self): - import json ACK_DEADLINE = 90 PUSH_ENDPOINT = 'https://api.example.com/push' RETURNED = { @@ -590,60 +433,45 @@ def test_subscription_get(self): 'ackDeadlineSeconds': ACK_DEADLINE, 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}, } - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - resource = conn.subscription_get(self.SUB_PATH) + resource = api.subscription_get(self.SUB_PATH) self.assertEqual(resource, RETURNED) - self.assertEqual(http._called_with['method'], 'GET') - self._verify_uri(http._called_with['uri'], self.SUB_PATH) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) def test_subscription_delete(self): - import json RETURNED = {} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - conn.subscription_delete(self.SUB_PATH) + api.subscription_delete(self.SUB_PATH) - self.assertEqual(http._called_with['method'], 'DELETE') - self._verify_uri(http._called_with['uri'], self.SUB_PATH) - self.assertEqual(http._called_with['body'], None) + self.assertEqual(connection._called_with['method'], 'DELETE') + path = '/%s' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) def test_subscription_modify_push_config(self): - import json PUSH_ENDPOINT = 'https://api.example.com/push' BODY = { 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}, } RETURNED = {} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - conn.subscription_modify_push_config(self.SUB_PATH, PUSH_ENDPOINT) + api.subscription_modify_push_config(self.SUB_PATH, PUSH_ENDPOINT) - self.assertEqual(http._called_with['method'], 'POST') - self._verify_uri(http._called_with['uri'], - '%s:modifyPushConfig' % (self.SUB_PATH,)) - self.assertEqual(http._called_with['body'], json.dumps(BODY)) + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:modifyPushConfig' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], BODY) def test_subscription_pull_defaults(self): import base64 - import json PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') ACK_ID = 'DEADBEEF' @@ -652,28 +480,23 @@ def test_subscription_pull_defaults(self): RETURNED = { 'receivedMessages': [{'ackId': ACK_ID, 'message': MESSAGE}], } - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } + connection = _Connection(RETURNED) + api = self._makeOne(connection) BODY = { 'returnImmediately': False, 'maxMessages': 1, } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) - received = conn.subscription_pull(self.SUB_PATH) + received = api.subscription_pull(self.SUB_PATH) self.assertEqual(received, RETURNED['receivedMessages']) - self.assertEqual(http._called_with['method'], 'POST') - self._verify_uri(http._called_with['uri'], - '%s:pull' % (self.SUB_PATH,)) - self.assertEqual(http._called_with['body'], json.dumps(BODY)) + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:pull' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], BODY) def test_subscription_pull_explicit(self): import base64 - import json PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') ACK_ID = 'DEADBEEF' @@ -682,51 +505,41 @@ def test_subscription_pull_explicit(self): RETURNED = { 'receivedMessages': [{'ackId': ACK_ID, 'message': MESSAGE}], } - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } + connection = _Connection(RETURNED) + api = self._makeOne(connection) MAX_MESSAGES = 10 BODY = { 'returnImmediately': True, 'maxMessages': MAX_MESSAGES, } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) - received = conn.subscription_pull( + received = api.subscription_pull( self.SUB_PATH, return_immediately=True, max_messages=MAX_MESSAGES) self.assertEqual(received, RETURNED['receivedMessages']) - self.assertEqual(http._called_with['method'], 'POST') - self._verify_uri(http._called_with['uri'], - '%s:pull' % (self.SUB_PATH,)) - self.assertEqual(http._called_with['body'], json.dumps(BODY)) + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:pull' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], BODY) def test_subscription_acknowledge(self): - import json ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' BODY = { 'ackIds': [ACK_ID1, ACK_ID2], } RETURNED = {} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - conn.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) + api.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) - self.assertEqual(http._called_with['method'], 'POST') - self._verify_uri(http._called_with['uri'], - '%s:acknowledge' % (self.SUB_PATH,)) - self.assertEqual(http._called_with['body'], json.dumps(BODY)) + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:acknowledge' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], BODY) def test_subscription_modify_ack_deadline(self): - import json ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' NEW_DEADLINE = 90 @@ -735,31 +548,128 @@ def test_subscription_modify_ack_deadline(self): 'ackDeadlineSeconds': NEW_DEADLINE, } RETURNED = {} - HEADERS = { - 'status': '200', - 'content-type': 'application/json', - } - http = _Http(HEADERS, json.dumps(RETURNED)) - conn = self._makeOne(http=http) + connection = _Connection(RETURNED) + api = self._makeOne(connection) - conn.subscription_modify_ack_deadline( + api.subscription_modify_ack_deadline( self.SUB_PATH, [ACK_ID1, ACK_ID2], NEW_DEADLINE) - self.assertEqual(http._called_with['method'], 'POST') - self._verify_uri(http._called_with['uri'], - '%s:modifyAckDeadline' % (self.SUB_PATH,)) - self.assertEqual(http._called_with['body'], json.dumps(BODY)) + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:modifyAckDeadline' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], BODY) + + +class Test_IAMPolicyAPI(_Base): + + def _getTargetClass(self): + from gcloud.pubsub.connection import _IAMPolicyAPI + return _IAMPolicyAPI + + def test_ctor(self): + connection = _Connection() + api = self._makeOne(connection) + self.assertTrue(api._connection is connection) + + def test_get_iam_policy(self): + from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE + OWNER1 = 'user:phred@example.com' + OWNER2 = 'group:cloud-logs@google.com' + EDITOR1 = 'domain:google.com' + EDITOR2 = 'user:phred@example.com' + VIEWER1 = 'serviceAccount:1234-abcdef@service.example.com' + VIEWER2 = 'user:phred@example.com' + RETURNED = { + 'etag': 'DEADBEEF', + 'version': 17, + 'bindings': [ + {'role': OWNER_ROLE, 'members': [OWNER1, OWNER2]}, + {'role': EDITOR_ROLE, 'members': [EDITOR1, EDITOR2]}, + {'role': VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, + ], + } + connection = _Connection(RETURNED) + api = self._makeOne(connection) + + policy = api.get_iam_policy(self.TOPIC_PATH) + + self.assertEqual(policy, RETURNED) + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s:getIamPolicy' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + + def test_set_iam_policy(self): + from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE + OWNER1 = 'user:phred@example.com' + OWNER2 = 'group:cloud-logs@google.com' + EDITOR1 = 'domain:google.com' + EDITOR2 = 'user:phred@example.com' + VIEWER1 = 'serviceAccount:1234-abcdef@service.example.com' + VIEWER2 = 'user:phred@example.com' + POLICY = { + 'etag': 'DEADBEEF', + 'version': 17, + 'bindings': [ + {'role': OWNER_ROLE, 'members': [OWNER1, OWNER2]}, + {'role': EDITOR_ROLE, 'members': [EDITOR1, EDITOR2]}, + {'role': VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, + ], + } + RETURNED = POLICY.copy() + connection = _Connection(RETURNED) + api = self._makeOne(connection) + + policy = api.set_iam_policy(self.TOPIC_PATH, POLICY) + + self.assertEqual(policy, RETURNED) + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:setIamPolicy' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], + {'policy': POLICY}) + + def test_test_iam_permissions(self): + from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE + ALL_ROLES = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE] + ALLOWED = ALL_ROLES[1:] + RETURNED = {'permissions': ALLOWED} + connection = _Connection(RETURNED) + api = self._makeOne(connection) + + allowed = api.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES) + + self.assertEqual(allowed, ALLOWED) + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:testIamPermissions' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], + {'permissions': ALL_ROLES}) + + def test_test_iam_permissions_missing_key(self): + from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE + ALL_ROLES = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE] + RETURNED = {} + connection = _Connection(RETURNED) + api = self._makeOne(connection) + + allowed = api.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES) + + self.assertEqual(allowed, []) + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:testIamPermissions' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], + {'permissions': ALL_ROLES}) -class _Http(object): +class _Connection(object): _called_with = None - def __init__(self, headers, content): - from httplib2 import Response - self._response = Response(headers) - self._content = content + def __init__(self, *responses): + self._responses = responses - def request(self, **kw): + def api_request(self, **kw): self._called_with = kw - return self._response, self._content + response, self._responses = self._responses[0], self._responses[1:] + return response diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index cb4cbb75ba83..fdcc03aa4d50 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -78,7 +78,7 @@ def test_from_api_repr_no_topics(self): subscription = klass.from_api_repr(resource, client) self.assertEqual(subscription.name, self.SUB_NAME) topic = subscription.topic - self.assertTrue(isinstance(topic, Topic)) + self.assertIsInstance(topic, Topic) self.assertEqual(topic.name, self.TOPIC_NAME) self.assertEqual(topic.project, self.PROJECT) self.assertEqual(subscription.ack_deadline, self.DEADLINE) @@ -110,7 +110,7 @@ def test_from_api_repr_w_topics_no_topic_match(self): subscription = klass.from_api_repr(resource, client, topics=topics) self.assertEqual(subscription.name, self.SUB_NAME) topic = subscription.topic - self.assertTrue(isinstance(topic, Topic)) + self.assertIsInstance(topic, Topic) self.assertTrue(topic is topics[self.TOPIC_PATH]) self.assertEqual(topic.name, self.TOPIC_NAME) self.assertEqual(topic.project, self.PROJECT) @@ -137,16 +137,15 @@ def test_create_pull_wo_ack_deadline_w_bound_client(self): 'topic': self.TOPIC_PATH, 'name': self.SUB_PATH, } - conn = _Connection() - conn._subscription_create_response = RESPONSE - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_create_response = RESPONSE topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) subscription.create() - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._subscription_created, + self.assertEqual(api._subscription_created, (self.SUB_PATH, self.TOPIC_PATH, None, None)) def test_create_push_w_ack_deadline_w_alternate_client(self): @@ -156,51 +155,42 @@ def test_create_push_w_ack_deadline_w_alternate_client(self): 'ackDeadlineSeconds': self.DEADLINE, 'pushConfig': {'pushEndpoint': self.ENDPOINT} } - conn = _Connection() - conn._subscription_create_response = RESPONSE - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._subscription_create_response = RESPONSE - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_create_response = RESPONSE topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic, self.DEADLINE, self.ENDPOINT) subscription.create(client=client2) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) self.assertEqual( - conn2._subscription_created, + api._subscription_created, (self.SUB_PATH, self.TOPIC_PATH, self.DEADLINE, self.ENDPOINT)) def test_exists_miss_w_bound_client(self): - conn = _Connection() - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) self.assertFalse(subscription.exists()) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._subscription_got, self.SUB_PATH) + self.assertEqual(api._subscription_got, self.SUB_PATH) def test_exists_hit_w_alternate_client(self): RESPONSE = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._subscription_get_response = RESPONSE - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_get_response = RESPONSE topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) self.assertTrue(subscription.exists(client=client2)) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._subscription_got, self.SUB_PATH) + self.assertEqual(api._subscription_got, self.SUB_PATH) def test_reload_w_bound_client(self): RESPONSE = { @@ -209,9 +199,9 @@ def test_reload_w_bound_client(self): 'ackDeadlineSeconds': self.DEADLINE, 'pushConfig': {'pushEndpoint': self.ENDPOINT}, } - conn = _Connection() - conn._subscription_get_response = RESPONSE - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_get_response = RESPONSE topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) @@ -219,19 +209,17 @@ def test_reload_w_bound_client(self): self.assertEqual(subscription.ack_deadline, self.DEADLINE) self.assertEqual(subscription.push_endpoint, self.ENDPOINT) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._subscription_got, self.SUB_PATH) + self.assertEqual(api._subscription_got, self.SUB_PATH) def test_reload_w_alternate_client(self): RESPONSE = { 'name': self.SUB_PATH, 'topic': self.TOPIC_PATH, } - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._subscription_get_response = RESPONSE - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_get_response = RESPONSE topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic, self.DEADLINE, self.ENDPOINT) @@ -240,60 +228,52 @@ def test_reload_w_alternate_client(self): self.assertEqual(subscription.ack_deadline, None) self.assertEqual(subscription.push_endpoint, None) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._subscription_got, self.SUB_PATH) + self.assertEqual(api._subscription_got, self.SUB_PATH) def test_delete_w_bound_client(self): RESPONSE = {} - conn = _Connection() - conn._subscription_delete_response = RESPONSE - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_delete_response = RESPONSE topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) subscription.delete() - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._subscription_deleted, self.SUB_PATH) + self.assertEqual(api._subscription_deleted, self.SUB_PATH) def test_delete_w_alternate_client(self): RESPONSE = {} - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._subscription_delete_response = RESPONSE - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_delete_response = RESPONSE topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic, self.DEADLINE, self.ENDPOINT) subscription.delete(client=client2) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._subscription_deleted, self.SUB_PATH) + self.assertEqual(api._subscription_deleted, self.SUB_PATH) def test_modify_push_config_w_endpoint_w_bound_client(self): - conn = _Connection() - conn._subscription_modify_push_config_response = {} - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_modify_push_config_response = {} topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) subscription.modify_push_configuration(push_endpoint=self.ENDPOINT) self.assertEqual(subscription.push_endpoint, self.ENDPOINT) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._subscription_modified_push_config, + self.assertEqual(api._subscription_modified_push_config, (self.SUB_PATH, self.ENDPOINT)) def test_modify_push_config_wo_endpoint_w_alternate_client(self): - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._subscription_modify_push_config_response = {} - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_modify_push_config_response = {} topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic, push_endpoint=self.ENDPOINT) @@ -302,9 +282,7 @@ def test_modify_push_config_wo_endpoint_w_alternate_client(self): client=client2) self.assertEqual(subscription.push_endpoint, None) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._subscription_modified_push_config, + self.assertEqual(api._subscription_modified_push_config, (self.SUB_PATH, None)) def test_pull_wo_return_immediately_max_messages_w_bound_client(self): @@ -316,9 +294,9 @@ def test_pull_wo_return_immediately_max_messages_w_bound_client(self): B64 = base64.b64encode(PAYLOAD) MESSAGE = {'messageId': MSG_ID, 'data': B64} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} - conn = _Connection() - conn._subscription_pull_response = [REC_MESSAGE] - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_pull_response = [REC_MESSAGE] topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) @@ -327,12 +305,11 @@ def test_pull_wo_return_immediately_max_messages_w_bound_client(self): self.assertEqual(len(pulled), 1) ack_id, message = pulled[0] self.assertEqual(ack_id, ACK_ID) - self.assertTrue(isinstance(message, Message)) + self.assertIsInstance(message, Message) self.assertEqual(message.data, PAYLOAD) self.assertEqual(message.message_id, MSG_ID) self.assertEqual(message.attributes, {}) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._subscription_pulled, + self.assertEqual(api._subscription_pulled, (self.SUB_PATH, False, 1)) def test_pull_w_return_immediately_w_max_messages_w_alt_client(self): @@ -344,11 +321,10 @@ def test_pull_w_return_immediately_w_max_messages_w_alt_client(self): B64 = base64.b64encode(PAYLOAD) MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._subscription_pull_response = [REC_MESSAGE] - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_pull_response = [REC_MESSAGE] topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) @@ -358,94 +334,83 @@ def test_pull_w_return_immediately_w_max_messages_w_alt_client(self): self.assertEqual(len(pulled), 1) ack_id, message = pulled[0] self.assertEqual(ack_id, ACK_ID) - self.assertTrue(isinstance(message, Message)) + self.assertIsInstance(message, Message) self.assertEqual(message.data, PAYLOAD) self.assertEqual(message.message_id, MSG_ID) self.assertEqual(message.attributes, {'a': 'b'}) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._subscription_pulled, + self.assertEqual(api._subscription_pulled, (self.SUB_PATH, True, 3)) def test_pull_wo_receivedMessages(self): - conn = _Connection({}) - conn._subscription_pull_response = {} - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_pull_response = {} topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) pulled = subscription.pull(return_immediately=False) self.assertEqual(len(pulled), 0) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._subscription_pulled, + self.assertEqual(api._subscription_pulled, (self.SUB_PATH, False, 1)) def test_acknowledge_w_bound_client(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' - conn = _Connection() - conn._subscription_acknowlege_response = {} - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_acknowlege_response = {} topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) subscription.acknowledge([ACK_ID1, ACK_ID2]) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._subscription_acked, + self.assertEqual(api._subscription_acked, (self.SUB_PATH, [ACK_ID1, ACK_ID2])) def test_acknowledge_w_alternate_client(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._subscription_acknowlege_response = {} - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_acknowlege_response = {} topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) subscription.acknowledge([ACK_ID1, ACK_ID2], client=client2) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._subscription_acked, + self.assertEqual(api._subscription_acked, (self.SUB_PATH, [ACK_ID1, ACK_ID2])) def test_modify_ack_deadline_w_bound_client(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' - conn = _Connection() - conn._subscription_modify_ack_deadline_response = {} - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_modify_ack_deadline_response = {} topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) subscription.modify_ack_deadline([ACK_ID1, ACK_ID2], self.DEADLINE) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._subscription_modified_ack_deadline, + self.assertEqual(api._subscription_modified_ack_deadline, (self.SUB_PATH, [ACK_ID1, ACK_ID2], self.DEADLINE)) def test_modify_ack_deadline_w_alternate_client(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._subscription_modify_ack_deadline_response = {} - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_modify_ack_deadline_response = {} topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) subscription.modify_ack_deadline( [ACK_ID1, ACK_ID2], self.DEADLINE, client=client2) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._subscription_modified_ack_deadline, + self.assertEqual(api._subscription_modified_ack_deadline, (self.SUB_PATH, [ACK_ID1, ACK_ID2], self.DEADLINE)) def test_get_iam_policy_w_bound_client(self): @@ -465,9 +430,9 @@ def test_get_iam_policy_w_bound_client(self): {'role': VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, ], } - conn = _Connection() - conn._get_iam_policy_response = POLICY - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.iam_policy_api = _FauxIAMPolicy() + api._get_iam_policy_response = POLICY topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) @@ -478,18 +443,16 @@ def test_get_iam_policy_w_bound_client(self): self.assertEqual(sorted(policy.owners), [OWNER2, OWNER1]) self.assertEqual(sorted(policy.editors), [EDITOR1, EDITOR2]) self.assertEqual(sorted(policy.viewers), [VIEWER1, VIEWER2]) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._got_iam_policy, self.SUB_PATH) + self.assertEqual(api._got_iam_policy, self.SUB_PATH) def test_get_iam_policy_w_alternate_client(self): POLICY = { 'etag': 'ACAB', } - conn1 = _Connection() - conn2 = _Connection() - conn2._get_iam_policy_response = POLICY - client1 = _Client(project=self.PROJECT, connection=conn1) - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.iam_policy_api = _FauxIAMPolicy() + api._get_iam_policy_response = POLICY topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) @@ -501,9 +464,7 @@ def test_get_iam_policy_w_alternate_client(self): self.assertEqual(sorted(policy.editors), []) self.assertEqual(sorted(policy.viewers), []) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._got_iam_policy, self.SUB_PATH) + self.assertEqual(api._got_iam_policy, self.SUB_PATH) def test_set_iam_policy_w_bound_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE @@ -526,9 +487,9 @@ def test_set_iam_policy_w_bound_client(self): RESPONSE = POLICY.copy() RESPONSE['etag'] = 'ABACABAF' RESPONSE['version'] = 18 - conn = _Connection() - conn._set_iam_policy_response = RESPONSE - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.iam_policy_api = _FauxIAMPolicy() + api._set_iam_policy_response = RESPONSE topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) policy = Policy('DEADBEEF', 17) @@ -546,17 +507,15 @@ def test_set_iam_policy_w_bound_client(self): self.assertEqual(sorted(new_policy.owners), [OWNER1, OWNER2]) self.assertEqual(sorted(new_policy.editors), [EDITOR1, EDITOR2]) self.assertEqual(sorted(new_policy.viewers), [VIEWER1, VIEWER2]) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._set_iam_policy, (self.SUB_PATH, POLICY)) + self.assertEqual(api._set_iam_policy, (self.SUB_PATH, POLICY)) def test_set_iam_policy_w_alternate_client(self): from gcloud.pubsub.iam import Policy RESPONSE = {'etag': 'ACAB'} - conn1 = _Connection() - conn2 = _Connection() - conn2._set_iam_policy_response = RESPONSE - client1 = _Client(project=self.PROJECT, connection=conn1) - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.iam_policy_api = _FauxIAMPolicy() + api._set_iam_policy_response = RESPONSE topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) @@ -568,51 +527,41 @@ def test_set_iam_policy_w_alternate_client(self): self.assertEqual(sorted(new_policy.owners), []) self.assertEqual(sorted(new_policy.editors), []) self.assertEqual(sorted(new_policy.viewers), []) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._set_iam_policy, (self.SUB_PATH, {})) + self.assertEqual(api._set_iam_policy, (self.SUB_PATH, {})) def test_check_iam_permissions_w_bound_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE ROLES = [VIEWER_ROLE, EDITOR_ROLE, OWNER_ROLE] - conn = _Connection() - conn._test_iam_permissions_response = ROLES[:-1] - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.iam_policy_api = _FauxIAMPolicy() + api._test_iam_permissions_response = ROLES[:-1] topic = _Topic(self.TOPIC_NAME, client=client) subscription = self._makeOne(self.SUB_NAME, topic) allowed = subscription.check_iam_permissions(ROLES) self.assertEqual(allowed, ROLES[:-1]) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._tested_iam_permissions, + self.assertEqual(api._tested_iam_permissions, (self.SUB_PATH, ROLES)) def test_check_iam_permissions_w_alternate_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE ROLES = [VIEWER_ROLE, EDITOR_ROLE, OWNER_ROLE] - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._test_iam_permissions_response = [] - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.iam_policy_api = _FauxIAMPolicy() + api._test_iam_permissions_response = [] topic = _Topic(self.TOPIC_NAME, client=client1) subscription = self._makeOne(self.SUB_NAME, topic) allowed = subscription.check_iam_permissions(ROLES, client=client2) self.assertEqual(len(allowed), 0) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._tested_iam_permissions, + self.assertEqual(api._tested_iam_permissions, (self.SUB_PATH, ROLES)) -class _Connection(object): # pylint: disable=too-many-instance-attributes - - def __init__(self, *responses): - self._responses = responses - self._requested = [] +class _FauxSubscribererAPI(object): def subscription_create(self, subscription_path, topic_path, ack_deadline=None, push_endpoint=None): @@ -654,6 +603,9 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, subscription_path, ack_ids, ack_deadline) return self._subscription_modify_ack_deadline_response + +class _FauxIAMPolicy(object): + def get_iam_policy(self, target_path): self._got_iam_policy = target_path return self._get_iam_policy_response @@ -679,9 +631,10 @@ def __init__(self, name, client): class _Client(object): - def __init__(self, project, connection=None): + connection = None + + def __init__(self, project): self.project = project - self.connection = connection def topic(self, name, timestamp_messages=False): from gcloud.pubsub.topic import Topic diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py index c1844dc60a69..20a44141ffcd 100644 --- a/gcloud/pubsub/test_topic.py +++ b/gcloud/pubsub/test_topic.py @@ -58,78 +58,66 @@ def test_from_api_repr_with_bad_client(self): resource, client=client) def test_create_w_bound_client(self): - conn = _Connection() - conn._topic_create_response = {'name': self.TOPIC_PATH} - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() + api._topic_create_response = {'name': self.TOPIC_PATH} topic = self._makeOne(self.TOPIC_NAME, client=client) topic.create() - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_created, self.TOPIC_PATH) + self.assertEqual(api._topic_created, self.TOPIC_PATH) def test_create_w_alternate_client(self): - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._topic_create_response = {'name': self.TOPIC_PATH} - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.publisher_api = _FauxPublisherAPI() + api._topic_create_response = {'name': self.TOPIC_PATH} topic = self._makeOne(self.TOPIC_NAME, client=client1) topic.create(client=client2) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._topic_created, self.TOPIC_PATH) + self.assertEqual(api._topic_created, self.TOPIC_PATH) def test_exists_miss_w_bound_client(self): - conn = _Connection() - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() topic = self._makeOne(self.TOPIC_NAME, client=client) self.assertFalse(topic.exists()) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_got, self.TOPIC_PATH) + self.assertEqual(api._topic_got, self.TOPIC_PATH) def test_exists_hit_w_alternate_client(self): - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._topic_get_response = {'name': self.TOPIC_PATH} - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.publisher_api = _FauxPublisherAPI() + api._topic_get_response = {'name': self.TOPIC_PATH} topic = self._makeOne(self.TOPIC_NAME, client=client1) self.assertTrue(topic.exists(client=client2)) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._topic_got, self.TOPIC_PATH) + self.assertEqual(api._topic_got, self.TOPIC_PATH) def test_delete_w_bound_client(self): - conn = _Connection() - conn._topic_delete_response = {} - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() + api._topic_delete_response = {} topic = self._makeOne(self.TOPIC_NAME, client=client) topic.delete() - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_deleted, self.TOPIC_PATH) + self.assertEqual(api._topic_deleted, self.TOPIC_PATH) def test_delete_w_alternate_client(self): - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._topic_delete_response = {} - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.publisher_api = _FauxPublisherAPI() + api._topic_delete_response = {} topic = self._makeOne(self.TOPIC_NAME, client=client1) topic.delete(client=client2) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._topic_deleted, self.TOPIC_PATH) + self.assertEqual(api._topic_deleted, self.TOPIC_PATH) def test_publish_single_bytes_wo_attrs_w_bound_client(self): import base64 @@ -137,16 +125,15 @@ def test_publish_single_bytes_wo_attrs_w_bound_client(self): B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {}} - conn = _Connection() - conn._topic_publish_response = [MSGID] - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() + api._topic_publish_response = [MSGID] topic = self._makeOne(self.TOPIC_NAME, client=client) msgid = topic.publish(PAYLOAD) self.assertEqual(msgid, MSGID) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_published, (self.TOPIC_PATH, [MESSAGE])) + self.assertEqual(api._topic_published, (self.TOPIC_PATH, [MESSAGE])) def test_publish_single_bytes_wo_attrs_w_add_timestamp_alt_client(self): import base64 @@ -166,11 +153,10 @@ def _utcnow(): 'data': B64, 'attributes': {'timestamp': NOW.strftime(_RFC3339_MICROS)}, } - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._topic_publish_response = [MSGID] - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.publisher_api = _FauxPublisherAPI() + api._topic_publish_response = [MSGID] topic = self._makeOne(self.TOPIC_NAME, client=client1, timestamp_messages=True) @@ -178,9 +164,7 @@ def _utcnow(): msgid = topic.publish(PAYLOAD, client=client2) self.assertEqual(msgid, MSGID) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._topic_published, (self.TOPIC_PATH, [MESSAGE])) + self.assertEqual(api._topic_published, (self.TOPIC_PATH, [MESSAGE])) def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): import base64 @@ -190,17 +174,16 @@ def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): OVERRIDE = '2015-04-10T16:46:22.868399Z' MESSAGE = {'data': B64, 'attributes': {'timestamp': OVERRIDE}} - conn = _Connection() - conn._topic_publish_response = [MSGID] - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() + api._topic_publish_response = [MSGID] topic = self._makeOne(self.TOPIC_NAME, client=client, timestamp_messages=True) msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE) self.assertEqual(msgid, MSGID) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_published, (self.TOPIC_PATH, [MESSAGE])) + self.assertEqual(api._topic_published, (self.TOPIC_PATH, [MESSAGE])) def test_publish_single_w_attrs(self): import base64 @@ -209,16 +192,15 @@ def test_publish_single_w_attrs(self): MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn = _Connection() - conn._topic_publish_response = [MSGID] - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() + api._topic_publish_response = [MSGID] topic = self._makeOne(self.TOPIC_NAME, client=client) msgid = topic.publish(PAYLOAD, attr1='value1', attr2='value2') self.assertEqual(msgid, MSGID) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_published, (self.TOPIC_PATH, [MESSAGE])) + self.assertEqual(api._topic_published, (self.TOPIC_PATH, [MESSAGE])) def test_publish_multiple_w_bound_client(self): import base64 @@ -232,9 +214,9 @@ def test_publish_multiple_w_bound_client(self): 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn = _Connection() - conn._topic_publish_response = [MSGID1, MSGID2] - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() + api._topic_publish_response = [MSGID1, MSGID2] topic = self._makeOne(self.TOPIC_NAME, client=client) with topic.batch() as batch: @@ -243,8 +225,7 @@ def test_publish_multiple_w_bound_client(self): self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_published, + self.assertEqual(api._topic_published, (self.TOPIC_PATH, [MESSAGE1, MESSAGE2])) def test_publish_multiple_w_alternate_client(self): @@ -260,11 +241,10 @@ def test_publish_multiple_w_alternate_client(self): 'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}, } - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._topic_publish_response = [MSGID1, MSGID2] - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.publisher_api = _FauxPublisherAPI() + api._topic_publish_response = [MSGID1, MSGID2] topic = self._makeOne(self.TOPIC_NAME, client=client1) with topic.batch(client=client2) as batch: @@ -273,16 +253,14 @@ def test_publish_multiple_w_alternate_client(self): self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._topic_published, + self.assertEqual(api._topic_published, (self.TOPIC_PATH, [MESSAGE1, MESSAGE2])) def test_publish_multiple_error(self): PAYLOAD1 = b'This is the first message text' PAYLOAD2 = b'This is the second message text' - conn = _Connection() client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() topic = self._makeOne(self.TOPIC_NAME, client=client) try: @@ -294,8 +272,7 @@ def test_publish_multiple_error(self): pass self.assertEqual(list(batch), []) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(getattr(conn, '_topic_published', self), self) + self.assertEqual(getattr(api, '_topic_published', self), self) def test_subscription(self): from gcloud.pubsub.subscription import Subscription @@ -304,7 +281,7 @@ def test_subscription(self): SUBSCRIPTION_NAME = 'subscription_name' subscription = topic.subscription(SUBSCRIPTION_NAME) - self.assertTrue(isinstance(subscription, Subscription)) + self.assertIsInstance(subscription, Subscription) self.assertEqual(subscription.name, SUBSCRIPTION_NAME) self.assertTrue(subscription.topic is topic) @@ -319,9 +296,9 @@ def test_list_subscriptions_no_paging(self): SUBS_LIST = [SUB_PATH_1, SUB_PATH_2] TOKEN = 'TOKEN' - conn = _Connection() - conn._topic_list_subscriptions_response = SUBS_LIST, TOKEN - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() + api._topic_list_subscriptions_response = SUBS_LIST, TOKEN topic = self._makeOne(self.TOPIC_NAME, client=client) subscriptions, next_page_token = topic.list_subscriptions() @@ -329,18 +306,17 @@ def test_list_subscriptions_no_paging(self): self.assertEqual(len(subscriptions), 2) subscription = subscriptions[0] - self.assertTrue(isinstance(subscription, Subscription)) + self.assertIsInstance(subscription, Subscription) self.assertEqual(subscriptions[0].name, SUB_NAME_1) self.assertTrue(subscription.topic is topic) subscription = subscriptions[1] - self.assertTrue(isinstance(subscription, Subscription)) + self.assertIsInstance(subscription, Subscription) self.assertEqual(subscriptions[1].name, SUB_NAME_2) self.assertTrue(subscription.topic is topic) self.assertEqual(next_page_token, TOKEN) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_listed, + self.assertEqual(api._topic_listed, (self.TOPIC_PATH, None, None)) def test_list_subscriptions_with_paging(self): @@ -355,9 +331,9 @@ def test_list_subscriptions_with_paging(self): PAGE_SIZE = 10 TOKEN = 'TOKEN' - conn = _Connection() - conn._topic_list_subscriptions_response = SUBS_LIST, None - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() + api._topic_list_subscriptions_response = SUBS_LIST, None topic = self._makeOne(self.TOPIC_NAME, client=client) subscriptions, next_page_token = topic.list_subscriptions( @@ -366,24 +342,23 @@ def test_list_subscriptions_with_paging(self): self.assertEqual(len(subscriptions), 2) subscription = subscriptions[0] - self.assertTrue(isinstance(subscription, Subscription)) + self.assertIsInstance(subscription, Subscription) self.assertEqual(subscriptions[0].name, SUB_NAME_1) self.assertTrue(subscription.topic is topic) subscription = subscriptions[1] - self.assertTrue(isinstance(subscription, Subscription)) + self.assertIsInstance(subscription, Subscription) self.assertEqual(subscriptions[1].name, SUB_NAME_2) self.assertTrue(subscription.topic is topic) self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_listed, + self.assertEqual(api._topic_listed, (self.TOPIC_PATH, PAGE_SIZE, TOKEN)) def test_list_subscriptions_missing_key(self): - conn = _Connection() - conn._topic_list_subscriptions_response = (), None - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.publisher_api = _FauxPublisherAPI() + api._topic_list_subscriptions_response = (), None topic = self._makeOne(self.TOPIC_NAME, client=client) subscriptions, next_page_token = topic.list_subscriptions() @@ -391,8 +366,7 @@ def test_list_subscriptions_missing_key(self): self.assertEqual(len(subscriptions), 0) self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_listed, + self.assertEqual(api._topic_listed, (self.TOPIC_PATH, None, None)) def test_get_iam_policy_w_bound_client(self): @@ -413,9 +387,9 @@ def test_get_iam_policy_w_bound_client(self): ], } - conn = _Connection() - conn._get_iam_policy_response = POLICY - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.iam_policy_api = _FauxIAMPolicy() + api._get_iam_policy_response = POLICY topic = self._makeOne(self.TOPIC_NAME, client=client) policy = topic.get_iam_policy() @@ -425,19 +399,17 @@ def test_get_iam_policy_w_bound_client(self): self.assertEqual(sorted(policy.owners), [OWNER2, OWNER1]) self.assertEqual(sorted(policy.editors), [EDITOR1, EDITOR2]) self.assertEqual(sorted(policy.viewers), [VIEWER1, VIEWER2]) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._got_iam_policy, self.TOPIC_PATH) + self.assertEqual(api._got_iam_policy, self.TOPIC_PATH) def test_get_iam_policy_w_alternate_client(self): POLICY = { 'etag': 'ACAB', } - conn1 = _Connection() - conn2 = _Connection() - conn2._get_iam_policy_response = POLICY - client1 = _Client(project=self.PROJECT, connection=conn1) - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.iam_policy_api = _FauxIAMPolicy() + api._get_iam_policy_response = POLICY topic = self._makeOne(self.TOPIC_NAME, client=client1) policy = topic.get_iam_policy(client=client2) @@ -448,9 +420,7 @@ def test_get_iam_policy_w_alternate_client(self): self.assertEqual(sorted(policy.editors), []) self.assertEqual(sorted(policy.viewers), []) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._got_iam_policy, self.TOPIC_PATH) + self.assertEqual(api._got_iam_policy, self.TOPIC_PATH) def test_set_iam_policy_w_bound_client(self): from gcloud.pubsub.iam import Policy @@ -474,9 +444,9 @@ def test_set_iam_policy_w_bound_client(self): RESPONSE['etag'] = 'ABACABAF' RESPONSE['version'] = 18 - conn = _Connection() - conn._set_iam_policy_response = RESPONSE - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.iam_policy_api = _FauxIAMPolicy() + api._set_iam_policy_response = RESPONSE topic = self._makeOne(self.TOPIC_NAME, client=client) policy = Policy('DEADBEEF', 17) policy.owners.add(OWNER1) @@ -493,18 +463,16 @@ def test_set_iam_policy_w_bound_client(self): self.assertEqual(sorted(new_policy.owners), [OWNER1, OWNER2]) self.assertEqual(sorted(new_policy.editors), [EDITOR1, EDITOR2]) self.assertEqual(sorted(new_policy.viewers), [VIEWER1, VIEWER2]) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._set_iam_policy, (self.TOPIC_PATH, POLICY)) + self.assertEqual(api._set_iam_policy, (self.TOPIC_PATH, POLICY)) def test_set_iam_policy_w_alternate_client(self): from gcloud.pubsub.iam import Policy RESPONSE = {'etag': 'ACAB'} - conn1 = _Connection() - conn2 = _Connection() - conn2._set_iam_policy_response = RESPONSE - client1 = _Client(project=self.PROJECT, connection=conn1) - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.iam_policy_api = _FauxIAMPolicy() + api._set_iam_policy_response = RESPONSE topic = self._makeOne(self.TOPIC_NAME, client=client1) policy = Policy() @@ -516,41 +484,35 @@ def test_set_iam_policy_w_alternate_client(self): self.assertEqual(sorted(new_policy.editors), []) self.assertEqual(sorted(new_policy.viewers), []) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._set_iam_policy, (self.TOPIC_PATH, {})) + self.assertEqual(api._set_iam_policy, (self.TOPIC_PATH, {})) def test_check_iam_permissions_w_bound_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE ROLES = [VIEWER_ROLE, EDITOR_ROLE, OWNER_ROLE] - conn = _Connection() - conn._test_iam_permissions_response = ROLES[:-1] - client = _Client(project=self.PROJECT, connection=conn) + client = _Client(project=self.PROJECT) + api = client.iam_policy_api = _FauxIAMPolicy() + api._test_iam_permissions_response = ROLES[:-1] topic = self._makeOne(self.TOPIC_NAME, client=client) allowed = topic.check_iam_permissions(ROLES) self.assertEqual(allowed, ROLES[:-1]) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._tested_iam_permissions, + self.assertEqual(api._tested_iam_permissions, (self.TOPIC_PATH, ROLES)) def test_check_iam_permissions_w_alternate_client(self): from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE ROLES = [VIEWER_ROLE, EDITOR_ROLE, OWNER_ROLE] - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection() - conn2._test_iam_permissions_response = [] - client2 = _Client(project=self.PROJECT, connection=conn2) + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.iam_policy_api = _FauxIAMPolicy() + api._test_iam_permissions_response = [] topic = self._makeOne(self.TOPIC_NAME, client=client1) allowed = topic.check_iam_permissions(ROLES, client=client2) self.assertEqual(len(allowed), 0) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._tested_iam_permissions, + self.assertEqual(api._tested_iam_permissions, (self.TOPIC_PATH, ROLES)) @@ -592,12 +554,10 @@ def test_publish_bytes_wo_attrs(self): B64 = base64.b64encode(PAYLOAD).decode('ascii') MESSAGE = {'data': B64, 'attributes': {}} - connection = _Connection() - client = _Client(project=self.PROJECT, connection=connection) + client = _Client(project=self.PROJECT) topic = _Topic() batch = self._makeOne(topic, client=client) batch.publish(PAYLOAD) - self.assertEqual(len(connection._requested), 0) self.assertEqual(batch.messages, [MESSAGE]) def test_publish_bytes_w_add_timestamp(self): @@ -606,12 +566,10 @@ def test_publish_bytes_w_add_timestamp(self): B64 = base64.b64encode(PAYLOAD).decode('ascii') MESSAGE = {'data': B64, 'attributes': {'timestamp': 'TIMESTAMP'}} - connection = _Connection() - client = _Client(project=self.PROJECT, connection=connection) + client = _Client(project=self.PROJECT) topic = _Topic(timestamp_messages=True) batch = self._makeOne(topic, client=client) batch.publish(PAYLOAD) - self.assertEqual(len(connection._requested), 0) self.assertEqual(batch.messages, [MESSAGE]) def test_commit_w_bound_client(self): @@ -626,9 +584,9 @@ def test_commit_w_bound_client(self): 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn = _Connection() - conn._topic_publish_response = [MSGID1, MSGID2] - client = _Client(project='PROJECT', connection=conn) + client = _Client(project='PROJECT') + api = client.publisher_api = _FauxPublisherAPI() + api._topic_publish_response = [MSGID1, MSGID2] topic = _Topic() batch = self._makeOne(topic, client=client) @@ -638,8 +596,7 @@ def test_commit_w_bound_client(self): self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_published, + self.assertEqual(api._topic_published, (topic.full_name, [MESSAGE1, MESSAGE2])) def test_commit_w_alternate_client(self): @@ -654,11 +611,10 @@ def test_commit_w_alternate_client(self): 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn1 = _Connection() - client1 = _Client(project='PROJECT', connection=conn1) - conn2 = _Connection() - conn2._topic_publish_response = [MSGID1, MSGID2] - client2 = _Client(project='PROJECT', connection=conn2) + client1 = _Client(project='PROJECT') + client2 = _Client(project='PROJECT') + api = client2.publisher_api = _FauxPublisherAPI() + api._topic_publish_response = [MSGID1, MSGID2] topic = _Topic() batch = self._makeOne(topic, client=client1) @@ -668,9 +624,7 @@ def test_commit_w_alternate_client(self): self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 0) - self.assertEqual(conn2._topic_published, + self.assertEqual(api._topic_published, (topic.full_name, [MESSAGE1, MESSAGE2])) def test_context_mgr_success(self): @@ -685,9 +639,9 @@ def test_context_mgr_success(self): 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn = _Connection() - conn._topic_publish_response = [MSGID1, MSGID2] - client = _Client(project='PROJECT', connection=conn) + client = _Client(project='PROJECT') + api = client.publisher_api = _FauxPublisherAPI() + api._topic_publish_response = [MSGID1, MSGID2] topic = _Topic() batch = self._makeOne(topic, client=client) @@ -698,8 +652,7 @@ def test_context_mgr_success(self): self.assertTrue(other is batch) self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(conn._topic_published, + self.assertEqual(api._topic_published, (topic.full_name, [MESSAGE1, MESSAGE2])) def test_context_mgr_failure(self): @@ -708,14 +661,12 @@ def test_context_mgr_failure(self): PAYLOAD2 = b'This is the second message text' B64_1 = base64.b64encode(PAYLOAD1) B64_2 = base64.b64encode(PAYLOAD2) - MSGID1 = 'DEADBEEF' - MSGID2 = 'BEADCAFE' MESSAGE1 = {'data': B64_1.decode('ascii'), 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - client = _Client(project='PROJECT', connection=conn) + client = _Client(project='PROJECT') + api = client.publisher_api = _FauxPublisherAPI() topic = _Topic() batch = self._makeOne(topic, client=client) @@ -730,15 +681,10 @@ def test_context_mgr_failure(self): self.assertTrue(other is batch) self.assertEqual(list(batch), []) self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2]) - self.assertEqual(len(conn._requested), 0) - self.assertEqual(getattr(conn, '_topic_published', self), self) - + self.assertEqual(getattr(api, '_topic_published', self), self) -class _Connection(object): # pylint: disable=too-many-instance-attributes - def __init__(self, *responses): - self._responses = responses - self._requested = [] +class _FauxPublisherAPI(object): def topic_create(self, topic_path): self._topic_created = topic_path @@ -765,6 +711,9 @@ def topic_list_subscriptions(self, topic_path, page_size=None, self._topic_listed = topic_path, page_size, page_token return self._topic_list_subscriptions_response + +class _FauxIAMPolicy(object): + def get_iam_policy(self, target_path): self._got_iam_policy = target_path return self._get_iam_policy_response @@ -793,9 +742,10 @@ def _timestamp_message(self, attrs): class _Client(object): - def __init__(self, project, connection=None): + connection = None + + def __init__(self, project): self.project = project - self.connection = connection class _Bugout(Exception): diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py index 9aa8fa171a8a..1e01d52f221b 100644 --- a/gcloud/pubsub/topic.py +++ b/gcloud/pubsub/topic.py @@ -123,7 +123,8 @@ def create(self, client=None): ``client`` stored on the current topic. """ client = self._require_client(client) - client.connection.topic_create(topic_path=self.full_name) + api = client.publisher_api + api.topic_create(topic_path=self.full_name) def exists(self, client=None): """API call: test for the existence of the topic via a GET request @@ -136,9 +137,10 @@ def exists(self, client=None): ``client`` stored on the current topic. """ client = self._require_client(client) + api = client.publisher_api try: - client.connection.topic_get(topic_path=self.full_name) + api.topic_get(topic_path=self.full_name) except NotFound: return False else: @@ -155,7 +157,8 @@ def delete(self, client=None): ``client`` stored on the current topic. """ client = self._require_client(client) - client.connection.topic_delete(topic_path=self.full_name) + api = client.publisher_api + api.topic_delete(topic_path=self.full_name) def _timestamp_message(self, attrs): """Add a timestamp to ``attrs``, if the topic is so configured. @@ -187,12 +190,12 @@ def publish(self, message, client=None, **attrs): :returns: message ID assigned by the server to the published message """ client = self._require_client(client) + api = client.publisher_api self._timestamp_message(attrs) message_b = base64.b64encode(message).decode('ascii') message_data = {'data': message_b, 'attributes': attrs} - message_ids = client.connection.topic_publish( - self.full_name, [message_data]) + message_ids = api.topic_publish(self.full_name, [message_data]) return message_ids[0] def batch(self, client=None): @@ -234,8 +237,8 @@ def list_subscriptions(self, page_size=None, page_token=None, client=None): value as ``page_token``). """ client = self._require_client(client) - conn = client.connection - sub_paths, next_token = conn.topic_list_subscriptions( + api = client.publisher_api + sub_paths, next_token = api.topic_list_subscriptions( self.full_name, page_size, page_token) subscriptions = [] for sub_path in sub_paths: @@ -258,7 +261,8 @@ def get_iam_policy(self, client=None): ``getIamPolicy`` API request. """ client = self._require_client(client) - resp = client.connection.get_iam_policy(self.full_name) + api = client.iam_policy_api + resp = api.get_iam_policy(self.full_name) return Policy.from_api_repr(resp) def set_iam_policy(self, policy, client=None): @@ -280,8 +284,9 @@ def set_iam_policy(self, policy, client=None): ``setIamPolicy`` API request. """ client = self._require_client(client) + api = client.iam_policy_api resource = policy.to_api_repr() - resp = client.connection.set_iam_policy(self.full_name, resource) + resp = api.set_iam_policy(self.full_name, resource) return Policy.from_api_repr(resp) def check_iam_permissions(self, permissions, client=None): @@ -301,7 +306,8 @@ def check_iam_permissions(self, permissions, client=None): :returns: subset of ``permissions`` allowed by current IAM policy. """ client = self._require_client(client) - return client.connection.test_iam_permissions( + api = client.iam_policy_api + return api.test_iam_permissions( self.full_name, list(permissions)) @@ -355,7 +361,7 @@ def commit(self, client=None): """ if client is None: client = self.client - message_ids = client.connection.topic_publish( - self.topic.full_name, self.messages[:]) + api = client.publisher_api + message_ids = api.topic_publish(self.topic.full_name, self.messages[:]) self.message_ids.extend(message_ids) del self.messages[:]