From ffde1bd690cc7fe58c46f54ac478ca31e4fb8ea0 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 10 Aug 2017 13:48:39 -0400 Subject: [PATCH] Drop 'Database.read' and 'Database.execute_sql' convenience methods. Because the context managers they use returned the session to the database's pool, application code could not safely iterate over the result sets returned by the methods. Update docs for 'Snapshot.read' and 'Snapshot.execute_sql' to emphasize iteration of their results sets before the session is returned to the database pool (i.e., within the 'with' block which constructs the snapshot). Closes #3769. --- docs/spanner/snapshot-usage.rst | 44 ++++++++++++----- spanner/google/cloud/spanner/database.py | 62 ------------------------ spanner/tests/system/test_system.py | 19 ++++---- spanner/tests/unit/test_database.py | 55 --------------------- 4 files changed, 42 insertions(+), 138 deletions(-) diff --git a/docs/spanner/snapshot-usage.rst b/docs/spanner/snapshot-usage.rst index d67533edb8f7..a23ff114c2fa 100644 --- a/docs/spanner/snapshot-usage.rst +++ b/docs/spanner/snapshot-usage.rst @@ -45,12 +45,22 @@ fails if the result set is too large, .. code:: python - result = snapshot.read( - table='table-name', columns=['first_name', 'last_name', 'age'], - key_set=['phred@example.com', 'bharney@example.com']) + with database.snapshot() as snapshot: + result = snapshot.read( + table='table-name', columns=['first_name', 'last_name', 'age'], + key_set=['phred@example.com', 'bharney@example.com']) - for row in result.rows: - print(row) + for row in result.rows: + print(row) + +.. note:: + + The result set returned by + :meth:`~google.cloud.spanner.snapshot.Snapshot.execute_sql` *must not* be + iterated after the snapshot's session has been returned to the database's + session pool. Therefore, unless your application creates sessions + manually, perform all iteration within the context of the + ``with database.snapshot()`` block. .. note:: @@ -68,14 +78,24 @@ fails if the result set is too large, .. code:: python - QUERY = ( - 'SELECT e.first_name, e.last_name, p.telephone ' - 'FROM employees as e, phones as p ' - 'WHERE p.employee_id == e.employee_id') - result = snapshot.execute_sql(QUERY) + with database.snapshot() as snapshot: + QUERY = ( + 'SELECT e.first_name, e.last_name, p.telephone ' + 'FROM employees as e, phones as p ' + 'WHERE p.employee_id == e.employee_id') + result = snapshot.execute_sql(QUERY) + + for row in result.rows: + print(row) + +.. note:: - for row in result.rows: - print(row) + The result set returned by + :meth:`~google.cloud.spanner.snapshot.Snapshot.execute_sql` *must not* be + iterated after the snapshot's session has been returned to the database's + session pool. Therefore, unless your application creates sessions + manually, perform all iteration within the context of the + ``with database.snapshot()`` block. Next Step diff --git a/spanner/google/cloud/spanner/database.py b/spanner/google/cloud/spanner/database.py index a984b88ed4b2..b098f7684b7c 100644 --- a/spanner/google/cloud/spanner/database.py +++ b/spanner/google/cloud/spanner/database.py @@ -313,68 +313,6 @@ def session(self): """ return Session(self) - def read(self, table, columns, keyset, index='', limit=0, - resume_token=b''): - """Perform a ``StreamingRead`` API request for rows in a table. - - :type table: str - :param table: name of the table from which to fetch data - - :type columns: list of str - :param columns: names of columns to be retrieved - - :type keyset: :class:`~google.cloud.spanner.keyset.KeySet` - :param keyset: keys / ranges identifying rows to be retrieved - - :type index: str - :param index: (Optional) name of index to use, rather than the - table's primary key - - :type limit: int - :param limit: (Optional) maxiumn number of rows to return - - :type resume_token: bytes - :param resume_token: token for resuming previously-interrupted read - - :rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet` - :returns: a result set instance which can be used to consume rows. - """ - with SessionCheckout(self._pool) as session: - return session.read( - table, columns, keyset, index, limit, resume_token) - - def execute_sql(self, sql, params=None, param_types=None, query_mode=None, - resume_token=b''): - """Perform an ``ExecuteStreamingSql`` API request. - - :type sql: str - :param sql: SQL query statement - - :type params: dict, {str -> column value} - :param params: values for parameter replacement. Keys must match - the names used in ``sql``. - - :type param_types: - dict, {str -> :class:`google.spanner.v1.type_pb2.TypeCode`} - :param param_types: (Optional) explicit types for one or more param - values; overrides default type detection on the - back-end. - - :type query_mode: - :class:`google.spanner.v1.spanner_pb2.ExecuteSqlRequest.QueryMode` - :param query_mode: Mode governing return of results / query plan. See - https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1 - - :type resume_token: bytes - :param resume_token: token for resuming previously-interrupted query - - :rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet` - :returns: a result set instance which can be used to consume rows. - """ - with SessionCheckout(self._pool) as session: - return session.execute_sql( - sql, params, param_types, query_mode, resume_token) - def run_in_transaction(self, func, *args, **kw): """Perform a unit of work in a transaction, retrying on abort. diff --git a/spanner/tests/system/test_system.py b/spanner/tests/system/test_system.py index fa70573c88de..f20ce592070a 100644 --- a/spanner/tests/system/test_system.py +++ b/spanner/tests/system/test_system.py @@ -297,7 +297,7 @@ def test_update_database_ddl(self): self.assertEqual(len(temp_db.ddl_statements), len(DDL_STATEMENTS)) - def test_db_batch_insert_then_db_snapshot_read_and_db_read(self): + def test_db_batch_insert_then_db_snapshot_read(self): retry = RetryInstanceState(_has_all_ddl) retry(self._db.reload)() @@ -310,10 +310,7 @@ def test_db_batch_insert_then_db_snapshot_read_and_db_read(self): self._check_row_data(from_snap) - from_db = list(self._db.read(self.TABLE, self.COLUMNS, self.ALL)) - self._check_row_data(from_db) - - def test_db_run_in_transaction_then_db_execute_sql(self): + def test_db_run_in_transaction_then_snapshot_execute_sql(self): retry = RetryInstanceState(_has_all_ddl) retry(self._db.reload)() @@ -329,7 +326,8 @@ def _unit_of_work(transaction, test): self._db.run_in_transaction(_unit_of_work, test=self) - rows = list(self._db.execute_sql(self.SQL)) + with self._db.snapshot() as after: + rows = list(after.execute_sql(self.SQL)) self._check_row_data(rows) def test_db_run_in_transaction_twice(self): @@ -346,7 +344,8 @@ def _unit_of_work(transaction, test): self._db.run_in_transaction(_unit_of_work, test=self) self._db.run_in_transaction(_unit_of_work, test=self) - rows = list(self._db.execute_sql(self.SQL)) + with self._db.snapshot() as after: + rows = list(after.execute_sql(self.SQL)) self._check_row_data(rows) @@ -1085,7 +1084,8 @@ def setUpClass(cls): def _verify_one_column(self, table_desc): sql = 'SELECT chunk_me FROM {}'.format(table_desc.table) - rows = list(self._db.execute_sql(sql)) + with self._db.snapshot() as snapshot: + rows = list(snapshot.execute_sql(sql)) self.assertEqual(len(rows), table_desc.row_count) expected = table_desc.value() for row in rows: @@ -1093,7 +1093,8 @@ def _verify_one_column(self, table_desc): def _verify_two_columns(self, table_desc): sql = 'SELECT chunk_me, chunk_me_2 FROM {}'.format(table_desc.table) - rows = list(self._db.execute_sql(sql)) + with self._db.snapshot() as snapshot: + rows = list(snapshot.execute_sql(sql)) self.assertEqual(len(rows), table_desc.row_count) expected = table_desc.value() for row in rows: diff --git a/spanner/tests/unit/test_database.py b/spanner/tests/unit/test_database.py index 40e10ec971a9..c1218599b3b3 100644 --- a/spanner/tests/unit/test_database.py +++ b/spanner/tests/unit/test_database.py @@ -621,21 +621,6 @@ def test_session_factory(self): self.assertIs(session.session_id, None) self.assertIs(session._database, database) - def test_execute_sql_defaults(self): - QUERY = 'SELECT * FROM employees' - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - pool = _Pool() - session = _Session() - pool.put(session) - session._execute_result = [] - database = self._make_one(self.DATABASE_ID, instance, pool=pool) - - rows = list(database.execute_sql(QUERY)) - - self.assertEqual(rows, []) - self.assertEqual(session._executed, (QUERY, None, None, None, b'')) - def test_run_in_transaction_wo_args(self): import datetime @@ -678,38 +663,6 @@ def test_run_in_transaction_w_args(self): self.assertEqual(session._retried, (_unit_of_work, (SINCE,), {'until': UNTIL})) - def test_read(self): - from google.cloud.spanner.keyset import KeySet - - TABLE_NAME = 'citizens' - COLUMNS = ['email', 'first_name', 'last_name', 'age'] - KEYS = ['bharney@example.com', 'phred@example.com'] - KEYSET = KeySet(keys=KEYS) - INDEX = 'email-address-index' - LIMIT = 20 - TOKEN = b'DEADBEEF' - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - pool = _Pool() - session = _Session() - pool.put(session) - database = self._make_one(self.DATABASE_ID, instance, pool=pool) - - rows = list(database.read( - TABLE_NAME, COLUMNS, KEYSET, INDEX, LIMIT, TOKEN)) - - self.assertEqual(rows, []) - - (table, columns, key_set, index, limit, - resume_token) = session._read_with - - self.assertEqual(table, TABLE_NAME) - self.assertEqual(columns, COLUMNS) - self.assertEqual(key_set, KEYSET) - self.assertEqual(index, INDEX) - self.assertEqual(limit, LIMIT) - self.assertEqual(resume_token, TOKEN) - def test_batch(self): from google.cloud.spanner.database import BatchCheckout @@ -951,18 +904,10 @@ def __init__(self, database=None, name=_BaseTest.SESSION_NAME): self._database = database self.name = name - def execute_sql(self, sql, params, param_types, query_mode, resume_token): - self._executed = (sql, params, param_types, query_mode, resume_token) - return iter(self._rows) - def run_in_transaction(self, func, *args, **kw): self._retried = (func, args, kw) return self._committed - def read(self, table, columns, keyset, index, limit, resume_token): - self._read_with = (table, columns, keyset, index, limit, resume_token) - return iter(self._rows) - class _SessionPB(object): name = TestDatabase.SESSION_NAME