Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions gcloud/pubsub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,33 @@

"""Client for interacting with the Google Cloud Pub/Sub API."""

import os

from gcloud.client import JSONClient
from gcloud.pubsub.connection import Connection
from gcloud.pubsub.connection import _PublisherAPI
from gcloud.pubsub.connection import _SubscriberAPI
from gcloud.pubsub.connection import _PublisherAPI as JSONPublisherAPI
from gcloud.pubsub.connection import _SubscriberAPI as JSONSubscriberAPI
from gcloud.pubsub.connection import _IAMPolicyAPI
from gcloud.pubsub.subscription import Subscription
from gcloud.pubsub.topic import Topic

try:
from google.pubsub.v1.publisher_api import (
PublisherApi as GeneratedPublisherAPI)
from google.pubsub.v1.subscriber_api import (
SubscriberApi as GeneratedSubscriberAPI)
from gcloud.pubsub._gax import _PublisherAPI as GAXPublisherAPI
from gcloud.pubsub._gax import _SubscriberAPI as GAXSubscriberAPI
except ImportError: # pragma: NO COVER
_HAVE_GAX = False
GeneratedPublisherAPI = GAXPublisherAPI = None
GeneratedSubscriberAPI = GAXSubscriberAPI = None
else:
_HAVE_GAX = True


_USE_GAX = _HAVE_GAX and os.environ.get('GCLOUD_DISABLE_GAX') is None


class Client(JSONClient):
"""Client to bundle configuration needed for API requests.
Expand Down Expand Up @@ -52,14 +70,22 @@ class Client(JSONClient):
def publisher_api(self):
"""Helper for publisher-related API calls."""
if self._publisher_api is None:
self._publisher_api = _PublisherAPI(self.connection)
if _USE_GAX:
generated = GeneratedPublisherAPI()
self._publisher_api = GAXPublisherAPI(generated)
else:
self._publisher_api = JSONPublisherAPI(self.connection)
return self._publisher_api

@property
def subscriber_api(self):
"""Helper for subscriber-related API calls."""
if self._subscriber_api is None:
self._subscriber_api = _SubscriberAPI(self.connection)
if _USE_GAX:
generated = GeneratedSubscriberAPI()
self._subscriber_api = GAXSubscriberAPI(generated)
else:
self._subscriber_api = JSONSubscriberAPI(self.connection)
return self._subscriber_api

@property
Expand Down
80 changes: 76 additions & 4 deletions gcloud/pubsub/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,102 @@ def _getTargetClass(self):
def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)

def test_publisher_api(self):
def test_publisher_api_wo_gax(self):
from gcloud.pubsub.connection import _PublisherAPI
from gcloud.pubsub import client as MUT
from gcloud._testing import _Monkey
creds = _Credentials()
client = self._makeOne(project=self.PROJECT, credentials=creds)
conn = client.connection = object()
api = client.publisher_api

with _Monkey(MUT, _USE_GAX=False):
api = client.publisher_api

self.assertIsInstance(api, _PublisherAPI)
self.assertTrue(api._connection is conn)
# API instance is cached
again = client.publisher_api
self.assertTrue(again is api)

def test_subscriber_api(self):
def test_publisher_api_w_gax(self):
from gcloud.pubsub import client as MUT
from gcloud._testing import _Monkey

wrapped = object()
_called_with = []

def _generated_api(*args, **kw):
_called_with.append((args, kw))
return wrapped

class _GaxPublisherAPI(object):

def __init__(self, _wrapped):
self._wrapped = _wrapped

creds = _Credentials()
client = self._makeOne(project=self.PROJECT, credentials=creds)

with _Monkey(MUT,
_USE_GAX=True,
GeneratedPublisherAPI=_generated_api,
GAXPublisherAPI=_GaxPublisherAPI):
api = client.publisher_api

self.assertIsInstance(api, _GaxPublisherAPI)
self.assertTrue(api._wrapped is wrapped)
# API instance is cached
again = client.publisher_api
self.assertTrue(again is api)

def test_subscriber_api_wo_gax(self):
from gcloud.pubsub.connection import _SubscriberAPI
from gcloud.pubsub import client as MUT
from gcloud._testing import _Monkey
creds = _Credentials()
client = self._makeOne(project=self.PROJECT, credentials=creds)
conn = client.connection = object()
api = client.subscriber_api

with _Monkey(MUT, _USE_GAX=False):
api = client.subscriber_api

self.assertIsInstance(api, _SubscriberAPI)
self.assertTrue(api._connection is conn)
# API instance is cached
again = client.subscriber_api
self.assertTrue(again is api)

def test_subscriber_api_w_gax(self):
from gcloud.pubsub import client as MUT
from gcloud._testing import _Monkey

wrapped = object()
_called_with = []

def _generated_api(*args, **kw):
_called_with.append((args, kw))
return wrapped

class _GaxSubscriberAPI(object):

def __init__(self, _wrapped):
self._wrapped = _wrapped

creds = _Credentials()
client = self._makeOne(project=self.PROJECT, credentials=creds)

with _Monkey(MUT,
_USE_GAX=True,
GeneratedSubscriberAPI=_generated_api,
GAXSubscriberAPI=_GaxSubscriberAPI):
api = client.subscriber_api

self.assertIsInstance(api, _GaxSubscriberAPI)
self.assertTrue(api._wrapped is wrapped)
# API instance is cached
again = client.subscriber_api
self.assertTrue(again is api)

def test_iam_policy_api(self):
from gcloud.pubsub.connection import _IAMPolicyAPI
creds = _Credentials()
Expand Down