Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 208 additions & 0 deletions gcloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,30 @@ def _require_client(self, client):
client = self._dataset._client
return client

def _parse_schema_resource(self, info):
"""Parse a resource fragment into a schema field.

:type info: mapping
:param info: should contain a "fields" key to be parsed

:rtype: list of :class:`SchemaField`, or ``NoneType``
:returns: a list of parsed fields, or ``None`` if no "fields" key is
present in ``info``.
"""
if 'fields' not in info:
return None

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


schema = []
for r_field in info['fields']:
name = r_field['name']
field_type = r_field['type']
mode = r_field['mode']
description = r_field.get('description')
sub_fields = self._parse_schema_resource(r_field)
schema.append(
SchemaField(name, field_type, mode, description, sub_fields))
return schema

def _set_properties(self, api_response):
"""Update properties from resource in body of ``api_response``

Expand All @@ -320,6 +344,8 @@ def _set_properties(self, api_response):
"""
self._properties.clear()
cleaned = api_response.copy()
schema = cleaned.pop('schema', {})
self.schema = self._parse_schema_resource(schema)
if 'creationTime' in cleaned:
cleaned['creationTime'] = float(cleaned['creationTime'])
if 'lastModifiedTime' in cleaned:
Expand Down Expand Up @@ -525,3 +551,185 @@ def delete(self, client=None):
"""
client = self._require_client(client)
client.connection.api_request(method='DELETE', path=self.path)

def fetch_data(self, max_results=None, page_token=None, client=None):
"""API call: fetch the table data via a GET request

See:
https://cloud.google.com/bigquery/reference/rest/v2/tabledata/list

.. note::

This method assumes that its instance's ``schema`` attribute is
up-to-date with the schema as defined on the back-end: if the
two schemas are not identical, the values returned may be
incomplete. To ensure that the local copy of the schema is
up-to-date, call the table's ``reload`` method.

:type max_results: integer or ``NoneType``
:param max_results: maximum number of rows to return.

:type page_token: string or ``NoneType``
:param page_token: token representing a cursor into the table's rows.

:type client: :class:`gcloud.bigquery.client.Client` or ``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.

:rtype: tuple
:returns: ``(row_data, total_rows, page_token)``, where ``row_data``
is a list of tuples, one per result row, containing only
the values; ``total_rows`` is a count of the total number
of rows in the table; and ``page_token`` is an opaque
string which can be used to fetch the next batch of rows
(``None`` if no further batches can be fetched).
"""
client = self._require_client(client)
params = {}

if max_results is not None:
params['maxResults'] = max_results

if page_token is not None:
params['pageToken'] = page_token

response = client.connection.api_request(method='GET',
path='%s/data' % self.path,
query_params=params)
total_rows = response.get('totalRows')

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

page_token = response.get('pageToken')
rows_data = []

for row in response.get('rows', ()):
row_data = []
for field, cell in zip(self._schema, row['f']):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

converter = _CELLDATA_FROM_JSON[field.field_type]
if field.mode == 'REPEATED':
row_data.append([converter(item, field)
for item in cell['v']])
else:
row_data.append(converter(cell['v'], field))
rows_data.append(tuple(row_data))

return rows_data, total_rows, page_token

def insert_data(self,
rows,
row_ids=None,
skip_invalid_rows=None,
ignore_unknown_values=None,
client=None):
"""API call: insert table data via a POST request

See:
https://cloud.google.com/bigquery/reference/rest/v2/tabledata/insertAll

:type rows: list of tuples
:param rows: row data to be inserted

:type row_ids: list of string
:param row_ids: Unique ids, one per row being inserted. If not
passed, no de-duplication occurs.

:type skip_invalid_rows: boolean or ``NoneType``
:param skip_invalid_rows: skip rows w/ invalid data?

:type ignore_unknown_values: boolean or ``NoneType``
:param ignore_unknown_values: ignore columns beyond schema?

:type client: :class:`gcloud.bigquery.client.Client` or ``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.

:rtype: list of mappings
:returns: One mapping per row with insert errors: the "index" key
identifies the row, and the "errors" key contains a list
of the mappings describing one or more problems with the
row.
"""
client = self._require_client(client)
rows_info = []
data = {'rows': rows_info}

for index, row in enumerate(rows):
row_info = {}

for field, value in zip(self._schema, row):
if field.field_type == 'TIMESTAMP':
value = _prop_from_datetime(value)
row_info[field.name] = value

info = {'json': row_info}
if row_ids is not None:
info['insertId'] = row_ids[index]

rows_info.append(info)

if skip_invalid_rows is not None:
data['skipInvalidRows'] = skip_invalid_rows

if ignore_unknown_values is not None:
data['ignoreUnknownValues'] = ignore_unknown_values

response = client.connection.api_request(
method='POST',
path='%s/insertAll' % self.path,
data=data)
errors = []

for error in response.get('insertErrors', ()):
errors.append({'index': int(error['index']),
'errors': error['errors']})

return errors


def _not_null(value, field):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

return value is not None or field.mode != 'NULLABLE'


def _int_from_json(value, field):
if _not_null(value, field):
return int(value)


def _float_from_json(value, field):
if _not_null(value, field):
return float(value)


def _bool_from_json(value, field):
if _not_null(value, field):
return value.lower() in ['t', 'true', '1']


def _datetime_from_json(value, field):
if _not_null(value, field):

This comment was marked as spam.

return _datetime_from_prop(float(value))


def _record_from_json(value, field):
if _not_null(value, field):
record = {}
for subfield, cell in zip(field.fields, value['f']):
converter = _CELLDATA_FROM_JSON[subfield.field_type]
if field.mode == 'REPEATED':
value = [converter(item, field) for item in cell['v']]
else:
value = converter(cell['v'], field)
record[subfield.name] = value
return record


def _string_from_json(value, _):
return value


_CELLDATA_FROM_JSON = {
'INTEGER': _int_from_json,
'FLOAT': _float_from_json,
'BOOLEAN': _bool_from_json,
'TIMESTAMP': _datetime_from_json,

This comment was marked as spam.

This comment was marked as spam.

'RECORD': _record_from_json,
'STRING': _string_from_json,
}

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

Loading