Skip to content

Commit c211446

Browse files
author
Jon Wayne Parrott
authored
Add base future package to google.cloud (#3616)
1 parent cb40d1d commit c211446

File tree

9 files changed

+472
-0
lines changed

9 files changed

+472
-0
lines changed

packages/google-cloud-core/.coveragerc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@ exclude_lines =
1313
pragma: NO COVER
1414
# Ignore debug-only repr
1515
def __repr__
16+
# Ignore abstract methods
17+
raise NotImplementedError
18+
raise NotImplementedError()

packages/google-cloud-core/google/cloud/_helpers.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,29 @@ def _bytes_to_unicode(value):
379379
raise ValueError('%r could not be converted to unicode' % (value,))
380380

381381

382+
def _from_any_pb(pb_type, any_pb):
383+
"""Converts an Any protobuf to the specified message type
384+
385+
Args:
386+
pb_type (type): the type of the message that any_pb stores an instance
387+
of.
388+
any_pb (google.protobuf.any_pb2.Any): the object to be converted.
389+
390+
Returns:
391+
pb_type: An instance of the pb_type message.
392+
393+
Raises:
394+
TypeError: if the message could not be converted.
395+
"""
396+
msg = pb_type()
397+
if not any_pb.Unpack(msg):
398+
raise TypeError(
399+
'Could not convert {} to {}'.format(
400+
any_pb.__class__.__name__, pb_type.__name__))
401+
402+
return msg
403+
404+
382405
def _pb_timestamp_to_datetime(timestamp_pb):
383406
"""Convert a Timestamp protobuf to a datetime object.
384407
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Copyright 2017, Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Futures for dealing with asynchronous operations."""
16+
17+
from google.cloud.future.base import Future
18+
19+
__all__ = [
20+
'Future',
21+
]
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Copyright 2017, Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Private helpers for futures."""
16+
17+
import logging
18+
import threading
19+
20+
21+
_LOGGER = logging.getLogger(__name__)
22+
23+
24+
def start_daemon_thread(*args, **kwargs):
25+
"""Starts a thread and marks it as a daemon thread."""
26+
thread = threading.Thread(*args, **kwargs)
27+
thread.daemon = True
28+
thread.start()
29+
return thread
30+
31+
32+
def safe_invoke_callback(callback, *args, **kwargs):
33+
"""Invoke a callback, swallowing and logging any exceptions."""
34+
# pylint: disable=bare-except
35+
# We intentionally want to swallow all exceptions.
36+
try:
37+
return callback(*args, **kwargs)
38+
except:
39+
_LOGGER.exception('Error while executing Future callback.')
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
# Copyright 2017, Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Abstract and helper bases for Future implementations."""
16+
17+
import abc
18+
19+
import six
20+
21+
from google.cloud.future import _helpers
22+
23+
24+
@six.add_metaclass(abc.ABCMeta)
25+
class Future(object):
26+
# pylint: disable=missing-docstring
27+
# We inherit the interfaces here from concurrent.futures.
28+
29+
"""Future interface.
30+
31+
This interface is based on :class:`concurrent.futures.Future`.
32+
"""
33+
34+
@abc.abstractmethod
35+
def cancel(self):
36+
raise NotImplementedError()
37+
38+
@abc.abstractmethod
39+
def cancelled(self):
40+
raise NotImplementedError()
41+
42+
@abc.abstractmethod
43+
def running(self):
44+
raise NotImplementedError()
45+
46+
@abc.abstractmethod
47+
def done(self):
48+
raise NotImplementedError()
49+
50+
@abc.abstractmethod
51+
def result(self, timeout=None):
52+
raise NotImplementedError()
53+
54+
@abc.abstractmethod
55+
def exception(self, timeout=None):
56+
raise NotImplementedError()
57+
58+
@abc.abstractmethod
59+
def add_done_callback(self, fn):
60+
# pylint: disable=invalid-name
61+
raise NotImplementedError()
62+
63+
@abc.abstractmethod
64+
def set_result(self, result):
65+
raise NotImplementedError()
66+
67+
@abc.abstractmethod
68+
def set_exception(self, exception):
69+
raise NotImplementedError()
70+
71+
72+
class PollingFuture(Future):
73+
"""A Future that needs to poll some service to check its status.
74+
75+
The private :meth:`_blocking_poll` method should be implemented by
76+
subclasses.
77+
78+
.. note: Privacy here is intended to prevent the final class from
79+
overexposing, not to prevent subclasses from accessing methods.
80+
"""
81+
def __init__(self):
82+
super(PollingFuture, self).__init__()
83+
self._result = None
84+
self._exception = None
85+
self._result_set = False
86+
"""bool: Set to True when the result has been set via set_result or
87+
set_exception."""
88+
self._polling_thread = None
89+
self._done_callbacks = []
90+
91+
@abc.abstractmethod
92+
def _blocking_poll(self, timeout=None):
93+
"""Poll and wait for the Future to be resolved.
94+
95+
Args:
96+
timeout (int): How long to wait for the operation to complete.
97+
If None, wait indefinitely.
98+
"""
99+
# pylint: disable=missing-raises
100+
raise NotImplementedError()
101+
102+
def result(self, timeout=None):
103+
"""Get the result of the operation, blocking if necessary.
104+
105+
Args:
106+
timeout (int): How long to wait for the operation to complete.
107+
If None, wait indefinitely.
108+
109+
Returns:
110+
google.protobuf.Message: The Operation's result.
111+
112+
Raises:
113+
google.gax.GaxError: If the operation errors or if the timeout is
114+
reached before the operation completes.
115+
"""
116+
self._blocking_poll()
117+
118+
if self._exception is not None:
119+
# pylint: disable=raising-bad-type
120+
# Pylint doesn't recognize that this is valid in this case.
121+
raise self._exception
122+
123+
return self._result
124+
125+
def exception(self, timeout=None):
126+
"""Get the exception from the operation, blocking if necessary.
127+
128+
Args:
129+
timeout (int): How long to wait for the operation to complete.
130+
If None, wait indefinitely.
131+
132+
Returns:
133+
Optional[google.gax.GaxError]: The operation's error.
134+
"""
135+
self._blocking_poll()
136+
return self._exception
137+
138+
def add_done_callback(self, fn):
139+
"""Add a callback to be executed when the operation is complete.
140+
141+
If the operation is not already complete, this will start a helper
142+
thread to poll for the status of the operation in the background.
143+
144+
Args:
145+
fn (Callable[Future]): The callback to execute when the operation
146+
is complete.
147+
"""
148+
if self._result_set:
149+
_helpers.safe_invoke_callback(fn, self)
150+
return
151+
152+
self._done_callbacks.append(fn)
153+
154+
if self._polling_thread is None:
155+
# The polling thread will exit on its own as soon as the operation
156+
# is done.
157+
self._polling_thread = _helpers.start_daemon_thread(
158+
target=self._blocking_poll)
159+
160+
def _invoke_callbacks(self, *args, **kwargs):
161+
"""Invoke all done callbacks."""
162+
for callback in self._done_callbacks:
163+
_helpers.safe_invoke_callback(callback, *args, **kwargs)
164+
165+
def set_result(self, result):
166+
"""Set the Future's result."""
167+
self._result = result
168+
self._result_set = True
169+
self._invoke_callbacks(self)
170+
171+
def set_exception(self, exception):
172+
"""Set the Future's exception."""
173+
self._exception = exception
174+
self._result_set = True
175+
self._invoke_callbacks(self)

packages/google-cloud-core/tests/unit/future/__init__.py

Whitespace-only changes.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Copyright 2017, Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import mock
16+
17+
from google.cloud.future import _helpers
18+
19+
20+
@mock.patch('threading.Thread', autospec=True)
21+
def test_start_deamon_thread(unused_thread):
22+
deamon_thread = _helpers.start_daemon_thread(target=mock.sentinel.target)
23+
assert deamon_thread.daemon is True
24+
25+
26+
def test_safe_invoke_callback():
27+
callback = mock.Mock(spec=['__call__'], return_value=42)
28+
result = _helpers.safe_invoke_callback(callback, 'a', b='c')
29+
assert result == 42
30+
callback.assert_called_once_with('a', b='c')
31+
32+
33+
def test_safe_invoke_callback_exception():
34+
callback = mock.Mock(spec=['__call__'], side_effect=ValueError())
35+
result = _helpers.safe_invoke_callback(callback, 'a', b='c')
36+
assert result is None
37+
callback.assert_called_once_with('a', b='c')

0 commit comments

Comments
 (0)