Skip to content

Commit c8a0d6e

Browse files
committed
Removing many references to connection in Pub / Sub _http.
Still using a method of a connection object, but this way it can be more easily swapped out for a function defined in that module doing the same task.
1 parent e3b7059 commit c8a0d6e

File tree

4 files changed

+43
-51
lines changed

4 files changed

+43
-51
lines changed

pubsub/google/cloud/pubsub/_http.py

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Create / interact with Google Cloud Pub/Sub connections."""
15+
"""Interact with Google Cloud Pub/Sub via JSON-over-HTTP."""
1616

1717
import base64
1818
import copy
@@ -109,7 +109,7 @@ class _PublisherAPI(object):
109109

110110
def __init__(self, client):
111111
self._client = client
112-
self._connection = client._connection
112+
self.api_request = client._connection.api_request
113113

114114
def list_topics(self, project, page_size=None, page_token=None):
115115
"""API call: list topics for a given project
@@ -131,7 +131,7 @@ def list_topics(self, project, page_size=None, page_token=None):
131131
132132
:rtype: :class:`~google.cloud.iterator.Iterator`
133133
:returns: Iterator of :class:`~google.cloud.pubsub.topic.Topic`
134-
accessible to the current connection.
134+
accessible to the current client.
135135
"""
136136
extra_params = {}
137137
if page_size is not None:
@@ -156,8 +156,7 @@ def topic_create(self, topic_path):
156156
:rtype: dict
157157
:returns: ``Topic`` resource returned from the API.
158158
"""
159-
conn = self._connection
160-
return conn.api_request(method='PUT', path='/%s' % (topic_path,))
159+
return self.api_request(method='PUT', path='/%s' % (topic_path,))
161160

162161
def topic_get(self, topic_path):
163162
"""API call: retrieve a topic
@@ -172,8 +171,7 @@ def topic_get(self, topic_path):
172171
:rtype: dict
173172
:returns: ``Topic`` resource returned from the API.
174173
"""
175-
conn = self._connection
176-
return conn.api_request(method='GET', path='/%s' % (topic_path,))
174+
return self.api_request(method='GET', path='/%s' % (topic_path,))
177175

178176
def topic_delete(self, topic_path):
179177
"""API call: delete a topic
@@ -185,8 +183,7 @@ def topic_delete(self, topic_path):
185183
:param topic_path: the fully-qualified path of the topic, in format
186184
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
187185
"""
188-
conn = self._connection
189-
conn.api_request(method='DELETE', path='/%s' % (topic_path,))
186+
self.api_request(method='DELETE', path='/%s' % (topic_path,))
190187

