Skip to content

Commit ee506c6

Browse files
author
Jon Wayne Parrott
committed
Add basic future interface to bigquery
1 parent dde3d9b commit ee506c6

File tree

1 file changed

+56
-2
lines changed
  • bigquery/google/cloud/bigquery

1 file changed

+56
-2
lines changed

bigquery/google/cloud/bigquery/job.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414

1515
"""Define API Jobs."""
1616

17+
import threading
18+
1719
import six
1820

21+
from google.cloud import exceptions
1922
from google.cloud.exceptions import NotFound
2023
from google.cloud._helpers import _datetime_from_microseconds
2124
from google.cloud.bigquery.dataset import Dataset
@@ -27,6 +30,7 @@
2730
from google.cloud.bigquery._helpers import UDFResourcesProperty
2831
from google.cloud.bigquery._helpers import _EnumProperty
2932
from google.cloud.bigquery._helpers import _TypedProperty
33+
import google.cloud.future.base
3034

3135

3236
class Compression(_EnumProperty):
@@ -82,14 +86,15 @@ class WriteDisposition(_EnumProperty):
8286
ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY)
8387

8488

85-
class _BaseJob(object):
89+
class _BaseJob(google.cloud.future.base.PollingFuture):
8690
"""Base class for jobs.
8791
8892
:type client: :class:`google.cloud.bigquery.client.Client`
8993
:param client: A client which holds credentials and project configuration
9094
for the dataset (which requires a project).
9195
"""
9296
def __init__(self, client):
97+
super(_BaseJob, self).__init__()
9398
self._client = client
9499
self._properties = {}
95100

@@ -131,6 +136,8 @@ class _AsyncJob(_BaseJob):
131136
def __init__(self, name, client):
132137
super(_AsyncJob, self).__init__(client)
133138
self.name = name
139+
self._result_set = False
140+
self._completion_lock = threading.Lock()
134141

135142
@property
136143
def job_type(self):
@@ -273,6 +280,9 @@ def _set_properties(self, api_response):
273280
self._properties.clear()
274281
self._properties.update(cleaned)
275282

283+
# For Future interface
284+
self._set_future_result()
285+
276286
@classmethod
277287
def _get_resource_config(cls, resource):
278288
"""Helper for :meth:`from_api_repr`
@@ -345,7 +355,7 @@ def exists(self, client=None):
345355
return True
346356

347357
def reload(self, client=None):
348-
"""API call: refresh job properties via a GET request
358+
"""API call: refresh job properties via a GET request.
349359
350360
See
351361
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
@@ -371,12 +381,56 @@ def cancel(self, client=None):
371381
``NoneType``
372382
:param client: the client to use. If not passed, falls back to the
373383
``client`` stored on the current dataset.
384+
385+
:rtype: bool
386+
:returns: Boolean indicating that the cancel request was sent.
374387
"""
375388
client = self._require_client(client)
376389

377390
api_response = client._connection.api_request(
378391
method='POST', path='%s/cancel' % (self.path,))
379392
self._set_properties(api_response['job'])
393+
return True
394+
395+
# The following methods implement the PollingFuture interface. Note that
396+
# the methods above are from the pre-Future interface and are left for
397+
# compatibility. The only "overloaded" method is :meth:`cancel`, which
398+
# satisfies both interfaces.
399+
400+
def _set_future_result(self):
401+
"""Set the result or exception from the job if it is complete."""
402+
# This must be done in a lock to prevent the polling thread
403+
# and main thread from both executing the completion logic
404+
# at the same time.
405+
with self._completion_lock:
406+
# If the operation isn't complete or if the result has already been
407+
# set, do not call set_result/set_exception again.
408+
# Note: self._result_set is set to True in set_result and
409+
# set_exception, in case those methods are invoked directly.
410+
if not self.state != 'DONE' or self._result_set:
411+
return
412+
413+
if self.error_result:
414+
exception = exceptions.GoogleCloudError(
415+
self.error_result, errors=self.errors)
416+
self.set_exception(exception)
417+
else:
418+
self.set_result(self)
419+
420+
def done(self):
421+
# Do not refresh is the state is already done, as the job will not
422+
# change once complete.
423+
if self.state != 'DONE':
424+
self.reload()
425+
return self.state == 'DONE'
426+
427+
def result(self, timeout=None):
428+
if self.state is None:
429+
self.begin()
430+
return super(self, _AsyncJob).result(timeout=timeout)
431+
432+
def cancelled(self):
433+
return False
380434

381435

382436
class _LoadConfiguration(object):

0 commit comments

Comments
 (0)