Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion gcloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,42 @@
from gcloud.bigquery._helpers import _TypedProperty


class UDFResource(object):
"""Describe a single user-defined function (UDF) resource.
:type udf_type: str
:param udf_type: the type of the resource ('inlineCode' or 'resourceUri')

:type value: str
:param value: the inline code or resource URI

See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
def __init__(self, udf_type, value):
self.udf_type = udf_type
self.value = value

def __eq__(self, other):
return(
self.udf_type == other.udf_type and
self.value == other.value)


def _build_udf_resources(resources):
"""
:type resources: sequence of :class:`UDFResource`
:param resources: fields to be appended

:rtype: mapping
:returns: a mapping describing userDefinedFunctionResources for the query.
"""
udfs = []
for resource in resources:
udf = {resource.udf_type: resource.value}
udfs.append(udf)
return udfs


class Compression(_EnumProperty):
"""Pseudo-enum for ``compression`` properties."""
GZIP = 'GZIP'
Expand Down Expand Up @@ -888,14 +924,44 @@ class QueryJob(_AsyncJob):
:type client: :class:`gcloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).

:type udf_resources: tuple
:param udf_resources: An iterable of
:class:`gcloud.bigquery.job.UDFResource`
(empty by default)
"""
_JOB_TYPE = 'query'
_UDF_KEY = 'userDefinedFunctionResources'
_udf_resources = None

def __init__(self, name, query, client):
def __init__(self, name, query, client, udf_resources=()):
super(QueryJob, self).__init__(name, client)
self.query = query
self.udf_resources = udf_resources
self._configuration = _AsyncQueryConfiguration()

@property
def udf_resources(self):
"""Property for list of UDF resources attached to a query
See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
return list(self._udf_resources)

@udf_resources.setter
def udf_resources(self, value):
"""Update queries UDF resources

:type value: list of :class:`UDFResource`
:param value: an object which defines the type and value of a resource

:raises: TypeError if 'value' is not a sequence, or ValueError if
any item in the sequence is not a UDFResource
"""
if not all(isinstance(u, UDFResource) for u in value):
raise ValueError("udf items must be UDFResource")
self._udf_resources = tuple(value)

allow_large_results = _TypedProperty('allow_large_results', bool)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.allowLargeResults
Expand Down Expand Up @@ -979,6 +1045,9 @@ def _populate_config_resource(self, configuration):
configuration['useLegacySql'] = self.use_legacy_sql
if self.write_disposition is not None:
configuration['writeDisposition'] = self.write_disposition
if len(self._udf_resources) > 0:
configuration[self._UDF_KEY] = _build_udf_resources(
self._udf_resources)

def _build_resource(self):
"""Generate a resource for :meth:`begin`."""
Expand Down
39 changes: 38 additions & 1 deletion gcloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from gcloud.bigquery._helpers import _rows_from_json
from gcloud.bigquery.dataset import Dataset
from gcloud.bigquery.job import QueryJob
from gcloud.bigquery.job import UDFResource
from gcloud.bigquery.job import _build_udf_resources
from gcloud.bigquery.table import _parse_schema_resource


Expand All @@ -46,12 +48,22 @@ class QueryResults(object):
:type client: :class:`gcloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).

:type udf_resources: tuple
:param udf_resources: An iterable of
:class:`gcloud.bigquery.job.UDFResource`
(empty by default)
"""
def __init__(self, query, client):

_UDF_KEY = 'userDefinedFunctionResources'
_udf_resources = None

def __init__(self, query, client, udf_resources=()):
self._client = client
self._properties = {}
self.query = query
self._configuration = _SyncQueryConfiguration()
self.udf_resources = udf_resources
self._job = None

@property
Expand Down Expand Up @@ -204,6 +216,28 @@ def schema(self):
"""
return _parse_schema_resource(self._properties.get('schema', {}))

@property
def udf_resources(self):
"""Property for list of UDF resources attached to a query
See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
return list(self._udf_resources)

@udf_resources.setter
def udf_resources(self, value):
"""Update queries UDF resources

:type value: list of :class:`UDFResource`
:param value: an object which defines the type and value of a resource

:raises: TypeError if 'value' is not a sequence, or ValueError if
any item in the sequence is not a UDFResource
"""
if not all(isinstance(udf, UDFResource) for udf in value):
raise ValueError("udf items must be UDFResource")
self._udf_resources = tuple(value)

default_dataset = _TypedProperty('default_dataset', Dataset)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#defaultDataset
Expand Down Expand Up @@ -277,6 +311,9 @@ def _build_resource(self):
if self.dry_run is not None:
resource['dryRun'] = self.dry_run

if len(self._udf_resources) > 0:
resource[self._UDF_KEY] = _build_udf_resources(self._udf_resources)

return resource