191188
def topic_publish(self, topic_path, messages):
192189
"""API call: publish one or more messages to a topic
@@ -206,9 +203,8 @@ def topic_publish(self, topic_path, messages):
206203
"""
207204
messages_to_send = copy.deepcopy(messages)
208205
_transform_messages_base64(messages_to_send, _base64_unicode)
209-
conn = self._connection
210206
data = {'messages': messages_to_send}
211-
response = conn.api_request(
207+
response = self.api_request(
212208
method='POST', path='/%s:publish' % (topic_path,), data=data)
213209
return response['messageIds']
214210

@@ -257,7 +253,7 @@ class _SubscriberAPI(object):
257253

258254
def __init__(self, client):
259255
self._client = client
260-
self._connection = client._connection
256+
self.api_request = client._connection.api_request
261257

262258
def list_subscriptions(self, project, page_size=None, page_token=None):
263259
"""API call: list subscriptions for a given project
@@ -328,7 +324,6 @@ def subscription_create(self, subscription_path, topic_path,
328324
:rtype: dict
329325
:returns: ``Subscription`` resource returned from the API.
330326
"""
331-
conn = self._connection
332327
path = '/%s' % (subscription_path,)
333328
resource = {'topic': topic_path}
334329

@@ -338,7 +333,7 @@ def subscription_create(self, subscription_path, topic_path,
338333
if push_endpoint is not None:
339334
resource['pushConfig'] = {'pushEndpoint': push_endpoint}
340335

341-
return conn.api_request(method='PUT', path=path, data=resource)
336+
return self.api_request(method='PUT', path=path, data=resource)
342337

343338
def subscription_get(self, subscription_path):
344339
"""API call: retrieve a subscription
@@ -354,9 +349,8 @@ def subscription_get(self, subscription_path):
354349
:rtype: dict
355350
:returns: ``Subscription`` resource returned from the API.
356351
"""
357-
conn = self._connection
358352
path = '/%s' % (subscription_path,)
359-
return conn.api_request(method='GET', path=path)
353+
return self.api_request(method='GET', path=path)
360354

361355
def subscription_delete(self, subscription_path):
362356
"""API call: delete a subscription
@@ -369,9 +363,8 @@ def subscription_delete(self, subscription_path):
369363
the fully-qualified path of the subscription, in format
370364
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
371365
"""
372-
conn = self._connection
373366
path = '/%s' % (subscription_path,)
374-
conn.api_request(method='DELETE', path=path)
367+
self.api_request(method='DELETE', path=path)
375368

376369
def subscription_modify_push_config(self, subscription_path,
377370
push_endpoint):
@@ -390,10 +383,9 @@ def subscription_modify_push_config(self, subscription_path,
390383
(Optional) URL to which messages will be pushed by the back-end.
391384
If not set, the application must pull messages.
392385
"""
393-
conn = self._connection
394386
path = '/%s:modifyPushConfig' % (subscription_path,)
395387
resource = {'pushConfig': {'pushEndpoint': push_endpoint}}
396-
conn.api_request(method='POST', path=path, data=resource)
388+
self.api_request(method='POST', path=path, data=resource)
397389

398390
def subscription_pull(self, subscription_path, return_immediately=False,
399391
max_messages=1):
@@ -419,13 +411,12 @@ def subscription_pull(self, subscription_path, return_immediately=False,
419411
:rtype: list of dict
420412
:returns: the ``receivedMessages`` element of the response.
421413
"""
422-
conn = self._connection
423414
path = '/%s:pull' % (subscription_path,)
424415
data = {
425416
'returnImmediately': return_immediately,
426417
'maxMessages': max_messages,
427418
}
428-
response = conn.api_request(method='POST', path=path, data=data)
419+
response = self.api_request(method='POST', path=path, data=data)
429420
messages = response.get('receivedMessages', ())
430421
_transform_messages_base64(messages, base64.b64decode, 'message')
431422
return messages
@@ -444,12 +435,11 @@ def subscription_acknowledge(self, subscription_path, ack_ids):
444435
:type ack_ids: list of string
445436
:param ack_ids: ack IDs of messages being acknowledged
446437
"""
447-
conn = self._connection
448438
path = '/%s:acknowledge' % (subscription_path,)
449439
data = {
450440
'ackIds': ack_ids,
451441
}
452-
conn.api_request(method='POST', path=path, data=data)
442+
self.api_request(method='POST', path=path, data=data)
453443

454444
def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
455445
ack_deadline):
@@ -470,24 +460,23 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
470460
:param ack_deadline: the deadline (in seconds) by which messages pulled
471461
from the back-end must be acknowledged.
472462
"""
473-
conn = self._connection
474463
path = '/%s:modifyAckDeadline' % (subscription_path,)
475464
data = {
476465
'ackIds': ack_ids,
477466
'ackDeadlineSeconds': ack_deadline,
478467
}
479-
conn.api_request(method='POST', path=path, data=data)
468+
self.api_request(method='POST', path=path, data=data)
480469

481470

482471
class _IAMPolicyAPI(object):
483472
"""Helper mapping IAM policy-related APIs.
484473
485-
:type connection: :class:`Connection`
486-
:param connection: the connection used to make API requests.
474+
:type client: :class:`~google.cloud.pubsub.client.Client`
475+
:param client: the client used to make API requests.
487476
"""
488477

489-
def __init__(self, connection):
490-
self._connection = connection
478+
def __init__(self, client):
479+
self.api_request = client._connection.api_request
491480

492481
def get_iam_policy(self, target_path):
493482
"""API call: fetch the IAM policy for the target
@@ -502,9 +491,8 @@ def get_iam_policy(self, target_path):
502491
:rtype: dict
503492
:returns: the resource returned by the ``getIamPolicy`` API request.
504493
"""
505-
conn = self._connection
506494
path = '/%s:getIamPolicy' % (target_path,)
507-
return conn.api_request(method='GET', path=path)
495+
return self.api_request(method='GET', path=path)
508496

509497
def set_iam_policy(self, target_path, policy):
510498
"""API call: update the IAM policy for the target
@@ -522,10 +510,9 @@ def set_iam_policy(self, target_path, policy):
522510
:rtype: dict
523511
:returns: the resource returned by the ``setIamPolicy`` API request.
524512
"""
525-
conn = self._connection
526513
wrapped = {'policy': policy}
527514
path = '/%s:setIamPolicy' % (target_path,)
528-
return conn.api_request(method='POST', path=path, data=wrapped)
515+
return self.api_request(method='POST', path=path, data=wrapped)
529516

530517
def test_iam_permissions(self, target_path, permissions):
531518
"""API call: test permissions
@@ -543,10 +530,9 @@ def test_iam_permissions(self, target_path, permissions):
543530
:rtype: dict
544531
:returns: the resource returned by the ``getIamPolicy`` API request.
545532
"""
546-
conn = self._connection
547533
wrapped = {'permissions': permissions}
548534
path = '/%s:testIamPermissions' % (target_path,)
549-
resp = conn.api_request(method='POST', path=path, data=wrapped)
535+
resp = self.api_request(method='POST', path=path, data=wrapped)
550536
return resp.get('permissions', [])
551537

552538

pubsub/google/cloud/pubsub/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def subscriber_api(self):
116116
def iam_policy_api(self):
117117
"""Helper for IAM policy-related API calls."""
118118
if self._iam_policy_api is None:
119-
self._iam_policy_api = _IAMPolicyAPI(self._connection)
119+
self._iam_policy_api = _IAMPolicyAPI(self)
120120
return self._iam_policy_api
121121

122122
def list_topics(self, page_size=None, page_token=None):

pubsub/unit_tests/test__http.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def test_ctor(self):
111111
client = _Client(connection, self.PROJECT)
112112
api = self._make_one(client)
113113
self.assertIs(api._client, client)
114-
self.assertIs(api._connection, connection)
114+
self.assertEqual(api.api_request, connection.api_request)
115115

116116
def test_list_topics_no_paging(self):
117117
from google.cloud.pubsub.topic import Topic
@@ -449,8 +449,8 @@ def test_ctor(self):
449449
connection = _Connection()
450450
client = _Client(connection, self.PROJECT)
451451
api = self._make_one(client)
452-
self.assertIs(api._connection, connection)
453452
self.assertIs(api._client, client)
453+
self.assertEqual(api.api_request, connection.api_request)
454454

455455
def test_list_subscriptions_no_paging(self):
456456
from google.cloud.pubsub.client import Client
@@ -747,8 +747,9 @@ def _get_target_class():
747747

748748
def test_ctor(self):
749749
connection = _Connection()
750-
api = self._make_one(connection)
751-
self.assertIs(api._connection, connection)
750+
client = _Client(connection, None)
751+
api = self._make_one(client)
752+
self.assertEqual(api.api_request, connection.api_request)
752753

753754
def test_get_iam_policy(self):
754755
from google.cloud.pubsub.iam import OWNER_ROLE
@@ -771,7 +772,8 @@ def test_get_iam_policy(self):
771772
],
772773
}
773774
connection = _Connection(RETURNED)
774-
api = self._make_one(connection)
775+
client = _Client(connection, None)
776+
api = self._make_one(client)
775777

776778
policy = api.get_iam_policy(self.TOPIC_PATH)
777779

@@ -802,7 +804,8 @@ def test_set_iam_policy(self):
802804
}
803805
RETURNED = POLICY.copy()
804806
connection = _Connection(RETURNED)
805-
api = self._make_one(connection)
807+
client = _Client(connection, None)
808+
api = self._make_one(client)
806809

