Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/bigquery-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,19 @@ Poll until the job is complete:
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)

Retrieve the results:

.. doctest::

>>> results = job.results()
>>> rows, total_count, token = query.fetch_data() # API requet
>>> while True:
... do_something_with(rows)
... if token is None:
... break
... rows, total_count, token = query.fetch_data(
... page_token=token) # API request


Inserting data (asynchronous)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
52 changes: 52 additions & 0 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,55 @@ def _validate(self, value):
"""
if value not in self.ALLOWED:
raise ValueError('Pass one of: %s' ', '.join(self.ALLOWED))


class UDFResource(object):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""Describe a single user-defined function (UDF) resource.
:type udf_type: str

This comment was marked as spam.

:param udf_type: the type of the resource ('inlineCode' or 'resourceUri')

:type value: str
:param value: the inline code or resource URI.

See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
def __init__(self, udf_type, value):
self.udf_type = udf_type
self.value = value

def __eq__(self, other):
return(
self.udf_type == other.udf_type and
self.value == other.value)


class UDFResourcesProperty(object):
"""Custom property type, holding :class:`UDFResource` instances."""

def __get__(self, instance, owner):
"""Descriptor protocol: accessor"""
if instance is None:
return self
return list(instance._udf_resources)

def __set__(self, instance, value):
"""Descriptor protocol: mutator"""
if not all(isinstance(u, UDFResource) for u in value):
raise ValueError("udf items must be UDFResource")
instance._udf_resources = tuple(value)


def _build_udf_resources(resources):
"""
:type resources: sequence of :class:`UDFResource`
:param resources: fields to be appended.

:rtype: mapping
:returns: a mapping describing userDefinedFunctionResources for the query.
"""
udfs = []
for resource in resources:
udf = {resource.udf_type: resource.value}
udfs.append(udf)
return udfs
67 changes: 12 additions & 55 deletions google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,62 +23,10 @@
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import _build_schema_resource
from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery._helpers import UDFResourcesProperty
from google.cloud.bigquery._helpers import _EnumProperty
from google.cloud.bigquery._helpers import _TypedProperty


class UDFResource(object):
"""Describe a single user-defined function (UDF) resource.
:type udf_type: str
:param udf_type: the type of the resource ('inlineCode' or 'resourceUri')

:type value: str
:param value: the inline code or resource URI.

See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
def __init__(self, udf_type, value):
self.udf_type = udf_type
self.value = value

def __eq__(self, other):
return(
self.udf_type == other.udf_type and
self.value == other.value)


def _build_udf_resources(resources):
"""
:type resources: sequence of :class:`UDFResource`
:param resources: fields to be appended.

:rtype: mapping
:returns: a mapping describing userDefinedFunctionResources for the query.
"""
udfs = []
for resource in resources:
udf = {resource.udf_type: resource.value}
udfs.append(udf)
return udfs


class UDFResourcesProperty(object):
"""Custom property type for :class:`QueryJob`.

Also used by :class:`~google.cloud.bigquery.query.Query`.
"""
def __get__(self, instance, owner):
"""Descriptor protocol: accessor"""
if instance is None:
return self
return list(instance._udf_resources)

def __set__(self, instance, value):
"""Descriptor protocol: mutator"""
if not all(isinstance(u, UDFResource) for u in value):
raise ValueError("udf items must be UDFResource")
instance._udf_resources = tuple(value)
from google.cloud.bigquery._helpers import _build_udf_resources


class Compression(_EnumProperty):
Expand Down Expand Up @@ -952,7 +900,7 @@ class QueryJob(_AsyncJob):

:type udf_resources: tuple
:param udf_resources: An iterable of
:class:`google.cloud.bigquery.job.UDFResource`
:class:`google.cloud.bigquery._helpers.UDFResource`
(empty by default)
"""
_JOB_TYPE = 'query'
Expand Down Expand Up @@ -1125,3 +1073,12 @@ def from_api_repr(cls, resource, client):
job = cls(name, query, client=client)
job._set_properties(resource)
return job

def results(self):
"""Construct a QueryResults instance, bound to this job.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: results instance
"""
from google.cloud.bigquery.query import QueryResults
return QueryResults.from_query_job(self)
27 changes: 25 additions & 2 deletions google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from google.cloud.bigquery._helpers import _rows_from_json
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.job import UDFResourcesProperty
from google.cloud.bigquery.job import _build_udf_resources
from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery._helpers import _build_udf_resources
from google.cloud.bigquery._helpers import UDFResourcesProperty


class _SyncQueryConfiguration(object):
Expand Down Expand Up @@ -65,6 +65,26 @@ def __init__(self, query, client, udf_resources=()):
self.udf_resources = udf_resources
self._job = None

@classmethod
def from_query_job(cls, job):

This comment was marked as spam.

"""Factory: construct from an existing job.

:type job: :class:`~google.cloud.bigquery.job.QueryJob`
:param job: existing job

