Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Job-Related Types
:toctree: generated

job.Compression
job.ConnectionProperty
job.CreateDisposition
job.DestinationFormat
job.DmlStats
Expand All @@ -64,6 +65,7 @@ Job-Related Types
job.QueryPlanEntryStep
job.QueryPriority
job.ReservationUsage
job.ScriptOptions
job.SourceFormat
job.WriteDisposition
job.SchemaUpdateOption
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from google.cloud.bigquery.format_options import AvroOptions
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery.job import Compression
from google.cloud.bigquery.job import ConnectionProperty
from google.cloud.bigquery.job import CopyJob
from google.cloud.bigquery.job import CopyJobConfig
from google.cloud.bigquery.job import CreateDisposition
Expand Down Expand Up @@ -149,6 +150,7 @@
"BigtableOptions",
"BigtableColumnFamily",
"BigtableColumn",
"ConnectionProperty",
"DmlStats",
"CSVOptions",
"GoogleSheetsOptions",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from google.cloud.bigquery.job.load import LoadJob
from google.cloud.bigquery.job.load import LoadJobConfig
from google.cloud.bigquery.job.query import _contains_order_by
from google.cloud.bigquery.job.query import ConnectionProperty
from google.cloud.bigquery.job.query import DmlStats
from google.cloud.bigquery.job.query import QueryJob
from google.cloud.bigquery.job.query import QueryJobConfig
Expand Down Expand Up @@ -68,6 +69,7 @@
"LoadJob",
"LoadJobConfig",
"_contains_order_by",
"ConnectionProperty",
"DmlStats",
"QueryJob",
"QueryJobConfig",
Expand Down
125 changes: 123 additions & 2 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import copy
import re
import typing
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Union

from google.api_core import exceptions
from google.api_core.future import polling as polling_future
Expand Down Expand Up @@ -222,6 +222,62 @@ def key_result_statement(self, value: Union[KeyResultStatementKind, None]):
self._properties["keyResultStatement"] = value


class ConnectionProperty:
"""A connection-level property to customize query behavior.

See the product documention for the list of supported connection properties:
https://cloud.google.com/bigquery/docs/reference/rest/v2/ConnectionProperty

.. versionadded:: 2.29.0
"""

def __init__(self, key: str = None, value: str = None):
self._properties = {}

self.key = key
self.value = value

@classmethod
def from_api_repr(cls, resource: Dict[str, str]) -> "ConnectionProperty":
"""Factory: construct instance from the JSON repr.

Args:
resource:
ConnectionProperty representation returned from API.
"""
instance = cls()
instance._properties = copy.deepcopy(resource)
return instance

def to_api_repr(self) -> Dict[str, str]:
"""Construct the API resource representation."""
return copy.deepcopy(self._properties)

@property
def key(self) -> Optional[str]:
"""The key of the connection property to set."""
return self._properties.get("key")

@key.setter
def key(self, value: Optional[str]):
if value is None:
_helpers._del_sub_prop(self._properties, ["key"])
else:
self._properties["key"] = value

@property
def value(self) -> Optional[str]:
"""The value of the connection property to set."""
return self._properties.get("value")

@value.setter
def value(self, value: Optional[str]):
if value is None:
_helpers._del_sub_prop(self._properties, ["value"])
else:
self._properties["value"] = value


class QueryJobConfig(_JobConfig):
"""Configuration options for query jobs.

Expand Down Expand Up @@ -283,6 +339,56 @@ def create_disposition(self):
def create_disposition(self, value):
self._set_sub_prop("createDisposition", value)

@property
def connection_properties(self) -> Optional[List[ConnectionProperty]]:
"""Connection properties which can modify the query behavior.

See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.connection_properties

.. versionadded:: 2.29.0
"""
properties = self._get_sub_prop("connectionProperties")
if properties is None:
return None
else:
return [ConnectionProperty.from_api_repr(prop) for prop in properties]

@connection_properties.setter
def connection_properties(self, value: Optional[Iterable[ConnectionProperty]]):
if value is not None:
value = [item.to_api_repr() for item in value]
self._set_sub_prop("connectionProperties", value)

@property
def create_session(self) -> Optional[bool]:
"""If ``True``, the job creates a new session using a random session ID.

To continue using a created session with subsequent queries, pass the existing
session identifier as a
:class:`~google.cloud.bigquery.job.query.ConnectionProperty` value. The session
identifier is returned as the
:attr:`~google.cloud.bigquery.job.query.QueryJob.session_id` property of the
query job instance.

The new session's location will be set to Job.JobReference.location if it is
present, otherwise it's set to the default location based on existing routing
logic.

See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_session

.. versionadded:: 2.29.0
"""
return self._get_sub_prop("createSession")

@create_session.setter
def create_session(self, value: Optional[bool]) -> None:
if value is None:
self._del_sub_prop("createSession")
else:
self._set_sub_prop("createSession", value)

@property
def default_dataset(self):
"""google.cloud.bigquery.dataset.DatasetReference: the default dataset
Expand Down Expand Up @@ -613,7 +719,7 @@ def schema_update_options(self, values):

@property
def script_options(self) -> ScriptOptions:
"""Connection properties which can modify the query behavior.
"""Options controlling the execution of scripts.

https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#scriptoptions
"""
Expand Down Expand Up @@ -1026,6 +1132,21 @@ def num_dml_affected_rows(self):
result = int(result)
return result

@property
def session_id(self) -> Optional[str]:
"""The session ID the query was part of, if any.