807810
policy = api.set_iam_policy(self.TOPIC_PATH, POLICY)
808811

@@ -822,7 +825,8 @@ def test_test_iam_permissions(self):
822825
ALLOWED = ALL_ROLES[1:]
823826
RETURNED = {'permissions': ALLOWED}
824827
connection = _Connection(RETURNED)
825-
api = self._make_one(connection)
828+
client = _Client(connection, None)
829+
api = self._make_one(client)
826830

827831
allowed = api.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES)
828832

@@ -841,7 +845,8 @@ def test_test_iam_permissions_missing_key(self):
841845
ALL_ROLES = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE]
842846
RETURNED = {}
843847
connection = _Connection(RETURNED)
844-
api = self._make_one(connection)
848+
client = _Client(connection, None)
849+
api = self._make_one(client)
845850

846851
allowed = api.test_iam_permissions(self.TOPIC_PATH, ALL_ROLES)
847852

pubsub/unit_tests/test_client.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ def test_publisher_api_wo_gax(self):
4646
project=self.PROJECT, credentials=creds,
4747
use_gax=False)
4848

49-
conn = client._connection = object()
49+
conn = client._connection = _Connection()
5050
api = client.publisher_api
5151

5252
self.assertIsInstance(api, _PublisherAPI)
53-
self.assertIs(api._connection, conn)
53+
self.assertEqual(api.api_request, conn.api_request)
5454
# API instance is cached
5555
again = client.publisher_api
5656
self.assertIs(again, api)
@@ -114,11 +114,11 @@ def test_subscriber_api_wo_gax(self):
114114
project=self.PROJECT, credentials=creds,
115115
use_gax=False)
116116

117-
conn = client._connection = object()
117+
conn = client._connection = _Connection()
118118
api = client.subscriber_api
119119

120120
self.assertIsInstance(api, _SubscriberAPI)
121-
self.assertIs(api._connection, conn)
121+
self.assertEqual(api.api_request, conn.api_request)
122122
# API instance is cached
123123
again = client.subscriber_api
124124
self.assertIs(again, api)
@@ -165,10 +165,11 @@ def test_iam_policy_api(self):
165165
from google.cloud.pubsub._http import _IAMPolicyAPI
166166
creds = _make_credentials()
167167
client = self._make_one(project=self.PROJECT, credentials=creds)
168-
conn = client._connection = object()
168+
conn = client._connection = _Connection()
169+
169170
api = client.iam_policy_api
170171
self.assertIsInstance(api, _IAMPolicyAPI)
171-
self.assertIs(api._connection, conn)
172+
self.assertEqual(api.api_request, conn.api_request)
172173
# API instance is cached
173174
again = client.iam_policy_api
174175
self.assertIs(again, api)

0 commit comments

Comments
 (0)