:rtype: :class:`QueryResults`
:returns: the instance, bound to the job
"""
instance = cls(job.query, job._client, job.udf_resources)
instance._job = job
if job.default_dataset is not None:
instance.default_dataset = job.default_dataset
if job.use_query_cache is not None:
instance.use_query_cache = job.use_query_cache
if job.use_legacy_sql is not None:
instance.use_legacy_sql = job.use_legacy_sql
return instance

@property
def project(self):
"""Project bound to the job.
Expand Down Expand Up @@ -307,6 +327,9 @@ def run(self, client=None):
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.
"""
if self._job is not None:
raise ValueError("Query job is already running.")

client = self._require_client(client)
path = '/projects/%s/queries' % (self.project,)
api_response = client.connection.api_request(
Expand Down
70 changes: 70 additions & 0 deletions unit_tests/bigquery/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,76 @@ def __init__(self):
self.assertEqual(wrapper._configuration._attr, None)


class Test_UDFResourcesProperty(unittest.TestCase):

def _getTargetClass(self):
from google.cloud.bigquery._helpers import UDFResourcesProperty
return UDFResourcesProperty

def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)

def _descriptor_and_klass(self):
descriptor = self._makeOne()

class _Test(object):
_udf_resources = ()
udf_resources = descriptor

return descriptor, _Test

def test_class_getter(self):
descriptor, klass = self._descriptor_and_klass()
self.assertTrue(klass.udf_resources is descriptor)

def test_instance_getter_empty(self):
_, klass = self._descriptor_and_klass()
instance = klass()
self.assertEqual(instance.udf_resources, [])

def test_instance_getter_w_non_empty_list(self):
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()
instance._udf_resources = tuple(udf_resources)

self.assertEqual(instance.udf_resources, udf_resources)

def test_instance_setter_w_empty_list(self):
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()
instance._udf_resources = udf_resources

instance.udf_resources = []

self.assertEqual(instance.udf_resources, [])

def test_instance_setter_w_valid_udf(self):
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()

instance.udf_resources = udf_resources

self.assertEqual(instance.udf_resources, udf_resources)

def test_instance_setter_w_bad_udfs(self):
_, klass = self._descriptor_and_klass()
instance = klass()

with self.assertRaises(ValueError):
instance.udf_resources = ["foo"]

self.assertEqual(instance.udf_resources, [])


class _Field(object):

def __init__(self, mode, name='unknown', field_type='UNKNOWN', fields=()):
Expand Down
80 changes: 9 additions & 71 deletions unit_tests/bigquery/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,76 +15,6 @@
import unittest


class Test_UDFResourcesProperty(unittest.TestCase):

def _getTargetClass(self):
from google.cloud.bigquery.job import UDFResourcesProperty
return UDFResourcesProperty

def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)

def _descriptor_and_klass(self):
descriptor = self._makeOne()

class _Test(object):
_udf_resources = ()
udf_resources = descriptor

return descriptor, _Test

def test_class_getter(self):
descriptor, klass = self._descriptor_and_klass()
self.assertTrue(klass.udf_resources is descriptor)

def test_instance_getter_empty(self):
_, klass = self._descriptor_and_klass()
instance = klass()
self.assertEqual(instance.udf_resources, [])

def test_instance_getter_w_non_empty_list(self):
from google.cloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()
instance._udf_resources = tuple(udf_resources)

self.assertEqual(instance.udf_resources, udf_resources)

def test_instance_setter_w_empty_list(self):
from google.cloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()
instance._udf_resources = udf_resources

instance.udf_resources = []

self.assertEqual(instance.udf_resources, [])

def test_instance_setter_w_valid_udf(self):
from google.cloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()

instance.udf_resources = udf_resources

self.assertEqual(instance.udf_resources, udf_resources)

def test_instance_setter_w_bad_udfs(self):
_, klass = self._descriptor_and_klass()
instance = klass()

with self.assertRaises(ValueError):
instance.udf_resources = ["foo"]

self.assertEqual(instance.udf_resources, [])


class _Base(object):
PROJECT = 'project'
SOURCE1 = 'http://example.com/source1.csv'
Expand Down Expand Up @@ -1456,6 +1386,14 @@ def test_from_api_repr_w_properties(self):
self.assertTrue(dataset._client is client)
self._verifyResourceProperties(dataset, RESOURCE)

def test_results(self):
from google.cloud.bigquery.query import QueryResults
client = _Client(self.PROJECT)
job = self._makeOne(self.JOB_NAME, self.QUERY, client)
results = job.results()
self.assertIsInstance(results, QueryResults)
self.assertTrue(results._job is job)

def test_begin_w_bound_client(self):
PATH = 'projects/%s/jobs' % self.PROJECT
RESOURCE = self._makeResource()
Expand Down Expand Up @@ -1558,7 +1496,7 @@ def test_begin_w_alternate_client(self):
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_bound_client_and_udf(self):
from google.cloud.bigquery.job import UDFResource
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
PATH = 'projects/%s/jobs' % self.PROJECT
RESOURCE = self._makeResource()
Expand Down
Loading