-
Notifications
You must be signed in to change notification settings - Fork 319
feat: Support passing struct data to the DB API #718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
586a022
13f9107
aea7bae
6f26130
c386448
71c8614
6f5b345
e08f6f6
411c336
ef2b323
654b108
525b8fd
462f2eb
904d2ce
a6393e6
e63e8b7
2f2bdcd
91b0028
35555aa
0d81b80
d3f959c
5554301
8830113
a72cafb
cba9697
12bd941
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,18 +18,34 @@ | |
import decimal | ||
import functools | ||
import numbers | ||
import re | ||
import typing | ||
|
||
from google.cloud import bigquery | ||
from google.cloud.bigquery import table, enums | ||
from google.cloud.bigquery import table, enums, query | ||
from google.cloud.bigquery.dbapi import exceptions | ||
|
||
|
||
_NUMERIC_SERVER_MIN = decimal.Decimal("-9.9999999999999999999999999999999999999E+28") | ||
_NUMERIC_SERVER_MAX = decimal.Decimal("9.9999999999999999999999999999999999999E+28") | ||
|
||
type_parameters_re = re.compile( | ||
r""" | ||
\( | ||
\s*[0-9]+\s* | ||
(, | ||
\s*[0-9]+\s* | ||
)* | ||
\) | ||
""", | ||
re.VERBOSE, | ||
) | ||
|
||
|
||
def _parameter_type(name, value, query_parameter_type=None, value_doc=""): | ||
if query_parameter_type: | ||
# Strip type parameters | ||
query_parameter_type = type_parameters_re.sub("", query_parameter_type) | ||
try: | ||
parameter_type = getattr( | ||
enums.SqlParameterScalarTypes, query_parameter_type.upper() | ||
|
@@ -113,6 +129,197 @@ def array_to_query_parameter(value, name=None, query_parameter_type=None): | |
return bigquery.ArrayQueryParameter(name, array_type, value) | ||
|
||
|
||
def _parse_struct_fields( | ||
fields, | ||
base, | ||
parse_struct_field=re.compile( | ||
r""" | ||
(?:(\w+)\s+) # field name | ||
([A-Z0-9<> ,()]+) # Field type | ||
$""", | ||
re.VERBOSE | re.IGNORECASE, | ||
).match, | ||
): | ||
# Split a string of struct fields. They're defined by commas, but | ||
# we have to avoid splitting on commas internal to fields. For | ||
# example: | ||
# name string, children array<struct<name string, bdate date>> | ||
# | ||
# only has 2 top-level fields. | ||
fields = fields.split(",") | ||
fields = list(reversed(fields)) # in the off chance that there are very many | ||
while fields: | ||
field = fields.pop() | ||
while fields and field.count("<") != field.count(">"): | ||
field += "," + fields.pop() | ||
|
||
m = parse_struct_field(field.strip()) | ||
if not m: | ||
raise exceptions.ProgrammingError( | ||
f"Invalid struct field, {field}, in {base}" | ||
) | ||
yield m.group(1, 2) | ||
|
||
|
||
SCALAR, ARRAY, STRUCT = "sar" | ||
|
||
|
||
def _parse_type( | ||
type_, | ||
name, | ||
base, | ||
complex_query_parameter_parse=re.compile( | ||
r""" | ||
\s* | ||
(ARRAY|STRUCT|RECORD) # Type | ||
\s* | ||
<([A-Z0-9<> ,()]+)> # Subtype(s) | ||
\s*$ | ||
""", | ||
re.IGNORECASE | re.VERBOSE, | ||
).match, | ||
): | ||
if "<" not in type_: | ||
# Scalar | ||
|
||
# Strip type parameters | ||
type_ = type_parameters_re.sub("", type_).strip() | ||
try: | ||
type_ = getattr(enums.SqlParameterScalarTypes, type_.upper()) | ||
except AttributeError: | ||
raise exceptions.ProgrammingError( | ||
f"The given parameter type, {type_}," | ||
f"{' for ' + name if name else ''}" | ||
f" is not a valid BigQuery scalar type, in {base}." | ||
) | ||
if name: | ||
type_ = type_.with_name(name) | ||
return SCALAR, type_ | ||
|
||
m = complex_query_parameter_parse(type_) | ||
if not m: | ||
raise exceptions.ProgrammingError(f"Invalid parameter type, {type_}") | ||
tname, sub = m.group(1, 2) | ||
if tname.upper() == "ARRAY": | ||
sub_type = complex_query_parameter_type(None, sub, base) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This allows for arrays of scalars, right? Looks like it from looking at the code, but then I'm a bit confused why we need the scalar logic in lines 183-197. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need the code on line 183-197 because we support arrays and structs of scalars. Basically, we allow arrays of anything (other than arrays). To handle that, we recurse to handle whatever's inside the |
||
if isinstance(sub_type, query.ArrayQueryParameterType): | ||
raise exceptions.ProgrammingError(f"Array can't contain an array in {base}") | ||
sub_type._complex__src = sub | ||
return ARRAY, sub_type | ||
else: | ||
return STRUCT, _parse_struct_fields(sub, base) | ||
|
||
|
||
def complex_query_parameter_type(name: typing.Optional[str], type_: str, base: str): | ||
"""Construct a parameter type (`StructQueryParameterType`) for a complex type | ||
|
||
or a non-complex type that's part of a complex type. | ||
|
||
Examples: | ||
|
||
array<struct<x float64, y float64>> | ||
|
||
struct<name string, children array<struct<name string, bdate date>>> | ||
|
||
This is used for computing array types. | ||
""" | ||
|
||
type_type, sub_type = _parse_type(type_, name, base) | ||
if type_type == SCALAR: | ||
type_ = sub_type | ||
elif type_type == ARRAY: | ||
type_ = query.ArrayQueryParameterType(sub_type, name=name) | ||
elif type_type == STRUCT: | ||
fields = [ | ||
complex_query_parameter_type(field_name, field_type, base) | ||
for field_name, field_type in sub_type | ||
] | ||
type_ = query.StructQueryParameterType(*fields, name=name) | ||
else: # pragma: NO COVER | ||
raise AssertionError("Bad type_type", type_type) # Can't happen :) | ||
|
||
return type_ | ||
|
||
|
||
def complex_query_parameter( | ||
name: typing.Optional[str], value, type_: str, base: typing.Optional[str] = None | ||
): | ||
plamut marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Construct a query parameter for a complex type (array or struct record) | ||
|
||
or for a subtype, which may not be complex | ||
|
||
Examples: | ||
|
||
array<struct<x float64, y float64>> | ||
|
||
struct<name string, children array<struct<name string, bdate date>>> | ||
|
||
""" | ||
base = base or type_ | ||
|
||
type_type, sub_type = _parse_type(type_, name, base) | ||
|
||
if type_type == SCALAR: | ||
param = query.ScalarQueryParameter(name, sub_type._type, value) | ||
elif type_type == ARRAY: | ||
if not array_like(value): | ||
raise exceptions.ProgrammingError( | ||
f"Array type with non-array-like value" | ||
f" with type {type(value).__name__}" | ||
) | ||
param = query.ArrayQueryParameter( | ||
name, | ||
sub_type, | ||
value | ||
if isinstance(sub_type, query.ScalarQueryParameterType) | ||
else [ | ||
complex_query_parameter(None, v, sub_type._complex__src, base) | ||
for v in value | ||
], | ||
) | ||
elif type_type == STRUCT: | ||
if not isinstance(value, collections_abc.Mapping): | ||
raise exceptions.ProgrammingError(f"Non-mapping value for type {type_}") | ||
value_keys = set(value) | ||
fields = [] | ||
for field_name, field_type in sub_type: | ||
if field_name not in value: | ||
raise exceptions.ProgrammingError( | ||
f"No field value for {field_name} in {type_}" | ||
) | ||
value_keys.remove(field_name) | ||
fields.append( | ||
complex_query_parameter(field_name, value[field_name], field_type, base) | ||
) | ||
if value_keys: | ||
raise exceptions.ProgrammingError(f"Extra data keys for {type_}") | ||
|
||
param = query.StructQueryParameter(name, *fields) | ||
else: # pragma: NO COVER | ||
raise AssertionError("Bad type_type", type_type) # Can't happen :) | ||
|
||
return param | ||
|
||
|
||
def _dispatch_parameter(type_, value, name=None): | ||
if type_ is not None and "<" in type_: | ||
param = complex_query_parameter(name, value, type_) | ||
elif isinstance(value, collections_abc.Mapping): | ||
raise NotImplementedError( | ||
f"STRUCT-like parameter values are not supported" | ||
f"{' (parameter ' + name + ')' if name else ''}," | ||
f" unless an explicit type is give in the parameter placeholder" | ||
f" (e.g. '%({name if name else ''}:struct<...>)s')." | ||
) | ||
elif array_like(value): | ||
param = array_to_query_parameter(value, name, type_) | ||
else: | ||
param = scalar_to_query_parameter(value, name, type_) | ||
|
||
return param | ||
|
||
|
||
def to_query_parameters_list(parameters, parameter_types): | ||
"""Converts a sequence of parameter values into query parameters. | ||
|
||
|
@@ -126,19 +333,10 @@ def to_query_parameters_list(parameters, parameter_types): | |
List[google.cloud.bigquery.query._AbstractQueryParameter]: | ||
A list of query parameters. | ||
""" | ||
result = [] | ||
|
||
for value, type_ in zip(parameters, parameter_types): | ||
if isinstance(value, collections_abc.Mapping): | ||
raise NotImplementedError("STRUCT-like parameter values are not supported.") | ||
elif array_like(value): | ||
param = array_to_query_parameter(value, None, type_) | ||
else: | ||
param = scalar_to_query_parameter(value, None, type_) | ||
|
||
result.append(param) | ||
|
||
return result | ||
return [ | ||
_dispatch_parameter(type_, value) | ||
for value, type_ in zip(parameters, parameter_types) | ||
] | ||
|
||
|
||
def to_query_parameters_dict(parameters, query_parameter_types): | ||
|
@@ -154,28 +352,10 @@ def to_query_parameters_dict(parameters, query_parameter_types): | |
List[google.cloud.bigquery.query._AbstractQueryParameter]: | ||
A list of named query parameters. | ||
""" | ||
result = [] | ||
|
||
for name, value in parameters.items(): | ||
if isinstance(value, collections_abc.Mapping): | ||
raise NotImplementedError( | ||
"STRUCT-like parameter values are not supported " | ||
"(parameter {}).".format(name) | ||
) | ||
else: | ||
query_parameter_type = query_parameter_types.get(name) | ||
if array_like(value): | ||
param = array_to_query_parameter( | ||
value, name=name, query_parameter_type=query_parameter_type | ||
) | ||
else: | ||
param = scalar_to_query_parameter( | ||
value, name=name, query_parameter_type=query_parameter_type, | ||
) | ||
|
||
result.append(param) | ||
|
||
return result | ||
return [ | ||
_dispatch_parameter(query_parameter_types.get(name), value, name) | ||
for name, value in parameters.items() | ||
] | ||
|
||
|
||
def to_query_parameters(parameters, parameter_types): | ||
|
Uh oh!
There was an error while loading. Please reload this page.