Skip to content

[Alpha WIP] Open Factory SqliteConnections #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ assets

.idea
.vscode
.devcontainer
*.db
*.db-*
test-db
Expand Down
33 changes: 32 additions & 1 deletion lib/src/common/abstract_open_factory.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import 'dart:async';
import 'package:meta/meta.dart';

import 'package:sqlite_async/sqlite3_common.dart' as sqlite;
import 'package:sqlite_async/src/common/mutex.dart';
import 'package:sqlite_async/src/sqlite_connection.dart';
import 'package:sqlite_async/src/sqlite_options.dart';
import 'package:sqlite_async/src/update_notification.dart';

/// Factory to create new SQLite database connections.
///
Expand All @@ -11,7 +14,11 @@ import 'package:sqlite_async/src/sqlite_options.dart';
abstract class SqliteOpenFactory<Database extends sqlite.CommonDatabase> {
String get path;

/// Opens a direct connection to the SQLite database
FutureOr<Database> open(SqliteOpenOptions options);

/// Opens an asynchronous [SqliteConnection]
FutureOr<SqliteConnection> openConnection(SqliteOpenOptions options);
}

class SqliteOpenOptions {
Expand All @@ -21,8 +28,21 @@ class SqliteOpenOptions {
/// Whether this connection is read-only.
final bool readOnly;

/// Mutex to use in [SqliteConnection]s
final Mutex? mutex;

/// Name used in debug logs
final String? debugName;

/// Stream of external update notifications
final Stream<UpdateNotification>? updates;

const SqliteOpenOptions(
{required this.primaryConnection, required this.readOnly});
{required this.primaryConnection,
required this.readOnly,
this.mutex,
this.debugName,
this.updates});

sqlite.OpenMode get openMode {
if (primaryConnection) {
Expand Down Expand Up @@ -55,9 +75,14 @@ abstract class AbstractDefaultSqliteOpenFactory<
List<String> pragmaStatements(SqliteOpenOptions options);

@protected

/// Opens a direct connection to a SQLite database connection
FutureOr<Database> openDB(SqliteOpenOptions options);

@override

/// Opens a direct connection to a SQLite database connection
/// and executes setup pragma statements to initialize the DB
FutureOr<Database> open(SqliteOpenOptions options) async {
var db = await openDB(options);

Expand All @@ -66,4 +91,10 @@ abstract class AbstractDefaultSqliteOpenFactory<
}
return db;
}

@override

/// Opens an asynchronous [SqliteConnection] to a SQLite database
/// and executes setup pragma statements to initialize the DB
FutureOr<SqliteConnection> openConnection(SqliteOpenOptions options);
}
9 changes: 4 additions & 5 deletions lib/src/common/isolate_connection_factory.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ abstract class IsolateConnectionFactory<Database extends sqlite.CommonDatabase>
factory IsolateConnectionFactory(
{required openFactory,
required mutex,
SerializedPortClient? upstreamPort}) {
required SerializedPortClient upstreamPort}) {
return IsolateConnectionFactoryImpl(
openFactory: openFactory,
mutex: mutex,
upstreamPort: upstreamPort as SerializedPortClient)
as IsolateConnectionFactory<Database>;
openFactory: openFactory,
mutex: mutex,
upstreamPort: upstreamPort) as IsolateConnectionFactory<Database>;
}

/// Open a new SqliteConnection.
Expand Down
2 changes: 2 additions & 0 deletions lib/src/common/sqlite_database.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:async';

import 'package:meta/meta.dart';
import 'package:sqlite_async/src/common/abstract_open_factory.dart';
import 'package:sqlite_async/src/common/isolate_connection_factory.dart';
import 'package:sqlite_async/src/impl/sqlite_database_impl.dart';
Expand All @@ -26,6 +27,7 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries {
final StreamController<UpdateNotification> updatesController =
StreamController.broadcast();

@protected
Future<void> get isInitialized;

/// Wait for initialization to complete.
Expand Down
2 changes: 2 additions & 0 deletions lib/src/impl/stub_sqlite_database.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'package:meta/meta.dart';
import 'package:sqlite_async/src/common/isolate_connection_factory.dart';
import 'package:sqlite_async/src/common/abstract_open_factory.dart';
import 'package:sqlite_async/src/common/sqlite_database.dart';
Expand Down Expand Up @@ -31,6 +32,7 @@ class SqliteDatabaseImpl
}

@override
@protected
Future<void> get isInitialized => throw UnimplementedError();

@override
Expand Down
8 changes: 8 additions & 0 deletions lib/src/impl/stub_sqlite_open_factory.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import 'dart:async';

import 'package:sqlite_async/sqlite3_common.dart';
import 'package:sqlite_async/src/common/abstract_open_factory.dart';
import 'package:sqlite_async/src/sqlite_connection.dart';
import 'package:sqlite_async/src/sqlite_options.dart';

