diff --git a/docs/bigquery-usage.rst b/docs/bigquery-usage.rst index d8050affe145..52f17d9f8f50 100644 --- a/docs/bigquery-usage.rst +++ b/docs/bigquery-usage.rst @@ -284,6 +284,19 @@ Poll until the job is complete: >>> job.ended datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=) +Retrieve the results: + +.. doctest:: + + >>> results = job.results() + >>> rows, total_count, token = query.fetch_data() # API requet + >>> while True: + ... do_something_with(rows) + ... if token is None: + ... break + ... rows, total_count, token = query.fetch_data( + ... page_token=token) # API request + Inserting data (asynchronous) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/google/cloud/bigquery/_helpers.py b/google/cloud/bigquery/_helpers.py index a14a9053d8d4..1dd591908332 100644 --- a/google/cloud/bigquery/_helpers.py +++ b/google/cloud/bigquery/_helpers.py @@ -172,3 +172,55 @@ def _validate(self, value): """ if value not in self.ALLOWED: raise ValueError('Pass one of: %s' ', '.join(self.ALLOWED)) + + +class UDFResource(object): + """Describe a single user-defined function (UDF) resource. + :type udf_type: str + :param udf_type: the type of the resource ('inlineCode' or 'resourceUri') + + :type value: str + :param value: the inline code or resource URI. + + See + https://cloud.google.com/bigquery/user-defined-functions#api + """ + def __init__(self, udf_type, value): + self.udf_type = udf_type + self.value = value + + def __eq__(self, other): + return( + self.udf_type == other.udf_type and + self.value == other.value) + + +class UDFResourcesProperty(object): + """Custom property type, holding :class:`UDFResource` instances.""" + + def __get__(self, instance, owner): + """Descriptor protocol: accessor""" + if instance is None: + return self + return list(instance._udf_resources) + + def __set__(self, instance, value): + """Descriptor protocol: mutator""" + if not all(isinstance(u, UDFResource) for u in value): + raise ValueError("udf items must be UDFResource") + instance._udf_resources = tuple(value) + + +def _build_udf_resources(resources): + """ + :type resources: sequence of :class:`UDFResource` + :param resources: fields to be appended. + + :rtype: mapping + :returns: a mapping describing userDefinedFunctionResources for the query. + """ + udfs = [] + for resource in resources: + udf = {resource.udf_type: resource.value} + udfs.append(udf) + return udfs diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 53490ad841b1..9b1da85a30e4 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -23,62 +23,10 @@ from google.cloud.bigquery.table import Table from google.cloud.bigquery.table import _build_schema_resource from google.cloud.bigquery.table import _parse_schema_resource +from google.cloud.bigquery._helpers import UDFResourcesProperty from google.cloud.bigquery._helpers import _EnumProperty from google.cloud.bigquery._helpers import _TypedProperty - - -class UDFResource(object): - """Describe a single user-defined function (UDF) resource. - :type udf_type: str - :param udf_type: the type of the resource ('inlineCode' or 'resourceUri') - - :type value: str - :param value: the inline code or resource URI. - - See - https://cloud.google.com/bigquery/user-defined-functions#api - """ - def __init__(self, udf_type, value): - self.udf_type = udf_type - self.value = value - - def __eq__(self, other): - return( - self.udf_type == other.udf_type and - self.value == other.value) - - -def _build_udf_resources(resources): - """ - :type resources: sequence of :class:`UDFResource` - :param resources: fields to be appended. - - :rtype: mapping - :returns: a mapping describing userDefinedFunctionResources for the query. - """ - udfs = [] - for resource in resources: - udf = {resource.udf_type: resource.value} - udfs.append(udf) - return udfs - - -class UDFResourcesProperty(object): - """Custom property type for :class:`QueryJob`. - - Also used by :class:`~google.cloud.bigquery.query.Query`. - """ - def __get__(self, instance, owner): - """Descriptor protocol: accessor""" - if instance is None: - return self - return list(instance._udf_resources) - - def __set__(self, instance, value): - """Descriptor protocol: mutator""" - if not all(isinstance(u, UDFResource) for u in value): - raise ValueError("udf items must be UDFResource") - instance._udf_resources = tuple(value) +from google.cloud.bigquery._helpers import _build_udf_resources class Compression(_EnumProperty): @@ -952,7 +900,7 @@ class QueryJob(_AsyncJob): :type udf_resources: tuple :param udf_resources: An iterable of - :class:`google.cloud.bigquery.job.UDFResource` + :class:`google.cloud.bigquery._helpers.UDFResource` (empty by default) """ _JOB_TYPE = 'query' @@ -1125,3 +1073,12 @@ def from_api_repr(cls, resource, client): job = cls(name, query, client=client) job._set_properties(resource) return job + + def results(self): + """Construct a QueryResults instance, bound to this job. + + :rtype: :class:`~google.cloud.bigquery.query.QueryResults` + :returns: results instance + """ + from google.cloud.bigquery.query import QueryResults + return QueryResults.from_query_job(self) diff --git a/google/cloud/bigquery/query.py b/google/cloud/bigquery/query.py index bc4e23a2ff82..e7c3826216a0 100644 --- a/google/cloud/bigquery/query.py +++ b/google/cloud/bigquery/query.py @@ -20,9 +20,9 @@ from google.cloud.bigquery._helpers import _rows_from_json from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.job import QueryJob -from google.cloud.bigquery.job import UDFResourcesProperty -from google.cloud.bigquery.job import _build_udf_resources from google.cloud.bigquery.table import _parse_schema_resource +from google.cloud.bigquery._helpers import _build_udf_resources +from google.cloud.bigquery._helpers import UDFResourcesProperty class _SyncQueryConfiguration(object): @@ -65,6 +65,26 @@ def __init__(self, query, client, udf_resources=()): self.udf_resources = udf_resources self._job = None + @classmethod + def from_query_job(cls, job): + """Factory: construct from an existing job. + + :type job: :class:`~google.cloud.bigquery.job.QueryJob` + :param job: existing job + + :rtype: :class:`QueryResults` + :returns: the instance, bound to the job + """ + instance = cls(job.query, job._client, job.udf_resources) + instance._job = job + if job.default_dataset is not None: + instance.default_dataset = job.default_dataset + if job.use_query_cache is not None: + instance.use_query_cache = job.use_query_cache + if job.use_legacy_sql is not None: + instance.use_legacy_sql = job.use_legacy_sql + return instance + @property def project(self): """Project bound to the job. @@ -307,6 +327,9 @@ def run(self, client=None): :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. """ + if self._job is not None: + raise ValueError("Query job is already running.") + client = self._require_client(client) path = '/projects/%s/queries' % (self.project,) api_response = client.connection.api_request( diff --git a/unit_tests/bigquery/test__helpers.py b/unit_tests/bigquery/test__helpers.py index 86884f3dab2e..bdf766ad2778 100644 --- a/unit_tests/bigquery/test__helpers.py +++ b/unit_tests/bigquery/test__helpers.py @@ -386,6 +386,76 @@ def __init__(self): self.assertEqual(wrapper._configuration._attr, None) +class Test_UDFResourcesProperty(unittest.TestCase): + + def _getTargetClass(self): + from google.cloud.bigquery._helpers import UDFResourcesProperty + return UDFResourcesProperty + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def _descriptor_and_klass(self): + descriptor = self._makeOne() + + class _Test(object): + _udf_resources = () + udf_resources = descriptor + + return descriptor, _Test + + def test_class_getter(self): + descriptor, klass = self._descriptor_and_klass() + self.assertTrue(klass.udf_resources is descriptor) + + def test_instance_getter_empty(self): + _, klass = self._descriptor_and_klass() + instance = klass() + self.assertEqual(instance.udf_resources, []) + + def test_instance_getter_w_non_empty_list(self): + from google.cloud.bigquery._helpers import UDFResource + RESOURCE_URI = 'gs://some-bucket/js/lib.js' + udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] + _, klass = self._descriptor_and_klass() + instance = klass() + instance._udf_resources = tuple(udf_resources) + + self.assertEqual(instance.udf_resources, udf_resources) + + def test_instance_setter_w_empty_list(self): + from google.cloud.bigquery._helpers import UDFResource + RESOURCE_URI = 'gs://some-bucket/js/lib.js' + udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] + _, klass = self._descriptor_and_klass() + instance = klass() + instance._udf_resources = udf_resources + + instance.udf_resources = [] + + self.assertEqual(instance.udf_resources, []) + + def test_instance_setter_w_valid_udf(self): + from google.cloud.bigquery._helpers import UDFResource + RESOURCE_URI = 'gs://some-bucket/js/lib.js' + udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] + _, klass = self._descriptor_and_klass() + instance = klass() + + instance.udf_resources = udf_resources + + self.assertEqual(instance.udf_resources, udf_resources) + + def test_instance_setter_w_bad_udfs(self): + _, klass = self._descriptor_and_klass() + instance = klass() + + with self.assertRaises(ValueError): + instance.udf_resources = ["foo"] + + self.assertEqual(instance.udf_resources, []) + + class _Field(object): def __init__(self, mode, name='unknown', field_type='UNKNOWN', fields=()): diff --git a/unit_tests/bigquery/test_job.py b/unit_tests/bigquery/test_job.py index 512a4aa80d05..9d9ae948254c 100644 --- a/unit_tests/bigquery/test_job.py +++ b/unit_tests/bigquery/test_job.py @@ -15,76 +15,6 @@ import unittest -class Test_UDFResourcesProperty(unittest.TestCase): - - def _getTargetClass(self): - from google.cloud.bigquery.job import UDFResourcesProperty - return UDFResourcesProperty - - def _makeOne(self, *args, **kw): - return self._getTargetClass()(*args, **kw) - - def _descriptor_and_klass(self): - descriptor = self._makeOne() - - class _Test(object): - _udf_resources = () - udf_resources = descriptor - - return descriptor, _Test - - def test_class_getter(self): - descriptor, klass = self._descriptor_and_klass() - self.assertTrue(klass.udf_resources is descriptor) - - def test_instance_getter_empty(self): - _, klass = self._descriptor_and_klass() - instance = klass() - self.assertEqual(instance.udf_resources, []) - - def test_instance_getter_w_non_empty_list(self): - from google.cloud.bigquery.job import UDFResource - RESOURCE_URI = 'gs://some-bucket/js/lib.js' - udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] - _, klass = self._descriptor_and_klass() - instance = klass() - instance._udf_resources = tuple(udf_resources) - - self.assertEqual(instance.udf_resources, udf_resources) - - def test_instance_setter_w_empty_list(self): - from google.cloud.bigquery.job import UDFResource - RESOURCE_URI = 'gs://some-bucket/js/lib.js' - udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] - _, klass = self._descriptor_and_klass() - instance = klass() - instance._udf_resources = udf_resources - - instance.udf_resources = [] - - self.assertEqual(instance.udf_resources, []) - - def test_instance_setter_w_valid_udf(self): - from google.cloud.bigquery.job import UDFResource - RESOURCE_URI = 'gs://some-bucket/js/lib.js' - udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] - _, klass = self._descriptor_and_klass() - instance = klass() - - instance.udf_resources = udf_resources - - self.assertEqual(instance.udf_resources, udf_resources) - - def test_instance_setter_w_bad_udfs(self): - _, klass = self._descriptor_and_klass() - instance = klass() - - with self.assertRaises(ValueError): - instance.udf_resources = ["foo"] - - self.assertEqual(instance.udf_resources, []) - - class _Base(object): PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' @@ -1456,6 +1386,14 @@ def test_from_api_repr_w_properties(self): self.assertTrue(dataset._client is client) self._verifyResourceProperties(dataset, RESOURCE) + def test_results(self): + from google.cloud.bigquery.query import QueryResults + client = _Client(self.PROJECT) + job = self._makeOne(self.JOB_NAME, self.QUERY, client) + results = job.results() + self.assertIsInstance(results, QueryResults) + self.assertTrue(results._job is job) + def test_begin_w_bound_client(self): PATH = 'projects/%s/jobs' % self.PROJECT RESOURCE = self._makeResource() @@ -1558,7 +1496,7 @@ def test_begin_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) def test_begin_w_bound_client_and_udf(self): - from google.cloud.bigquery.job import UDFResource + from google.cloud.bigquery._helpers import UDFResource RESOURCE_URI = 'gs://some-bucket/js/lib.js' PATH = 'projects/%s/jobs' % self.PROJECT RESOURCE = self._makeResource() diff --git a/unit_tests/bigquery/test_query.py b/unit_tests/bigquery/test_query.py index f7ff8468c11e..be4b45b84fea 100644 --- a/unit_tests/bigquery/test_query.py +++ b/unit_tests/bigquery/test_query.py @@ -138,6 +138,51 @@ def test_ctor(self): self.assertTrue(query.use_query_cache is None) self.assertTrue(query.use_legacy_sql is None) + def test_from_query_job(self): + from google.cloud.bigquery.dataset import Dataset + from google.cloud.bigquery.job import QueryJob + from google.cloud.bigquery._helpers import UDFResource + DS_NAME = 'DATASET' + RESOURCE_URI = 'gs://some-bucket/js/lib.js' + client = _Client(self.PROJECT) + job = QueryJob( + self.JOB_NAME, self.QUERY, client, + udf_resources=[UDFResource("resourceUri", RESOURCE_URI)]) + dataset = job.default_dataset = Dataset(DS_NAME, client) + job.use_query_cache = True + job.use_legacy_sql = True + klass = self._getTargetClass() + + query = klass.from_query_job(job) + + self.assertEqual(query.query, self.QUERY) + self.assertTrue(query._client is client) + self.assertTrue(query._job is job) + self.assertEqual(query.udf_resources, job.udf_resources) + self.assertTrue(query.default_dataset is dataset) + self.assertTrue(query.use_query_cache) + self.assertTrue(query.use_legacy_sql) + + def test_from_query_job_wo_default_dataset(self): + from google.cloud.bigquery.job import QueryJob + from google.cloud.bigquery._helpers import UDFResource + RESOURCE_URI = 'gs://some-bucket/js/lib.js' + client = _Client(self.PROJECT) + job = QueryJob( + self.JOB_NAME, self.QUERY, client, + udf_resources=[UDFResource("resourceUri", RESOURCE_URI)]) + klass = self._getTargetClass() + + query = klass.from_query_job(job) + + self.assertEqual(query.query, self.QUERY) + self.assertTrue(query._client is client) + self.assertTrue(query._job is job) + self.assertEqual(query.udf_resources, job.udf_resources) + self.assertIsNone(query.default_dataset) + self.assertIsNone(query.use_query_cache) + self.assertIsNone(query.use_legacy_sql) + def test_job_wo_jobid(self): client = _Client(self.PROJECT) query = self._makeOne(self.QUERY, client) @@ -175,6 +220,14 @@ def test_schema(self): query._set_properties(resource) self._verifyResourceProperties(query, resource) + def test_run_w_already_has_job(self): + conn = _Connection() + client = _Client(project=self.PROJECT, connection=conn) + query = self._makeOne(self.QUERY, client) + query._job = object() # simulate already running + with self.assertRaises(ValueError): + query.run() + def test_run_w_bound_client(self): PATH = 'projects/%s/queries' % self.PROJECT RESOURCE = self._makeResource(complete=False) @@ -234,7 +287,7 @@ def test_run_w_alternate_client(self): self._verifyResourceProperties(query, RESOURCE) def test_run_w_inline_udf(self): - from google.cloud.bigquery.job import UDFResource + from google.cloud.bigquery._helpers import UDFResource INLINE_UDF_CODE = 'var someCode = "here";' PATH = 'projects/%s/queries' % self.PROJECT RESOURCE = self._makeResource(complete=False) @@ -256,7 +309,7 @@ def test_run_w_inline_udf(self): self._verifyResourceProperties(query, RESOURCE) def test_run_w_udf_resource_uri(self): - from google.cloud.bigquery.job import UDFResource + from google.cloud.bigquery._helpers import UDFResource RESOURCE_URI = 'gs://some-bucket/js/lib.js' PATH = 'projects/%s/queries' % self.PROJECT RESOURCE = self._makeResource(complete=False) @@ -278,7 +331,7 @@ def test_run_w_udf_resource_uri(self): self._verifyResourceProperties(query, RESOURCE) def test_run_w_mixed_udfs(self): - from google.cloud.bigquery.job import UDFResource + from google.cloud.bigquery._helpers import UDFResource RESOURCE_URI = 'gs://some-bucket/js/lib.js' INLINE_UDF_CODE = 'var someCode = "here";' PATH = 'projects/%s/queries' % self.PROJECT