diff --git a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart index d84bdaa..f63e0df 100644 --- a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart @@ -8,9 +8,11 @@ import 'package:sqlite_async/src/sqlite_queries.dart'; import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; +import '../../impl/context.dart'; + /// A simple "synchronous" connection which provides the async SqliteConnection /// implementation using a synchronous SQLite connection -class SyncSqliteConnection extends SqliteConnection with SqliteQueries { +class SyncSqliteConnection with SqliteQueries implements SqliteConnection { final CommonDatabase db; late Mutex mutex; @override @@ -44,7 +46,10 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { return mutex.lock( () { task?.finish(); - return callback(SyncReadContext(db, parent: task)); + return ScopedReadContext.assumeReadLock( + _UnsafeSyncContext(db, parent: task), + callback, + ); }, timeout: lockTimeout, ); @@ -59,7 +64,10 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { return mutex.lock( () { task?.finish(); - return callback(SyncWriteContext(db, parent: task)); + return ScopedWriteContext.assumeWriteLock( + _UnsafeSyncContext(db, parent: task), + callback, + ); }, timeout: lockTimeout, ); @@ -80,12 +88,12 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { } } -class SyncReadContext implements SqliteReadContext { +final class _UnsafeSyncContext extends UnscopedContext { final TimelineTask? task; CommonDatabase db; - SyncReadContext(this.db, {TimelineTask? parent}) + _UnsafeSyncContext(this.db, {TimelineTask? parent}) : task = TimelineTask(parent: parent); @override @@ -129,10 +137,6 @@ class SyncReadContext implements SqliteReadContext { Future getAutoCommit() async { return db.autocommit; } -} - -class SyncWriteContext extends SyncReadContext implements SqliteWriteContext { - SyncWriteContext(super.db, {super.parent}); @override Future execute(String sql, diff --git a/packages/sqlite_async/lib/src/impl/context.dart b/packages/sqlite_async/lib/src/impl/context.dart new file mode 100644 index 0000000..0e3eef6 --- /dev/null +++ b/packages/sqlite_async/lib/src/impl/context.dart @@ -0,0 +1,208 @@ +import 'package:sqlite3/common.dart'; + +import '../sqlite_connection.dart'; + +/// A context that can be used to run both reading and writing queries - +/// basically a [SqliteWriteContext] without the ability to start transactions. +/// +/// Instances of this are not given out to clients - instead, they are wrapped +/// with [ScopedReadContext] and [ScopedWriteContext] after obtaining a lock. +/// Those wrapped views have a shorter lifetime (they can be closed +/// independently, and verify that they're not being used after being closed). +abstract base class UnscopedContext implements SqliteReadContext { + Future execute(String sql, List parameters); + Future executeBatch(String sql, List> parameterSets); + + /// Returns an [UnscopedContext] useful as the outermost transaction. + /// + /// This is called by [ScopedWriteContext.writeTransaction] _after_ executing + /// the first `BEGIN` statement. + /// This is used on the web to assert that the auto-commit state is false + /// before running statements. + UnscopedContext interceptOutermostTransaction() { + return this; + } +} + +/// A view over an [UnscopedContext] implementing [SqliteReadContext]. +final class ScopedReadContext implements SqliteReadContext { + final UnscopedContext _context; + + /// Whether this context view is locked on an inner operation like a + /// transaction. + /// + /// We don't use a mutex because we don't want to serialize access - we just + /// want to forbid concurrent operations. + bool _isLocked = false; + + /// Whether this particular view of a read context has been closed, e.g. + /// because the callback owning it has returned. + bool _closed = false; + + ScopedReadContext(this._context); + + void _checkNotLocked() { + _checkStillOpen(); + + if (_isLocked) { + throw StateError( + 'The context from the callback was locked, e.g. due to a nested ' + 'transaction.'); + } + } + + void _checkStillOpen() { + if (_closed) { + throw StateError('This context to a callback is no longer open. ' + 'Make sure to await all statements on a database to avoid a context ' + 'still being used after its callback has finished.'); + } + } + + @override + bool get closed => _closed || _context.closed; + + @override + Future computeWithDatabase( + Future Function(CommonDatabase db) compute) async { + _checkNotLocked(); + return await _context.computeWithDatabase(compute); + } + + @override + Future get(String sql, [List parameters = const []]) async { + _checkNotLocked(); + return _context.get(sql, parameters); + } + + @override + Future getAll(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return await _context.getAll(sql, parameters); + } + + @override + Future getAutoCommit() async { + _checkStillOpen(); + return _context.getAutoCommit(); + } + + @override + Future getOptional(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return _context.getOptional(sql, parameters); + } + + void invalidate() => _closed = true; + + /// Creates a short-lived wrapper around the [unsafe] context to safely give + /// [callback] read-access to the database. + /// + /// Assumes that a read lock providing sound access to the inner + /// [UnscopedContext] is held until this future returns. + static Future assumeReadLock( + UnscopedContext unsafe, + Future Function(SqliteReadContext) callback, + ) async { + final scoped = ScopedReadContext(unsafe); + try { + return await callback(scoped); + } finally { + scoped.invalidate(); + } + } +} + +final class ScopedWriteContext extends ScopedReadContext + implements SqliteWriteContext { + /// The "depth" of virtual nested transaction. + /// + /// A value of `0` indicates that this is operating outside of a transaction. + /// A value of `1` indicates a regular transaction (guarded with `BEGIN` and + /// `COMMIT` statements). + /// All higher values indicate a nested transaction implemented with + /// savepoint statements. + final int transactionDepth; + + ScopedWriteContext(super._context, {this.transactionDepth = 0}); + + @override + Future execute(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return await _context.execute(sql, parameters); + } + + @override + Future executeBatch( + String sql, List> parameterSets) async { + _checkNotLocked(); + + return await _context.executeBatch(sql, parameterSets); + } + + @override + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback) async { + _checkNotLocked(); + final (begin, commit, rollback) = _beginCommitRollback(transactionDepth); + ScopedWriteContext? inner; + + final innerContext = transactionDepth == 0 + ? _context.interceptOutermostTransaction() + : _context; + + try { + _isLocked = true; + + await _context.execute(begin, const []); + + inner = ScopedWriteContext(innerContext, + transactionDepth: transactionDepth + 1); + final result = await callback(inner); + await innerContext.execute(commit, const []); + return result; + } catch (e) { + try { + await innerContext.execute(rollback, const []); + } catch (e) { + // In rare cases, a ROLLBACK may fail. + // Safe to ignore. + } + rethrow; + } finally { + _isLocked = false; + inner?.invalidate(); + } + } + + static (String, String, String) _beginCommitRollback(int level) { + return switch (level) { + 0 => ('BEGIN IMMEDIATE', 'COMMIT', 'ROLLBACK'), + final nested => ( + 'SAVEPOINT s$nested', + 'RELEASE s$nested', + 'ROLLBACK TO s$nested' + ) + }; + } + + /// Creates a short-lived wrapper around the [unsafe] context to safely give + /// [callback] access to the database. + /// + /// Assumes that a write lock providing sound access to the inner + /// [UnscopedContext] is held until this future returns. + static Future assumeWriteLock( + UnscopedContext unsafe, + Future Function(SqliteWriteContext) callback, + ) async { + final scoped = ScopedWriteContext(unsafe); + try { + return await callback(scoped); + } finally { + scoped.invalidate(); + } + } +} diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 38c184e..e2df0f3 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -14,6 +14,7 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; import 'package:sqlite_async/src/utils/shared_utils.dart'; +import '../../impl/context.dart'; import 'upstream_updates.dart'; typedef TxCallback = Future Function(CommonDatabase db); @@ -64,8 +65,8 @@ class SqliteConnectionImpl return _isolateClient.closed; } - _TransactionContext _context() { - return _TransactionContext( + _UnsafeContext _context() { + return _UnsafeContext( _isolateClient, profileQueries ? TimelineTask() : null); } @@ -137,7 +138,7 @@ class SqliteConnectionImpl return _connectionMutex.lock(() async { final ctx = _context(); try { - return await callback(ctx); + return await ScopedReadContext.assumeReadLock(ctx, callback); } finally { await ctx.close(); } @@ -160,7 +161,7 @@ class SqliteConnectionImpl return await _writeMutex.lock(() async { final ctx = _context(); try { - return await callback(ctx); + return await ScopedWriteContext.assumeWriteLock(ctx, callback); } finally { await ctx.close(); } @@ -177,14 +178,14 @@ class SqliteConnectionImpl int _nextCtxId = 1; -class _TransactionContext implements SqliteWriteContext { +final class _UnsafeContext extends UnscopedContext { final PortClient _sendPort; bool _closed = false; final int ctxId = _nextCtxId++; final TimelineTask? task; - _TransactionContext(this._sendPort, this.task); + _UnsafeContext(this._sendPort, this.task); @override bool get closed { diff --git a/packages/sqlite_async/lib/src/sqlite_connection.dart b/packages/sqlite_async/lib/src/sqlite_connection.dart index bd2292b..0e360fc 100644 --- a/packages/sqlite_async/lib/src/sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/sqlite_connection.dart @@ -8,7 +8,7 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'common/connection/sync_sqlite_connection.dart'; /// Abstract class representing calls available in a read-only or read-write context. -abstract class SqliteReadContext { +abstract interface class SqliteReadContext { /// Execute a read-only (SELECT) query and return the results. Future getAll(String sql, [List parameters = const []]); @@ -66,7 +66,7 @@ abstract class SqliteReadContext { } /// Abstract class representing calls available in a read-write context. -abstract class SqliteWriteContext extends SqliteReadContext { +abstract interface class SqliteWriteContext extends SqliteReadContext { /// Execute a write query (INSERT, UPDATE, DELETE) and return the results (if any). Future execute(String sql, [List parameters = const []]); @@ -75,13 +75,28 @@ abstract class SqliteWriteContext extends SqliteReadContext { /// parameter set. This is faster than executing separately with each /// parameter set. Future executeBatch(String sql, List> parameterSets); + + /// Open a read-write transaction on this write context. + /// + /// When called on a [SqliteConnection], this takes a global lock - only one + /// write write transaction can execute against the database at a time. This + /// applies even when constructing separate [SqliteDatabase] instances for the + /// same database file. + /// + /// Statements within the transaction must be done on the provided + /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] + /// instance will error. + /// It is forbidden to use the [SqliteWriteContext] after the [callback] + /// completes. + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback); } /// Abstract class representing a connection to the SQLite database. /// /// This package typically pools multiple [SqliteConnection] instances into a /// managed [SqliteDatabase] automatically. -abstract class SqliteConnection extends SqliteWriteContext { +abstract interface class SqliteConnection extends SqliteWriteContext { /// Default constructor for subclasses. SqliteConnection(); @@ -123,6 +138,7 @@ abstract class SqliteConnection extends SqliteWriteContext { /// Statements within the transaction must be done on the provided /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] /// instance will error. + @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}); diff --git a/packages/sqlite_async/lib/src/sqlite_queries.dart b/packages/sqlite_async/lib/src/sqlite_queries.dart index 80bc568..367d23f 100644 --- a/packages/sqlite_async/lib/src/sqlite_queries.dart +++ b/packages/sqlite_async/lib/src/sqlite_queries.dart @@ -107,7 +107,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}) async { return writeLock((ctx) async { - return await internalWriteTransaction(ctx, callback); + return ctx.writeTransaction(callback); }, lockTimeout: lockTimeout, debugContext: 'writeTransaction()'); } diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index c911bbc..9faf928 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -21,24 +21,6 @@ Future internalReadTransaction(SqliteReadContext ctx, } } -Future internalWriteTransaction(SqliteWriteContext ctx, - Future Function(SqliteWriteContext tx) callback) async { - try { - await ctx.execute('BEGIN IMMEDIATE'); - final result = await callback(ctx); - await ctx.execute('COMMIT'); - return result; - } catch (e) { - try { - await ctx.execute('ROLLBACK'); - } catch (e) { - // In rare cases, a ROLLBACK may fail. - // Safe to ignore. - } - rethrow; - } -} - /// Given a SELECT query, return the tables that the query depends on. Future> getSourceTablesText( SqliteReadContext ctx, String sql) async { diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index cd7e215..3e0797b 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -8,9 +8,9 @@ import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; -import 'package:sqlite_async/src/utils/shared_utils.dart'; import 'package:sqlite_async/src/web/database/broadcast_updates.dart'; import 'package:sqlite_async/web.dart'; +import '../impl/context.dart'; import 'protocol.dart'; import 'web_mutex.dart'; @@ -94,13 +94,9 @@ class WebDatabase Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { if (_mutex case var mutex?) { - return await mutex.lock(timeout: lockTimeout, () async { - final context = _SharedContext(this); - try { - return await callback(context); - } finally { - context.markClosed(); - } + return await mutex.lock(timeout: lockTimeout, () { + return ScopedReadContext.assumeReadLock( + _UnscopedContext(this), callback); }); } else { // No custom mutex, coordinate locks through shared worker. @@ -108,7 +104,8 @@ class WebDatabase CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock)); try { - return await callback(_SharedContext(this)); + return await ScopedReadContext.assumeReadLock( + _UnscopedContext(this), callback); } finally { await _database.customRequest( CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); @@ -125,30 +122,28 @@ class WebDatabase Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, bool? flush}) { - return writeLock( - (writeContext) => - internalWriteTransaction(writeContext, (context) async { - // All execute calls done in the callback will be checked for the - // autocommit state - return callback(_ExclusiveTransactionContext(this, writeContext)); - }), + return writeLock((writeContext) { + return ScopedWriteContext.assumeWriteLock( + _UnscopedContext(this), + (ctx) async { + return await ctx.writeTransaction(callback); + }, + ); + }, debugContext: 'writeTransaction()', lockTimeout: lockTimeout, flush: flush); } @override - - /// Internal writeLock which intercepts transaction context's to verify auto commit is not active Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext, bool? flush}) async { if (_mutex case var mutex?) { return await mutex.lock(timeout: lockTimeout, () async { - final context = _ExclusiveContext(this); + final context = _UnscopedContext(this); try { - return await callback(context); + return await ScopedWriteContext.assumeWriteLock(context, callback); } finally { - context.markClosed(); if (flush != false) { await this.flush(); } @@ -158,11 +153,10 @@ class WebDatabase // No custom mutex, coordinate locks through shared worker. await _database.customRequest(CustomDatabaseMessage( CustomDatabaseMessageKind.requestExclusiveLock)); - final context = _ExclusiveContext(this); + final context = _UnscopedContext(this); try { - return await callback(context); + return await ScopedWriteContext.assumeWriteLock(context, callback); } finally { - context.markClosed(); if (flush != false) { await this.flush(); } @@ -179,17 +173,16 @@ class WebDatabase } } -class _SharedContext implements SqliteReadContext { +final class _UnscopedContext extends UnscopedContext { final WebDatabase _database; - bool _contextClosed = false; final TimelineTask? _task; - _SharedContext(this._database) + _UnscopedContext(this._database) : _task = _database.profileQueries ? TimelineTask() : null; @override - bool get closed => _contextClosed || _database.closed; + bool get closed => _database.closed; @override Future computeWithDatabase( @@ -230,14 +223,6 @@ class _SharedContext implements SqliteReadContext { return results.firstOrNull; } - void markClosed() { - _contextClosed = true; - } -} - -class _ExclusiveContext extends _SharedContext implements SqliteWriteContext { - _ExclusiveContext(super.database); - @override Future execute(String sql, [List parameters = const []]) { return _task.timeAsync('execute', sql: sql, parameters: parameters, () { @@ -258,15 +243,17 @@ class _ExclusiveContext extends _SharedContext implements SqliteWriteContext { }); }); } -} - -class _ExclusiveTransactionContext extends _ExclusiveContext { - SqliteWriteContext baseContext; - - _ExclusiveTransactionContext(super.database, this.baseContext); @override - bool get closed => baseContext.closed; + UnscopedContext interceptOutermostTransaction() { + // All execute calls done in the callback will be checked for the + // autocommit state + return _ExclusiveTransactionContext(_database); + } +} + +final class _ExclusiveTransactionContext extends _UnscopedContext { + _ExclusiveTransactionContext(super._database); Future _executeInternal( String sql, List parameters) async { diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart index fb98e17..6a315da 100644 --- a/packages/sqlite_async/test/basic_test.dart +++ b/packages/sqlite_async/test/basic_test.dart @@ -122,7 +122,7 @@ void main() { ['Test Data']); expect(rs.rows[0], equals(['Test Data'])); }); - expect(await savedTx!.getAutoCommit(), equals(true)); + expect(await db.getAutoCommit(), equals(true)); expect(savedTx!.closed, equals(true)); }); @@ -189,6 +189,73 @@ void main() { ); }); + group('nested transaction', () { + const insert = 'INSERT INTO test_data (description) VALUES(?);'; + late SqliteDatabase db; + + setUp(() async { + db = await testUtils.setupDatabase(path: path); + await createTables(db); + }); + + tearDown(() => db.close()); + + test('run in outer transaction', () async { + await db.writeTransaction((tx) async { + await tx.execute(insert, ['first']); + + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['second']); + }); + + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + expect(await db.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + test('can rollback inner transaction', () async { + await db.writeTransaction((tx) async { + await tx.execute(insert, ['first']); + + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['second']); + }); + + await expectLater(() async { + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['third']); + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(3)); + throw 'rollback please'; + }); + }, throwsA(anything)); + + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + expect(await db.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + test('cannot use outer transaction while inner is active', () async { + await db.writeTransaction((outer) async { + await outer.writeTransaction((inner) async { + await expectLater(outer.execute('SELECT 1'), throwsStateError); + }); + }); + }); + + test('cannot use inner after leaving scope', () async { + await db.writeTransaction((tx) async { + late SqliteWriteContext inner; + await tx.writeTransaction((tx) async { + inner = tx; + }); + + await expectLater(inner.execute('SELECT 1'), throwsStateError); + }); + }); + }); + test('can use raw database instance', () async { final factory = await testUtils.testFactory(); final raw = await factory.openDatabaseForSingleConnection();