def run(self, client=None):
Expand Down
60 changes: 58 additions & 2 deletions gcloud/bigquery/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ def test_begin_w_bound_client(self):
job = self._makeOne(self.JOB_NAME, self.QUERY, client)

job.begin()

self.assertEqual(job.udf_resources, [])
self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
Expand All @@ -1396,7 +1396,7 @@ def test_begin_w_bound_client(self):
},
'configuration': {
'query': {
'query': self.QUERY,
'query': self.QUERY
},
},
}
Expand Down Expand Up @@ -1468,6 +1468,62 @@ def test_begin_w_alternate_client(self):
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_bound_client_and_udf(self):
from gcloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
PATH = 'projects/%s/jobs' % self.PROJECT
RESOURCE = self._makeResource()
# Ensure None for missing server-set props
del RESOURCE['statistics']['creationTime']
del RESOURCE['etag']
del RESOURCE['selfLink']
del RESOURCE['user_email']
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
job = self._makeOne(self.JOB_NAME, self.QUERY, client,
udf_resources=[
UDFResource("resourceUri", RESOURCE_URI)
])

job.begin()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(job.udf_resources,
[UDFResource("resourceUri", RESOURCE_URI)])
SENT = {
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_NAME,
},
'configuration': {
'query': {
'query': self.QUERY,
'userDefinedFunctionResources':
[{'resourceUri': RESOURCE_URI}]
},
},
}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_bad_udf(self):
RESOURCE = self._makeResource()
# Ensure None for missing server-set props
del RESOURCE['statistics']['creationTime']
del RESOURCE['etag']
del RESOURCE['selfLink']
del RESOURCE['user_email']
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
job = self._makeOne(self.JOB_NAME, self.QUERY, client)

with self.assertRaises(ValueError):
job.udf_resources = ["foo"]
self.assertEqual(job.udf_resources, [])

def test_exists_miss_w_bound_client(self):
PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)
conn = _Connection()
Expand Down
84 changes: 83 additions & 1 deletion gcloud/bigquery/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def test_run_w_bound_client(self):
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)

self.assertEqual(query.udf_resources, [])
query.run()

self.assertEqual(len(conn._requested), 1)
Expand Down Expand Up @@ -233,6 +233,88 @@ def test_run_w_alternate_client(self):
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(query, RESOURCE)

def test_run_w_inline_udf(self):
from gcloud.bigquery.query import UDFResource
INLINE_UDF_CODE = 'var someCode = "here";'
PATH = 'projects/%s/queries' % self.PROJECT
RESOURCE = self._makeResource(complete=False)
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)
query.udf_resources = [UDFResource("inlineCode", INLINE_UDF_CODE)]

query.run()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
SENT = {'query': self.QUERY,
'userDefinedFunctionResources':
[{'inlineCode': INLINE_UDF_CODE}]}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(query, RESOURCE)

def test_run_w_udf_resource_uri(self):
from gcloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
PATH = 'projects/%s/queries' % self.PROJECT
RESOURCE = self._makeResource(complete=False)
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)
query.udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]

query.run()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
SENT = {'query': self.QUERY,
'userDefinedFunctionResources':
[{'resourceUri': RESOURCE_URI}]}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(query, RESOURCE)

def test_run_w_mixed_udfs(self):
from gcloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
INLINE_UDF_CODE = 'var someCode = "here";'
PATH = 'projects/%s/queries' % self.PROJECT
RESOURCE = self._makeResource(complete=False)
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)
query.udf_resources = [UDFResource("resourceUri", RESOURCE_URI),
UDFResource("inlineCode", INLINE_UDF_CODE)]

query.run()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(query.udf_resources,
[UDFResource("resourceUri", RESOURCE_URI),
UDFResource("inlineCode", INLINE_UDF_CODE)])
SENT = {'query': self.QUERY,
'userDefinedFunctionResources': [
{'resourceUri': RESOURCE_URI},
{"inlineCode": INLINE_UDF_CODE}]}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(query, RESOURCE)

def test_run_w_bad_udfs(self):
RESOURCE = self._makeResource(complete=False)
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
query = self._makeOne(self.QUERY, client)

with self.assertRaises(ValueError):
query.udf_resources = ["foo"]
self.assertEqual(query.udf_resources, [])

def test_fetch_data_query_not_yet_run(self):
conn = _Connection()
client = _Client(project=self.PROJECT, connection=conn)
Expand Down
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tox]
envlist =
py27,py34,py35,cover,docs,lint
py26,py27,py34,py35,cover,docs,lint

[testing]
deps =
Expand Down Expand Up @@ -43,6 +43,7 @@ deps =
basepython =
python2.6
deps =
ordereddict
{[testing]deps}
setenv =
PYTHONPATH = {toxinidir}/_testing
Expand Down