From 042e062e928c12050853b690fedfce09c30c1bb4 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 15 Dec 2016 10:17:07 -0800 Subject: [PATCH 1/3] Removing references to cnxn in Pub/Sub _gax module. --- pubsub/google/cloud/pubsub/_gax.py | 56 +++++++++++++++++----------- pubsub/google/cloud/pubsub/client.py | 8 +++- pubsub/unit_tests/test__gax.py | 31 ++++----------- pubsub/unit_tests/test_client.py | 14 +++++-- 4 files changed, 58 insertions(+), 51 deletions(-) diff --git a/pubsub/google/cloud/pubsub/_gax.py b/pubsub/google/cloud/pubsub/_gax.py index 97b73db687aa..2ca9c1be6c02 100644 --- a/pubsub/google/cloud/pubsub/_gax.py +++ b/pubsub/google/cloud/pubsub/_gax.py @@ -512,48 +512,60 @@ def _received_message_pb_to_mapping(received_message_pb): } -def make_gax_publisher_api(connection): +def make_gax_publisher_api(credentials, host=None, secure=True): """Create an instance of the GAX Publisher API. - If the ``connection`` is intended for a local emulator, then - an insecure ``channel`` is created pointing at the local - Pub / Sub server. + If the ``secure=False`` then we create an insecure ``channel`` + pointing at the local Pub / Sub emulator. - :type connection: :class:`~google.cloud.pubsub._http.Connection` - :param connection: The connection that holds configuration details. + :type credentials: :class:`~google.auth.credentials.Credentials` + :param credentials: Credentials for getting access tokens. + + :type host: str + :param host: (Optional) The host for an insecure channel. Only + used if ``secure=False``. + + :type secure: bool + :param secure: (Optional) Indicates if we should create a secure + or insecure channel. Defaults to :data:`True`. :rtype: :class:`.publisher_client.PublisherClient` - :returns: A publisher API instance with the proper connection - configuration. + :returns: A publisher API instance with the proper channel. """ - if connection.in_emulator: - channel = insecure_channel(connection.host) + if not secure: + channel = insecure_channel(host) else: channel = make_secure_channel( - connection.credentials, DEFAULT_USER_AGENT, + credentials, DEFAULT_USER_AGENT, PublisherClient.SERVICE_ADDRESS) return PublisherClient(channel=channel) -def make_gax_subscriber_api(connection): +def make_gax_subscriber_api(credentials, host=None, secure=True): """Create an instance of the GAX Subscriber API. - If the ``connection`` is intended for a local emulator, then - an insecure ``channel`` is created pointing at the local - Pub / Sub server. + If the ``secure=False`` then we create an insecure ``channel`` + pointing at the local Pub / Sub emulator. + + :type credentials: :class:`~google.auth.credentials.Credentials` + :param credentials: Credentials for getting access tokens. + + :type host: str + :param host: (Optional) The host for an insecure channel. Only + used if ``secure=False``. - :type connection: :class:`~google.cloud.pubsub._http.Connection` - :param connection: The connection that holds configuration details. + :type secure: bool + :param secure: (Optional) Indicates if we should create a secure + or insecure channel. Defaults to :data:`True`. :rtype: :class:`.subscriber_client.SubscriberClient` - :returns: A subscriber API instance with the proper connection - configuration. + :returns: A subscriber API instance with the proper channel. """ - if connection.in_emulator: - channel = insecure_channel(connection.host) + if not secure: + channel = insecure_channel(host) else: channel = make_secure_channel( - connection.credentials, DEFAULT_USER_AGENT, + credentials, DEFAULT_USER_AGENT, SubscriberClient.SERVICE_ADDRESS) return SubscriberClient(channel=channel) diff --git a/pubsub/google/cloud/pubsub/client.py b/pubsub/google/cloud/pubsub/client.py index 271d231c2329..6388da2cbdbf 100644 --- a/pubsub/google/cloud/pubsub/client.py +++ b/pubsub/google/cloud/pubsub/client.py @@ -91,7 +91,9 @@ def publisher_api(self): """Helper for publisher-related API calls.""" if self._publisher_api is None: if self._use_gax: - generated = make_gax_publisher_api(self._connection) + generated = make_gax_publisher_api( + self._credentials, host=self._connection.host, + secure=not self._connection.in_emulator) self._publisher_api = GAXPublisherAPI(generated, self) else: self._publisher_api = JSONPublisherAPI(self) @@ -102,7 +104,9 @@ def subscriber_api(self): """Helper for subscriber-related API calls.""" if self._subscriber_api is None: if self._use_gax: - generated = make_gax_subscriber_api(self._connection) + generated = make_gax_subscriber_api( + self._credentials, host=self._connection.host, + secure=not self._connection.in_emulator) self._subscriber_api = GAXSubscriberAPI(generated, self) else: self._subscriber_api = JSONSubscriberAPI(self) diff --git a/pubsub/unit_tests/test__gax.py b/pubsub/unit_tests/test__gax.py index 4f2037d7c4dd..f9682f02e14b 100644 --- a/pubsub/unit_tests/test__gax.py +++ b/pubsub/unit_tests/test__gax.py @@ -900,9 +900,9 @@ def test_subscription_modify_ack_deadline_error(self): @unittest.skipUnless(_HAVE_GAX, 'No gax-python') class Test_make_gax_publisher_api(_Base, unittest.TestCase): - def _call_fut(self, connection): + def _call_fut(self, *args, **kwargs): from google.cloud.pubsub._gax import make_gax_publisher_api - return make_gax_publisher_api(connection) + return make_gax_publisher_api(*args, **kwargs) def test_live_api(self): from google.cloud.pubsub._gax import DEFAULT_USER_AGENT @@ -924,14 +924,12 @@ def make_channel(*args): mock_publisher_api.SERVICE_ADDRESS = host creds = _make_credentials() - connection = _Connection(in_emulator=False, - credentials=creds) patch = mock.patch.multiple( 'google.cloud.pubsub._gax', PublisherClient=mock_publisher_api, make_secure_channel=make_channel) with patch: - result = self._call_fut(connection) + result = self._call_fut(creds) self.assertIs(result, mock_result) self.assertEqual(channels, [channel_obj]) @@ -953,13 +951,12 @@ def mock_insecure_channel(host): return mock_channel host = 'CURR_HOST:1234' - connection = _Connection(in_emulator=True, host=host) patch = mock.patch.multiple( 'google.cloud.pubsub._gax', PublisherClient=mock_publisher_api, insecure_channel=mock_insecure_channel) with patch: - result = self._call_fut(connection) + result = self._call_fut(None, host=host, secure=False) self.assertIs(result, mock_result) self.assertEqual(channels, [mock_channel]) @@ -969,9 +966,9 @@ def mock_insecure_channel(host): @unittest.skipUnless(_HAVE_GAX, 'No gax-python') class Test_make_gax_subscriber_api(_Base, unittest.TestCase): - def _call_fut(self, connection): + def _call_fut(self, *args, **kwargs): from google.cloud.pubsub._gax import make_gax_subscriber_api - return make_gax_subscriber_api(connection) + return make_gax_subscriber_api(*args, **kwargs) def test_live_api(self): from google.cloud.pubsub._gax import DEFAULT_USER_AGENT @@ -993,14 +990,12 @@ def make_channel(*args): mock_subscriber_api.SERVICE_ADDRESS = host creds = _make_credentials() - connection = _Connection(in_emulator=False, - credentials=creds) patch = mock.patch.multiple( 'google.cloud.pubsub._gax', SubscriberClient=mock_subscriber_api, make_secure_channel=make_channel) with patch: - result = self._call_fut(connection) + result = self._call_fut(creds) self.assertIs(result, mock_result) self.assertEqual(channels, [channel_obj]) @@ -1022,13 +1017,12 @@ def mock_insecure_channel(host): return mock_channel host = 'CURR_HOST:1234' - connection = _Connection(in_emulator=True, host=host) patch = mock.patch.multiple( 'google.cloud.pubsub._gax', SubscriberClient=mock_subscriber_api, insecure_channel=mock_insecure_channel) with patch: - result = self._call_fut(connection) + result = self._call_fut(None, host=host, secure=False) self.assertIs(result, mock_result) self.assertEqual(channels, [mock_channel]) @@ -1207,15 +1201,6 @@ def __init__(self, received_messages): self.received_messages = received_messages -class _Connection(object): - - def __init__(self, in_emulator=False, host=None, - credentials=None): - self.in_emulator = in_emulator - self.host = host - self.credentials = credentials - - class _Client(object): def __init__(self, project): diff --git a/pubsub/unit_tests/test_client.py b/pubsub/unit_tests/test_client.py index 3bde01417359..7ba9a5b50d8c 100644 --- a/pubsub/unit_tests/test_client.py +++ b/pubsub/unit_tests/test_client.py @@ -69,6 +69,8 @@ def test_no_gax_ctor(self): self.assertIsInstance(api, _PublisherAPI) def test_publisher_api_w_gax(self): + from google.cloud.pubsub import _http + wrapped = object() _called_with = [] @@ -100,8 +102,9 @@ def __init__(self, _wrapped, client): # API instance is cached again = client.publisher_api self.assertIs(again, api) - args = (client._connection,) - self.assertEqual(_called_with, [(args, {})]) + args = (creds,) + kwargs = {'host': _http.Connection.API_BASE_URL, 'secure': True} + self.assertEqual(_called_with, [(args, kwargs)]) def test_subscriber_api_wo_gax(self): from google.cloud.pubsub._http import _SubscriberAPI @@ -121,6 +124,8 @@ def test_subscriber_api_wo_gax(self): self.assertIs(again, api) def test_subscriber_api_w_gax(self): + from google.cloud.pubsub import _http + wrapped = object() _called_with = [] @@ -152,8 +157,9 @@ def __init__(self, _wrapped, client): # API instance is cached again = client.subscriber_api self.assertIs(again, api) - args = (client._connection,) - self.assertEqual(_called_with, [(args, {})]) + args = (creds,) + kwargs = {'host': _http.Connection.API_BASE_URL, 'secure': True} + self.assertEqual(_called_with, [(args, kwargs)]) def test_iam_policy_api(self): from google.cloud.pubsub._http import _IAMPolicyAPI From d3f72162bc07a2443f94d400125a673e2c9d2df3 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 15 Dec 2016 10:47:34 -0800 Subject: [PATCH 2/3] Removing many references to connection in Pub / Sub _http. Still using a method of a connection object, but this way it can be more easily swapped out for a function defined in that module doing the same task. --- pubsub/google/cloud/pubsub/_http.py | 58 +++++++++++----------------- pubsub/google/cloud/pubsub/client.py | 2 +- pubsub/unit_tests/test__http.py | 21 ++++++---- pubsub/unit_tests/test_client.py | 13 ++++--- 4 files changed, 43 insertions(+), 51 deletions(-) diff --git a/pubsub/google/cloud/pubsub/_http.py b/pubsub/google/cloud/pubsub/_http.py index 635c43bdaab5..583413e313b6 100644 --- a/pubsub/google/cloud/pubsub/_http.py +++ b/pubsub/google/cloud/pubsub/_http.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Create / interact with Google Cloud Pub/Sub connections.""" +"""Interact with Google Cloud Pub/Sub via JSON-over-HTTP.""" import base64 import copy @@ -109,7 +109,7 @@ class _PublisherAPI(object): def __init__(self, client): self._client = client - self._connection = client._connection + self.api_request = client._connection.api_request def list_topics(self, project, page_size=None, page_token=None): """API call: list topics for a given project @@ -131,7 +131,7 @@ def list_topics(self, project, page_size=None, page_token=None): :rtype: :class:`~google.cloud.iterator.Iterator` :returns: Iterator of :class:`~google.cloud.pubsub.topic.Topic` - accessible to the current connection. + accessible to the current client. """ extra_params = {} if page_size is not None: @@ -156,8 +156,7 @@ def topic_create(self, topic_path): :rtype: dict :returns: ``Topic`` resource returned from the API. """ - conn = self._connection - return conn.api_request(method='PUT', path='/%s' % (topic_path,)) + return self.api_request(method='PUT', path='/%s' % (topic_path,)) def topic_get(self, topic_path): """API call: retrieve a topic @@ -172,8 +171,7 @@ def topic_get(self, topic_path): :rtype: dict :returns: ``Topic`` resource returned from the API. """ - conn = self._connection - return conn.api_request(method='GET', path='/%s' % (topic_path,)) + return self.api_request(method='GET', path='/%s' % (topic_path,)) def topic_delete(self, topic_path): """API call: delete a topic @@ -185,8 +183,7 @@ def topic_delete(self, topic_path): :param topic_path: the fully-qualified path of the topic, in format ``projects//topics/``. """ - conn = self._connection - conn.api_request(method='DELETE', path='/%s' % (topic_path,)) + self.api_request(method='DELETE', path='/%s' % (topic_path,)) def topic_publish(self, topic_path, messages): """API call: publish one or more messages to a topic @@ -206,9 +203,8 @@ def topic_publish(self, topic_path, messages): """ messages_to_send = copy.deepcopy(messages) _transform_messages_base64(messages_to_send, _base64_unicode) - conn = self._connection data = {'messages': messages_to_send} - response = conn.api_request( + response = self.api_request( method='POST', path='/%s:publish' % (topic_path,), data=data) return response['messageIds'] @@ -257,7 +253,7 @@ class _SubscriberAPI(object): def __init__(self, client): self._client = client - self._connection = client._connection + self.api_request = client._connection.api_request def list_subscriptions(self, project, page_size=None, page_token=None): """API call: list subscriptions for a given project @@ -328,7 +324,6 @@ def subscription_create(self, subscription_path, topic_path, :rtype: dict :returns: ``Subscription`` resource returned from the API. """ - conn = self._connection path = '/%s' % (subscription_path,) resource = {'topic': topic_path} @@ -338,7 +333,7 @@ def subscription_create(self, subscription_path, topic_path, if push_endpoint is not None: resource['pushConfig'] = {'pushEndpoint': push_endpoint} - return conn.api_request(method='PUT', path=path, data=resource) + return self.api_request(method='PUT', path=path, data=resource) def subscription_get(self, subscription_path): """API call: retrieve a subscription @@ -354,9 +349,8 @@ def subscription_get(self, subscription_path): :rtype: dict :returns: ``Subscription`` resource returned from the API. """ - conn = self._connection path = '/%s' % (subscription_path,) - return conn.api_request(method='GET', path=path) + return self.api_request(method='GET', path=path) def subscription_delete(self, subscription_path): """API call: delete a subscription @@ -369,9 +363,8 @@ def subscription_delete(self, subscription_path): the fully-qualified path of the subscription, in format ``projects//subscriptions/``. """ - conn = self._connection path = '/%s' % (subscription_path,) - conn.api_request(method='DELETE', path=path) + self.api_request(method='DELETE', path=path) def subscription_modify_push_config(self, subscription_path, push_endpoint): @@ -390,10 +383,9 @@ def subscription_modify_push_config(self, subscription_path, (Optional) URL to which messages will be pushed by the back-end. If not set, the application must pull messages. """ - conn = self._connection path = '/%s:modifyPushConfig' % (subscription_path,) resource = {'pushConfig': {'pushEndpoint': push_endpoint}} - conn.api_request(method='POST', path=path, data=resource) + self.api_request(method='POST', path=path, data=resource) def subscription_pull(self, subscription_path, return_immediately=False, max_messages=1): @@ -419,13 +411,12 @@ def subscription_pull(self, subscription_path, return_immediately=False, :rtype: list of dict :returns: the ``receivedMessages`` element of the response. """ - conn = self._connection path = '/%s:pull' % (subscription_path,) data = { 'returnImmediately': return_immediately, 'maxMessages': max_messages, } - response = conn.api_request(method='POST', path=path, data=data) + response = self.api_request(method='POST', path=path, data=data) messages = response.get('receivedMessages', ()) _transform_messages_base64(messages, base64.b64decode, 'message') return messages @@ -444,12 +435,11 @@ def subscription_acknowledge(self, subscription_path, ack_ids): :type ack_ids: list of string :param ack_ids: ack IDs of messages being acknowledged """ - conn = self._connection path = '/%s:acknowledge' % (subscription_path,) data = { 'ackIds': ack_ids, } - conn.api_request(method='POST', path=path, data=data) + self.api_request(method='POST', path=path, data=data) def subscription_modify_ack_deadline(self, subscription_path, ack_ids, ack_deadline): @@ -470,24 +460,23 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, :param ack_deadline: the deadline (in seconds) by which messages pulled from the back-end must be acknowledged. """ - conn = self._connection path = '/%s:modifyAckDeadline' % (subscription_path,) data = { 'ackIds': ack_ids, 'ackDeadlineSeconds': ack_deadline, } - conn.api_request(method='POST', path=path, data=data) + self.api_request(method='POST', path=path, data=data) class _IAMPolicyAPI(object): """Helper mapping IAM policy-related APIs. - :type connection: :class:`Connection` - :param connection: the connection used to make API requests. + :type client: :class:`~google.cloud.pubsub.client.Client` + :param client: the client used to make API requests. """ - def __init__(self, connection): - self._connection = connection + def __init__(self, client): + self.api_request = client._connection.api_request def get_iam_policy(self, target_path): """API call: fetch the IAM policy for the target @@ -502,9 +491,8 @@ def get_iam_policy(self, target_path): :rtype: dict :returns: the resource returned by the ``getIamPolicy`` API request. """ - conn = self._connection path = '/%s:getIamPolicy' % (target_path,) - return conn.api_request(method='GET', path=path) + return self.api_request(method='GET', path=path) def set_iam_policy(self, target_path, policy): """API call: update the IAM policy for the target @@ -522,10 +510,9 @@ def set_iam_policy(self, target_path, policy): :rtype: dict :returns: the resource returned by the ``setIamPolicy`` API request. """ - conn = self._connection wrapped = {'policy': policy} path = '/%s:setIamPolicy' % (target_path,) - return conn.api_request(method='POST', path=path, data=wrapped) + return self.api_request(method='POST', path=path, data=wrapped) def test_iam_permissions(self, target_path, permissions): """API call: test permissions @@ -543,10 +530,9 @@ def test_iam_permissions(self, target_path, permissions): :rtype: dict :returns: the resource returned by the ``getIamPolicy`` API request. """ - conn = self._connection wrapped = {'permissions': permissions} path = '/%s:testIamPermissions' % (target_path,) - resp = conn.api_request(method='POST', path=path, data=wrapped) + resp = self.api_request(method='POST', path=path, data=wrapped) return resp.get('permissions', []) diff --git a/pubsub/google/cloud/pubsub/client.py b/pubsub/google/cloud/pubsub/client.py index 6388da2cbdbf..6e2f0e06ffd6 100644 --- a/pubsub/google/cloud/pubsub/client.py +++ b/pubsub/google/cloud/pubsub/client.py @@ -116,7 +116,7 @@ def subscriber_api(self): def iam_policy_api(self): """Helper for IAM policy-related API calls.""" if self._iam_policy_api is None: - self._iam_policy_api = _IAMPolicyAPI(self._connection) + self._iam_policy_api = _IAMPolicyAPI(self) return self._iam_policy_api def list_topics(self, page_size=None, page_token=None): diff --git a/pubsub/unit_tests/test__http.py b/pubsub/unit_tests/test__http.py index 955fc06a9104..e60ebf480684 100644 --- a/pubsub/unit_tests/test__http.py +++ b/pubsub/unit_tests/test__http.py @@ -111,7 +111,7 @@ def test_ctor(self): client = _Client(connection, self.PROJECT) api = self._make_one(client) self.assertIs(api._client, client) - self.assertIs(api._connection, connection) + self.assertEqual(api.api_request, connection.api_request) def test_list_topics_no_paging(self): from google.cloud.pubsub.topic import Topic @@ -449,8 +449,8 @@ def test_ctor(self): connection = _Connection() client = _Client(connection, self.PROJECT) api = self._make_one(client) - self.assertIs(api._connection, connection) self.assertIs(api._client, client) + self.assertEqual(api.api_request, connection.api_request) def test_list_subscriptions_no_paging(self): from google.cloud.pubsub.client import Client @@ -747,8 +747,9 @@ def _get_target_class(): def test_ctor(self): connection = _Connection() - api = self._make_one(connection) - self.assertIs(api._connection, connection) + client = _Client(connection, None) + api = self._make_one(client) + self.assertEqual(api.api_request, connection.api_request) def test_get_iam_policy(self): from google.cloud.pubsub.iam import OWNER_ROLE @@ -771,7 +772,8 @@ def test_get_iam_policy(self): ], } connection = _Connection(RETURNED) - api = self._make_one(connection) + client = _Client(connection, None) + api = self._make_one(client) policy = api.get_iam_policy(self.TOPIC_PATH) @@ -802,7 +804,8 @@ def test_set_iam_policy(self): } RETURNED = POLICY.copy() connection = _Connection(RETURNED) - api = self._make_one(connection) + client = _Client(connection, None) + api = self._make_one(client) policy = api.set_iam_policy(self.TOPIC_PATH, POLICY) @@ -822,7 +825,8 @@ def test_test_iam_permissions(self): ALLOWED = ALL_ROLES[1:] RETURNED = {'permissions': ALLOWED} connection = _Connection(RETURNED) - api = self._make_one(connection) + client = _Client(connection, None) + api = self._make_one(client) allowed = api.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES) @@ -841,7 +845,8 @@ def test_test_iam_permissions_missing_key(self): ALL_ROLES = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE] RETURNED = {} connection = _Connection(RETURNED) - api = self._make_one(connection) + client = _Client(connection, None) + api = self._make_one(client) allowed = api.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES) diff --git a/pubsub/unit_tests/test_client.py b/pubsub/unit_tests/test_client.py index 7ba9a5b50d8c..d0119b22853e 100644 --- a/pubsub/unit_tests/test_client.py +++ b/pubsub/unit_tests/test_client.py @@ -46,11 +46,11 @@ def test_publisher_api_wo_gax(self): project=self.PROJECT, credentials=creds, use_gax=False) - conn = client._connection = object() + conn = client._connection = _Connection() api = client.publisher_api self.assertIsInstance(api, _PublisherAPI) - self.assertIs(api._connection, conn) + self.assertEqual(api.api_request, conn.api_request) # API instance is cached again = client.publisher_api self.assertIs(again, api) @@ -114,11 +114,11 @@ def test_subscriber_api_wo_gax(self): project=self.PROJECT, credentials=creds, use_gax=False) - conn = client._connection = object() + conn = client._connection = _Connection() api = client.subscriber_api self.assertIsInstance(api, _SubscriberAPI) - self.assertIs(api._connection, conn) + self.assertEqual(api.api_request, conn.api_request) # API instance is cached again = client.subscriber_api self.assertIs(again, api) @@ -165,10 +165,11 @@ def test_iam_policy_api(self): from google.cloud.pubsub._http import _IAMPolicyAPI creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) - conn = client._connection = object() + conn = client._connection = _Connection() + api = client.iam_policy_api self.assertIsInstance(api, _IAMPolicyAPI) - self.assertIs(api._connection, conn) + self.assertEqual(api.api_request, conn.api_request) # API instance is cached again = client.iam_policy_api self.assertIs(again, api) From a46184c5db3f05f8f311c5f957ead363290cf9a4 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 15 Dec 2016 22:27:01 -0800 Subject: [PATCH 3/3] Re-factor of GAX Pub / Sub channel helpers. --- pubsub/google/cloud/pubsub/_gax.py | 34 ++++++++++++---------------- pubsub/google/cloud/pubsub/client.py | 18 ++++++++++----- pubsub/unit_tests/test__gax.py | 4 ++-- pubsub/unit_tests/test_client.py | 34 +++++++++++++++++++++------- 4 files changed, 54 insertions(+), 36 deletions(-) diff --git a/pubsub/google/cloud/pubsub/_gax.py b/pubsub/google/cloud/pubsub/_gax.py index 2ca9c1be6c02..582cb8d0e128 100644 --- a/pubsub/google/cloud/pubsub/_gax.py +++ b/pubsub/google/cloud/pubsub/_gax.py @@ -512,27 +512,24 @@ def _received_message_pb_to_mapping(received_message_pb): } -def make_gax_publisher_api(credentials, host=None, secure=True): +def make_gax_publisher_api(credentials=None, host=None): """Create an instance of the GAX Publisher API. - If the ``secure=False`` then we create an insecure ``channel`` - pointing at the local Pub / Sub emulator. + If the ``credentials`` are omitted, then we create an insecure + ``channel`` pointing at the local Pub / Sub emulator. :type credentials: :class:`~google.auth.credentials.Credentials` - :param credentials: Credentials for getting access tokens. + :param credentials: (Optional) Credentials for getting access + tokens. :type host: str :param host: (Optional) The host for an insecure channel. Only - used if ``secure=False``. - - :type secure: bool - :param secure: (Optional) Indicates if we should create a secure - or insecure channel. Defaults to :data:`True`. + used if ``credentials`` are omitted. :rtype: :class:`.publisher_client.PublisherClient` :returns: A publisher API instance with the proper channel. """ - if not secure: + if credentials is None: channel = insecure_channel(host) else: channel = make_secure_channel( @@ -541,27 +538,24 @@ def make_gax_publisher_api(credentials, host=None, secure=True): return PublisherClient(channel=channel) -def make_gax_subscriber_api(credentials, host=None, secure=True): +def make_gax_subscriber_api(credentials=None, host=None): """Create an instance of the GAX Subscriber API. - If the ``secure=False`` then we create an insecure ``channel`` - pointing at the local Pub / Sub emulator. + If the ``credentials`` are omitted, then we create an insecure + ``channel`` pointing at the local Pub / Sub emulator. :type credentials: :class:`~google.auth.credentials.Credentials` - :param credentials: Credentials for getting access tokens. + :param credentials: (Optional) Credentials for getting access + tokens. :type host: str :param host: (Optional) The host for an insecure channel. Only - used if ``secure=False``. - - :type secure: bool - :param secure: (Optional) Indicates if we should create a secure - or insecure channel. Defaults to :data:`True`. + used if ``credentials`` are omitted. :rtype: :class:`.subscriber_client.SubscriberClient` :returns: A subscriber API instance with the proper channel. """ - if not secure: + if credentials is None: channel = insecure_channel(host) else: channel = make_secure_channel( diff --git a/pubsub/google/cloud/pubsub/client.py b/pubsub/google/cloud/pubsub/client.py index 6e2f0e06ffd6..689113631b12 100644 --- a/pubsub/google/cloud/pubsub/client.py +++ b/pubsub/google/cloud/pubsub/client.py @@ -91,9 +91,12 @@ def publisher_api(self): """Helper for publisher-related API calls.""" if self._publisher_api is None: if self._use_gax: - generated = make_gax_publisher_api( - self._credentials, host=self._connection.host, - secure=not self._connection.in_emulator) + if self._connection.in_emulator: + generated = make_gax_publisher_api( + host=self._connection.host) + else: + generated = make_gax_publisher_api( + credentials=self._credentials) self._publisher_api = GAXPublisherAPI(generated, self) else: self._publisher_api = JSONPublisherAPI(self) @@ -104,9 +107,12 @@ def subscriber_api(self): """Helper for subscriber-related API calls.""" if self._subscriber_api is None: if self._use_gax: - generated = make_gax_subscriber_api( - self._credentials, host=self._connection.host, - secure=not self._connection.in_emulator) + if self._connection.in_emulator: + generated = make_gax_subscriber_api( + host=self._connection.host) + else: + generated = make_gax_subscriber_api( + credentials=self._credentials) self._subscriber_api = GAXSubscriberAPI(generated, self) else: self._subscriber_api = JSONSubscriberAPI(self) diff --git a/pubsub/unit_tests/test__gax.py b/pubsub/unit_tests/test__gax.py index f9682f02e14b..aeb2cfc229cf 100644 --- a/pubsub/unit_tests/test__gax.py +++ b/pubsub/unit_tests/test__gax.py @@ -956,7 +956,7 @@ def mock_insecure_channel(host): PublisherClient=mock_publisher_api, insecure_channel=mock_insecure_channel) with patch: - result = self._call_fut(None, host=host, secure=False) + result = self._call_fut(host=host) self.assertIs(result, mock_result) self.assertEqual(channels, [mock_channel]) @@ -1022,7 +1022,7 @@ def mock_insecure_channel(host): SubscriberClient=mock_subscriber_api, insecure_channel=mock_insecure_channel) with patch: - result = self._call_fut(None, host=host, secure=False) + result = self._call_fut(host=host) self.assertIs(result, mock_result) self.assertEqual(channels, [mock_channel]) diff --git a/pubsub/unit_tests/test_client.py b/pubsub/unit_tests/test_client.py index d0119b22853e..34b4cd4d6b8b 100644 --- a/pubsub/unit_tests/test_client.py +++ b/pubsub/unit_tests/test_client.py @@ -68,7 +68,7 @@ def test_no_gax_ctor(self): api = client.publisher_api self.assertIsInstance(api, _PublisherAPI) - def test_publisher_api_w_gax(self): + def _publisher_api_w_gax_helper(self, emulator=False): from google.cloud.pubsub import _http wrapped = object() @@ -88,6 +88,7 @@ def __init__(self, _wrapped, client): client = self._make_one( project=self.PROJECT, credentials=creds, use_gax=True) + client._connection.in_emulator = emulator patch = mock.patch.multiple( 'google.cloud.pubsub.client', @@ -102,9 +103,17 @@ def __init__(self, _wrapped, client): # API instance is cached again = client.publisher_api self.assertIs(again, api) - args = (creds,) - kwargs = {'host': _http.Connection.API_BASE_URL, 'secure': True} - self.assertEqual(_called_with, [(args, kwargs)]) + if emulator: + kwargs = {'host': _http.Connection.API_BASE_URL} + else: + kwargs = {'credentials': creds} + self.assertEqual(_called_with, [((), kwargs)]) + + def test_publisher_api_w_gax(self): + self._publisher_api_w_gax_helper() + + def test_publisher_api_w_gax_and_emulator(self): + self._publisher_api_w_gax_helper(emulator=True) def test_subscriber_api_wo_gax(self): from google.cloud.pubsub._http import _SubscriberAPI @@ -123,7 +132,7 @@ def test_subscriber_api_wo_gax(self): again = client.subscriber_api self.assertIs(again, api) - def test_subscriber_api_w_gax(self): + def _subscriber_api_w_gax_helper(self, emulator=False): from google.cloud.pubsub import _http wrapped = object() @@ -143,6 +152,7 @@ def __init__(self, _wrapped, client): client = self._make_one( project=self.PROJECT, credentials=creds, use_gax=True) + client._connection.in_emulator = emulator patch = mock.patch.multiple( 'google.cloud.pubsub.client', @@ -157,9 +167,17 @@ def __init__(self, _wrapped, client): # API instance is cached again = client.subscriber_api self.assertIs(again, api) - args = (creds,) - kwargs = {'host': _http.Connection.API_BASE_URL, 'secure': True} - self.assertEqual(_called_with, [(args, kwargs)]) + if emulator: + kwargs = {'host': _http.Connection.API_BASE_URL} + else: + kwargs = {'credentials': creds} + self.assertEqual(_called_with, [((), kwargs)]) + + def test_subscriber_api_w_gax(self): + self._subscriber_api_w_gax_helper() + + def test_subscriber_api_w_gax_and_emulator(self): + self._subscriber_api_w_gax_helper(emulator=True) def test_iam_policy_api(self): from google.cloud.pubsub._http import _IAMPolicyAPI