See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.session_info
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: Session info currently does not exist in the docs, thus this (predicted) anchor on the page currently doesn't have any effect.


.. versionadded:: 2.29.0
"""
return (
self._properties.get("statistics", {})
.get("sessionInfo", {})
.get("sessionId")
)

@property
def slot_millis(self):
"""Union[int, None]: Slot-milliseconds used by this query job."""
Expand Down
27 changes: 27 additions & 0 deletions tests/system/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,30 @@ def test_dry_run(bigquery_client: bigquery.Client, scalars_table: str):
assert query_job.dry_run is True
assert query_job.total_bytes_processed > 0
assert len(query_job.schema) > 0


def test_query_session(bigquery_client: bigquery.Client):
# CREATE TEMPORARY TABLE requires a script, a plain statement would not do.
sql = """
DECLARE my_number INT64;
SET my_number = 123;
CREATE TEMPORARY TABLE tbl_temp AS SELECT my_number AS foo;
"""
job_config = bigquery.QueryJobConfig(create_session=True)
query_job = bigquery_client.query(sql, job_config=job_config)
query_job.result()

session_id = query_job.session_id
assert session_id is not None

job_config = bigquery.QueryJobConfig(
connection_properties=[
bigquery.ConnectionProperty(key="session_id", value=session_id)
]
)
query_job_2 = bigquery_client.query("SELECT * FROM tbl_temp", job_config=job_config)
result = query_job_2.result() # No error if the session works.

rows = list(result)
assert len(rows) == 1
assert list(rows[0].items()) == [("foo", 123)]
27 changes: 27 additions & 0 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,17 @@ def test_from_api_repr_w_properties(self):
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_with_session_info(self):
klass = self._get_target_class()
client = _make_client(project=self.PROJECT)
RESOURCE = self._make_resource()
RESOURCE["statistics"] = {"sessionInfo": {"sessionId": "abc123=="}}

job = klass.from_api_repr(RESOURCE, client=client)

self.assertIs(job._client, client)
self.assertEqual(job.session_id, "abc123==")

def test_cancelled(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, self.QUERY, client)
Expand Down Expand Up @@ -859,6 +870,22 @@ def test_undeclared_query_parameters(self):
self.assertEqual(struct.struct_types, {"count": "INT64"})
self.assertEqual(struct.struct_values, {"count": 123})

def test_session_id(self):
session_id = "aHR0cHM6Ly95b3V0dS5iZS9kUXc0dzlXZ1hjUQ==" # not random

client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, self.QUERY, client)
assert job.session_id is None

statistics = job._properties["statistics"] = {}
assert job.session_id is None

session_info = statistics["sessionInfo"] = {}
assert job.session_id is None

session_info["sessionId"] = session_id
assert job.session_id == session_id

def test_estimated_bytes_processed(self):
est_bytes = 123456

Expand Down
72 changes: 72 additions & 0 deletions tests/unit/job/test_query_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import operator as op

import pytest

from .helpers import _Base
Expand Down Expand Up @@ -60,6 +62,27 @@ def test_ctor_w_string_destinaton(self):
expected = table.TableReference.from_string(destination)
self.assertEqual(config.destination, expected)

def test_create_session_w_none(self):
object_under_test = self._get_target_class()()
assert object_under_test.create_session is None

def test_create_session_w_value(self):
object_under_test = self._get_target_class()()
object_under_test._properties["query"]["createSession"] = True
assert object_under_test.create_session

def test_create_session_setter_w_none(self):
object_under_test = self._get_target_class()()
object_under_test._properties["query"]["createSession"] = True
object_under_test.create_session = None
assert "createSession" not in object_under_test._properties["query"]

def test_create_session_setter_w_value(self):
object_under_test = self._get_target_class()()
object_under_test._properties["query"]["createSession"] = False
object_under_test.create_session = True
assert object_under_test.create_session

def test_default_dataset_w_string(self):
from google.cloud.bigquery import dataset

Expand Down Expand Up @@ -309,3 +332,52 @@ def test_from_api_repr_with_script_options(self):
self.assertEqual(
script_options.key_result_statement, KeyResultStatementKind.LAST
)

def test_to_api_repr_with_no_connection_properties(self):
config = self._make_one()
config.connection_properties = None

resource = config.to_api_repr()

assert resource == {"query": {"connectionProperties": None}}
assert config.connection_properties is None

def test_to_api_repr_with_connection_properties(self):
from google.cloud.bigquery import ConnectionProperty

config = self._make_one()
config.connection_properties = (
ConnectionProperty(key="foo", value="bar"),
ConnectionProperty(key="baz", value="quux"),
)

resource = config.to_api_repr()

expected_conn_properties = [
{"key": "foo", "value": "bar"},
{"key": "baz", "value": "quux"},
]
assert resource == {"query": {"connectionProperties": expected_conn_properties}}

def test_from_api_repr_with_connection_properties(self):
from google.cloud.bigquery import ConnectionProperty

resource = {
"query": {
"connectionProperties": [
{"key": "foo", "value": "bar"},
{"key": "baz", "value": "quux"},
]
},
}
klass = self._get_target_class()

config = klass.from_api_repr(resource)

# The exact order of the properties does not matter.
conn_properties = sorted(config.connection_properties, key=op.attrgetter("key"))

assert len(conn_properties) == 2
assert all(isinstance(prop, ConnectionProperty) for prop in conn_properties)
assert (conn_properties[0].key, conn_properties[0].value) == ("baz", "quux")
assert (conn_properties[1].key, conn_properties[1].value) == ("foo", "bar")