diff --git a/gcloud/bigquery/table.py b/gcloud/bigquery/table.py index bd537375ce2f..67ac13e09a78 100644 --- a/gcloud/bigquery/table.py +++ b/gcloud/bigquery/table.py @@ -312,6 +312,30 @@ def _require_client(self, client): client = self._dataset._client return client + def _parse_schema_resource(self, info): + """Parse a resource fragment into a schema field. + + :type info: mapping + :param info: should contain a "fields" key to be parsed + + :rtype: list of :class:`SchemaField`, or ``NoneType`` + :returns: a list of parsed fields, or ``None`` if no "fields" key is + present in ``info``. + """ + if 'fields' not in info: + return None + + schema = [] + for r_field in info['fields']: + name = r_field['name'] + field_type = r_field['type'] + mode = r_field['mode'] + description = r_field.get('description') + sub_fields = self._parse_schema_resource(r_field) + schema.append( + SchemaField(name, field_type, mode, description, sub_fields)) + return schema + def _set_properties(self, api_response): """Update properties from resource in body of ``api_response`` @@ -320,6 +344,8 @@ def _set_properties(self, api_response): """ self._properties.clear() cleaned = api_response.copy() + schema = cleaned.pop('schema', {}) + self.schema = self._parse_schema_resource(schema) if 'creationTime' in cleaned: cleaned['creationTime'] = float(cleaned['creationTime']) if 'lastModifiedTime' in cleaned: @@ -525,3 +551,185 @@ def delete(self, client=None): """ client = self._require_client(client) client.connection.api_request(method='DELETE', path=self.path) + + def fetch_data(self, max_results=None, page_token=None, client=None): + """API call: fetch the table data via a GET request + + See: + https://cloud.google.com/bigquery/reference/rest/v2/tabledata/list + + .. note:: + + This method assumes that its instance's ``schema`` attribute is + up-to-date with the schema as defined on the back-end: if the + two schemas are not identical, the values returned may be + incomplete. To ensure that the local copy of the schema is + up-to-date, call the table's ``reload`` method. + + :type max_results: integer or ``NoneType`` + :param max_results: maximum number of rows to return. + + :type page_token: string or ``NoneType`` + :param page_token: token representing a cursor into the table's rows. + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + + :rtype: tuple + :returns: ``(row_data, total_rows, page_token)``, where ``row_data`` + is a list of tuples, one per result row, containing only + the values; ``total_rows`` is a count of the total number + of rows in the table; and ``page_token`` is an opaque + string which can be used to fetch the next batch of rows + (``None`` if no further batches can be fetched). + """ + client = self._require_client(client) + params = {} + + if max_results is not None: + params['maxResults'] = max_results + + if page_token is not None: + params['pageToken'] = page_token + + response = client.connection.api_request(method='GET', + path='%s/data' % self.path, + query_params=params) + total_rows = response.get('totalRows') + page_token = response.get('pageToken') + rows_data = [] + + for row in response.get('rows', ()): + row_data = [] + for field, cell in zip(self._schema, row['f']): + converter = _CELLDATA_FROM_JSON[field.field_type] + if field.mode == 'REPEATED': + row_data.append([converter(item, field) + for item in cell['v']]) + else: + row_data.append(converter(cell['v'], field)) + rows_data.append(tuple(row_data)) + + return rows_data, total_rows, page_token + + def insert_data(self, + rows, + row_ids=None, + skip_invalid_rows=None, + ignore_unknown_values=None, + client=None): + """API call: insert table data via a POST request + + See: + https://cloud.google.com/bigquery/reference/rest/v2/tabledata/insertAll + + :type rows: list of tuples + :param rows: row data to be inserted + + :type row_ids: list of string + :param row_ids: Unique ids, one per row being inserted. If not + passed, no de-duplication occurs. + + :type skip_invalid_rows: boolean or ``NoneType`` + :param skip_invalid_rows: skip rows w/ invalid data? + + :type ignore_unknown_values: boolean or ``NoneType`` + :param ignore_unknown_values: ignore columns beyond schema? + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + + :rtype: list of mappings + :returns: One mapping per row with insert errors: the "index" key + identifies the row, and the "errors" key contains a list + of the mappings describing one or more problems with the + row. + """ + client = self._require_client(client) + rows_info = [] + data = {'rows': rows_info} + + for index, row in enumerate(rows): + row_info = {} + + for field, value in zip(self._schema, row): + if field.field_type == 'TIMESTAMP': + value = _prop_from_datetime(value) + row_info[field.name] = value + + info = {'json': row_info} + if row_ids is not None: + info['insertId'] = row_ids[index] + + rows_info.append(info) + + if skip_invalid_rows is not None: + data['skipInvalidRows'] = skip_invalid_rows + + if ignore_unknown_values is not None: + data['ignoreUnknownValues'] = ignore_unknown_values + + response = client.connection.api_request( + method='POST', + path='%s/insertAll' % self.path, + data=data) + errors = [] + + for error in response.get('insertErrors', ()): + errors.append({'index': int(error['index']), + 'errors': error['errors']}) + + return errors + + +def _not_null(value, field): + return value is not None or field.mode != 'NULLABLE' + + +def _int_from_json(value, field): + if _not_null(value, field): + return int(value) + + +def _float_from_json(value, field): + if _not_null(value, field): + return float(value) + + +def _bool_from_json(value, field): + if _not_null(value, field): + return value.lower() in ['t', 'true', '1'] + + +def _datetime_from_json(value, field): + if _not_null(value, field): + return _datetime_from_prop(float(value)) + + +def _record_from_json(value, field): + if _not_null(value, field): + record = {} + for subfield, cell in zip(field.fields, value['f']): + converter = _CELLDATA_FROM_JSON[subfield.field_type] + if field.mode == 'REPEATED': + value = [converter(item, field) for item in cell['v']] + else: + value = converter(cell['v'], field) + record[subfield.name] = value + return record + + +def _string_from_json(value, _): + return value + + +_CELLDATA_FROM_JSON = { + 'INTEGER': _int_from_json, + 'FLOAT': _float_from_json, + 'BOOLEAN': _bool_from_json, + 'TIMESTAMP': _datetime_from_json, + 'RECORD': _record_from_json, + 'STRING': _string_from_json, +} diff --git a/gcloud/bigquery/test_table.py b/gcloud/bigquery/test_table.py index dcc30b16e5fb..5c678ebe545f 100644 --- a/gcloud/bigquery/test_table.py +++ b/gcloud/bigquery/test_table.py @@ -105,6 +105,18 @@ def _makeResource(self): 'type': 'TABLE', } + def _verify_field(self, field, r_field): + self.assertEqual(field.name, r_field['name']) + self.assertEqual(field.field_type, r_field['type']) + self.assertEqual(field.mode, r_field['mode']) + + def _verifySchema(self, schema, resource): + r_fields = resource['schema']['fields'] + self.assertEqual(len(schema), len(r_fields)) + + for field, r_field in zip(schema, r_fields): + self._verify_field(field, r_field) + def _verifyResourceProperties(self, table, resource): self.assertEqual(table.created, self.WHEN) self.assertEqual(table.etag, self.ETAG) @@ -127,6 +139,8 @@ def _verifyResourceProperties(self, table, resource): else: self.assertEqual(table.view_query, None) + self._verifySchema(table.schema, resource) + def test_ctor(self): client = _Client(self.PROJECT) dataset = _Dataset(client) @@ -302,6 +316,32 @@ def test_view_query_deleter(self): del table.view_query self.assertEqual(table.view_query, None) + def test__parse_schema_resource_defaults(self): + client = _Client(self.PROJECT) + dataset = _Dataset(client) + table = self._makeOne(self.TABLE_NAME, dataset) + RESOURCE = self._makeResource() + schema = table._parse_schema_resource(RESOURCE['schema']) + self._verifySchema(schema, RESOURCE) + + def test__parse_schema_resource_subfields(self): + client = _Client(self.PROJECT) + dataset = _Dataset(client) + table = self._makeOne(self.TABLE_NAME, dataset) + RESOURCE = self._makeResource() + RESOURCE['schema']['fields'].append( + {'name': 'phone', + 'type': 'RECORD', + 'mode': 'REPEATABLE', + 'fields': [{'name': 'type', + 'type': 'STRING', + 'mode': 'REQUIRED'}, + {'name': 'number', + 'type': 'STRING', + 'mode': 'REQUIRED'}]}) + schema = table._parse_schema_resource(RESOURCE['schema']) + self._verifySchema(schema, RESOURCE) + def test__build_schema_resource_defaults(self): from gcloud.bigquery.table import SchemaField client = _Client(self.PROJECT) @@ -623,7 +663,7 @@ def test_patch_w_alternate_client(self): dataset = _Dataset(client1) table = self._makeOne(self.TABLE_NAME, dataset=dataset) full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') - age = SchemaField('age', 'INTEGER', mode='OPTIONAL') + age = SchemaField('age', 'INTEGER', mode='NULLABLE') table.patch(client=client2, view_query=QUERY, location=LOCATION, expires=self.EXP_TIME, schema=[full_name, age]) @@ -639,7 +679,7 @@ def test_patch_w_alternate_client(self): 'expirationTime': _millis(self.EXP_TIME), 'schema': {'fields': [ {'name': 'full_name', 'type': 'STRING', 'mode': 'REQUIRED'}, - {'name': 'age', 'type': 'INTEGER', 'mode': 'OPTIONAL'}]}, + {'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}, } self.assertEqual(req['data'], SENT) self._verifyResourceProperties(table, RESOURCE) @@ -795,6 +835,418 @@ def test_delete_w_alternate_client(self): self.assertEqual(req['method'], 'DELETE') self.assertEqual(req['path'], '/%s' % PATH) + def test_fetch_data_w_bound_client(self): + import datetime + import pytz + from gcloud.bigquery.table import SchemaField + from gcloud.bigquery._helpers import _prop_from_datetime + PATH = 'projects/%s/datasets/%s/tables/%s/data' % ( + self.PROJECT, self.DS_NAME, self.TABLE_NAME) + WHEN_TS = 1437767599.006 + WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( + tzinfo=pytz.UTC) + WHEN_1 = WHEN + datetime.timedelta(seconds=1) + WHEN_2 = WHEN + datetime.timedelta(seconds=2) + ROWS = 1234 + TOKEN = 'TOKEN' + DATA = { + 'totalRows': ROWS, + 'pageToken': TOKEN, + 'rows': [ + {'f': [ + {'v': 'Phred Phlyntstone'}, + {'v': '32'}, + {'v': _prop_from_datetime(WHEN)}, + ]}, + {'f': [ + {'v': 'Bharney Rhubble'}, + {'v': '33'}, + {'v': _prop_from_datetime(WHEN_1)}, + ]}, + {'f': [ + {'v': 'Wylma Phlyntstone'}, + {'v': '29'}, + {'v': _prop_from_datetime(WHEN_2)}, + ]}, + {'f': [ + {'v': 'Bhettye Rhubble'}, + {'v': None}, + {'v': None}, + ]}, + ] + } + conn = _Connection(DATA) + client = _Client(project=self.PROJECT, connection=conn) + dataset = _Dataset(client) + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + age = SchemaField('age', 'INTEGER', mode='NULLABLE') + joined = SchemaField('joined', 'TIMESTAMP', mode='NULLABLE') + table = self._makeOne(self.TABLE_NAME, dataset=dataset, + schema=[full_name, age, joined]) + + rows, total_rows, page_token = table.fetch_data() + + self.assertEqual(len(rows), 4) + self.assertEqual(rows[0], ('Phred Phlyntstone', 32, WHEN)) + self.assertEqual(rows[1], ('Bharney Rhubble', 33, WHEN_1)) + self.assertEqual(rows[2], ('Wylma Phlyntstone', 29, WHEN_2)) + self.assertEqual(rows[3], ('Bhettye Rhubble', None, None)) + self.assertEqual(total_rows, ROWS) + self.assertEqual(page_token, TOKEN) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_fetch_data_w_alternate_client(self): + from gcloud.bigquery.table import SchemaField + PATH = 'projects/%s/datasets/%s/tables/%s/data' % ( + self.PROJECT, self.DS_NAME, self.TABLE_NAME) + MAX = 10 + ROWS = 1234 + TOKEN = 'TOKEN' + DATA = { + 'totalRows': ROWS, + 'rows': [ + {'f': [ + {'v': 'Phred Phlyntstone'}, + {'v': '32'}, + {'v': 'true'}, + {'v': '3.1415926'}, + ]}, + {'f': [ + {'v': 'Bharney Rhubble'}, + {'v': '33'}, + {'v': 'false'}, + {'v': '1.414'}, + ]}, + {'f': [ + {'v': 'Wylma Phlyntstone'}, + {'v': '29'}, + {'v': 'true'}, + {'v': '2.71828'}, + ]}, + {'f': [ + {'v': 'Bhettye Rhubble'}, + {'v': '27'}, + {'v': None}, + {'v': None}, + ]}, + ] + } + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(DATA) + client2 = _Client(project=self.PROJECT, connection=conn2) + dataset = _Dataset(client1) + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + age = SchemaField('age', 'INTEGER', mode='REQUIRED') + voter = SchemaField('voter', 'BOOLEAN', mode='NULLABLE') + score = SchemaField('score', 'FLOAT', mode='NULLABLE') + table = self._makeOne(self.TABLE_NAME, dataset=dataset, + schema=[full_name, age, voter, score]) + + rows, total_rows, page_token = table.fetch_data(client=client2, + max_results=MAX, + page_token=TOKEN) + + self.assertEqual(len(rows), 4) + self.assertEqual(rows[0], ('Phred Phlyntstone', 32, True, 3.1415926)) + self.assertEqual(rows[1], ('Bharney Rhubble', 33, False, 1.414)) + self.assertEqual(rows[2], ('Wylma Phlyntstone', 29, True, 2.71828)) + self.assertEqual(rows[3], ('Bhettye Rhubble', 27, None, None)) + self.assertEqual(total_rows, ROWS) + self.assertEqual(page_token, None) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], + {'maxResults': MAX, 'pageToken': TOKEN}) + + def test_fetch_data_w_repeated_fields(self): + from gcloud.bigquery.table import SchemaField + PATH = 'projects/%s/datasets/%s/tables/%s/data' % ( + self.PROJECT, self.DS_NAME, self.TABLE_NAME) + ROWS = 1234 + TOKEN = 'TOKEN' + DATA = { + 'totalRows': ROWS, + 'pageToken': TOKEN, + 'rows': [ + {'f': [ + {'v': ['red', 'green']}, + {'v': [{'f': [{'v': ['1', '2']}, + {'v': ['3.1415', '1.414']}]}]}, + ]}, + ] + } + conn = _Connection(DATA) + client = _Client(project=self.PROJECT, connection=conn) + dataset = _Dataset(client) + full_name = SchemaField('color', 'STRING', mode='REPEATED') + index = SchemaField('index', 'INTEGER', 'REPEATED') + score = SchemaField('score', 'FLOAT', 'REPEATED') + struct = SchemaField('struct', 'RECORD', mode='REPEATED', + fields=[index, score]) + table = self._makeOne(self.TABLE_NAME, dataset=dataset, + schema=[full_name, struct]) + + rows, total_rows, page_token = table.fetch_data() + + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0][0], ['red', 'green']) + self.assertEqual(rows[0][1], [{'index': [1, 2], + 'score': [3.1415, 1.414]}]) + self.assertEqual(total_rows, ROWS) + self.assertEqual(page_token, TOKEN) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_fetch_data_w_record_schema(self): + from gcloud.bigquery.table import SchemaField + PATH = 'projects/%s/datasets/%s/tables/%s/data' % ( + self.PROJECT, self.DS_NAME, self.TABLE_NAME) + ROWS = 1234 + TOKEN = 'TOKEN' + DATA = { + 'totalRows': ROWS, + 'pageToken': TOKEN, + 'rows': [ + {'f': [ + {'v': 'Phred Phlyntstone'}, + {'v': {'f': [{'v': '800'}, {'v': '555-1212'}, {'v': 1}]}}, + ]}, + {'f': [ + {'v': 'Bharney Rhubble'}, + {'v': {'f': [{'v': '877'}, {'v': '768-5309'}, {'v': 2}]}}, + ]}, + {'f': [ + {'v': 'Wylma Phlyntstone'}, + {'v': None}, + ]}, + ] + } + conn = _Connection(DATA) + client = _Client(project=self.PROJECT, connection=conn) + dataset = _Dataset(client) + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + area_code = SchemaField('area_code', 'STRING', 'REQUIRED') + local_number = SchemaField('local_number', 'STRING', 'REQUIRED') + rank = SchemaField('rank', 'INTEGER', 'REQUIRED') + phone = SchemaField('phone', 'RECORD', mode='NULLABLE', + fields=[area_code, local_number, rank]) + table = self._makeOne(self.TABLE_NAME, dataset=dataset, + schema=[full_name, phone]) + + rows, total_rows, page_token = table.fetch_data() + + self.assertEqual(len(rows), 3) + self.assertEqual(rows[0][0], 'Phred Phlyntstone') + self.assertEqual(rows[0][1], {'area_code': '800', + 'local_number': '555-1212', + 'rank': 1}) + self.assertEqual(rows[1][0], 'Bharney Rhubble') + self.assertEqual(rows[1][1], {'area_code': '877', + 'local_number': '768-5309', + 'rank': 2}) + self.assertEqual(rows[2][0], 'Wylma Phlyntstone') + self.assertEqual(rows[2][1], None) + self.assertEqual(total_rows, ROWS) + self.assertEqual(page_token, TOKEN) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_insert_data_w_bound_client(self): + import datetime + import pytz + from gcloud.bigquery._helpers import _prop_from_datetime + from gcloud.bigquery.table import SchemaField + WHEN_TS = 1437767599.006 + WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( + tzinfo=pytz.UTC) + PATH = 'projects/%s/datasets/%s/tables/%s/insertAll' % ( + self.PROJECT, self.DS_NAME, self.TABLE_NAME) + conn = _Connection({}) + client = _Client(project=self.PROJECT, connection=conn) + dataset = _Dataset(client) + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + age = SchemaField('age', 'INTEGER', mode='REQUIRED') + joined = SchemaField('joined', 'TIMESTAMP', mode='NULLABLE') + table = self._makeOne(self.TABLE_NAME, dataset=dataset, + schema=[full_name, age, joined]) + ROWS = [ + ('Phred Phlyntstone', 32, WHEN), + ('Bharney Rhubble', 33, WHEN + datetime.timedelta(seconds=1)), + ('Wylma Phlyntstone', 29, WHEN + datetime.timedelta(seconds=2)), + ('Bhettye Rhubble', 27, None), + ] + + def _row_data(row): + return {'full_name': row[0], + 'age': row[1], + 'joined': _prop_from_datetime(row[2])} + + SENT = { + 'rows': [{'json': _row_data(row)} for row in ROWS], + } + + errors = table.insert_data(ROWS) + + self.assertEqual(len(errors), 0) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['data'], SENT) + + def test_insert_data_w_alternate_client(self): + from gcloud.bigquery.table import SchemaField + PATH = 'projects/%s/datasets/%s/tables/%s/insertAll' % ( + self.PROJECT, self.DS_NAME, self.TABLE_NAME) + RESPONSE = { + 'insertErrors': [ + {'index': 1, + 'errors': [ + {'reason': 'REASON', + 'location': 'LOCATION', + 'debugInfo': 'INFO', + 'message': 'MESSAGE'} + ]}, + ]} + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESPONSE) + client2 = _Client(project=self.PROJECT, connection=conn2) + dataset = _Dataset(client1) + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + age = SchemaField('age', 'INTEGER', mode='REQUIRED') + voter = SchemaField('voter', 'BOOLEAN', mode='NULLABLE') + table = self._makeOne(self.TABLE_NAME, dataset=dataset, + schema=[full_name, age, voter]) + ROWS = [ + ('Phred Phlyntstone', 32, True), + ('Bharney Rhubble', 33, False), + ('Wylma Phlyntstone', 29, True), + ('Bhettye Rhubble', 27, True), + ] + + def _row_data(row): + return {'full_name': row[0], 'age': row[1], 'voter': row[2]} + + SENT = { + 'skipInvalidRows': True, + 'ignoreUnknownValues': True, + 'rows': [{'insertId': index, 'json': _row_data(row)} + for index, row in enumerate(ROWS)], + } + + errors = table.insert_data( + client=client2, + rows=ROWS, + row_ids=[index for index, _ in enumerate(ROWS)], + skip_invalid_rows=True, + ignore_unknown_values=True) + + self.assertEqual(len(errors), 1) + self.assertEqual(errors[0]['index'], 1) + self.assertEqual(len(errors[0]['errors']), 1) + self.assertEqual(errors[0]['errors'][0], + RESPONSE['insertErrors'][0]['errors'][0]) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['data'], SENT) + + def test_insert_data_w_repeated_fields(self): + from gcloud.bigquery.table import SchemaField + PATH = 'projects/%s/datasets/%s/tables/%s/insertAll' % ( + self.PROJECT, self.DS_NAME, self.TABLE_NAME) + conn = _Connection({}) + client = _Client(project=self.PROJECT, connection=conn) + dataset = _Dataset(client) + full_name = SchemaField('color', 'STRING', mode='REPEATED') + index = SchemaField('index', 'INTEGER', 'REPEATED') + score = SchemaField('score', 'FLOAT', 'REPEATED') + struct = SchemaField('struct', 'RECORD', mode='REPEATED', + fields=[index, score]) + table = self._makeOne(self.TABLE_NAME, dataset=dataset, + schema=[full_name, struct]) + ROWS = [ + (['red', 'green'], [{'index': [1, 2], 'score': [3.1415, 1.414]}]), + ] + + def _row_data(row): + return {'color': row[0], + 'struct': row[1]} + + SENT = { + 'rows': [{'json': _row_data(row)} for row in ROWS], + } + + errors = table.insert_data(ROWS) + + self.assertEqual(len(errors), 0) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['data'], SENT) + + def test_insert_data_w_record_schema(self): + from gcloud.bigquery.table import SchemaField + PATH = 'projects/%s/datasets/%s/tables/%s/insertAll' % ( + self.PROJECT, self.DS_NAME, self.TABLE_NAME) + conn = _Connection({}) + client = _Client(project=self.PROJECT, connection=conn) + dataset = _Dataset(client) + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + area_code = SchemaField('area_code', 'STRING', 'REQUIRED') + local_number = SchemaField('local_number', 'STRING', 'REQUIRED') + rank = SchemaField('rank', 'INTEGER', 'REQUIRED') + phone = SchemaField('phone', 'RECORD', mode='NULLABLE', + fields=[area_code, local_number, rank]) + table = self._makeOne(self.TABLE_NAME, dataset=dataset, + schema=[full_name, phone]) + ROWS = [ + ('Phred Phlyntstone', {'area_code': '800', + 'local_number': '555-1212', + 'rank': 1}), + ('Bharney Rhubble', {'area_code': '877', + 'local_number': '768-5309', + 'rank': 2}), + ('Wylma Phlyntstone', None), + ] + + def _row_data(row): + return {'full_name': row[0], + 'phone': row[1]} + + SENT = { + 'rows': [{'json': _row_data(row)} for row in ROWS], + } + + errors = table.insert_data(ROWS) + + self.assertEqual(len(errors), 0) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['data'], SENT) + class _Client(object):