diff --git a/CHANGELOG.md b/CHANGELOG.md index 202e52f..cc76dca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.3.0 + +- Better error messages for recursive transactions. +- Breaking change: Error by default when starting a read transaction within a write transaction. + ## 0.2.1 - Fix update notifications missing the first update. diff --git a/README.md b/README.md index 3bc0fc2..f3f8e6f 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,9 @@ query access. * Automatically convert query args to JSON where applicable, making JSON1 operations simple. * Direct SQL queries - no wrapper classes or code generation required. +See this [blog post](https://www.powersync.co/blog/sqlite-optimizations-for-ultra-high-performance), +explaining why these features are important for using SQLite. + ## Installation ```sh @@ -65,8 +68,8 @@ void main() async { // 1. Atomic persistence (all updates are either applied or rolled back). // 2. Improved throughput. await db.writeTransaction((tx) async { - await db.execute('INSERT INTO test_data(data) values(?)', ['Test3']); - await db.execute('INSERT INTO test_data(data) values(?)', ['Test4']); + await tx.execute('INSERT INTO test_data(data) values(?)', ['Test3']); + await tx.execute('INSERT INTO test_data(data) values(?)', ['Test4']); }); await db.close(); diff --git a/example/basic_example.dart b/example/basic_example.dart index 35100f2..63fda84 100644 --- a/example/basic_example.dart +++ b/example/basic_example.dart @@ -29,8 +29,8 @@ void main() async { // 1. Atomic persistence (all updates are either applied or rolled back). // 2. Improved throughput. await db.writeTransaction((tx) async { - await db.execute('INSERT INTO test_data(data) values(?)', ['Test3']); - await db.execute('INSERT INTO test_data(data) values(?)', ['Test4']); + await tx.execute('INSERT INTO test_data(data) values(?)', ['Test3']); + await tx.execute('INSERT INTO test_data(data) values(?)', ['Test4']); }); // Close database to release resources diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index d4347a6..ff0aa9b 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -50,58 +50,60 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout}) async { + {Duration? lockTimeout, String? debugContext}) async { await _expandPool(); - bool haveLock = false; - var completer = Completer(); + return _runZoned(() async { + bool haveLock = false; + var completer = Completer(); + + var futures = _readConnections.sublist(0).map((connection) async { + try { + return await connection.readLock((ctx) async { + if (haveLock) { + // Already have a different lock - release this one. + return false; + } + haveLock = true; + + var future = callback(ctx); + completer.complete(future); + + // We have to wait for the future to complete before we can release the + // lock. + try { + await future; + } catch (_) { + // Ignore + } + + return true; + }, lockTimeout: lockTimeout, debugContext: debugContext); + } on TimeoutException { + return false; + } + }); + + final stream = Stream.fromFutures(futures); + var gotAny = await stream.any((element) => element); + + if (!gotAny) { + // All TimeoutExceptions + throw TimeoutException('Failed to get a read connection', lockTimeout); + } - var futures = _readConnections.sublist(0).map((connection) async { try { - return await connection.readLock((ctx) async { - if (haveLock) { - // Already have a different lock - release this one. - return false; - } - haveLock = true; - - var future = callback(ctx); - completer.complete(future); - - // We have to wait for the future to complete before we can release the - // lock. - try { - await future; - } catch (_) { - // Ignore - } - - return true; - }, lockTimeout: lockTimeout); - } on TimeoutException { - return false; + return await completer.future; + } catch (e) { + // throw e; + rethrow; } - }); - - final stream = Stream.fromFutures(futures); - var gotAny = await stream.any((element) => element); - - if (!gotAny) { - // All TimeoutExceptions - throw TimeoutException('Failed to get a read connection', lockTimeout); - } - - try { - return await completer.future; - } catch (e) { - // throw e; - rethrow; - } + }, debugContext: debugContext ?? 'get*()'); } @override Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout}) { + {Duration? lockTimeout, String? debugContext}) { if (closed) { throw AssertionError('Closed'); } @@ -113,7 +115,25 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { mutex: mutex, readOnly: false, openFactory: _factory); - return _writeConnection!.writeLock(callback, lockTimeout: lockTimeout); + return _runZoned(() { + return _writeConnection!.writeLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); + }, debugContext: debugContext ?? 'execute()'); + } + + /// The [Mutex] on individual connections do already error in recursive locks. + /// + /// We duplicate the same check here, to: + /// 1. Also error when the recursive transaction is handled by a different + /// connection (with a different lock). + /// 2. Give a more specific error message when it happens. + T _runZoned(T Function() callback, {required String debugContext}) { + if (Zone.current[this] != null) { + throw LockError( + 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); + } + var zone = Zone.current.fork(zoneValues: {this: true}); + return zone.run(callback); } Future _expandPool() async { diff --git a/lib/src/mutex.dart b/lib/src/mutex.dart index cb0277a..578e1d4 100644 --- a/lib/src/mutex.dart +++ b/lib/src/mutex.dart @@ -40,7 +40,7 @@ class SimpleMutex implements Mutex { @override Future lock(Future Function() callback, {Duration? timeout}) async { if (Zone.current[this] != null) { - throw AssertionError('Recursive lock is not allowed'); + throw LockError('Recursive lock is not allowed'); } var zone = Zone.current.fork(zoneValues: {this: true}); @@ -132,7 +132,7 @@ class SharedMutex implements Mutex { @override Future lock(Future Function() callback, {Duration? timeout}) async { if (Zone.current[this] != null) { - throw AssertionError('Recursive lock is not allowed'); + throw LockError('Recursive lock is not allowed'); } return runZoned(() async { await _acquire(timeout: timeout); @@ -223,3 +223,14 @@ class _AcquireMessage { class _UnlockMessage { const _UnlockMessage(); } + +class LockError extends Error { + final String message; + + LockError(this.message); + + @override + String toString() { + return 'LockError: $message'; + } +} diff --git a/lib/src/sqlite_connection.dart b/lib/src/sqlite_connection.dart index ac8316b..a8aea66 100644 --- a/lib/src/sqlite_connection.dart +++ b/lib/src/sqlite_connection.dart @@ -94,13 +94,13 @@ abstract class SqliteConnection extends SqliteWriteContext { /// /// In most cases, [readTransaction] should be used instead. Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout}); + {Duration? lockTimeout, String? debugContext}); /// Takes a global lock, without starting a transaction. /// /// In most cases, [writeTransaction] should be used instead. Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout}); + {Duration? lockTimeout, String? debugContext}); Future close(); } diff --git a/lib/src/sqlite_connection_impl.dart b/lib/src/sqlite_connection_impl.dart index e1e7267..ff8645d 100644 --- a/lib/src/sqlite_connection_impl.dart +++ b/lib/src/sqlite_connection_impl.dart @@ -79,7 +79,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { @override Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout}) async { + {Duration? lockTimeout, String? debugContext}) async { // Private lock to synchronize this with other statements on the same connection, // to ensure that transactions aren't interleaved. return _connectionMutex.lock(() async { @@ -94,7 +94,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { @override Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout}) async { + {Duration? lockTimeout, String? debugContext}) async { final stopWatch = lockTimeout == null ? null : (Stopwatch()..start()); // Private lock to synchronize this with other statements on the same connection, // to ensure that transactions aren't interleaved. diff --git a/lib/src/sqlite_database.dart b/lib/src/sqlite_database.dart index 865b23b..c655ebe 100644 --- a/lib/src/sqlite_database.dart +++ b/lib/src/sqlite_database.dart @@ -209,13 +209,15 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection { @override Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout}) { - return _pool.readLock(callback, lockTimeout: lockTimeout); + {Duration? lockTimeout, String? debugContext}) { + return _pool.readLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); } @override Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout}) { - return _pool.writeLock(callback, lockTimeout: lockTimeout); + {Duration? lockTimeout, String? debugContext}) { + return _pool.writeLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); } } diff --git a/lib/src/sqlite_queries.dart b/lib/src/sqlite_queries.dart index f3b5237..2403d75 100644 --- a/lib/src/sqlite_queries.dart +++ b/lib/src/sqlite_queries.dart @@ -17,7 +17,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { [List parameters = const []]) async { return writeLock((ctx) async { return ctx.execute(sql, parameters); - }); + }, debugContext: 'execute()'); } @override @@ -25,14 +25,14 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { [List parameters = const []]) { return readLock((ctx) async { return ctx.getAll(sql, parameters); - }); + }, debugContext: 'getAll()'); } @override Future get(String sql, [List parameters = const []]) { return readLock((ctx) async { return ctx.get(sql, parameters); - }); + }, debugContext: 'get()'); } @override @@ -40,7 +40,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { [List parameters = const []]) { return readLock((ctx) async { return ctx.getOptional(sql, parameters); - }); + }, debugContext: 'getOptional()'); } @override @@ -103,7 +103,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { {Duration? lockTimeout}) async { return readLock((ctx) async { return await internalReadTransaction(ctx, callback); - }, lockTimeout: lockTimeout); + }, lockTimeout: lockTimeout, debugContext: 'readTransaction()'); } @override @@ -112,7 +112,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { {Duration? lockTimeout}) async { return writeLock((ctx) async { return await internalWriteTransaction(ctx, callback); - }, lockTimeout: lockTimeout); + }, lockTimeout: lockTimeout, debugContext: 'writeTransaction()'); } /// See [SqliteReadContext.computeWithDatabase]. diff --git a/pubspec.yaml b/pubspec.yaml index 8a7a3f6..4003bc1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.2.1 +version: 0.3.0 repository: https://github.com/journeyapps/sqlite_async.dart environment: sdk: '>=2.19.1 <3.0.0' diff --git a/test/basic_test.dart b/test/basic_test.dart index 66aedbe..fef41c1 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -1,8 +1,11 @@ +import 'dart:async'; import 'dart:math'; +import 'package:sqlite3/sqlite3.dart' as sqlite; +import 'package:sqlite_async/mutex.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; -import 'package:sqlite3/sqlite3.dart' as sqlite; + import 'util.dart'; void main() { @@ -108,18 +111,22 @@ void main() { await expectLater(() async { await db.execute( 'INSERT INTO test_data(description) VALUES(?)', ['test']); - }, throwsA((e) => e is AssertionError)); + }, throwsA((e) => e is LockError && e.message.contains('tx.execute'))); }); }); - test('does allow read-only db calls within transaction callback', () async { + test('should not allow read-only db calls within transaction callback', + () async { final db = await setupDatabase(path: path); await createTables(db); await db.writeTransaction((tx) async { - // This uses a different connection, so it's fine. - // Perhaps we should warn on this, since it's likely unintentional? - await db.getAll('SELECT * FROM test_data'); + // This uses a different connection, so it _could_ work. + // But it's likely unintentional and could cause weird bugs, so we don't + // allow it by default. + await expectLater(() async { + await db.getAll('SELECT * FROM test_data'); + }, throwsA((e) => e is LockError && e.message.contains('tx.getAll'))); }); await db.readTransaction((tx) async { @@ -129,24 +136,69 @@ void main() { // opens another connection, but doesn't use it. await expectLater(() async { await db.getAll('SELECT * FROM test_data'); - }, throwsA((e) => e is AssertionError)); + }, throwsA((e) => e is LockError && e.message.contains('tx.getAll'))); }); }); - test('does allow read-only db calls within lock callback', () async { + test('should not allow read-only db calls within lock callback', () async { final db = await setupDatabase(path: path); await createTables(db); // Locks - should behave the same as transactions above await db.writeLock((tx) async { - await db.getAll('SELECT * FROM test_data'); + await expectLater(() async { + await db.getOptional('SELECT * FROM test_data'); + }, + throwsA( + (e) => e is LockError && e.message.contains('tx.getOptional'))); }); await db.readLock((tx) async { await expectLater(() async { + await db.getOptional('SELECT * FROM test_data'); + }, + throwsA( + (e) => e is LockError && e.message.contains('tx.getOptional'))); + }); + }); + + test( + 'should allow read-only db calls within transaction callback in separate zone', + () async { + final db = await setupDatabase(path: path); + await createTables(db); + + // Get a reference to the parent zone (outside the transaction). + final zone = Zone.current; + + // Each of these are fine, since it could use a separate connection. + // Note: In highly concurrent cases, it could exhaust the connection pool and cause a deadlock. + + await db.writeTransaction((tx) async { + // Use the parent zone to avoid the "recursive lock" error. + await zone.fork().run(() async { + await db.getAll('SELECT * FROM test_data'); + }); + }); + + await db.readTransaction((tx) async { + await zone.fork().run(() async { await db.getAll('SELECT * FROM test_data'); - }, throwsA((e) => e is AssertionError)); + }); + }); + + await db.readTransaction((tx) async { + await zone.fork().run(() async { + await db.execute('SELECT * FROM test_data'); + }); }); + + // Note: This would deadlock, since it shares a global write lock. + // await db.writeTransaction((tx) async { + // await zone.fork().run(() async { + // await db.execute('SELECT * FROM test_data'); + // }); + // }); }); test('should allow PRAMGAs', () async {