Skip to content

Commit 75551bc

Browse files
author
Jon Wayne Parrott
authored
Split polling future into its own module (#3662)
1 parent 0a28565 commit 75551bc

File tree

5 files changed

+174
-154
lines changed

5 files changed

+174
-154
lines changed

packages/google-cloud-core/google/cloud/future/base.py

Lines changed: 0 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,8 @@
1515
"""Abstract and helper bases for Future implementations."""
1616

1717
import abc
18-
import concurrent.futures
19-
import functools
20-
import operator
2118

2219
import six
23-
import tenacity
24-
25-
from google.cloud.future import _helpers
2620

2721

2822
@six.add_metaclass(abc.ABCMeta)
@@ -71,146 +65,3 @@ def set_result(self, result):
7165
@abc.abstractmethod
7266
def set_exception(self, exception):
7367
raise NotImplementedError()
74-
75-
76-
class PollingFuture(Future):
77-
"""A Future that needs to poll some service to check its status.
78-
79-
The :meth:`done` method should be implemented by subclasses. The polling
80-
behavior will repeatedly call ``done`` until it returns True.
81-
82-
.. note: Privacy here is intended to prevent the final class from
83-
overexposing, not to prevent subclasses from accessing methods.
84-
"""
85-
def __init__(self):
86-
super(PollingFuture, self).__init__()
87-
self._result = None
88-
self._exception = None
89-
self._result_set = False
90-
"""bool: Set to True when the result has been set via set_result or
91-
set_exception."""
92-
self._polling_thread = None
93-
self._done_callbacks = []
94-
95-
@abc.abstractmethod
96-
def done(self):
97-
"""Checks to see if the operation is complete.
98-
99-
Returns:
100-
bool: True if the operation is complete, False otherwise.
101-
"""
102-
# pylint: disable=redundant-returns-doc, missing-raises-doc
103-
raise NotImplementedError()
104-
105-
def running(self):
106-
"""True if the operation is currently running."""
107-
return not self.done()
108-
109-
def _blocking_poll(self, timeout=None):
110-
"""Poll and wait for the Future to be resolved.
111-
112-
Args:
113-
timeout (int): How long to wait for the operation to complete.
114-
If None, wait indefinitely.
115-
"""
116-
if self._result_set:
117-
return
118-
119-
retry_on = tenacity.retry_if_result(
120-
functools.partial(operator.is_not, True))
121-
# Use exponential backoff with jitter.
122-
wait_on = (
123-
tenacity.wait_exponential(multiplier=1, max=10) +
124-
tenacity.wait_random(0, 1))
125-
126-
if timeout is None:
127-
retry = tenacity.retry(retry=retry_on, wait=wait_on)
128-
else:
129-
retry = tenacity.retry(
130-
retry=retry_on,
131-
wait=wait_on,
132-
stop=tenacity.stop_after_delay(timeout))
133-
134-
try:
135-
retry(self.done)()
136-
except tenacity.RetryError as exc:
137-
six.raise_from(
138-
concurrent.futures.TimeoutError(
139-
'Operation did not complete within the designated '
140-
'timeout.'),
141-
exc)
142-
143-
def result(self, timeout=None):
144-
"""Get the result of the operation, blocking if necessary.
145-
146-
Args:
147-
timeout (int): How long to wait for the operation to complete.
148-
If None, wait indefinitely.
149-
150-
Returns:
151-
google.protobuf.Message: The Operation's result.
152-
153-
Raises:
154-
google.gax.GaxError: If the operation errors or if the timeout is
155-
reached before the operation completes.
156-
"""
157-
self._blocking_poll(timeout=timeout)
158-
159-
if self._exception is not None:
160-
# pylint: disable=raising-bad-type
161-
# Pylint doesn't recognize that this is valid in this case.
162-
raise self._exception
163-
164-
return self._result
165-
166-
def exception(self, timeout=None):
167-
"""Get the exception from the operation, blocking if necessary.
168-
169-
Args:
170-
timeout (int): How long to wait for the operation to complete.
171-
If None, wait indefinitely.
172-
173-
Returns:
174-
Optional[google.gax.GaxError]: The operation's error.
175-
"""
176-
self._blocking_poll()
177-
return self._exception
178-
179-
def add_done_callback(self, fn):
180-
"""Add a callback to be executed when the operation is complete.
181-
182-
If the operation is not already complete, this will start a helper
183-
thread to poll for the status of the operation in the background.
184-
185-
Args:
186-
fn (Callable[Future]): The callback to execute when the operation
187-
is complete.
188-
"""
189-
if self._result_set:
190-
_helpers.safe_invoke_callback(fn, self)
191-
return
192-
193-
self._done_callbacks.append(fn)
194-
195-
if self._polling_thread is None:
196-
# The polling thread will exit on its own as soon as the operation
197-
# is done.
198-
self._polling_thread = _helpers.start_daemon_thread(
199-
target=self._blocking_poll)
200-
201-
def _invoke_callbacks(self, *args, **kwargs):
202-
"""Invoke all done callbacks."""
203-
for callback in self._done_callbacks:
204-
_helpers.safe_invoke_callback(callback, *args, **kwargs)
205-
206-
def set_result(self, result):
207-
"""Set the Future's result."""
208-
self._result = result
209-
self._result_set = True
210-
self._invoke_callbacks(self)
211-
212-
def set_exception(self, exception):
213-
"""Set the Future's exception."""
214-
self._exception = exception
215-
self._result_set = True
216-
self._invoke_callbacks(self)

packages/google-cloud-core/google/cloud/future/operation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323

2424
from google.cloud import _helpers
2525
from google.cloud import exceptions
26-
from google.cloud.future import base
26+
from google.cloud.future import polling
2727

2828

29-
class Operation(base.PollingFuture):
29+
class Operation(polling.PollingFuture):
3030
"""A Future for interacting with a Google API Long-Running Operation.
3131
3232
Args:
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# Copyright 2017, Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Abstract and helper bases for Future implementations."""
16+
17+
import abc
18+
import concurrent.futures
19+
import functools
20+
import operator
21+
22+
import six
23+
import tenacity
24+
25+
from google.cloud.future import _helpers
26+
from google.cloud.future import base
27+
28+
29+
class PollingFuture(base.Future):
30+
"""A Future that needs to poll some service to check its status.
31+
32+
The :meth:`done` method should be implemented by subclasses. The polling
33+
behavior will repeatedly call ``done`` until it returns True.
34+
35+
.. note: Privacy here is intended to prevent the final class from
36+
overexposing, not to prevent subclasses from accessing methods.
37+
"""
38+
def __init__(self):
39+
super(PollingFuture, self).__init__()
40+
self._result = None
41+
self._exception = None
42+
self._result_set = False
43+
"""bool: Set to True when the result has been set via set_result or
44+
set_exception."""
45+
self._polling_thread = None
46+
self._done_callbacks = []
47+
48+
@abc.abstractmethod
49+
def done(self):
50+
"""Checks to see if the operation is complete.
51+
52+
Returns:
53+
bool: True if the operation is complete, False otherwise.
54+
"""
55+
# pylint: disable=redundant-returns-doc, missing-raises-doc
56+
raise NotImplementedError()
57+
58+
def running(self):
59+
"""True if the operation is currently running."""
60+
return not self.done()
61+
62+
def _blocking_poll(self, timeout=None):
63+
"""Poll and wait for the Future to be resolved.
64+
65+
Args:
66+
timeout (int): How long to wait for the operation to complete.
67+
If None, wait indefinitely.
68+
"""
69+
if self._result_set:
70+
return
71+
72+
retry_on = tenacity.retry_if_result(
73+
functools.partial(operator.is_not, True))
74+
# Use exponential backoff with jitter.
75+
wait_on = (
76+
tenacity.wait_exponential(multiplier=1, max=10) +
77+
tenacity.wait_random(0, 1))
78+
79+
if timeout is None:
80+
retry = tenacity.retry(retry=retry_on, wait=wait_on)
81+
else:
82+
retry = tenacity.retry(
83+
retry=retry_on,
84+
wait=wait_on,
85+
stop=tenacity.stop_after_delay(timeout))
86+
87+
try:
88+
retry(self.done)()
89+
except tenacity.RetryError as exc:
90+
six.raise_from(
91+
concurrent.futures.TimeoutError(
92+
'Operation did not complete within the designated '
93+
'timeout.'),
94+
exc)
95+
96+
def result(self, timeout=None):
97+
"""Get the result of the operation, blocking if necessary.
98+
99+
Args:
100+
timeout (int): How long to wait for the operation to complete.
101+
If None, wait indefinitely.
102+
103+
Returns:
104+
google.protobuf.Message: The Operation's result.
105+
106+
Raises:
107+
google.gax.GaxError: If the operation errors or if the timeout is
108+
reached before the operation completes.
109+
"""
110+
self._blocking_poll(timeout=timeout)
111+
112+
if self._exception is not None:
113+
# pylint: disable=raising-bad-type
114+
# Pylint doesn't recognize that this is valid in this case.
115+
raise self._exception
116+
117+
return self._result
118+
119+
def exception(self, timeout=None):
120+
"""Get the exception from the operation, blocking if necessary.
121+
122+
Args:
123+
timeout (int): How long to wait for the operation to complete.
124+
If None, wait indefinitely.
125+
126+
Returns:
127+
Optional[google.gax.GaxError]: The operation's error.
128+
"""
129+
self._blocking_poll()
130+
return self._exception
131+
132+
def add_done_callback(self, fn):
133+
"""Add a callback to be executed when the operation is complete.
134+
135+
If the operation is not already complete, this will start a helper
136+
thread to poll for the status of the operation in the background.
137+
138+
Args:
139+
fn (Callable[Future]): The callback to execute when the operation
140+
is complete.
141+
"""
142+
if self._result_set:
143+
_helpers.safe_invoke_callback(fn, self)
144+
return
145+
146+
self._done_callbacks.append(fn)
147+
148+
if self._polling_thread is None:
149+
# The polling thread will exit on its own as soon as the operation
150+
# is done.
151+
self._polling_thread = _helpers.start_daemon_thread(
152+
target=self._blocking_poll)
153+
154+
def _invoke_callbacks(self, *args, **kwargs):
155+
"""Invoke all done callbacks."""
156+
for callback in self._done_callbacks:
157+
_helpers.safe_invoke_callback(callback, *args, **kwargs)
158+
159+
def set_result(self, result):
160+
"""Set the Future's result."""
161+
self._result = result
162+
self._result_set = True
163+
self._invoke_callbacks(self)
164+
165+
def set_exception(self, exception):
166+
"""Set the Future's exception."""
167+
self._exception = exception
168+
self._result_set = True
169+
self._invoke_callbacks(self)

packages/google-cloud-core/tests/unit/future/test_operation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def make_operation_future(client_operations_responses=None):
6161

6262

6363
def test_constructor():
64-
future, refresh, cancel = make_operation_future()
64+
future, refresh, _ = make_operation_future()
6565

6666
assert future.operation == refresh.responses[0]
6767
assert future.operation.done is False

packages/google-cloud-core/tests/unit/future/test_base.py renamed to packages/google-cloud-core/tests/unit/future/test_polling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import mock
2020
import pytest
2121

22-
from google.cloud.future import base
22+
from google.cloud.future import polling
2323

2424

25-
class PollingFutureImpl(base.PollingFuture):
25+
class PollingFutureImpl(polling.PollingFuture):
2626
def done(self):
2727
return False
2828

0 commit comments

Comments
 (0)