diff --git a/.gitignore b/.gitignore index 5c41ae2..f88fffb 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ assets .idea .vscode +.devcontainer *.db *.db-* test-db diff --git a/lib/src/common/abstract_open_factory.dart b/lib/src/common/abstract_open_factory.dart index e3ffda0..82534b9 100644 --- a/lib/src/common/abstract_open_factory.dart +++ b/lib/src/common/abstract_open_factory.dart @@ -2,7 +2,10 @@ import 'dart:async'; import 'package:meta/meta.dart'; import 'package:sqlite_async/sqlite3_common.dart' as sqlite; +import 'package:sqlite_async/src/common/mutex.dart'; +import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; +import 'package:sqlite_async/src/update_notification.dart'; /// Factory to create new SQLite database connections. /// @@ -11,7 +14,11 @@ import 'package:sqlite_async/src/sqlite_options.dart'; abstract class SqliteOpenFactory { String get path; + /// Opens a direct connection to the SQLite database FutureOr open(SqliteOpenOptions options); + + /// Opens an asynchronous [SqliteConnection] + FutureOr openConnection(SqliteOpenOptions options); } class SqliteOpenOptions { @@ -21,8 +28,21 @@ class SqliteOpenOptions { /// Whether this connection is read-only. final bool readOnly; + /// Mutex to use in [SqliteConnection]s + final Mutex? mutex; + + /// Name used in debug logs + final String? debugName; + + /// Stream of external update notifications + final Stream? updates; + const SqliteOpenOptions( - {required this.primaryConnection, required this.readOnly}); + {required this.primaryConnection, + required this.readOnly, + this.mutex, + this.debugName, + this.updates}); sqlite.OpenMode get openMode { if (primaryConnection) { @@ -55,9 +75,14 @@ abstract class AbstractDefaultSqliteOpenFactory< List pragmaStatements(SqliteOpenOptions options); @protected + + /// Opens a direct connection to a SQLite database connection FutureOr openDB(SqliteOpenOptions options); @override + + /// Opens a direct connection to a SQLite database connection + /// and executes setup pragma statements to initialize the DB FutureOr open(SqliteOpenOptions options) async { var db = await openDB(options); @@ -66,4 +91,10 @@ abstract class AbstractDefaultSqliteOpenFactory< } return db; } + + @override + + /// Opens an asynchronous [SqliteConnection] to a SQLite database + /// and executes setup pragma statements to initialize the DB + FutureOr openConnection(SqliteOpenOptions options); } diff --git a/lib/src/common/isolate_connection_factory.dart b/lib/src/common/isolate_connection_factory.dart index 7a514d9..90dc9b1 100644 --- a/lib/src/common/isolate_connection_factory.dart +++ b/lib/src/common/isolate_connection_factory.dart @@ -32,12 +32,11 @@ abstract class IsolateConnectionFactory factory IsolateConnectionFactory( {required openFactory, required mutex, - SerializedPortClient? upstreamPort}) { + required SerializedPortClient upstreamPort}) { return IsolateConnectionFactoryImpl( - openFactory: openFactory, - mutex: mutex, - upstreamPort: upstreamPort as SerializedPortClient) - as IsolateConnectionFactory; + openFactory: openFactory, + mutex: mutex, + upstreamPort: upstreamPort) as IsolateConnectionFactory; } /// Open a new SqliteConnection. diff --git a/lib/src/common/sqlite_database.dart b/lib/src/common/sqlite_database.dart index b8380c8..8df4174 100644 --- a/lib/src/common/sqlite_database.dart +++ b/lib/src/common/sqlite_database.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; import 'package:sqlite_async/src/impl/sqlite_database_impl.dart'; @@ -26,6 +27,7 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries { final StreamController updatesController = StreamController.broadcast(); + @protected Future get isInitialized; /// Wait for initialization to complete. diff --git a/lib/src/impl/stub_sqlite_database.dart b/lib/src/impl/stub_sqlite_database.dart index adc8bbd..29db641 100644 --- a/lib/src/impl/stub_sqlite_database.dart +++ b/lib/src/impl/stub_sqlite_database.dart @@ -1,3 +1,4 @@ +import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/sqlite_database.dart'; @@ -31,6 +32,7 @@ class SqliteDatabaseImpl } @override + @protected Future get isInitialized => throw UnimplementedError(); @override diff --git a/lib/src/impl/stub_sqlite_open_factory.dart b/lib/src/impl/stub_sqlite_open_factory.dart index 437ff4b..1108752 100644 --- a/lib/src/impl/stub_sqlite_open_factory.dart +++ b/lib/src/impl/stub_sqlite_open_factory.dart @@ -1,5 +1,8 @@ +import 'dart:async'; + import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; +import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { @@ -16,4 +19,9 @@ class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { List pragmaStatements(SqliteOpenOptions options) { throw UnimplementedError(); } + + @override + FutureOr openConnection(SqliteOpenOptions options) { + throw UnimplementedError(); + } } diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index 4f92f27..2dd8374 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -1,25 +1,26 @@ import 'dart:async'; -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; -import 'package:sqlite_async/src/common/mutex.dart'; -import 'package:sqlite_async/src/common/port_channel.dart'; +import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; -import 'package:sqlite_async/src/sqlite_connection.dart'; -import 'package:sqlite_async/src/sqlite_queries.dart'; -import 'package:sqlite_async/src/update_notification.dart'; /// A connection pool with a single write connection and multiple read connections. class SqliteConnectionPool with SqliteQueries implements SqliteConnection { - SqliteConnection? _writeConnection; + final StreamController updatesController = + StreamController.broadcast(); + + @override + + /// The write connection might be recreated if it's closed + /// This will allow the update stream remain constant even + /// after using a new write connection. + late final Stream updates = updatesController.stream; + + SqliteConnectionImpl? _writeConnection; final List _readConnections = []; final AbstractDefaultSqliteOpenFactory _factory; - final SerializedPortClient _upstreamPort; - - @override - final Stream? updates; final int maxReaders; @@ -41,14 +42,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { /// Read connections are opened in read-only mode, and will reject any statements /// that modify the database. SqliteConnectionPool(this._factory, - {this.updates, - this.maxReaders = 5, - SqliteConnection? writeConnection, + {this.maxReaders = 5, + SqliteConnectionImpl? writeConnection, this.debugName, - required this.mutex, - required SerializedPortClient upstreamPort}) - : _writeConnection = writeConnection, - _upstreamPort = upstreamPort; + required this.mutex}) + : _writeConnection = writeConnection { + // Use the write connection's updates + _writeConnection?.updates?.forEach(updatesController.add); + } /// Returns true if the _write_ connection is currently in autocommit mode. @override @@ -117,21 +118,24 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, String? debugContext}) { + {Duration? lockTimeout, String? debugContext}) async { if (closed) { throw AssertionError('Closed'); } if (_writeConnection?.closed == true) { _writeConnection = null; } - _writeConnection ??= SqliteConnectionImpl( - upstreamPort: _upstreamPort, - primary: false, - updates: updates, - debugName: debugName != null ? '$debugName-writer' : null, - mutex: mutex, - readOnly: false, - openFactory: _factory); + + if (_writeConnection == null) { + _writeConnection = (await _factory.openConnection(SqliteOpenOptions( + primaryConnection: true, + debugName: debugName != null ? '$debugName-writer' : null, + mutex: mutex, + readOnly: false))) as SqliteConnectionImpl; + // Expose the new updates on the connection pool + _writeConnection!.updates?.forEach(updatesController.add); + } + return _runZoned(() { return _writeConnection!.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); @@ -163,7 +167,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { ? null : '$debugName-${_readConnections.length + 1}'; var connection = SqliteConnectionImpl( - upstreamPort: _upstreamPort, + upstreamPort: _writeConnection?.upstreamPort, primary: false, updates: updates, debugName: name, @@ -181,6 +185,10 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { } } + SerializedPortClient? get upstreamPort { + return _writeConnection?.upstreamPort; + } + @override Future close() async { closed = true; diff --git a/lib/src/native/database/native_sqlite_connection_impl.dart b/lib/src/native/database/native_sqlite_connection_impl.dart index 9351330..fd05957 100644 --- a/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/lib/src/native/database/native_sqlite_connection_impl.dart @@ -12,18 +12,22 @@ import 'package:sqlite_async/src/sqlite_queries.dart'; import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/shared_utils.dart'; +import 'upstream_updates.dart'; + typedef TxCallback = Future Function(CommonDatabase db); /// Implements a SqliteConnection using a separate isolate for the database /// operations. -class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { +class SqliteConnectionImpl + with SqliteQueries, UpStreamTableUpdates + implements SqliteConnection { /// Private to this connection final SimpleMutex _connectionMutex = SimpleMutex(); final Mutex _writeMutex; /// Must be a broadcast stream @override - final Stream? updates; + late final Stream? updates; final ParentPortClient _isolateClient = ParentPortClient(); late final Isolate _isolate; final String? debugName; @@ -32,13 +36,16 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { SqliteConnectionImpl( {required openFactory, required Mutex mutex, - required SerializedPortClient upstreamPort, - this.updates, + SerializedPortClient? upstreamPort, + Stream? updates, this.debugName, this.readOnly = false, bool primary = false}) : _writeMutex = mutex { - _open(openFactory, primary: primary, upstreamPort: upstreamPort); + this.upstreamPort = upstreamPort ?? listenForEvents(); + // Accept an incoming stream of updates, or expose one if not given. + this.updates = updates ?? updatesController.stream; + _open(openFactory, primary: primary, upstreamPort: this.upstreamPort); } Future get ready async { @@ -81,13 +88,14 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { paused: true); _isolateClient.tieToIsolate(_isolate); _isolate.resume(_isolate.pauseCapability!); - + isInitialized = _isolateClient.ready; await _isolateClient.ready; }); } @override Future close() async { + eventsPort?.close(); await _connectionMutex.lock(() async { if (readOnly) { await _isolateClient.post(const _SqliteIsolateConnectionClose()); diff --git a/lib/src/native/database/native_sqlite_database.dart b/lib/src/native/database/native_sqlite_database.dart index fd258d2..c57cfd0 100644 --- a/lib/src/native/database/native_sqlite_database.dart +++ b/lib/src/native/database/native_sqlite_database.dart @@ -1,9 +1,8 @@ import 'dart:async'; -import 'dart:isolate'; +import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/sqlite_database.dart'; -import 'package:sqlite_async/src/common/port_channel.dart'; import 'package:sqlite_async/src/native/database/connection_pool.dart'; import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart'; import 'package:sqlite_async/src/native/native_isolate_connection_factory.dart'; @@ -13,8 +12,6 @@ import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; import 'package:sqlite_async/src/sqlite_queries.dart'; import 'package:sqlite_async/src/update_notification.dart'; -import 'package:sqlite_async/src/utils/native_database_utils.dart'; -import 'package:sqlite_async/src/utils/shared_utils.dart'; /// A SQLite database instance. /// @@ -24,7 +21,7 @@ class SqliteDatabaseImpl with SqliteQueries, SqliteDatabaseMixin implements SqliteDatabase { @override - final AbstractDefaultSqliteOpenFactory openFactory; + final DefaultSqliteOpenFactory openFactory; @override late Stream updates; @@ -32,17 +29,17 @@ class SqliteDatabaseImpl @override int maxReaders; - @override - late Future isInitialized; + /// Global lock to serialize write transactions. + final SimpleMutex mutex = SimpleMutex(); - late final PortServer _eventsPort; + @override + @protected + // Native doesn't require any asynchronous initialization + late Future isInitialized = Future.value(); late final SqliteConnectionImpl _internalConnection; late final SqliteConnectionPool _pool; - /// Global lock to serialize write transactions. - final SimpleMutex mutex = SimpleMutex(); - /// Open a SqliteDatabase. /// /// Only a single SqliteDatabase per [path] should be opened at a time. @@ -71,26 +68,17 @@ class SqliteDatabaseImpl /// 2. Running additional per-connection PRAGMA statements on each connection. /// 3. Creating custom SQLite functions. /// 4. Creating temporary views or triggers. - SqliteDatabaseImpl.withFactory(this.openFactory, - {this.maxReaders = SqliteDatabase.defaultMaxReaders}) { - updates = updatesController.stream; - - _listenForEvents(); - + SqliteDatabaseImpl.withFactory(AbstractDefaultSqliteOpenFactory factory, + {this.maxReaders = SqliteDatabase.defaultMaxReaders}) + : openFactory = factory as DefaultSqliteOpenFactory { _internalConnection = _openPrimaryConnection(debugName: 'sqlite-writer'); _pool = SqliteConnectionPool(openFactory, - upstreamPort: _eventsPort.client(), - updates: updates, writeConnection: _internalConnection, debugName: 'sqlite', maxReaders: maxReaders, mutex: mutex); - - isInitialized = _init(); - } - - Future _init() async { - await _internalConnection.ready; + // Updates get updates from the pool + updates = _pool.updates; } @override @@ -105,50 +93,6 @@ class SqliteDatabaseImpl return _pool.getAutoCommit(); } - void _listenForEvents() { - UpdateNotification? updates; - - Map subscriptions = {}; - - _eventsPort = PortServer((message) async { - if (message is UpdateNotification) { - if (updates == null) { - updates = message; - // Use the mutex to only send updates after the current transaction. - // Do take care to avoid getting a lock for each individual update - - // that could add massive performance overhead. - mutex.lock(() async { - if (updates != null) { - updatesController.add(updates!); - updates = null; - } - }); - } else { - updates!.tables.addAll(message.tables); - } - return null; - } else if (message is InitDb) { - await isInitialized; - return null; - } else if (message is SubscribeToUpdates) { - if (subscriptions.containsKey(message.port)) { - return; - } - final subscription = updatesController.stream.listen((event) { - message.port.send(event); - }); - subscriptions[message.port] = subscription; - return null; - } else if (message is UnsubscribeToUpdates) { - final subscription = subscriptions.remove(message.port); - subscription?.cancel(); - return null; - } else { - throw ArgumentError('Unknown message type: $message'); - } - }); - } - /// A connection factory that can be passed to different isolates. /// /// Use this to access the database in background isolates. @@ -157,25 +101,13 @@ class SqliteDatabaseImpl return IsolateConnectionFactoryImpl( openFactory: openFactory, mutex: mutex.shared, - upstreamPort: _eventsPort.client()); - } - - SqliteConnectionImpl _openPrimaryConnection({String? debugName}) { - return SqliteConnectionImpl( - upstreamPort: _eventsPort.client(), - primary: true, - updates: updates, - debugName: debugName, - mutex: mutex, - readOnly: false, - openFactory: openFactory); + upstreamPort: _pool.upstreamPort!); } @override Future close() async { await _pool.close(); updatesController.close(); - _eventsPort.close(); await mutex.close(); } @@ -222,4 +154,13 @@ class SqliteDatabaseImpl return _pool.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } + + SqliteConnectionImpl _openPrimaryConnection({String? debugName}) { + return SqliteConnectionImpl( + primary: true, + debugName: debugName, + mutex: mutex, + readOnly: false, + openFactory: openFactory); + } } diff --git a/lib/src/native/database/upstream_updates.dart b/lib/src/native/database/upstream_updates.dart new file mode 100644 index 0000000..8d94cc5 --- /dev/null +++ b/lib/src/native/database/upstream_updates.dart @@ -0,0 +1,67 @@ +import 'dart:async'; +import 'dart:isolate'; + +import 'package:meta/meta.dart'; +import 'package:sqlite_async/src/common/port_channel.dart'; +import 'package:sqlite_async/src/update_notification.dart'; +import 'package:sqlite_async/src/utils/native_database_utils.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; + +mixin UpStreamTableUpdates { + final StreamController updatesController = + StreamController.broadcast(); + + late SerializedPortClient upstreamPort; + + @protected + + /// Resolves once the primary connection is initialized + late Future isInitialized; + + @protected + PortServer? eventsPort; + + @protected + SerializedPortClient listenForEvents() { + UpdateNotification? updates; + + Map subscriptions = {}; + + eventsPort = PortServer((message) async { + if (message is UpdateNotification) { + if (updates == null) { + updates = message; + // Use the mutex to only send updates after the current transaction. + // Do take care to avoid getting a lock for each individual update - + // that could add massive performance overhead. + if (updates != null) { + updatesController.add(updates!); + updates = null; + } + } else { + updates!.tables.addAll(message.tables); + } + return null; + } else if (message is InitDb) { + await isInitialized; + return null; + } else if (message is SubscribeToUpdates) { + if (subscriptions.containsKey(message.port)) { + return; + } + final subscription = updatesController.stream.listen((event) { + message.port.send(event); + }); + subscriptions[message.port] = subscription; + return null; + } else if (message is UnsubscribeToUpdates) { + final subscription = subscriptions.remove(message.port); + subscription?.cancel(); + return null; + } else { + throw ArgumentError('Unknown message type: $message'); + } + }); + return upstreamPort = eventsPort!.client(); + } +} diff --git a/lib/src/native/native_isolate_connection_factory.dart b/lib/src/native/native_isolate_connection_factory.dart index d49044d..e9110c4 100644 --- a/lib/src/native/native_isolate_connection_factory.dart +++ b/lib/src/native/native_isolate_connection_factory.dart @@ -2,9 +2,9 @@ import 'dart:async'; import 'dart:isolate'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/port_channel.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; +import 'package:sqlite_async/src/native/native_sqlite_open_factory.dart'; import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/database_utils.dart'; @@ -15,13 +15,13 @@ class IsolateConnectionFactoryImpl with IsolateOpenFactoryMixin implements IsolateConnectionFactory { @override - AbstractDefaultSqliteOpenFactory openFactory; + DefaultSqliteOpenFactory openFactory; @override SerializedMutex mutex; @override - SerializedPortClient upstreamPort; + final SerializedPortClient upstreamPort; IsolateConnectionFactoryImpl( {required this.openFactory, @@ -88,7 +88,7 @@ class _IsolateSqliteConnection extends SqliteConnectionImpl { _IsolateSqliteConnection( {required super.openFactory, required super.mutex, - required super.upstreamPort, + super.upstreamPort, super.updates, super.debugName, super.readOnly = false, diff --git a/lib/src/native/native_sqlite_open_factory.dart b/lib/src/native/native_sqlite_open_factory.dart index c243bd0..99fd8e4 100644 --- a/lib/src/native/native_sqlite_open_factory.dart +++ b/lib/src/native/native_sqlite_open_factory.dart @@ -2,6 +2,8 @@ import 'package:sqlite3/sqlite3.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; +import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart'; +import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; /// Native implementation of [AbstractDefaultSqliteOpenFactory] @@ -37,4 +39,16 @@ class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { } return statements; } + + @override + SqliteConnection openConnection(SqliteOpenOptions options) { + return SqliteConnectionImpl( + primary: options.primaryConnection, + readOnly: options.readOnly, + mutex: options.mutex!, + debugName: options.debugName, + updates: options.updates, + openFactory: this, + ); + } } diff --git a/lib/src/sqlite_connection.dart b/lib/src/sqlite_connection.dart index 1ace5b8..b9edd36 100644 --- a/lib/src/sqlite_connection.dart +++ b/lib/src/sqlite_connection.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'package:sqlite3/common.dart' as sqlite; +import 'package:sqlite_async/src/update_notification.dart'; /// Abstract class representing calls available in a read-only or read-write context. abstract class SqliteReadContext { @@ -74,6 +75,9 @@ abstract class SqliteWriteContext extends SqliteReadContext { /// Abstract class representing a connection to the SQLite database. abstract class SqliteConnection extends SqliteWriteContext { + /// Reports table change update notifications + Stream? get updates; + /// Open a read-only transaction. /// /// Statements within the transaction must be done on the provided diff --git a/lib/src/sqlite_queries.dart b/lib/src/sqlite_queries.dart index 959a97d..d0eab7a 100644 --- a/lib/src/sqlite_queries.dart +++ b/lib/src/sqlite_queries.dart @@ -9,9 +9,6 @@ import 'update_notification.dart'; /// Classes using this need to implement [SqliteConnection.readLock] /// and [SqliteConnection.writeLock]. mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { - /// Broadcast stream that is notified of any table updates - Stream? get updates; - @override Future execute(String sql, [List parameters = const []]) async { diff --git a/lib/src/web/database/connection/drift_sqlite_connection.dart b/lib/src/web/database/connection/drift_sqlite_connection.dart new file mode 100644 index 0000000..2ee7f0b --- /dev/null +++ b/lib/src/web/database/connection/drift_sqlite_connection.dart @@ -0,0 +1,259 @@ +import 'dart:async'; +import 'package:drift/drift.dart'; +import 'package:drift/remote.dart'; +import 'package:drift/wasm.dart'; +import 'package:meta/meta.dart'; +import 'package:sqlite_async/sqlite3_common.dart'; + +import 'package:sqlite_async/src/common/mutex.dart'; + +import 'package:sqlite_async/src/sqlite_connection.dart'; +import 'package:sqlite_async/src/sqlite_queries.dart'; +import 'package:sqlite_async/src/update_notification.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; + +/// Custom function which exposes CommonDatabase.autocommit +const sqliteAsyncAutoCommitCommand = 'sqlite_async_autocommit'; + +class DriftSqliteConnection with SqliteQueries implements SqliteConnection { + WasmDatabaseResult db; + + @override + late Stream updates; + + final Mutex mutex; + + @override + bool closed = false; + + DriftSqliteConnection(this.db, this.mutex) { + // Pass on table updates + updates = db.resolvedExecutor.streamQueries + .updatesForSync(TableUpdateQuery.any()) + .map((tables) { + return UpdateNotification(tables.map((e) => e.table).toSet()); + }); + } + + @override + close() { + closed = true; + return db.resolvedExecutor.close(); + } + + @override + Future executeBatch( + String sql, List> parameterSets) async { + try { + final result = await db.resolvedExecutor.runBatched(BatchedStatements( + [sql], + parameterSets + .map((e) => ArgumentsForBatchedStatement(0, e)) + .toList())); + return result; + } on DriftRemoteException catch (e) { + if (e.toString().contains('SqliteException')) { + // Drift wraps these in remote errors + throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); + } + rethrow; + } + } + + Future select(String sql, + [List parameters = const []]) async { + try { + final result = await db.resolvedExecutor.runSelect(sql, parameters); + if (result.isEmpty) { + return ResultSet([], [], []); + } + return ResultSet(result.first.keys.toList(), [], + result.map((e) => e.values.toList()).toList()); + } on DriftRemoteException catch (e) { + if (e.toString().contains('SqliteException')) { + // Drift wraps these in remote errors + throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); + } + rethrow; + } + } + + @override + Future readLock(Future Function(SqliteReadContext tx) callback, + {Duration? lockTimeout, + String? debugContext, + bool isTransaction = false}) async { + return _runZoned( + () => mutex.lock(() async { + final context = + DriftReadContext(this, isTransaction: isTransaction); + try { + final result = await callback(context); + return result; + } finally { + context.close(); + } + }, timeout: lockTimeout), + debugContext: debugContext ?? 'execute()'); + } + + @override + Future writeLock(Future Function(SqliteWriteContext tx) callback, + {Duration? lockTimeout, + String? debugContext, + bool isTransaction = false}) async { + return _runZoned( + () => mutex.lock(() async { + final context = + DriftWriteContext(this, isTransaction: isTransaction); + try { + final result = await callback(context); + return result; + } finally { + context.close(); + } + }, timeout: lockTimeout), + debugContext: debugContext ?? 'execute()'); + } + + @override + Future readTransaction( + Future Function(SqliteReadContext tx) callback, + {Duration? lockTimeout}) async { + return readLock((ctx) async { + return await internalReadTransaction(ctx, callback); + }, + lockTimeout: lockTimeout, + debugContext: 'readTransaction()', + isTransaction: true); + } + + @override + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback, + {Duration? lockTimeout}) async { + return writeLock(( + ctx, + ) async { + return await internalWriteTransaction(ctx, callback); + }, + lockTimeout: lockTimeout, + debugContext: 'writeTransaction()', + isTransaction: true); + } + + /// The mutex on individual connections do already error in recursive locks. + /// + /// We duplicate the same check here, to: + /// 1. Also error when the recursive transaction is handled by a different + /// connection (with a different lock). + /// 2. Give a more specific error message when it happens. + T _runZoned(T Function() callback, {required String debugContext}) { + if (Zone.current[this] != null) { + throw LockError( + 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); + } + var zone = Zone.current.fork(zoneValues: {this: true}); + return zone.run(callback); + } + + @override + Future getAutoCommit() async { + return DriftWriteContext(this).getAutoCommit(); + } +} + +class DriftReadContext implements SqliteReadContext { + DriftSqliteConnection db; + bool _closed = false; + + @protected + bool isTransaction; + + DriftReadContext(this.db, {this.isTransaction = false}); + + @override + Future computeWithDatabase( + Future Function(CommonDatabase db) compute) { + throw UnimplementedError(); + } + + @override + Future get(String sql, [List parameters = const []]) async { + return (await getAll(sql, parameters)).first; + } + + @override + Future getAll(String sql, + [List parameters = const []]) async { + if (_closed) { + throw SqliteException(0, 'Transaction closed', null, sql); + } + return db.select(sql, parameters); + } + + @override + Future getOptional(String sql, + [List parameters = const []]) async { + return (await db.select(sql, parameters)).firstOrNull; + } + + @override + bool get closed => _closed; + + close() { + _closed = true; + } + + @override + Future getAutoCommit() async { + final response = await db.select('select $sqliteAsyncAutoCommitCommand()'); + if (response.isEmpty) { + return false; + } + + return response.first.values.first != 0; + } +} + +class DriftWriteContext extends DriftReadContext implements SqliteWriteContext { + DriftWriteContext(super.db, {super.isTransaction}); + + @override + Future execute(String sql, + [List parameters = const []]) async { + return getAll(sql, parameters); + } + + @override + Future getAll(String sql, + [List parameters = const []]) async { + if (_closed) { + throw SqliteException(0, 'Transaction closed', null, sql); + } + + /// Statements in read/writeTransactions should not execute after ROLLBACK + if (isTransaction && + !sql.toLowerCase().contains('begin') && + await getAutoCommit()) { + throw SqliteException(0, + 'Transaction rolled back by earlier statement. Cannot execute: $sql'); + } + return db.select(sql, parameters); + } + + @override + Future executeBatch( + String sql, List> parameterSets) async { + return db.executeBatch(sql, parameterSets); + } +} + +class DriftSqliteUser extends QueryExecutorUser { + @override + Future beforeOpen( + QueryExecutor executor, OpeningDetails details) async {} + + @override + int get schemaVersion => 1; +} diff --git a/lib/src/web/database/executor/drift_sql_executor.dart b/lib/src/web/database/executor/drift_sql_executor.dart deleted file mode 100644 index 8911e41..0000000 --- a/lib/src/web/database/executor/drift_sql_executor.dart +++ /dev/null @@ -1,76 +0,0 @@ -import 'dart:async'; - -import 'package:drift/drift.dart'; -import 'package:drift/remote.dart'; -import 'package:drift/wasm.dart'; -import 'package:sqlite3/common.dart'; -import 'sqlite_executor.dart'; - -class DriftWebSQLExecutor extends SQLExecutor { - WasmDatabaseResult db; - - @override - bool closed = false; - - DriftWebSQLExecutor(this.db) { - // Pass on table updates - updateStream = db.resolvedExecutor.streamQueries - .updatesForSync(TableUpdateQuery.any()) - .map((tables) { - return tables.map((e) => e.table).toSet(); - }); - } - - @override - close() { - closed = true; - return db.resolvedExecutor.close(); - } - - @override - Future executeBatch( - String sql, List> parameterSets) async { - try { - final result = await db.resolvedExecutor.runBatched(BatchedStatements( - [sql], - parameterSets - .map((e) => ArgumentsForBatchedStatement(0, e)) - .toList())); - return result; - } on DriftRemoteException catch (e) { - if (e.toString().contains('SqliteException')) { - // Drift wraps these in remote errors - throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); - } - rethrow; - } - } - - @override - Future select(String sql, - [List parameters = const []]) async { - try { - final result = await db.resolvedExecutor.runSelect(sql, parameters); - if (result.isEmpty) { - return ResultSet([], [], []); - } - return ResultSet(result.first.keys.toList(), [], - result.map((e) => e.values.toList()).toList()); - } on DriftRemoteException catch (e) { - if (e.toString().contains('SqliteException')) { - // Drift wraps these in remote errors - throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); - } - rethrow; - } - } -} - -class DriftSqliteUser extends QueryExecutorUser { - @override - Future beforeOpen( - QueryExecutor executor, OpeningDetails details) async {} - - @override - int get schemaVersion => 1; -} diff --git a/lib/src/web/database/executor/sqlite_executor.dart b/lib/src/web/database/executor/sqlite_executor.dart deleted file mode 100644 index c6192fa..0000000 --- a/lib/src/web/database/executor/sqlite_executor.dart +++ /dev/null @@ -1,19 +0,0 @@ -// Abstract class which provides base methods required for Context providers -import 'dart:async'; - -import 'package:sqlite_async/sqlite3_common.dart'; - -/// Abstract class for providing basic SQLite operations -/// Specific DB implementations such as Drift can be adapted to -/// this interface -abstract class SQLExecutor { - bool get closed; - - Stream> updateStream = Stream.empty(); - - Future close(); - - FutureOr select(String sql, [List parameters = const []]); - - FutureOr executeBatch(String sql, List> parameterSets) {} -} diff --git a/lib/src/web/database/web_db_context.dart b/lib/src/web/database/web_db_context.dart deleted file mode 100644 index 1c6b47f..0000000 --- a/lib/src/web/database/web_db_context.dart +++ /dev/null @@ -1,96 +0,0 @@ -import 'dart:async'; - -import 'package:meta/meta.dart'; -import 'package:sqlite_async/sqlite3_common.dart'; -import 'package:sqlite_async/src/sqlite_connection.dart'; -import 'executor/sqlite_executor.dart'; - -/// Custom function which exposes CommonDatabase.autocommit -const sqliteAsyncAutoCommitCommand = 'sqlite_async_autocommit'; - -class WebReadContext implements SqliteReadContext { - SQLExecutor db; - bool _closed = false; - - @protected - bool isTransaction; - - WebReadContext(this.db, {this.isTransaction = false}); - - @override - Future computeWithDatabase( - Future Function(CommonDatabase db) compute) { - throw UnimplementedError(); - } - - @override - Future get(String sql, [List parameters = const []]) async { - return (await getAll(sql, parameters)).first; - } - - @override - Future getAll(String sql, - [List parameters = const []]) async { - if (_closed) { - throw SqliteException(0, 'Transaction closed', null, sql); - } - return db.select(sql, parameters); - } - - @override - Future getOptional(String sql, - [List parameters = const []]) async { - final rows = await getAll(sql, parameters); - return rows.isEmpty ? null : rows.first; - } - - @override - bool get closed => _closed; - - close() { - _closed = true; - } - - @override - Future getAutoCommit() async { - final response = await db.select('select $sqliteAsyncAutoCommitCommand()'); - if (response.isEmpty) { - return false; - } - - return response.first.values.first != 0; - } -} - -class WebWriteContext extends WebReadContext implements SqliteWriteContext { - WebWriteContext(super.db, {super.isTransaction}); - - @override - Future execute(String sql, - [List parameters = const []]) async { - return getAll(sql, parameters); - } - - @override - Future getAll(String sql, - [List parameters = const []]) async { - if (_closed) { - throw SqliteException(0, 'Transaction closed', null, sql); - } - - /// Statements in read/writeTransactions should not execute after ROLLBACK - if (isTransaction && - !sql.toLowerCase().contains('begin') && - await getAutoCommit()) { - throw SqliteException(0, - 'Transaction rolled back by earlier statement. Cannot execute: $sql'); - } - return db.select(sql, parameters); - } - - @override - Future executeBatch( - String sql, List> parameterSets) async { - return db.executeBatch(sql, parameterSets); - } -} diff --git a/lib/src/web/database/web_sqlite_connection_impl.dart b/lib/src/web/database/web_sqlite_connection_impl.dart deleted file mode 100644 index a9df6b2..0000000 --- a/lib/src/web/database/web_sqlite_connection_impl.dart +++ /dev/null @@ -1,144 +0,0 @@ -import 'dart:async'; -import 'package:meta/meta.dart'; -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; -import 'package:sqlite_async/src/common/mutex.dart'; - -import 'package:sqlite_async/src/sqlite_connection.dart'; -import 'package:sqlite_async/src/sqlite_queries.dart'; -import 'package:sqlite_async/src/update_notification.dart'; -import 'package:sqlite_async/src/utils/shared_utils.dart'; -import 'package:sqlite_async/src/web/web_sqlite_open_factory.dart'; - -import 'executor/sqlite_executor.dart'; -import 'web_db_context.dart'; - -/// Web implementation of [SqliteConnection] -class WebSqliteConnectionImpl with SqliteQueries implements SqliteConnection { - @override - bool get closed { - return executor == null || executor!.closed; - } - - @override - late Stream updates; - - late final Mutex mutex; - DefaultSqliteOpenFactory openFactory; - - @protected - final StreamController updatesController = - StreamController.broadcast(); - - @protected - late SQLExecutor? executor; - - @protected - late Future isInitialized; - - WebSqliteConnectionImpl({required this.openFactory, required this.mutex}) { - updates = updatesController.stream; - isInitialized = _init(); - } - - Future _init() async { - executor = await openFactory.openExecutor( - SqliteOpenOptions(primaryConnection: true, readOnly: false)); - - executor!.updateStream.forEach((tables) { - updatesController.add(UpdateNotification(tables)); - }); - } - - @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, - String? debugContext, - bool isTransaction = false}) async { - await isInitialized; - return _runZoned( - () => mutex.lock(() async { - final context = - WebReadContext(executor!, isTransaction: isTransaction); - try { - final result = await callback(context); - return result; - } finally { - context.close(); - } - }, timeout: lockTimeout), - debugContext: debugContext ?? 'execute()'); - } - - @override - Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, - String? debugContext, - bool isTransaction = false}) async { - await isInitialized; - return _runZoned( - () => mutex.lock(() async { - final context = - WebWriteContext(executor!, isTransaction: isTransaction); - try { - final result = await callback(context); - return result; - } finally { - context.close(); - } - }, timeout: lockTimeout), - debugContext: debugContext ?? 'execute()'); - } - - @override - Future readTransaction( - Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout}) async { - return readLock((ctx) async { - return await internalReadTransaction(ctx, callback); - }, - lockTimeout: lockTimeout, - debugContext: 'readTransaction()', - isTransaction: true); - } - - @override - Future writeTransaction( - Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout}) async { - return writeLock(( - ctx, - ) async { - return await internalWriteTransaction(ctx, callback); - }, - lockTimeout: lockTimeout, - debugContext: 'writeTransaction()', - isTransaction: true); - } - - /// The mutex on individual connections do already error in recursive locks. - /// - /// We duplicate the same check here, to: - /// 1. Also error when the recursive transaction is handled by a different - /// connection (with a different lock). - /// 2. Give a more specific error message when it happens. - T _runZoned(T Function() callback, {required String debugContext}) { - if (Zone.current[this] != null) { - throw LockError( - 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); - } - var zone = Zone.current.fork(zoneValues: {this: true}); - return zone.run(callback); - } - - @override - Future close() async { - await isInitialized; - await executor!.close(); - } - - @override - Future getAutoCommit() async { - await isInitialized; - return WebWriteContext(executor!).getAutoCommit(); - } -} diff --git a/lib/src/web/database/web_sqlite_database.dart b/lib/src/web/database/web_sqlite_database.dart index 62e3911..a92ceac 100644 --- a/lib/src/web/database/web_sqlite_database.dart +++ b/lib/src/web/database/web_sqlite_database.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/mutex.dart'; import 'package:sqlite_async/src/sqlite_queries.dart'; @@ -10,8 +11,6 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/web/web_mutex.dart'; import 'package:sqlite_async/src/web/web_sqlite_open_factory.dart'; -import 'web_sqlite_connection_impl.dart'; - /// Web implementation of [SqliteDatabase] /// Uses a web worker for SQLite connection class SqliteDatabaseImpl @@ -29,14 +28,14 @@ class SqliteDatabaseImpl int maxReaders; @override + @protected late Future isInitialized; @override AbstractDefaultSqliteOpenFactory openFactory; late final Mutex mutex; - late final IsolateConnectionFactoryImpl _isolateConnectionFactory; - late final WebSqliteConnectionImpl _connection; + late final SqliteConnection _connection; /// Open a SqliteDatabase. /// @@ -70,14 +69,13 @@ class SqliteDatabaseImpl {this.maxReaders = SqliteDatabase.defaultMaxReaders}) { updates = updatesController.stream; mutex = MutexImpl(); - _isolateConnectionFactory = IsolateConnectionFactoryImpl( - openFactory: openFactory as DefaultSqliteOpenFactory, mutex: mutex); - _connection = _isolateConnectionFactory.open(); isInitialized = _init(); } Future _init() async { - _connection.updates.forEach((update) { + _connection = await openFactory.openConnection(SqliteOpenOptions( + primaryConnection: true, readOnly: false, mutex: mutex)); + _connection.updates!.forEach((update) { updatesController.add(update); }); } @@ -85,6 +83,7 @@ class SqliteDatabaseImpl @override Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { + await isInitialized; return _connection.readLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } @@ -92,6 +91,7 @@ class SqliteDatabaseImpl @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { + await isInitialized; return _connection.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } @@ -101,6 +101,7 @@ class SqliteDatabaseImpl Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { + await isInitialized; return _connection.readTransaction(callback, lockTimeout: lockTimeout); } @@ -109,21 +110,24 @@ class SqliteDatabaseImpl Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { + await isInitialized; return _connection.writeTransaction(callback, lockTimeout: lockTimeout); } @override Future close() async { + await isInitialized; return _connection.close(); } @override IsolateConnectionFactoryImpl isolateConnectionFactory() { - return _isolateConnectionFactory; + throw UnimplementedError(); } @override - Future getAutoCommit() { + Future getAutoCommit() async { + await isInitialized; return _connection.getAutoCommit(); } } diff --git a/lib/src/web/web_isolate_connection_factory.dart b/lib/src/web/web_isolate_connection_factory.dart index 68212c2..2fa4f87 100644 --- a/lib/src/web/web_isolate_connection_factory.dart +++ b/lib/src/web/web_isolate_connection_factory.dart @@ -1,12 +1,11 @@ import 'dart:async'; import 'package:sqlite_async/sqlite3_common.dart'; -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; import 'package:sqlite_async/src/common/mutex.dart'; import 'package:sqlite_async/src/common/port_channel.dart'; +import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/web/web_sqlite_open_factory.dart'; -import 'database/web_sqlite_connection_impl.dart'; /// An implementation of [IsolateConnectionFactory] for Web /// This uses a web worker instead of an isolate @@ -22,14 +21,14 @@ class IsolateConnectionFactoryImpl IsolateConnectionFactoryImpl( {required this.openFactory, required this.mutex, - SerializedPortClient? upstreamPort}); + required SerializedPortClient upstreamPort}); /// Open a new SqliteConnection. /// /// This opens a single connection in a background execution isolate. @override - WebSqliteConnectionImpl open({String? debugName, bool readOnly = false}) { - return WebSqliteConnectionImpl(mutex: mutex, openFactory: openFactory); + SqliteConnection open({String? debugName, bool readOnly = false}) { + throw UnimplementedError(); } /// Opens a synchronous sqlite.Database directly in the current isolate. @@ -43,8 +42,7 @@ class IsolateConnectionFactoryImpl /// this connection. @override Future openRawDatabase({bool readOnly = false}) async { - return openFactory - .open(SqliteOpenOptions(primaryConnection: false, readOnly: readOnly)); + throw UnimplementedError(); } @override diff --git a/lib/src/web/web_sqlite_open_factory.dart b/lib/src/web/web_sqlite_open_factory.dart index 916d275..8bb7f4e 100644 --- a/lib/src/web/web_sqlite_open_factory.dart +++ b/lib/src/web/web_sqlite_open_factory.dart @@ -2,12 +2,9 @@ import 'dart:async'; import 'package:drift/wasm.dart'; import 'package:sqlite3/wasm.dart'; - -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; -import 'package:sqlite_async/src/sqlite_options.dart'; - -import 'database/executor/drift_sql_executor.dart'; -import 'database/executor/sqlite_executor.dart'; +import 'package:sqlite_async/sqlite_async.dart'; +import 'package:sqlite_async/src/web/database/connection/drift_sqlite_connection.dart'; +import 'package:sqlite_async/src/web/web_mutex.dart'; /// Web implementation of [AbstractDefaultSqliteOpenFactory] class DefaultSqliteOpenFactory @@ -33,23 +30,22 @@ class DefaultSqliteOpenFactory return wasmSqlite.open(path); } - /// Returns a simple asynchronous SQLExecutor which can be used to implement - /// higher order functionality. + @override + /// Currently this only uses the Drift WASM implementation. /// The Drift SQLite package provides built in async Web worker functionality /// and automatic persistence storage selection. /// Due to being asynchronous, the under laying CommonDatabase is not accessible - Future openExecutor(SqliteOpenOptions options) async { + Future openConnection(SqliteOpenOptions options) async { final db = await WasmDatabase.open( databaseName: path, sqlite3Uri: Uri.parse(sqliteOptions.webSqliteOptions.wasmUri), driftWorkerUri: Uri.parse(sqliteOptions.webSqliteOptions.workerUri), ); - final executor = DriftWebSQLExecutor(db); await db.resolvedExecutor.ensureOpen(DriftSqliteUser()); - return executor; + return DriftSqliteConnection(db, options.mutex ?? MutexImpl()); } @override diff --git a/lib/src/web/worker/worker_utils.dart b/lib/src/web/worker/worker_utils.dart index cf5b611..46c276b 100644 --- a/lib/src/web/worker/worker_utils.dart +++ b/lib/src/web/worker/worker_utils.dart @@ -1,5 +1,5 @@ import 'package:sqlite_async/sqlite3_common.dart'; -import 'package:sqlite_async/src/web/database/web_db_context.dart'; +import 'package:sqlite_async/src/web/database/connection/drift_sqlite_connection.dart'; void setupCommonWorkerDB(CommonDatabase database) { /// Exposes autocommit via a query function diff --git a/pubspec.yaml b/pubspec.yaml index bbce796..889e269 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -30,3 +30,11 @@ dev_dependencies: shelf_static: ^1.1.2 stream_channel: ^2.1.2 path: ^1.9.0 + +platforms: + android: + ios: + linux: + macos: + windows: + web: