diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b226d7e..9c331c0 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -32,19 +32,19 @@ jobs: include: - sqlite_version: "3440200" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3440200.tar.gz" - dart_sdk: 3.2.4 + dart_sdk: 3.3.3 - sqlite_version: "3430200" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3430200.tar.gz" - dart_sdk: 3.2.4 + dart_sdk: 3.3.3 - sqlite_version: "3420000" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3420000.tar.gz" - dart_sdk: 3.2.4 + dart_sdk: 3.3.3 - sqlite_version: "3410100" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3410100.tar.gz" - dart_sdk: 3.2.4 + dart_sdk: 3.3.3 - sqlite_version: "3380000" sqlite_url: "https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz" - dart_sdk: 3.2.0 + dart_sdk: 3.3.3 steps: - uses: actions/checkout@v3 - uses: dart-lang/setup-dart@v1 diff --git a/.gitignore b/.gitignore index f88fffb..271119a 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,5 @@ assets test-db sqlite-autoconf-* doc + +build diff --git a/CHANGELOG.md b/CHANGELOG.md index e9a2a23..fa59c99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,25 @@ +## 0.7.0-alpha.3 + +- Add latest changes from master + ## 0.7.0-alpha.2 -- Fix re-using a shared Mutex from https://github.com/powersync-ja/sqlite_async.dart/pull/31 +- Fix re-using a shared Mutex from ## 0.7.0-alpha.1 - Added initial support for web platform. +## 0.6.1 + +- Fix errors when closing a `SqliteDatabase`. +- Configure SQLite `busy_timeout` (30s default). This fixes "database is locked (code 5)" error when using multiple `SqliteDatabase` instances for the same database. +- Fix errors when opening multiple connections at the same time, e.g. when running multiple read queries concurrently + right after opening the dtaabase. +- Improved error handling when an Isolate crashes with an uncaught error. +- Rewrite connection pool logic to fix performance issues when multiple read connections are open. +- Fix using `SqliteDatabase.isolateConnectionFactory()` in multiple isolates. + ## 0.6.0 - Allow catching errors and continuing the transaction. This is technically a breaking change, although it should not be an issue in most cases. diff --git a/lib/src/common/abstract_open_factory.dart b/lib/src/common/abstract_open_factory.dart index 82534b9..e421e20 100644 --- a/lib/src/common/abstract_open_factory.dart +++ b/lib/src/common/abstract_open_factory.dart @@ -86,8 +86,21 @@ abstract class AbstractDefaultSqliteOpenFactory< FutureOr open(SqliteOpenOptions options) async { var db = await openDB(options); + // Pragma statements don't have the same BUSY_TIMEOUT behavior as normal statements. + // We add a manual retry loop for those. for (var statement in pragmaStatements(options)) { - db.execute(statement); + for (var tries = 0; tries < 30; tries++) { + try { + db.execute(statement); + break; + } on sqlite.SqliteException catch (e) { + if (e.resultCode == sqlite.SqlError.SQLITE_BUSY && tries < 29) { + continue; + } else { + rethrow; + } + } + } } return db; } diff --git a/lib/src/common/port_channel.dart b/lib/src/common/port_channel.dart index c32b7b9..8b05feb 100644 --- a/lib/src/common/port_channel.dart +++ b/lib/src/common/port_channel.dart @@ -18,8 +18,11 @@ abstract class PortClient { class ParentPortClient implements PortClient { late Future sendPortFuture; SendPort? sendPort; - ReceivePort receivePort = ReceivePort(); + final ReceivePort _receivePort = ReceivePort(); + final ReceivePort _errorPort = ReceivePort(); bool closed = false; + Object? _closeError; + String? _isolateDebugName; int _nextId = 1; Map> handlers = HashMap(); @@ -30,7 +33,7 @@ class ParentPortClient implements PortClient { sendPortFuture.then((value) { sendPort = value; }); - receivePort.listen((message) { + _receivePort.listen((message) { if (message is _InitMessage) { assert(!initCompleter.isCompleted); initCompleter.complete(message.port); @@ -57,24 +60,35 @@ class ParentPortClient implements PortClient { } close(); }); + _errorPort.listen((message) { + final [error, stackTraceString] = message; + final stackTrace = stackTraceString == null + ? null + : StackTrace.fromString(stackTraceString); + if (!initCompleter.isCompleted) { + initCompleter.completeError(error, stackTrace); + } + _close(IsolateError(cause: error, isolateDebugName: _isolateDebugName), + stackTrace); + }); } Future get ready async { await sendPortFuture; } - void _cancelAll(Object error) { + void _cancelAll(Object error, [StackTrace? stackTrace]) { var handlers = this.handlers; this.handlers = {}; for (var message in handlers.values) { - message.completeError(error); + message.completeError(error, stackTrace); } } @override Future post(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? const ClosedException(); } var completer = Completer.sync(); var id = _nextId++; @@ -87,27 +101,39 @@ class ParentPortClient implements PortClient { @override void fire(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? ClosedException(); } final port = sendPort ?? await sendPortFuture; port.send(_FireMessage(message)); } RequestPortServer server() { - return RequestPortServer(receivePort.sendPort); + return RequestPortServer(_receivePort.sendPort); } - void close() async { + void _close([Object? error, StackTrace? stackTrace]) { if (!closed) { closed = true; - receivePort.close(); - _cancelAll(const ClosedException()); + _receivePort.close(); + _errorPort.close(); + if (error == null) { + _cancelAll(const ClosedException()); + } else { + _closeError = error; + _cancelAll(error, stackTrace); + } } } + void close() { + _close(); + } + tieToIsolate(Isolate isolate) { - isolate.addOnExitListener(receivePort.sendPort, response: _closeMessage); + _isolateDebugName = isolate.debugName; + isolate.addErrorListener(_errorPort.sendPort); + isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); } } @@ -261,6 +287,27 @@ class _RequestMessage { class ClosedException implements Exception { const ClosedException(); + + @override + String toString() { + return 'ClosedException'; + } +} + +class IsolateError extends Error { + final Object cause; + final String? isolateDebugName; + + IsolateError({required this.cause, this.isolateDebugName}); + + @override + String toString() { + if (isolateDebugName != null) { + return 'IsolateError in $isolateDebugName: $cause'; + } else { + return 'IsolateError: $cause'; + } + } } class _PortChannelResult { diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index 2dd8374..56d9c12 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:collection'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart'; @@ -18,7 +19,9 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { SqliteConnectionImpl? _writeConnection; - final List _readConnections = []; + final Set _allReadConnections = {}; + final Queue _availableReadConnections = Queue(); + final Queue<_PendingItem> _queue = Queue(); final AbstractDefaultSqliteOpenFactory _factory; @@ -55,72 +58,84 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future getAutoCommit() async { if (_writeConnection == null) { - throw AssertionError('Closed'); + throw ClosedException(); } return await _writeConnection!.getAutoCommit(); } - @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}) async { - await _expandPool(); - - return _runZoned(() async { - bool haveLock = false; - var completer = Completer(); + void _nextRead() { + if (_queue.isEmpty) { + // Wait for queue item + return; + } else if (closed) { + while (_queue.isNotEmpty) { + final nextItem = _queue.removeFirst(); + nextItem.completer.completeError(const ClosedException()); + } + return; + } - var futures = _readConnections.sublist(0).map((connection) async { - if (connection.closed) { - _readConnections.remove(connection); - } - try { - return await connection.readLock((ctx) async { - if (haveLock) { - // Already have a different lock - release this one. - return false; - } - haveLock = true; - - var future = callback(ctx); - completer.complete(future); - - // We have to wait for the future to complete before we can release the - // lock. - try { - await future; - } catch (_) { - // Ignore - } - - return true; - }, lockTimeout: lockTimeout, debugContext: debugContext); - } on TimeoutException { - return false; - } - }); + while (_availableReadConnections.isNotEmpty && + _availableReadConnections.last.closed) { + // Remove connections that may have errored + final connection = _availableReadConnections.removeLast(); + _allReadConnections.remove(connection); + } - final stream = Stream.fromFutures(futures); - var gotAny = await stream.any((element) => element); + if (_availableReadConnections.isEmpty && + _allReadConnections.length == maxReaders) { + // Wait for available connection + return; + } - if (!gotAny) { - // All TimeoutExceptions - throw TimeoutException('Failed to get a read connection', lockTimeout); + var nextItem = _queue.removeFirst(); + while (nextItem.completer.isCompleted) { + // This item already timed out - try the next one if available + if (_queue.isEmpty) { + return; } + nextItem = _queue.removeFirst(); + } + nextItem.lockTimer?.cancel(); + + nextItem.completer.complete(Future.sync(() async { + final nextConnection = _availableReadConnections.isEmpty + ? await _expandPool() + : _availableReadConnections.removeLast(); try { - return await completer.future; - } catch (e) { - // throw e; - rethrow; + // At this point the connection is expected to be available immediately. + // No need to calculate a new lockTimeout here. + final result = await nextConnection.readLock(nextItem.callback); + return result; + } finally { + _availableReadConnections.add(nextConnection); + Timer.run(_nextRead); } - }, debugContext: debugContext ?? 'get*()'); + })); + } + + @override + Future readLock(ReadCallback callback, + {Duration? lockTimeout, String? debugContext}) async { + if (closed) { + throw ClosedException(); + } + final zone = _getZone(debugContext: debugContext ?? 'get*()'); + final item = _PendingItem((ctx) { + return zone.runUnary(callback, ctx); + }, lockTimeout: lockTimeout); + _queue.add(item); + _nextRead(); + + return (await item.future) as T; } @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { if (closed) { - throw AssertionError('Closed'); + throw ClosedException(); } if (_writeConnection?.closed == true) { _writeConnection = null; @@ -149,40 +164,38 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { /// connection (with a different lock). /// 2. Give a more specific error message when it happens. T _runZoned(T Function() callback, {required String debugContext}) { + return _getZone(debugContext: debugContext).run(callback); + } + + Zone _getZone({required String debugContext}) { if (Zone.current[this] != null) { throw LockError( 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); } - var zone = Zone.current.fork(zoneValues: {this: true}); - return zone.run(callback); + return Zone.current.fork(zoneValues: {this: true}); } - Future _expandPool() async { - if (closed || _readConnections.length >= maxReaders) { - return; - } - bool hasCapacity = _readConnections.any((connection) => !connection.locked); - if (!hasCapacity) { - var name = debugName == null - ? null - : '$debugName-${_readConnections.length + 1}'; - var connection = SqliteConnectionImpl( - upstreamPort: _writeConnection?.upstreamPort, - primary: false, - updates: updates, - debugName: name, - mutex: mutex, - readOnly: true, - openFactory: _factory); - _readConnections.add(connection); - - // Edge case: - // If we don't await here, there is a chance that a different connection - // is used for the transaction, and that it finishes and deletes the database - // while this one is still opening. This is specifically triggered in tests. - // To avoid that, we wait for the connection to be ready. - await connection.ready; - } + Future _expandPool() async { + var name = debugName == null + ? null + : '$debugName-${_allReadConnections.length + 1}'; + var connection = SqliteConnectionImpl( + upstreamPort: upstreamPort, + primary: false, + updates: updates, + debugName: name, + mutex: mutex, + readOnly: true, + openFactory: _factory); + _allReadConnections.add(connection); + + // Edge case: + // If we don't await here, there is a chance that a different connection + // is used for the transaction, and that it finishes and deletes the database + // while this one is still opening. This is specifically triggered in tests. + // To avoid that, we wait for the connection to be ready. + await connection.ready; + return connection; } SerializedPortClient? get upstreamPort { @@ -192,7 +205,15 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future close() async { closed = true; - for (var connection in _readConnections) { + + // It is possible that `readLock()` removes connections from the pool while we're + // closing connections, but not possible for new connections to be added. + // Create a copy of the list, to avoid this triggering "Concurrent modification during iteration" + final toClose = _allReadConnections.toList(); + for (var connection in toClose) { + // Wait for connection initialization, so that any existing readLock() + // requests go through before closing. + await connection.ready; await connection.close(); } // Closing the write connection cleans up the journal files (-shm and -wal files). @@ -201,3 +222,34 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { await _writeConnection?.close(); } } + +typedef ReadCallback = Future Function(SqliteReadContext tx); + +class _PendingItem { + ReadCallback callback; + Completer completer = Completer.sync(); + late Future future = completer.future; + DateTime? deadline; + final Duration? lockTimeout; + late final Timer? lockTimer; + + _PendingItem(this.callback, {this.lockTimeout}) { + if (lockTimeout != null) { + deadline = DateTime.now().add(lockTimeout!); + lockTimer = Timer(lockTimeout!, () { + // Note: isCompleted is true when `nextItem.completer.complete` is called, not when the result is available. + // This matches the behavior we need for a timeout on the lock, but not the entire operation. + if (!completer.isCompleted) { + // completer.completeError( + // TimeoutException('Failed to get a read connection', lockTimeout)); + completer.complete(Future.sync(() async { + throw TimeoutException( + 'Failed to get a read connection', lockTimeout); + })); + } + }); + } else { + lockTimer = null; + } + } +} diff --git a/lib/src/native/native_sqlite_open_factory.dart b/lib/src/native/native_sqlite_open_factory.dart index 99fd8e4..5773be9 100644 --- a/lib/src/native/native_sqlite_open_factory.dart +++ b/lib/src/native/native_sqlite_open_factory.dart @@ -1,4 +1,4 @@ -import 'package:sqlite3/sqlite3.dart'; +import 'package:sqlite_async/sqlite3.dart' as sqlite; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; @@ -15,7 +15,7 @@ class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { @override CommonDatabase openDB(SqliteOpenOptions options) { final mode = options.openMode; - var db = sqlite3.open(path, mode: mode, mutex: false); + var db = sqlite.sqlite3.open(path, mode: mode, mutex: false); return db; } @@ -23,6 +23,12 @@ class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { List pragmaStatements(SqliteOpenOptions options) { List statements = []; + if (sqliteOptions.lockTimeout != null) { + // May be replaced by a Dart-level retry mechanism in the future + statements.add( + 'PRAGMA busy_timeout = ${sqliteOptions.lockTimeout!.inMilliseconds}'); + } + if (options.primaryConnection && sqliteOptions.journalMode != null) { // Persisted - only needed on the primary connection statements diff --git a/lib/src/sqlite_connection.dart b/lib/src/sqlite_connection.dart index b9edd36..f92d318 100644 --- a/lib/src/sqlite_connection.dart +++ b/lib/src/sqlite_connection.dart @@ -90,7 +90,8 @@ abstract class SqliteConnection extends SqliteWriteContext { /// Open a read-write transaction. /// /// This takes a global lock - only one write transaction can execute against - /// the database at a time. + /// the database at a time. This applies even when constructing separate + /// [SqliteDatabase] instances for the same database file. /// /// Statements within the transaction must be done on the provided /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] @@ -110,6 +111,9 @@ abstract class SqliteConnection extends SqliteWriteContext { /// Takes a read lock, without starting a transaction. /// + /// The lock only applies to a single [SqliteConnection], and multiple + /// connections may hold read locks at the same time. + /// /// In most cases, [readTransaction] should be used instead. Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}); @@ -117,6 +121,10 @@ abstract class SqliteConnection extends SqliteWriteContext { /// Takes a global lock, without starting a transaction. /// /// In most cases, [writeTransaction] should be used instead. + /// + /// The lock applies to all [SqliteConnection] instances for a [SqliteDatabase]. + /// Locks for separate [SqliteDatabase] instances on the same database file + /// may be held concurrently. Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}); diff --git a/lib/src/sqlite_options.dart b/lib/src/sqlite_options.dart index 6da7255..3767213 100644 --- a/lib/src/sqlite_options.dart +++ b/lib/src/sqlite_options.dart @@ -25,17 +25,24 @@ class SqliteOptions { final WebSqliteOptions webSqliteOptions; + /// Timeout waiting for locks to be released by other connections. + /// Defaults to 30 seconds. + /// Set to null or [Duration.zero] to fail immediately when the database is locked. + final Duration? lockTimeout; + const SqliteOptions.defaults() : journalMode = SqliteJournalMode.wal, journalSizeLimit = 6 * 1024 * 1024, // 1.5x the default checkpoint size synchronous = SqliteSynchronous.normal, - webSqliteOptions = const WebSqliteOptions.defaults(); + webSqliteOptions = const WebSqliteOptions.defaults(), + lockTimeout = const Duration(seconds: 30); const SqliteOptions( {this.journalMode = SqliteJournalMode.wal, this.journalSizeLimit = 6 * 1024 * 1024, this.synchronous = SqliteSynchronous.normal, - this.webSqliteOptions = const WebSqliteOptions.defaults()}); + this.webSqliteOptions = const WebSqliteOptions.defaults(), + this.lockTimeout = const Duration(seconds: 30)}); } /// SQLite journal mode. Set on the primary connection. diff --git a/pubspec.yaml b/pubspec.yaml index 4a8ad38..aacb854 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,9 +1,9 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.7.0-alpha.2 +version: 0.7.0-alpha.3 repository: https://github.com/powersync-ja/sqlite_async.dart environment: - sdk: '>=3.2.0 <4.0.0' + sdk: ">=3.2.0 <4.0.0" dependencies: drift: ^2.15.0 @@ -13,7 +13,7 @@ dependencies: # url: https://github.com/powersync-ja/drift.git # ref: test # branch name # path: drift - sqlite3: '^2.3.0' + sqlite3: "^2.3.0" js: ^0.6.3 async: ^2.10.0 collection: ^1.17.0 diff --git a/test/basic_native_test.dart b/test/native/basic_test.dart similarity index 70% rename from test/basic_native_test.dart rename to test/native/basic_test.dart index 0315ff9..c1fa4aa 100644 --- a/test/basic_native_test.dart +++ b/test/native/basic_test.dart @@ -6,7 +6,7 @@ import 'package:sqlite3/common.dart' as sqlite; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; -import 'utils/test_utils_impl.dart'; +import '../utils/test_utils_impl.dart'; final testUtils = TestUtils(); @@ -67,6 +67,37 @@ void main() { } }); + test('Concurrency 2', () async { + final db1 = await testUtils.setupDatabase(path: path, maxReaders: 3); + final db2 = await testUtils.setupDatabase(path: path, maxReaders: 3); + + await db1.initialize(); + await createTables(db1); + await db2.initialize(); + print("${DateTime.now()} start"); + + var futures1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11].map((i) { + return db1.execute( + "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *", + [ + i, + 5 + Random().nextInt(20) + ]).then((value) => print("${DateTime.now()} $value")); + }).toList(); + + var futures2 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11].map((i) { + return db2.execute( + "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 2 ' || datetime() as connection RETURNING *", + [ + i, + 5 + Random().nextInt(20) + ]).then((value) => print("${DateTime.now()} $value")); + }).toList(); + await Future.wait(futures1); + await Future.wait(futures2); + print("${DateTime.now()} done"); + }); + test('read-only transactions', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db); @@ -220,8 +251,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific error message may change + expect( + caughtError.toString(), + equals( + "IsolateError in sqlite-writer: Invalid argument(s): uncaught async error")); // Check that we can still continue afterwards final computed = await db.computeWithDatabase((db) async { @@ -247,8 +281,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific message may change + expect( + caughtError.toString(), + matches(RegExp( + r'IsolateError in sqlite-\d+: Invalid argument\(s\): uncaught async error'))); } // Check that we can still continue afterwards @@ -259,6 +296,51 @@ void main() { }); expect(computed, equals(5)); }); + + test('closing', () async { + // Test race condition in SqliteConnectionPool: + // 1. Open two concurrent queries, which opens two connection. + // 2. Second connection takes longer to open than first. + // 3. Call db.close(). + // 4. Now second connection is ready. Second query has two connections to choose from. + // 5. However, first connection is closed, so it's removed from the pool. + // 6. Triggers `Concurrent modification during iteration: Instance(length:1) of '_GrowableList'` + final db = await testUtils.setupDatabase(path: path, initStatements: [ + // Second connection to sleep more than first connection + 'SELECT test_sleep(test_connection_number() * 10)' + ]); + await db.initialize(); + + final future1 = db.get('SELECT test_sleep(10) as sleep'); + final future2 = db.get('SELECT test_sleep(10) as sleep'); + + await db.close(); + + await future1; + await future2; + }); + + test('lockTimeout', () async { + final db = await testUtils.setupDatabase(path: path, maxReaders: 2); + await db.initialize(); + + final f1 = db.readTransaction((tx) async { + await tx.get('select test_sleep(100)'); + }, lockTimeout: const Duration(milliseconds: 200)); + + final f2 = db.readTransaction((tx) async { + await tx.get('select test_sleep(100)'); + }, lockTimeout: const Duration(milliseconds: 200)); + + // At this point, both read connections are in use + await expectLater(() async { + await db.readLock((tx) async { + await tx.get('select test_sleep(10)'); + }, lockTimeout: const Duration(milliseconds: 2)); + }, throwsA((e) => e is TimeoutException)); + + await Future.wait([f1, f2]); + }); }); } diff --git a/test/watch_native_test.dart b/test/native/watch_test.dart similarity index 99% rename from test/watch_native_test.dart rename to test/native/watch_test.dart index 727408f..e6a5953 100644 --- a/test/watch_native_test.dart +++ b/test/native/watch_test.dart @@ -8,7 +8,7 @@ import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/utils/database_utils.dart'; import 'package:test/test.dart'; -import 'utils/test_utils_impl.dart'; +import '../utils/test_utils_impl.dart'; final testUtils = TestUtils(); diff --git a/test/utils/abstract_test_utils.dart b/test/utils/abstract_test_utils.dart index 6a6f094..65cf132 100644 --- a/test/utils/abstract_test_utils.dart +++ b/test/utils/abstract_test_utils.dart @@ -28,8 +28,12 @@ abstract class AbstractTestUtils { } /// Creates a SqliteDatabaseConnection - Future setupDatabase({String? path}) async { - final db = SqliteDatabase.withFactory(await testFactory(path: path)); + Future setupDatabase( + {String? path, + List initStatements = const [], + int maxReaders = SqliteDatabase.defaultMaxReaders}) async { + final db = SqliteDatabase.withFactory(await testFactory(path: path), + maxReaders: maxReaders); await db.initialize(); return db; } diff --git a/test/utils/native_test_utils.dart b/test/utils/native_test_utils.dart index d6a8615..2d7a924 100644 --- a/test/utils/native_test_utils.dart +++ b/test/utils/native_test_utils.dart @@ -17,7 +17,8 @@ class TestSqliteOpenFactory extends TestDefaultSqliteOpenFactory { TestSqliteOpenFactory( {required super.path, super.sqliteOptions, - super.sqlitePath = defaultSqlitePath}); + super.sqlitePath = defaultSqlitePath, + initStatements}); @override FutureOr open(SqliteOpenOptions options) async { @@ -88,8 +89,12 @@ class TestUtils extends AbstractTestUtils { Future testFactory( {String? path, String sqlitePath = defaultSqlitePath, + List initStatements = const [], SqliteOptions options = const SqliteOptions.defaults()}) async { return TestSqliteOpenFactory( - path: path ?? dbPath(), sqlitePath: sqlitePath, sqliteOptions: options); + path: path ?? dbPath(), + sqlitePath: sqlitePath, + sqliteOptions: options, + initStatements: initStatements); } } diff --git a/test/utils/web_test_utils.dart b/test/utils/web_test_utils.dart index bbfe82d..6efb044 100644 --- a/test/utils/web_test_utils.dart +++ b/test/utils/web_test_utils.dart @@ -46,7 +46,10 @@ class TestUtils extends AbstractTestUtils { } @override - Future setupDatabase({String? path}) async { + Future setupDatabase( + {String? path, + List initStatements = const [], + int maxReaders = SqliteDatabase.defaultMaxReaders}) async { await _isInitialized; return super.setupDatabase(path: path); }