class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory {
Expand All @@ -16,4 +19,9 @@ class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory {
List<String> pragmaStatements(SqliteOpenOptions options) {
throw UnimplementedError();
}

@override
FutureOr<SqliteConnection> openConnection(SqliteOpenOptions options) {
throw UnimplementedError();
}
}
64 changes: 36 additions & 28 deletions lib/src/native/database/connection_pool.dart
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import 'dart:async';

import 'package:sqlite_async/src/common/abstract_open_factory.dart';
import 'package:sqlite_async/src/common/mutex.dart';
import 'package:sqlite_async/src/common/port_channel.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart';
import 'package:sqlite_async/src/native/native_isolate_mutex.dart';
import 'package:sqlite_async/src/sqlite_connection.dart';
import 'package:sqlite_async/src/sqlite_queries.dart';
import 'package:sqlite_async/src/update_notification.dart';

/// A connection pool with a single write connection and multiple read connections.
class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
SqliteConnection? _writeConnection;
final StreamController<UpdateNotification> updatesController =
StreamController.broadcast();

@override

/// The write connection might be recreated if it's closed
/// This will allow the update stream remain constant even
/// after using a new write connection.
late final Stream<UpdateNotification> updates = updatesController.stream;

SqliteConnectionImpl? _writeConnection;

final List<SqliteConnectionImpl> _readConnections = [];

final AbstractDefaultSqliteOpenFactory _factory;
final SerializedPortClient _upstreamPort;

@override
final Stream<UpdateNotification>? updates;

final int maxReaders;

Expand All @@ -41,14 +42,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
/// Read connections are opened in read-only mode, and will reject any statements
/// that modify the database.
SqliteConnectionPool(this._factory,
{this.updates,
this.maxReaders = 5,
SqliteConnection? writeConnection,
{this.maxReaders = 5,
SqliteConnectionImpl? writeConnection,
this.debugName,
required this.mutex,
required SerializedPortClient upstreamPort})
: _writeConnection = writeConnection,
_upstreamPort = upstreamPort;
required this.mutex})
: _writeConnection = writeConnection {
// Use the write connection's updates
_writeConnection?.updates?.forEach(updatesController.add);
}

/// Returns true if the _write_ connection is currently in autocommit mode.
@override
Expand Down Expand Up @@ -117,21 +118,24 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {

@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) {
{Duration? lockTimeout, String? debugContext}) async {
if (closed) {
throw AssertionError('Closed');
}
if (_writeConnection?.closed == true) {
_writeConnection = null;
}
_writeConnection ??= SqliteConnectionImpl(
upstreamPort: _upstreamPort,
primary: false,
updates: updates,
debugName: debugName != null ? '$debugName-writer' : null,
mutex: mutex,
readOnly: false,
openFactory: _factory);

if (_writeConnection == null) {
_writeConnection = (await _factory.openConnection(SqliteOpenOptions(
primaryConnection: true,
debugName: debugName != null ? '$debugName-writer' : null,
mutex: mutex,
readOnly: false))) as SqliteConnectionImpl;
// Expose the new updates on the connection pool
_writeConnection!.updates?.forEach(updatesController.add);
}

return _runZoned(() {
return _writeConnection!.writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
Expand Down Expand Up @@ -163,7 +167,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
? null
: '$debugName-${_readConnections.length + 1}';
var connection = SqliteConnectionImpl(
upstreamPort: _upstreamPort,
upstreamPort: _writeConnection?.upstreamPort,
primary: false,
updates: updates,
debugName: name,
Expand All @@ -181,6 +185,10 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
}
}

SerializedPortClient? get upstreamPort {
return _writeConnection?.upstreamPort;
}

@override
Future<void> close() async {
closed = true;
Expand Down
20 changes: 14 additions & 6 deletions lib/src/native/database/native_sqlite_connection_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@ import 'package:sqlite_async/src/sqlite_queries.dart';
import 'package:sqlite_async/src/update_notification.dart';
import 'package:sqlite_async/src/utils/shared_utils.dart';

import 'upstream_updates.dart';

typedef TxCallback<T> = Future<T> Function(CommonDatabase db);

/// Implements a SqliteConnection using a separate isolate for the database
/// operations.
class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
class SqliteConnectionImpl
with SqliteQueries, UpStreamTableUpdates
implements SqliteConnection {
/// Private to this connection
final SimpleMutex _connectionMutex = SimpleMutex();
final Mutex _writeMutex;

/// Must be a broadcast stream
@override
final Stream<UpdateNotification>? updates;
late final Stream<UpdateNotification>? updates;
final ParentPortClient _isolateClient = ParentPortClient();
late final Isolate _isolate;
final String? debugName;
Expand All @@ -32,13 +36,16 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
SqliteConnectionImpl(
{required openFactory,
required Mutex mutex,
required SerializedPortClient upstreamPort,
this.updates,
SerializedPortClient? upstreamPort,
Stream<UpdateNotification>? updates,
this.debugName,
this.readOnly = false,
bool primary = false})
: _writeMutex = mutex {
_open(openFactory, primary: primary, upstreamPort: upstreamPort);
this.upstreamPort = upstreamPort ?? listenForEvents();
// Accept an incoming stream of updates, or expose one if not given.
this.updates = updates ?? updatesController.stream;
_open(openFactory, primary: primary, upstreamPort: this.upstreamPort);
}

Future<void> get ready async {
Expand Down Expand Up @@ -81,13 +88,14 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
paused: true);
_isolateClient.tieToIsolate(_isolate);
_isolate.resume(_isolate.pauseCapability!);

isInitialized = _isolateClient.ready;
await _isolateClient.ready;
});
}

@override
Future<void> close() async {
eventsPort?.close();
await _connectionMutex.lock(() async {
if (readOnly) {
await _isolateClient.post(const _SqliteIsolateConnectionClose());
Expand Down
Loading