Skip to content
Merged
20 changes: 20 additions & 0 deletions gcloud/pubsub/_implicit_environ.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,24 @@ def get_default_connection():
return _DEFAULTS.connection


def _require_connection(connection=None):
"""Infer a connection from the environment, if not passed explicitly.

:type connection: :class:`gcloud.pubsub.connection.Connection`
:param connection: Optional.

:rtype: :class:`gcloud.pubsub.connection.Connection`
:returns: A connection based on the current environment.
:raises: :class:`EnvironmentError` if ``connection`` is ``None``, and
cannot be inferred from the environment.
"""
if connection is None:
connection = get_default_connection()

if connection is None:
raise EnvironmentError('Connection could not be inferred.')

return connection


_DEFAULTS = _DefaultsContainer()
15 changes: 5 additions & 10 deletions gcloud/pubsub/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Define API functions (not bound to classes)."""

from gcloud._helpers import get_default_project
from gcloud.pubsub._implicit_environ import get_default_connection
from gcloud.pubsub._implicit_environ import _require_connection
from gcloud.pubsub.subscription import Subscription
from gcloud.pubsub.topic import Topic

Expand Down Expand Up @@ -53,8 +53,7 @@ def list_topics(page_size=None, page_token=None,
if project is None:
project = get_default_project()

if connection is None:
connection = get_default_connection()
connection = _require_connection(connection)

params = {}

Expand All @@ -66,8 +65,7 @@ def list_topics(page_size=None, page_token=None,

path = '/projects/%s/topics' % project
resp = connection.api_request(method='GET', path=path, query_params=params)
topics = [Topic.from_api_repr(resource, connection)
for resource in resp['topics']]
topics = [Topic.from_api_repr(resource) for resource in resp['topics']]
return topics, resp.get('nextPageToken')


Expand Down Expand Up @@ -110,8 +108,7 @@ def list_subscriptions(page_size=None, page_token=None, topic_name=None,
if project is None:
project = get_default_project()

if connection is None:
connection = get_default_connection()
connection = _require_connection(connection)

params = {}

Expand All @@ -128,8 +125,6 @@ def list_subscriptions(page_size=None, page_token=None, topic_name=None,

resp = connection.api_request(method='GET', path=path, query_params=params)
topics = {}
subscriptions = [Subscription.from_api_repr(resource,
connection=connection,
topics=topics)
subscriptions = [Subscription.from_api_repr(resource, topics=topics)
for resource in resp['subscriptions']]
return subscriptions, resp.get('nextPageToken')
107 changes: 67 additions & 40 deletions gcloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from gcloud.exceptions import NotFound
from gcloud.pubsub.message import Message
from gcloud.pubsub.topic import Topic
from gcloud.pubsub._implicit_environ import _require_connection


class Subscription(object):
Expand Down Expand Up @@ -46,17 +47,12 @@ def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
self.push_endpoint = push_endpoint

@classmethod
def from_api_repr(cls, resource, connection=None, topics=None):
def from_api_repr(cls, resource, topics=None):
"""Factory: construct a topic given its API representation

:type resource: dict
:param resource: topic resource representation returned from the API

:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the default inferred from the
environment.

:type topics: dict or None
:param topics: A mapping of topic names -> topics. If not passed,
the subscription will have a newly-created topic.
Expand All @@ -68,8 +64,7 @@ def from_api_repr(cls, resource, connection=None, topics=None):
t_name = resource['topic']
topic = topics.get(t_name)
if topic is None:
topic = topics[t_name] = Topic.from_api_repr({'name': t_name},
connection)
topic = topics[t_name] = Topic.from_api_repr({'name': t_name})
_, _, _, name = resource['name'].split('/')
ack_deadline = resource.get('ackDeadlineSeconds')
push_config = resource.get('pushConfig', {})
Expand All @@ -82,11 +77,15 @@ def path(self):
project = self.topic.project
return '/projects/%s/subscriptions/%s' % (project, self.name)

def create(self):
def create(self, connection=None):
"""API call: create the subscription via a PUT request

See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create

:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
data = {'topic': self.topic.full_name}

Expand All @@ -96,36 +95,44 @@ def create(self):
if self.push_endpoint is not None:
data['pushConfig'] = {'pushEndpoint': self.push_endpoint}

conn = self.topic.connection
conn.api_request(method='PUT', path=self.path, data=data)
connection = _require_connection(connection)
connection.api_request(method='PUT', path=self.path, data=data)

def exists(self):
def exists(self, connection=None):
"""API call: test existence of the subscription via a GET request

See
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get

:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
conn = self.topic.connection
connection = _require_connection(connection)
try:
conn.api_request(method='GET', path=self.path)
connection.api_request(method='GET', path=self.path)
except NotFound:
return False
else:
return True

def reload(self):
def reload(self, connection=None):
"""API call: sync local subscription configuration via a GET request

See
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get

:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
conn = self.topic.connection
data = conn.api_request(method='GET', path=self.path)
connection = _require_connection(connection)
data = connection.api_request(method='GET', path=self.path)
self.ack_deadline = data.get('ackDeadline')
push_config = data.get('pushConfig', {})
self.push_endpoint = push_config.get('pushEndpoint')

def modify_push_configuration(self, push_endpoint):
def modify_push_configuration(self, push_endpoint, connection=None):
"""API call: update the push endpoint for the subscription.

See:
Expand All @@ -135,18 +142,22 @@ def modify_push_configuration(self, push_endpoint):
:param push_endpoint: URL to which messages will be pushed by the
back-end. If None, the application must pull
messages.

:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
connection = _require_connection(connection)
data = {}
config = data['pushConfig'] = {}
if push_endpoint is not None:
config['pushEndpoint'] = push_endpoint
conn = self.topic.connection
conn.api_request(method='POST',
path='%s:modifyPushConfig' % self.path,
data=data)
connection.api_request(method='POST',
path='%s:modifyPushConfig' % self.path,
data=data)
self.push_endpoint = push_endpoint

def pull(self, return_immediately=False, max_messages=1):
def pull(self, return_immediately=False, max_messages=1, connection=None):
"""API call: retrieve messages for the subscription.

See:
Expand All @@ -161,36 +172,44 @@ def pull(self, return_immediately=False, max_messages=1):
:type max_messages: int
:param max_messages: the maximum number of messages to return.

:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.

:rtype: list of (ack_id, message) tuples
:returns: sequence of tuples: ``ack_id`` is the ID to be used in a
subsequent call to :meth:`acknowledge`, and ``message``
is an instance of :class:`gcloud.pubsub.message.Message`.
"""
connection = _require_connection(connection)
data = {'returnImmediately': return_immediately,
'maxMessages': max_messages}
conn = self.topic.connection
response = conn.api_request(method='POST',
path='%s:pull' % self.path,
data=data)
response = connection.api_request(method='POST',
path='%s:pull' % self.path,
data=data)
return [(info['ackId'], Message.from_api_repr(info['message']))
for info in response['receivedMessages']]

def acknowledge(self, ack_ids):
def acknowledge(self, ack_ids, connection=None):
"""API call: acknowledge retrieved messages for the subscription.

See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge

:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged

:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
connection = _require_connection(connection)
data = {'ackIds': ack_ids}
conn = self.topic.connection
conn.api_request(method='POST',
path='%s:acknowledge' % self.path,
data=data)
connection.api_request(method='POST',
path='%s:acknowledge' % self.path,
data=data)

def modify_ack_deadline(self, ack_id, ack_deadline):
def modify_ack_deadline(self, ack_id, ack_deadline, connection=None):
"""API call: update acknowledgement deadline for a retrieved message.

See:
Expand All @@ -201,18 +220,26 @@ def modify_ack_deadline(self, ack_id, ack_deadline):

:type ack_deadline: int
:param ack_deadline: new deadline for the message, in seconds

:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
connection = _require_connection(connection)
data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline}
conn = self.topic.connection
conn.api_request(method='POST',
path='%s:modifyAckDeadline' % self.path,
data=data)
connection.api_request(method='POST',
path='%s:modifyAckDeadline' % self.path,
data=data)

def delete(self):
def delete(self, connection=None):
"""API call: delete the subscription via a DELETE request.

See:
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete

:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the topic's connection.
"""
conn = self.topic.connection
conn.api_request(method='DELETE', path=self.path)
connection = _require_connection(connection)
connection.api_request(method='DELETE', path=self.path)
32 changes: 32 additions & 0 deletions gcloud/pubsub/test__implicit_environ.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,35 @@ def _callFUT(self):

def test_wo_override(self):
self.assertTrue(self._callFUT() is None)


class Test__require_connection(unittest2.TestCase):

def _callFUT(self, connection=None):
from gcloud.pubsub._implicit_environ import _require_connection
return _require_connection(connection=connection)

def _monkey(self, connection):
from gcloud.pubsub._testing import _monkey_defaults
return _monkey_defaults(connection=connection)

def test_implicit_unset(self):
with self._monkey(None):
with self.assertRaises(EnvironmentError):
self._callFUT()

def test_implicit_unset_passed_explicitly(self):
CONNECTION = object()
with self._monkey(None):
self.assertTrue(self._callFUT(CONNECTION) is CONNECTION)

def test_implicit_set(self):
IMPLICIT_CONNECTION = object()
with self._monkey(IMPLICIT_CONNECTION):
self.assertTrue(self._callFUT() is IMPLICIT_CONNECTION)

def test_implicit_set_passed_explicitly(self):
IMPLICIT_CONNECTION = object()
CONNECTION = object()
with self._monkey(IMPLICIT_CONNECTION):
self.assertTrue(self._callFUT(CONNECTION) is CONNECTION)
Loading