From 87cbfb960b83e16872bd58e4ce41bb7c7dd29a30 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 14 May 2025 16:26:31 +0200 Subject: [PATCH 1/4] Move options into single class --- .../powersync_core/lib/powersync_core.dart | 1 + .../native/native_powersync_database.dart | 21 ++- .../powersync_database_impl_stub.dart | 13 +- .../lib/src/database/powersync_db_mixin.dart | 28 ++-- .../database/web/web_powersync_database.dart | 15 +- .../lib/src/sync/instruction.dart | 147 ++++++++++++++++++ .../powersync_core/lib/src/sync/options.dart | 90 +++++++++++ .../lib/src/sync/streaming_sync.dart | 19 ++- .../lib/src/web/sync_controller.dart | 36 ++--- .../lib/src/web/sync_worker.dart | 79 +++++----- .../lib/src/web/sync_worker_protocol.dart | 35 +++-- .../test/utils/abstract_test_utils.dart | 5 +- 12 files changed, 370 insertions(+), 119 deletions(-) create mode 100644 packages/powersync_core/lib/src/sync/instruction.dart create mode 100644 packages/powersync_core/lib/src/sync/options.dart diff --git a/packages/powersync_core/lib/powersync_core.dart b/packages/powersync_core/lib/powersync_core.dart index 998c0ce1..b4dfe35d 100644 --- a/packages/powersync_core/lib/powersync_core.dart +++ b/packages/powersync_core/lib/powersync_core.dart @@ -10,6 +10,7 @@ export 'src/exceptions.dart'; export 'src/log.dart'; export 'src/open_factory.dart'; export 'src/schema.dart'; +export 'src/sync/options.dart' hide ResolvedSyncOptions; export 'src/sync/sync_status.dart' hide BucketProgress, InternalSyncDownloadProgress; export 'src/uuid.dart'; diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 6082fbe8..710768fa 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -15,6 +15,7 @@ import 'package:powersync_core/src/log_internal.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; import 'package:powersync_core/src/open_factory/native/native_open_factory.dart'; import 'package:powersync_core/src/schema.dart'; +import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:powersync_core/src/sync/sync_status.dart'; import 'package:sqlite_async/sqlite3_common.dart'; @@ -118,10 +119,9 @@ class PowerSyncDatabaseImpl @internal Future connectInternal({ required PowerSyncBackendConnector connector, - required Duration crudThrottleTime, + required SyncOptions options, required AbortController abort, required Zone asyncWorkZone, - Map? params, }) async { final dbRef = database.isolateConnectionFactory(); @@ -134,6 +134,7 @@ class PowerSyncDatabaseImpl SendPort? initPort; final hasInitPort = Completer(); final receivedIsolateExit = Completer(); + final resolved = ResolvedSyncOptions(options); Future waitForShutdown() async { // Only complete the abortion signal after the isolate shuts down. This @@ -175,8 +176,8 @@ class PowerSyncDatabaseImpl } else if (action == 'init') { final port = initPort = data[1] as SendPort; hasInitPort.complete(); - var crudStream = - database.onChange(['ps_crud'], throttle: crudThrottleTime); + var crudStream = database + .onChange(['ps_crud'], throttle: resolved.crudThrottleTime); crudUpdateSubscription = crudStream.listen((event) { port.send(['update']); }); @@ -238,8 +239,7 @@ class PowerSyncDatabaseImpl _PowerSyncDatabaseIsolateArgs( receiveMessages.sendPort, dbRef, - retryDelay, - clientParams, + resolved, crudMutex.shared, syncMutex.shared, ), @@ -282,16 +282,14 @@ class PowerSyncDatabaseImpl class _PowerSyncDatabaseIsolateArgs { final SendPort sPort; final IsolateConnectionFactory dbRef; - final Duration retryDelay; - final Map? parameters; + final ResolvedSyncOptions options; final SerializedMutex crudMutex; final SerializedMutex syncMutex; _PowerSyncDatabaseIsolateArgs( this.sPort, this.dbRef, - this.retryDelay, - this.parameters, + this.options, this.crudMutex, this.syncMutex, ); @@ -392,9 +390,8 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { invalidCredentialsCallback: invalidateCredentials, uploadCrud: uploadCrud, crudUpdateTriggerStream: crudUpdateController.stream, - retryDelay: args.retryDelay, + options: args.options, client: http.Client(), - syncParameters: args.parameters, crudMutex: crudMutex, syncMutex: syncMutex, ); diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index 348297ed..83498b17 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -6,6 +6,7 @@ import 'package:powersync_core/sqlite_async.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; +import '../sync/options.dart'; import 'powersync_database.dart'; import '../connector.dart'; @@ -110,12 +111,12 @@ class PowerSyncDatabaseImpl @override @internal - Future connectInternal( - {required PowerSyncBackendConnector connector, - required Duration crudThrottleTime, - required AbortController abort, - required Zone asyncWorkZone, - Map? params}) { + Future connectInternal({ + required PowerSyncBackendConnector connector, + required AbortController abort, + required Zone asyncWorkZone, + required SyncOptions options, + }) { throw UnimplementedError(); } diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 1934f463..e33fbd96 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -13,6 +13,7 @@ import 'package:powersync_core/src/powersync_update_notification.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/schema_logic.dart'; import 'package:powersync_core/src/schema_logic.dart' as schema_logic; +import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/sync/sync_status.dart'; mixin PowerSyncDatabaseMixin implements SqliteConnection { @@ -72,6 +73,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Delay between retrying failed requests. /// Defaults to 5 seconds. /// Only has an effect if changed before calling [connect]. + @Deprecated('Set option when calling connect() instead') Duration retryDelay = const Duration(seconds: 5); @protected @@ -269,17 +271,30 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// /// The connection is automatically re-opened if it fails for any reason. /// + /// To set sync parameters used in your sync rules (if any), use + /// [SyncOptions.params]. [SyncOptions] can also be used to tune the behavior + /// of the sync client, see that class for more information. + /// /// Status changes are reported on [statusStream]. Future connect({ required PowerSyncBackendConnector connector, - Duration crudThrottleTime = const Duration(milliseconds: 10), - Map? params, + SyncOptions? options, + @Deprecated('Use SyncOptions.crudThrottleTime instead') + Duration? crudThrottleTime, + @Deprecated('Use SyncOptions.params instead') Map? params, }) async { // The initialization process acquires a sync connect lock (through // updateSchema), so ensure the database is ready before we try to acquire // the lock for the connection. await initialize(); + final resolvedOptions = SyncOptions( + crudThrottleTime: options?.crudThrottleTime ?? crudThrottleTime, + // ignore: deprecated_member_use_from_same_package + retryDelay: options?.retryDelay ?? retryDelay, + params: options?.params ?? params, + ); + clientParams = params; var thisConnectAborter = AbortController(); final zone = Zone.current; @@ -294,8 +309,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { await connectInternal( connector: connector, - crudThrottleTime: crudThrottleTime, - params: params, + options: resolvedOptions, abort: thisConnectAborter, // Run follow-up async tasks in the parent zone, a new one is introduced // while we hold the lock (and async tasks won't hold the sync lock). @@ -342,17 +356,13 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// [connect] method and should not be called elsewhere. /// This method will only be called internally when no other sync client is /// active, so the method should not call [disconnect] itself. - /// - /// The [crudThrottleTime] is the throttle time between CRUD operations, it - /// defaults to 10 milliseconds in [connect]. @protected @internal Future connectInternal({ required PowerSyncBackendConnector connector, - required Duration crudThrottleTime, + required SyncOptions options, required AbortController abort, required Zone asyncWorkZone, - Map? params, }); /// Close the sync connection. diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 651bfc6e..de8bf80c 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -14,6 +14,7 @@ import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import '../../sync/options.dart'; import '../../web/sync_controller.dart'; /// A PowerSync managed database. @@ -114,13 +115,11 @@ class PowerSyncDatabaseImpl @internal Future connectInternal({ required PowerSyncBackendConnector connector, - required Duration crudThrottleTime, required AbortController abort, required Zone asyncWorkZone, - Map? params, + required SyncOptions options, }) async { - final crudStream = - database.onChange(['ps_crud'], throttle: crudThrottleTime); + final resolved = ResolvedSyncOptions(options); final storage = BucketStorage(database); StreamingSync sync; @@ -130,15 +129,16 @@ class PowerSyncDatabaseImpl sync = await SyncWorkerHandle.start( database: this, connector: connector, - crudThrottleTimeMs: crudThrottleTime.inMilliseconds, + options: options, workerUri: Uri.base.resolve('/powersync_sync.worker.js'), - syncParams: params, ); } catch (e) { logger.warning( 'Could not use shared worker for synchronization, falling back to locks.', e, ); + final crudStream = + database.onChange(['ps_crud'], throttle: resolved.crudThrottleTime); sync = StreamingSyncImplementation( adapter: storage, @@ -146,9 +146,8 @@ class PowerSyncDatabaseImpl invalidCredentialsCallback: connector.prefetchCredentials, uploadCrud: () => connector.uploadData(this), crudUpdateTriggerStream: crudStream, - retryDelay: Duration(seconds: 3), + options: resolved, client: BrowserClient(), - syncParameters: params, // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. identifier: database.openFactory.path, diff --git a/packages/powersync_core/lib/src/sync/instruction.dart b/packages/powersync_core/lib/src/sync/instruction.dart new file mode 100644 index 00000000..d3d8020a --- /dev/null +++ b/packages/powersync_core/lib/src/sync/instruction.dart @@ -0,0 +1,147 @@ +import 'sync_status.dart'; + +/// An internal instruction emitted by the sync client in the core extension in +/// response to the Dart SDK passing sync data into the extension. +sealed class Instruction { + factory Instruction.fromJson(Map json) { + return switch (json) { + {'LogLine': final logLine} => + LogLine.fromJson(logLine as Map), + {'UpdateSyncStatus': final updateStatus} => + UpdateSyncStatus.fromJson(updateStatus as Map), + {'EstablishSyncStream': final establish} => + EstablishSyncStream.fromJson(establish as Map), + {'FetchCredentials': final creds} => + FetchCredentials.fromJson(creds as Map), + {'CloseSyncStream': _} => const CloseSyncStream(), + {'FlushFileSystem': _} => const FlushFileSystem(), + {'DidCompleteSync': _} => const DidCompleteSync(), + _ => UnknownSyncLine(json) + }; + } +} + +final class LogLine implements Instruction { + final String severity; + final String line; + + LogLine({required this.severity, required this.line}); + + factory LogLine.fromJson(Map json) { + return LogLine( + severity: json['severity'] as String, + line: json['line'] as String, + ); + } +} + +final class EstablishSyncStream implements Instruction { + final Map request; + + EstablishSyncStream(this.request); + + factory EstablishSyncStream.fromJson(Map json) { + return EstablishSyncStream(json['request'] as Map); + } +} + +final class UpdateSyncStatus implements Instruction { + final CoreSyncStatus status; + + UpdateSyncStatus({required this.status}); + + factory UpdateSyncStatus.fromJson(Map json) { + return UpdateSyncStatus( + status: + CoreSyncStatus.fromJson(json['status'] as Map)); + } +} + +final class CoreSyncStatus implements Instruction { + final bool connected; + final bool connecting; + final List priorityStatus; + final DownloadProgress? downloading; + + CoreSyncStatus({ + required this.connected, + required this.connecting, + required this.priorityStatus, + required this.downloading, + }); + + factory CoreSyncStatus.fromJson(Map json) { + return CoreSyncStatus( + connected: json['connected'] as bool, + connecting: json['connecting'] as bool, + priorityStatus: [ + for (final entry in json['priority_status'] as List) + _priorityStatusFromJson(entry as Map) + ], + downloading: switch (json['downloading']) { + null => null, + final raw as Map => DownloadProgress.fromJson(raw), + }, + ); + } + + static SyncPriorityStatus _priorityStatusFromJson(Map json) { + return ( + priority: BucketPriority(json['priority'] as int), + hasSynced: json['has_synced'] as bool?, + lastSyncedAt: switch (json['last_synced_at']) { + null => null, + final lastSyncedAt as int => + DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000), + }, + ); + } +} + +final class DownloadProgress implements Instruction { + final Map progress; + + DownloadProgress(this.progress); + + factory DownloadProgress.fromJson(Map line) { + return DownloadProgress(line.map((k, v) => + MapEntry(k, _bucketProgressFromJson(v as Map)))); + } + + static BucketProgress _bucketProgressFromJson(Map json) { + return ( + priority: BucketPriority(json['priority'] as int), + atLast: json['at_last'] as int, + sinceLast: json['since_last'] as int, + targetCount: json['target_count'] as int, + ); + } +} + +final class FetchCredentials implements Instruction { + final bool didExpire; + + FetchCredentials(this.didExpire); + + factory FetchCredentials.fromJson(Map line) { + return FetchCredentials(line['did_expire'] as bool); + } +} + +final class CloseSyncStream implements Instruction { + const CloseSyncStream(); +} + +final class FlushFileSystem implements Instruction { + const FlushFileSystem(); +} + +final class DidCompleteSync implements Instruction { + const DidCompleteSync(); +} + +final class UnknownSyncLine implements Instruction { + final Map source; + + UnknownSyncLine(this.source); +} diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart new file mode 100644 index 00000000..68d6880d --- /dev/null +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -0,0 +1,90 @@ +import 'package:collection/collection.dart'; +import 'package:meta/meta.dart'; + +/// Options that affect how the sync client connects to the sync service. +final class SyncOptions { + /// A JSON object that is passed to the sync service and forwarded to sync + /// rules. + /// + /// These [parameters](https://docs.powersync.com/usage/sync-rules/advanced-topics/client-parameters) + /// can be used in sync rules to deliver different data to different clients + /// depending on the values used in [params]. + final Map? params; + + /// A throttle to apply when listening for local database changes before + /// scheduling them for uploads. + /// + /// The throttle is applied to avoid frequent tiny writes in favor of more + /// efficient batched uploads. When set to null, PowerSync defaults to a + /// throtle duration of 10 milliseconds. + final Duration? crudThrottleTime; + + /// How long PowerSync should wait before reconnecting after an error. + /// + /// When set to null, PowerSync defaults to a delay of 5 seconds. + final Duration? retryDelay; + + /// The client implementation to use. + /// + /// This allows switching between the existing [SyncClientImplementation.dart] + /// implementation and a newer one ([SyncClientImplementation.rust]). + /// + /// Note that not setting this field to the default value is experimental. + final SyncClientImplementation syncImplementation; + + const SyncOptions({ + this.crudThrottleTime, + this.retryDelay, + this.params, + this.syncImplementation = SyncClientImplementation.dart, + }); +} + +@internal +extension type ResolvedSyncOptions(SyncOptions source) { + Duration get crudThrottleTime => + source.crudThrottleTime ?? const Duration(milliseconds: 10); + + Duration get retryDelay => source.retryDelay ?? const Duration(seconds: 5); + + Map? get params => source.params ?? const {}; + + (ResolvedSyncOptions, bool) applyFrom(SyncOptions other) { + final newOptions = SyncOptions( + crudThrottleTime: other.crudThrottleTime ?? crudThrottleTime, + retryDelay: other.retryDelay ?? retryDelay, + params: other.params ?? params, + syncImplementation: other.syncImplementation, + ); + + final didChange = !_mapEquality.equals(other.params, params) || + other.crudThrottleTime != crudThrottleTime || + other.retryDelay != retryDelay || + other.syncImplementation != source.syncImplementation; + return (ResolvedSyncOptions(newOptions), didChange); + } + + static const _mapEquality = MapEquality(); +} + +/// Supported sync client implementations. +/// +/// Not using the default implementation (currently [dart], but this may change +/// in the future) is experimental. +@experimental +enum SyncClientImplementation { + /// Decode and handle data received from the sync service in Dart. + /// + /// This is the default option. + dart, + + /// An _experimental_ implementation of the sync client that is written in + /// Rust and shared across the PowerSync SDKs. + /// + /// Since this client decodes sync lines in Rust instead of parsing them in + /// Dart, it can be more performant than the the default [dart] + /// implementation. Since this option has not seen as much real-world testing, + /// it is marked as __experimental__ at the moment! + @experimental + rust, +} diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index a9646f0a..07904a52 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -6,6 +6,7 @@ import 'package:meta/meta.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; import 'package:powersync_core/src/log_internal.dart'; +import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/user_agent/user_agent.dart'; import 'package:sqlite_async/mutex.dart'; @@ -29,6 +30,7 @@ abstract interface class StreamingSync { @internal class StreamingSyncImplementation implements StreamingSync { final BucketStorage adapter; + final ResolvedSyncOptions options; final Future Function() credentialsCallback; final Future Function()? invalidCredentialsCallback; @@ -48,10 +50,6 @@ class StreamingSyncImplementation implements StreamingSync { final StreamController _localPingController = StreamController.broadcast(); - final Duration retryDelay; - - final Map? syncParameters; - AbortController? _abort; bool _safeToClose = true; @@ -68,8 +66,7 @@ class StreamingSyncImplementation implements StreamingSync { this.invalidCredentialsCallback, required this.uploadCrud, required this.crudUpdateTriggerStream, - required this.retryDelay, - this.syncParameters, + required this.options, required http.Client client, Mutex? syncMutex, Mutex? crudMutex, @@ -82,6 +79,8 @@ class StreamingSyncImplementation implements StreamingSync { crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"), _userAgentHeaders = userAgentHeaders(); + Duration get _retryDelay => options.retryDelay; + @override Stream get statusStream => _state.statusStream; @@ -136,7 +135,7 @@ class StreamingSyncImplementation implements StreamingSync { } // Protect sync iterations with exclusivity (if a valid Mutex is provided) await syncMutex.lock(() => streamingSyncIteration(), - timeout: retryDelay); + timeout: _retryDelay); } catch (e, stacktrace) { if (aborted && e is http.ClientException) { // Explicit abort requested - ignore. Example error: @@ -230,7 +229,7 @@ class StreamingSyncImplementation implements StreamingSync { _state.updateStatus((s) => s.uploading = false); } } - }, timeout: retryDelay).whenComplete(() { + }, timeout: _retryDelay).whenComplete(() { assert(identical(_activeCrudUpload, completer)); _activeCrudUpload = null; completer.complete(); @@ -302,7 +301,7 @@ class StreamingSyncImplementation implements StreamingSync { Checkpoint? appliedCheckpoint; var requestStream = streamingSyncRequest( - StreamingSyncRequest(bucketRequests, syncParameters, clientId!)); + StreamingSyncRequest(bucketRequests, options.params, clientId!)); var merged = addBroadcast(requestStream, _localPingController.stream); @@ -534,7 +533,7 @@ class StreamingSyncImplementation implements StreamingSync { /// Delays the standard `retryDelay` Duration, but exits early if /// an abort has been requested. Future _delayRetry() async { - await Future.any([Future.delayed(retryDelay), _abort!.onAbort]); + await Future.any([Future.delayed(_retryDelay), _abort!.onAbort]); } } diff --git a/packages/powersync_core/lib/src/web/sync_controller.dart b/packages/powersync_core/lib/src/web/sync_controller.dart index 9fbd26eb..0c26252e 100644 --- a/packages/powersync_core/lib/src/web/sync_controller.dart +++ b/packages/powersync_core/lib/src/web/sync_controller.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:js_interop'; import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/src/sync/options.dart'; import 'package:sqlite_async/web.dart'; import 'package:web/web.dart'; @@ -12,20 +13,18 @@ import 'sync_worker_protocol.dart'; class SyncWorkerHandle implements StreamingSync { final PowerSyncDatabaseImpl database; final PowerSyncBackendConnector connector; - final int crudThrottleTimeMs; - final Map? syncParams; - + final SyncOptions options; late final WorkerCommunicationChannel _channel; final StreamController _status = StreamController.broadcast(); - SyncWorkerHandle._( - {required this.database, - required this.connector, - required this.crudThrottleTimeMs, - required MessagePort sendToWorker, - required SharedWorker worker, - this.syncParams}) { + SyncWorkerHandle._({ + required this.database, + required this.connector, + required this.options, + required MessagePort sendToWorker, + required SharedWorker worker, + }) { _channel = WorkerCommunicationChannel( port: sendToWorker, errors: EventStreamProviders.errorEvent.forTarget(worker), @@ -77,20 +76,19 @@ class SyncWorkerHandle implements StreamingSync { }); } - static Future start( - {required PowerSyncDatabaseImpl database, - required PowerSyncBackendConnector connector, - required int crudThrottleTimeMs, - required Uri workerUri, - Map? syncParams}) async { + static Future start({ + required PowerSyncDatabaseImpl database, + required PowerSyncBackendConnector connector, + required Uri workerUri, + required SyncOptions options, + }) async { final worker = SharedWorker(workerUri.toString().toJS); final handle = SyncWorkerHandle._( + options: options, database: database, connector: connector, - crudThrottleTimeMs: crudThrottleTimeMs, sendToWorker: worker.port, worker: worker, - syncParams: syncParams, ); // Make sure that the worker is working, or throw immediately. @@ -115,6 +113,6 @@ class SyncWorkerHandle implements StreamingSync { @override Future streamingSync() async { await _channel.startSynchronization( - database.database.openFactory.path, crudThrottleTimeMs, syncParams); + database.database.openFactory.path, ResolvedSyncOptions(options)); } } diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index ba46dddf..ae3869d3 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -13,6 +13,7 @@ import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite_async.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; +import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/web.dart'; import 'package:web/web.dart' hide RequestMode; @@ -44,14 +45,11 @@ class _SyncWorker { } _SyncRunner referenceSyncTask( - String databaseIdentifier, - int crudThrottleTimeMs, - String? syncParamsEncoded, - _ConnectedClient client) { + String databaseIdentifier, SyncOptions options, _ConnectedClient client) { return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () { return _SyncRunner(databaseIdentifier); }) - ..registerClient(client, crudThrottleTimeMs, syncParamsEncoded); + ..registerClient(client, options); } } @@ -69,8 +67,21 @@ class _ConnectedClient { switch (type) { case SyncWorkerMessageType.startSynchronization: final request = payload as StartSynchronization; - _runner = _worker.referenceSyncTask(request.databaseName, - request.crudThrottleTimeMs, request.syncParamsEncoded, this); + final recoveredOptions = SyncOptions( + crudThrottleTime: + Duration(milliseconds: request.crudThrottleTimeMs), + retryDelay: Duration(milliseconds: request.retryDelayMs), + params: switch (request.syncParamsEncoded) { + null => null, + final encodedParams => + jsonDecode(encodedParams) as Map, + }, + syncImplementation: SyncClientImplementation.values + .byName(request.clientImplementationName), + ); + + _runner = _worker.referenceSyncTask( + request.databaseName, recoveredOptions, this); return (JSObject(), null); case SyncWorkerMessageType.abortSynchronization: _runner?.disconnectClient(this); @@ -110,8 +121,7 @@ class _ConnectedClient { class _SyncRunner { final String identifier; - int crudThrottleTimeMs = 1; - String? syncParamsEncoded; + ResolvedSyncOptions options = ResolvedSyncOptions(SyncOptions()); final StreamGroup<_RunnerEvent> _group = StreamGroup(); final StreamController<_RunnerEvent> _mainEvents = StreamController(); @@ -129,19 +139,12 @@ class _SyncRunner { switch (event) { case _AddConnection( :final client, - :final crudThrottleTimeMs, - :final syncParamsEncoded + :final options, ): connections.add(client); - var reconnect = false; - if (this.crudThrottleTimeMs != crudThrottleTimeMs) { - this.crudThrottleTimeMs = crudThrottleTimeMs; - reconnect = true; - } - if (this.syncParamsEncoded != syncParamsEncoded) { - this.syncParamsEncoded = syncParamsEncoded; - reconnect = true; - } + final (newOptions, reconnect) = this.options.applyFrom(options); + this.options = newOptions; + if (sync == null) { await _requestDatabase(client); } else if (reconnect) { @@ -241,7 +244,6 @@ class _SyncRunner { }); final tables = ['ps_crud']; - final crudThrottleTime = Duration(milliseconds: crudThrottleTimeMs); Stream crudStream = powerSyncUpdateNotifications(Stream.empty()); if (database.updates != null) { @@ -249,25 +251,21 @@ class _SyncRunner { .transform(UpdateNotification.filterTablesTransformer(tables)); crudStream = UpdateNotification.throttleStream( filteredStream, - crudThrottleTime, + options.crudThrottleTime, addOne: UpdateNotification.empty(), ); } - final syncParams = syncParamsEncoded == null - ? null - : jsonDecode(syncParamsEncoded!) as Map; - sync = StreamingSyncImplementation( - adapter: WebBucketStorage(database), - credentialsCallback: client.channel.credentialsCallback, - invalidCredentialsCallback: client.channel.invalidCredentialsCallback, - uploadCrud: client.channel.uploadCrud, - crudUpdateTriggerStream: crudStream, - retryDelay: Duration(seconds: 3), - client: BrowserClient(), - identifier: identifier, - syncParameters: syncParams); + adapter: WebBucketStorage(database), + credentialsCallback: client.channel.credentialsCallback, + invalidCredentialsCallback: client.channel.invalidCredentialsCallback, + uploadCrud: client.channel.uploadCrud, + crudUpdateTriggerStream: crudStream, + options: options, + client: BrowserClient(), + identifier: identifier, + ); sync!.statusStream.listen((event) { _logger.fine('Broadcasting sync event: $event'); for (final client in connections) { @@ -278,10 +276,8 @@ class _SyncRunner { sync!.streamingSync(); } - void registerClient(_ConnectedClient client, int currentCrudThrottleTimeMs, - String? currentSyncParamsEncoded) { - _mainEvents.add(_AddConnection( - client, currentCrudThrottleTimeMs, currentSyncParamsEncoded)); + void registerClient(_ConnectedClient client, SyncOptions options) { + _mainEvents.add(_AddConnection(client, options)); } /// Remove a client, disconnecting if no clients remain.. @@ -299,10 +295,9 @@ sealed class _RunnerEvent {} final class _AddConnection implements _RunnerEvent { final _ConnectedClient client; - final int crudThrottleTimeMs; - final String? syncParamsEncoded; + final SyncOptions options; - _AddConnection(this.client, this.crudThrottleTimeMs, this.syncParamsEncoded); + _AddConnection(this.client, this.options); } final class _RemoveConnection implements _RunnerEvent { diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index cd6b241a..243dc01f 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:js_interop'; import 'package:logging/logging.dart'; +import 'package:powersync_core/src/sync/options.dart'; import 'package:web/web.dart'; import '../connector.dart'; @@ -64,15 +65,20 @@ extension type SyncWorkerMessage._(JSObject _) implements JSObject { @anonymous extension type StartSynchronization._(JSObject _) implements JSObject { - external factory StartSynchronization( - {required String databaseName, - required int crudThrottleTimeMs, - required int requestId, - String? syncParamsEncoded}); + external factory StartSynchronization({ + required String databaseName, + required int crudThrottleTimeMs, + required int requestId, + required int retryDelayMs, + required String clientImplementationName, + String? syncParamsEncoded, + }); external String get databaseName; external int get requestId; external int get crudThrottleTimeMs; + external int get retryDelayMs; + external String get clientImplementationName; external String? get syncParamsEncoded; } @@ -403,17 +409,22 @@ final class WorkerCommunicationChannel { await _numericRequest(SyncWorkerMessageType.ping); } - Future startSynchronization(String databaseName, int crudThrottleTimeMs, - Map? syncParams) async { + Future startSynchronization( + String databaseName, ResolvedSyncOptions options) async { final (id, completion) = _newRequest(); port.postMessage(SyncWorkerMessage( type: SyncWorkerMessageType.startSynchronization.name, payload: StartSynchronization( - databaseName: databaseName, - crudThrottleTimeMs: crudThrottleTimeMs, - requestId: id, - syncParamsEncoded: - syncParams == null ? null : jsonEncode(syncParams)), + databaseName: databaseName, + crudThrottleTimeMs: options.crudThrottleTime.inMilliseconds, + clientImplementationName: options.source.syncImplementation.name, + retryDelayMs: options.retryDelay.inMilliseconds, + requestId: id, + syncParamsEncoded: switch (options.source.params) { + null => null, + final params => jsonEncode(params), + }, + ), )); await completion; } diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 116f319a..017f6c95 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -2,6 +2,7 @@ import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/src/sync/bucket_storage.dart'; +import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; @@ -149,7 +150,9 @@ extension MockSync on PowerSyncDatabase { final impl = StreamingSyncImplementation( adapter: BucketStorage(this), client: client, - retryDelay: const Duration(seconds: 5), + options: ResolvedSyncOptions(SyncOptions( + retryDelay: const Duration(seconds: 5), + )), credentialsCallback: connector.getCredentialsCached, invalidCredentialsCallback: connector.prefetchCredentials, uploadCrud: () => connector.uploadData(this), From 7ca39b30281252382f59f192a9114affd08968ed Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 15 May 2025 09:24:04 +0200 Subject: [PATCH 2/4] Refactor sync client and options --- demos/benchmarks/lib/powersync.dart | 15 +- .../native/native_powersync_database.dart | 33 ++- .../database/web/web_powersync_database.dart | 5 +- .../lib/src/sync/instruction.dart | 147 ----------- .../lib/src/sync/internal_connector.dart | 92 +++++++ .../powersync_core/lib/src/sync/options.dart | 37 +-- .../lib/src/sync/stream_utils.dart | 55 ++-- .../lib/src/sync/streaming_sync.dart | 247 ++++++++++-------- .../lib/src/web/sync_worker.dart | 18 +- .../lib/src/web/sync_worker_protocol.dart | 7 +- packages/powersync_core/pubspec.yaml | 1 + .../powersync_core/test/connected_test.dart | 1 + .../powersync_core/test/disconnect_test.dart | 1 + .../sync_server/in_memory_sync_server.dart | 32 ++- packages/powersync_core/test/stream_test.dart | 27 +- .../test/streaming_sync_test.dart | 34 +-- packages/powersync_core/test/upload_test.dart | 1 + .../test/utils/abstract_test_utils.dart | 14 +- 18 files changed, 372 insertions(+), 395 deletions(-) delete mode 100644 packages/powersync_core/lib/src/sync/instruction.dart create mode 100644 packages/powersync_core/lib/src/sync/internal_connector.dart diff --git a/demos/benchmarks/lib/powersync.dart b/demos/benchmarks/lib/powersync.dart index 8d5baa4d..df192ee0 100644 --- a/demos/benchmarks/lib/powersync.dart +++ b/demos/benchmarks/lib/powersync.dart @@ -88,13 +88,15 @@ Future getDatabasePath() async { var currentConnector = BackendConnector(); +const options = SyncOptions( + params: {'size_bucket': AppConfig.sizeBucket}, + crudThrottleTime: Duration(milliseconds: 1), +); + Future resync() async { await db.disconnectAndClear(); timer.start(db); - db.connect( - connector: currentConnector, - params: {'size_bucket': AppConfig.sizeBucket}, - crudThrottleTime: const Duration(milliseconds: 1)); + db.connect(connector: currentConnector, options: options); } Future openDatabase() async { @@ -106,8 +108,5 @@ Future openDatabase() async { BenchmarkItem.updateItemBenchmarks(); timer.start(db); - db.connect( - connector: currentConnector, - params: {'size_bucket': AppConfig.sizeBucket}, - crudThrottleTime: const Duration(milliseconds: 1)); + db.connect(connector: currentConnector, options: options); } diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 710768fa..01fddb76 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -15,6 +15,7 @@ import 'package:powersync_core/src/log_internal.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; import 'package:powersync_core/src/open_factory/native/native_open_factory.dart'; import 'package:powersync_core/src/schema.dart'; +import 'package:powersync_core/src/sync/internal_connector.dart'; import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:powersync_core/src/sync/sync_status.dart'; @@ -162,16 +163,21 @@ class PowerSyncDatabaseImpl Future handleMessage(Object? data) async { if (data is List) { String action = data[0] as String; - if (action == "getCredentials") { + if (action == "getCredentialsCached") { await (data[1] as PortCompleter).handle(() async { final token = await connector.getCredentialsCached(); logger.fine('Credentials: $token'); return token; }); - } else if (action == "invalidateCredentials") { + } else if (action == "prefetchCredentials") { logger.fine('Refreshing credentials'); + final invalidate = data[2] as bool; + await (data[1] as PortCompleter).handle(() async { - await connector.prefetchCredentials(); + if (invalidate) { + connector.invalidateCredentials(); + } + return await connector.prefetchCredentials(); }); } else if (action == 'init') { final port = initPort = data[1] as SendPort; @@ -360,15 +366,16 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { sPort.send(['log', copy]); }); - Future loadCredentials() async { + Future getCredentialsCached() async { final r = IsolateResult(); - sPort.send(['getCredentials', r.completer]); + sPort.send(['getCredentialsCached', r.completer]); return r.future; } - Future invalidateCredentials() async { - final r = IsolateResult(); - sPort.send(['invalidateCredentials', r.completer]); + Future prefetchCredentials( + {required bool invalidate}) async { + final r = IsolateResult(); + sPort.send(['prefetchCredentials', r.completer, invalidate]); return r.future; } @@ -386,9 +393,11 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { final storage = BucketStorage(connection); final sync = StreamingSyncImplementation( adapter: storage, - credentialsCallback: loadCredentials, - invalidCredentialsCallback: invalidateCredentials, - uploadCrud: uploadCrud, + connector: InternalConnector( + getCredentialsCached: getCredentialsCached, + prefetchCredentials: prefetchCredentials, + uploadCrud: uploadCrud, + ), crudUpdateTriggerStream: crudUpdateController.stream, options: args.options, client: http.Client(), @@ -426,6 +435,6 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { // This should be rare - any uncaught error is a bug. And in most cases, // it should occur after the database is already open. await shutdown(); - throw error; + Error.throwWithStackTrace(error, stack); }); } diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index de8bf80c..44fa01d9 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -11,6 +11,7 @@ import 'package:powersync_core/src/log.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; import 'package:powersync_core/src/open_factory/web/web_open_factory.dart'; import 'package:powersync_core/src/schema.dart'; +import 'package:powersync_core/src/sync/internal_connector.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/sqlite_async.dart'; @@ -142,9 +143,7 @@ class PowerSyncDatabaseImpl sync = StreamingSyncImplementation( adapter: storage, - credentialsCallback: connector.getCredentialsCached, - invalidCredentialsCallback: connector.prefetchCredentials, - uploadCrud: () => connector.uploadData(this), + connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, options: resolved, client: BrowserClient(), diff --git a/packages/powersync_core/lib/src/sync/instruction.dart b/packages/powersync_core/lib/src/sync/instruction.dart deleted file mode 100644 index d3d8020a..00000000 --- a/packages/powersync_core/lib/src/sync/instruction.dart +++ /dev/null @@ -1,147 +0,0 @@ -import 'sync_status.dart'; - -/// An internal instruction emitted by the sync client in the core extension in -/// response to the Dart SDK passing sync data into the extension. -sealed class Instruction { - factory Instruction.fromJson(Map json) { - return switch (json) { - {'LogLine': final logLine} => - LogLine.fromJson(logLine as Map), - {'UpdateSyncStatus': final updateStatus} => - UpdateSyncStatus.fromJson(updateStatus as Map), - {'EstablishSyncStream': final establish} => - EstablishSyncStream.fromJson(establish as Map), - {'FetchCredentials': final creds} => - FetchCredentials.fromJson(creds as Map), - {'CloseSyncStream': _} => const CloseSyncStream(), - {'FlushFileSystem': _} => const FlushFileSystem(), - {'DidCompleteSync': _} => const DidCompleteSync(), - _ => UnknownSyncLine(json) - }; - } -} - -final class LogLine implements Instruction { - final String severity; - final String line; - - LogLine({required this.severity, required this.line}); - - factory LogLine.fromJson(Map json) { - return LogLine( - severity: json['severity'] as String, - line: json['line'] as String, - ); - } -} - -final class EstablishSyncStream implements Instruction { - final Map request; - - EstablishSyncStream(this.request); - - factory EstablishSyncStream.fromJson(Map json) { - return EstablishSyncStream(json['request'] as Map); - } -} - -final class UpdateSyncStatus implements Instruction { - final CoreSyncStatus status; - - UpdateSyncStatus({required this.status}); - - factory UpdateSyncStatus.fromJson(Map json) { - return UpdateSyncStatus( - status: - CoreSyncStatus.fromJson(json['status'] as Map)); - } -} - -final class CoreSyncStatus implements Instruction { - final bool connected; - final bool connecting; - final List priorityStatus; - final DownloadProgress? downloading; - - CoreSyncStatus({ - required this.connected, - required this.connecting, - required this.priorityStatus, - required this.downloading, - }); - - factory CoreSyncStatus.fromJson(Map json) { - return CoreSyncStatus( - connected: json['connected'] as bool, - connecting: json['connecting'] as bool, - priorityStatus: [ - for (final entry in json['priority_status'] as List) - _priorityStatusFromJson(entry as Map) - ], - downloading: switch (json['downloading']) { - null => null, - final raw as Map => DownloadProgress.fromJson(raw), - }, - ); - } - - static SyncPriorityStatus _priorityStatusFromJson(Map json) { - return ( - priority: BucketPriority(json['priority'] as int), - hasSynced: json['has_synced'] as bool?, - lastSyncedAt: switch (json['last_synced_at']) { - null => null, - final lastSyncedAt as int => - DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000), - }, - ); - } -} - -final class DownloadProgress implements Instruction { - final Map progress; - - DownloadProgress(this.progress); - - factory DownloadProgress.fromJson(Map line) { - return DownloadProgress(line.map((k, v) => - MapEntry(k, _bucketProgressFromJson(v as Map)))); - } - - static BucketProgress _bucketProgressFromJson(Map json) { - return ( - priority: BucketPriority(json['priority'] as int), - atLast: json['at_last'] as int, - sinceLast: json['since_last'] as int, - targetCount: json['target_count'] as int, - ); - } -} - -final class FetchCredentials implements Instruction { - final bool didExpire; - - FetchCredentials(this.didExpire); - - factory FetchCredentials.fromJson(Map line) { - return FetchCredentials(line['did_expire'] as bool); - } -} - -final class CloseSyncStream implements Instruction { - const CloseSyncStream(); -} - -final class FlushFileSystem implements Instruction { - const FlushFileSystem(); -} - -final class DidCompleteSync implements Instruction { - const DidCompleteSync(); -} - -final class UnknownSyncLine implements Instruction { - final Map source; - - UnknownSyncLine(this.source); -} diff --git a/packages/powersync_core/lib/src/sync/internal_connector.dart b/packages/powersync_core/lib/src/sync/internal_connector.dart new file mode 100644 index 00000000..a18914c7 --- /dev/null +++ b/packages/powersync_core/lib/src/sync/internal_connector.dart @@ -0,0 +1,92 @@ +import 'package:meta/meta.dart'; + +import '../connector.dart'; +import '../database/powersync_database.dart'; + +/// A view over a backend connector that does not require a reference to the +/// PowerSync database. +@internal +abstract interface class InternalConnector { + /// Fetch or return cached credentials. + Future getCredentialsCached(); + + /// Ask the backend connector to fetch a new set of credentials. + /// + /// [invalidate] describes whether the current ([getCredentialsCached]) + /// credentials are already invalid, or whether this call is a pre-fetch. + /// + /// A call to [getCredentialsCached] after this future completes should return + /// the same credentials. + Future prefetchCredentials({bool invalidate = false}); + + /// Requests the connector to upload a crud batch to the backend. + Future uploadCrud(); + + const factory InternalConnector({ + required Future Function() getCredentialsCached, + required Future Function({required bool invalidate}) + prefetchCredentials, + required Future Function() uploadCrud, + }) = _CallbackConnector; + + factory InternalConnector.wrap( + PowerSyncBackendConnector connector, PowerSyncDatabase db) { + return _WrapConnector(connector, db); + } +} + +final class _WrapConnector implements InternalConnector { + final PowerSyncBackendConnector connector; + final PowerSyncDatabase database; + + _WrapConnector(this.connector, this.database); + + @override + Future getCredentialsCached() async { + return connector.getCredentialsCached(); + } + + @override + Future prefetchCredentials({bool invalidate = false}) { + if (invalidate) { + connector.invalidateCredentials(); + } + return connector.prefetchCredentials(); + } + + @override + Future uploadCrud() { + return connector.uploadData(database); + } +} + +final class _CallbackConnector implements InternalConnector { + final Future Function() _getCredentialsCached; + final Future Function({required bool invalidate}) + _prefetchCredentials; + final Future Function() _uploadCrud; + + const _CallbackConnector({ + required Future Function() getCredentialsCached, + required Future Function({required bool invalidate}) + prefetchCredentials, + required Future Function() uploadCrud, + }) : _getCredentialsCached = getCredentialsCached, + _prefetchCredentials = prefetchCredentials, + _uploadCrud = uploadCrud; + + @override + Future getCredentialsCached() { + return _getCredentialsCached(); + } + + @override + Future prefetchCredentials({bool invalidate = false}) { + return _prefetchCredentials(invalidate: invalidate); + } + + @override + Future uploadCrud() { + return _uploadCrud(); + } +} diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index 68d6880d..f13f7e38 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -24,19 +24,10 @@ final class SyncOptions { /// When set to null, PowerSync defaults to a delay of 5 seconds. final Duration? retryDelay; - /// The client implementation to use. - /// - /// This allows switching between the existing [SyncClientImplementation.dart] - /// implementation and a newer one ([SyncClientImplementation.rust]). - /// - /// Note that not setting this field to the default value is experimental. - final SyncClientImplementation syncImplementation; - const SyncOptions({ this.crudThrottleTime, this.retryDelay, this.params, - this.syncImplementation = SyncClientImplementation.dart, }); } @@ -47,44 +38,20 @@ extension type ResolvedSyncOptions(SyncOptions source) { Duration get retryDelay => source.retryDelay ?? const Duration(seconds: 5); - Map? get params => source.params ?? const {}; + Map get params => source.params ?? const {}; (ResolvedSyncOptions, bool) applyFrom(SyncOptions other) { final newOptions = SyncOptions( crudThrottleTime: other.crudThrottleTime ?? crudThrottleTime, retryDelay: other.retryDelay ?? retryDelay, params: other.params ?? params, - syncImplementation: other.syncImplementation, ); final didChange = !_mapEquality.equals(other.params, params) || other.crudThrottleTime != crudThrottleTime || - other.retryDelay != retryDelay || - other.syncImplementation != source.syncImplementation; + other.retryDelay != retryDelay; return (ResolvedSyncOptions(newOptions), didChange); } static const _mapEquality = MapEquality(); } - -/// Supported sync client implementations. -/// -/// Not using the default implementation (currently [dart], but this may change -/// in the future) is experimental. -@experimental -enum SyncClientImplementation { - /// Decode and handle data received from the sync service in Dart. - /// - /// This is the default option. - dart, - - /// An _experimental_ implementation of the sync client that is written in - /// Rust and shared across the PowerSync SDKs. - /// - /// Since this client decodes sync lines in Rust instead of parsing them in - /// Dart, it can be more performant than the the default [dart] - /// implementation. Since this option has not seen as much real-world testing, - /// it is marked as __experimental__ at the moment! - @experimental - rust, -} diff --git a/packages/powersync_core/lib/src/sync/stream_utils.dart b/packages/powersync_core/lib/src/sync/stream_utils.dart index 1360c7bd..a4689cf9 100644 --- a/packages/powersync_core/lib/src/sync/stream_utils.dart +++ b/packages/powersync_core/lib/src/sync/stream_utils.dart @@ -1,10 +1,10 @@ import 'dart:async'; -import 'package:http/http.dart'; import 'dart:convert' as convert; /// Inject a broadcast stream into a normal stream. Stream addBroadcast(Stream a, Stream broadcast) { + assert(broadcast.isBroadcast); return mergeStreams([a, broadcast]); } @@ -18,19 +18,34 @@ Stream mergeStreams(List> streams) { final controller = StreamController(sync: true); List>? subscriptions; + var isClosing = false; controller.onListen = () { subscriptions = streams.map((stream) { - return stream.listen((event) { - return controller.add(event); - }, onDone: () { - controller.close(); - }, onError: controller.addError); + return stream.listen( + (event) { + return controller.add(event); + }, + onError: controller.addError, + onDone: () async { + if (!isClosing) { + isClosing = true; + + try { + await cancelAll(subscriptions!); + } catch (e, s) { + controller.addError(e, s); + } finally { + controller.close(); + } + } + }, + ); }).toList(); }; controller.onCancel = () { - if (subscriptions != null) { + if (subscriptions != null && !isClosing) { // Important: The Future must be returned here. // Since calling cancel on one of the subscriptions may error, // not returning the Future may result in an unhandled error. @@ -53,19 +68,19 @@ Stream mergeStreams(List> streams) { return controller.stream; } -/// Given a raw ByteStream, parse each line as JSON. -Stream ndjson(ByteStream input) { - final textInput = input.transform(convert.utf8.decoder); - final lineInput = textInput.transform(const convert.LineSplitter()); - final jsonInput = lineInput.transform( - StreamTransformer.fromHandlers(handleError: (error, stackTrace, sink) { - /// On Web if the connection is closed, this error will throw, but - /// the stream is never closed. This closes the stream on error. - sink.close(); - }, handleData: (String data, EventSink sink) { - sink.add(convert.jsonDecode(data)); - })); - return jsonInput; +extension ByteStreamToLines on Stream> { + /// Decodes this stream using UTF8 and then splits the text stream by + /// newlines. + Stream get lines { + final textInput = transform(convert.utf8.decoder); + return textInput.transform(const convert.LineSplitter()); + } +} + +extension StreamToJson on Stream { + Stream get parseJson { + return map(convert.jsonDecode); + } } void pauseAll(List> subscriptions) { diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 07904a52..24284751 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:convert' as convert; import 'package:http/http.dart' as http; +import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; @@ -11,8 +12,9 @@ import 'package:powersync_core/src/user_agent/user_agent.dart'; import 'package:sqlite_async/mutex.dart'; import 'bucket_storage.dart'; -import '../connector.dart'; import '../crud.dart'; + +import 'internal_connector.dart'; import 'mutable_sync_status.dart'; import 'stream_utils.dart'; import 'sync_status.dart'; @@ -30,11 +32,11 @@ abstract interface class StreamingSync { @internal class StreamingSyncImplementation implements StreamingSync { final BucketStorage adapter; + final InternalConnector connector; final ResolvedSyncOptions options; - final Future Function() credentialsCallback; - final Future Function()? invalidCredentialsCallback; - final Future Function() uploadCrud; + final Logger logger = isolateLogger; + final Stream crudUpdateTriggerStream; // An internal controller which is used to trigger CRUD uploads internally @@ -45,26 +47,22 @@ class StreamingSyncImplementation implements StreamingSync { StreamController.broadcast(); final http.Client _client; - final SyncStatusStateStream _state = SyncStatusStateStream(); - final StreamController _localPingController = - StreamController.broadcast(); + final SyncStatusStateStream _state = SyncStatusStateStream(); AbortController? _abort; - bool _safeToClose = true; - final Mutex syncMutex, crudMutex; Completer? _activeCrudUpload; + final StreamController _nonLineSyncEvents = + StreamController.broadcast(); final Map _userAgentHeaders; String? clientId; StreamingSyncImplementation({ required this.adapter, - required this.credentialsCallback, - this.invalidCredentialsCallback, - required this.uploadCrud, + required this.connector, required this.crudUpdateTriggerStream, required this.options, required http.Client client, @@ -87,30 +85,34 @@ class StreamingSyncImplementation implements StreamingSync { @override Future abort() async { // If streamingSync() hasn't been called yet, _abort will be null. - var future = _abort?.abort(); - // This immediately triggers a new iteration in the merged stream, allowing us - // to break immediately. - // However, we still need to close the underlying stream explicitly, otherwise - // the break will wait for the next line of data received on the stream. - _localPingController.add(null); - // According to the documentation, the behavior is undefined when calling - // close() while requests are pending. However, this is no other - // known way to cancel open streams, and this appears to end the stream with - // a consistent ClientException if a request is open. - // We avoid closing the client while opening a request, as that does cause - // unpredicable uncaught errors. - if (_safeToClose) { + if (_abort case final abort?) { + final future = abort.abort(); + _internalCrudTriggerController.close(); + + // If a sync iteration is active, the control flow to abort is: + // + // 1. We close the non-line sync event stream here. + // 2. This emits a done event. + // 3. `addBroadcastStream` will cancel all source subscriptions in + // response to that, and then emit a done event too. If there is an + // error while cancelling the stream, it's forwarded by emitting an + // error before closing. + // 4. We break out of the sync loop (either due to an error or because + // all resources have been closed correctly). + // 5. `streamingSync` completes the abort controller, which we await + // here. + await _nonLineSyncEvents.close(); + + // Wait for the abort to complete, which also guarantees that no requests + // are pending. + await Future.wait([ + future, + if (_activeCrudUpload case final activeUpload?) activeUpload.future, + ]); + _client.close(); + _state.close(); } - - await _internalCrudTriggerController.close(); - - // wait for completeAbort() to be called - await future; - - // Now close the client in all cases not covered above - _client.close(); - _state.close(); } bool get aborted { @@ -127,15 +129,16 @@ class StreamingSyncImplementation implements StreamingSync { while (!aborted) { _state.updateStatus((s) => s.setConnectingIfNotConnected()); try { - if (invalidCredentials && invalidCredentialsCallback != null) { + if (invalidCredentials) { // This may error. In that case it will be retried again on the next // iteration. - await invalidCredentialsCallback!(); + await connector.prefetchCredentials(); invalidCredentials = false; } // Protect sync iterations with exclusivity (if a valid Mutex is provided) - await syncMutex.lock(() => streamingSyncIteration(), - timeout: _retryDelay); + await syncMutex.lock(() { + return _streamingSyncIteration(); + }, timeout: _retryDelay); } catch (e, stacktrace) { if (aborted && e is http.ClientException) { // Explicit abort requested - ignore. Example error: @@ -143,7 +146,7 @@ class StreamingSyncImplementation implements StreamingSync { return; } final message = _syncErrorMessage(e); - isolateLogger.warning('Sync error: $message', e, stacktrace); + logger.warning('Sync error: $message', e, stacktrace); invalidCredentials = true; _state.updateStatus((s) => s.applyDownloadError(e)); @@ -195,7 +198,7 @@ class StreamingSyncImplementation implements StreamingSync { _state.updateStatus((s) => s.uploading = true); if (nextCrudItem.clientId == checkedCrudItem?.clientId) { // This will force a higher log level than exceptions which are caught here. - isolateLogger.warning( + logger.warning( """Potentially previously uploaded CRUD entries are still present in the upload queue. Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method. The next upload iteration will be delayed."""); @@ -204,7 +207,7 @@ class StreamingSyncImplementation implements StreamingSync { } checkedCrudItem = nextCrudItem; - await uploadCrud(); + await connector.uploadCrud(); _state.updateStatus((s) => s.uploadError = null); } else { // Uploading is completed @@ -213,7 +216,7 @@ class StreamingSyncImplementation implements StreamingSync { } } catch (e, stacktrace) { checkedCrudItem = null; - isolateLogger.warning('Data upload error', e, stacktrace); + logger.warning('Data upload error', e, stacktrace); _state.updateStatus((s) => s.applyUploadError(e)); await _delayRetry(); @@ -221,7 +224,7 @@ class StreamingSyncImplementation implements StreamingSync { // Exit the upload loop if the sync stream is no longer connected break; } - isolateLogger.warning( + logger.warning( "Caught exception when uploading. Upload will retry after a delay", e, stacktrace); @@ -230,6 +233,10 @@ class StreamingSyncImplementation implements StreamingSync { } } }, timeout: _retryDelay).whenComplete(() { + if (!aborted) { + _nonLineSyncEvents.add(const UploadCompleted()); + } + assert(identical(_activeCrudUpload, completer)); _activeCrudUpload = null; completer.complete(); @@ -237,7 +244,7 @@ class StreamingSyncImplementation implements StreamingSync { } Future getWriteCheckpoint() async { - final credentials = await credentialsCallback(); + final credentials = await connector.getCredentialsCached(); if (credentials == null) { throw CredentialsException("Not logged in"); } @@ -252,9 +259,7 @@ class StreamingSyncImplementation implements StreamingSync { final response = await _client.get(uri, headers: headers); if (response.statusCode == 401) { - if (invalidCredentialsCallback != null) { - await invalidCredentialsCallback!(); - } + await connector.prefetchCredentials(); } if (response.statusCode != 200) { throw SyncResponseException.fromResponse(response); @@ -290,20 +295,19 @@ class StreamingSyncImplementation implements StreamingSync { return (initialRequests, localDescriptions); } - Future streamingSyncIteration() async { + Future _streamingSyncIteration() async { var (bucketRequests, bucketMap) = await _collectLocalBucketState(); if (aborted) { return; } Checkpoint? targetCheckpoint; - Checkpoint? validatedCheckpoint; - Checkpoint? appliedCheckpoint; - var requestStream = streamingSyncRequest( - StreamingSyncRequest(bucketRequests, options.params, clientId!)); + var requestStream = _streamingSyncRequest( + StreamingSyncRequest(bucketRequests, options.params, clientId!)) + .map(ReceivedLine.new); - var merged = addBroadcast(requestStream, _localPingController.stream); + var merged = addBroadcast(requestStream, _nonLineSyncEvents.stream); Future? credentialsInvalidation; bool haveInvalidated = false; @@ -311,12 +315,7 @@ class StreamingSyncImplementation implements StreamingSync { // Trigger a CRUD upload on reconnect _internalCrudTriggerController.add(null); - await for (var line in merged) { - if (aborted) { - break; - } - - _state.updateStatus((s) => s.setConnected()); + Future handleLine(StreamingSyncLine line) async { switch (line) { case Checkpoint(): targetCheckpoint = line; @@ -339,10 +338,6 @@ class StreamingSyncImplementation implements StreamingSync { if (result.abort) { return; } - validatedCheckpoint = targetCheckpoint; - if (result.didApply) { - appliedCheckpoint = targetCheckpoint; - } case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority): final result = await adapter.syncLocalDatabase(targetCheckpoint!, forPriority: bucketPriority); @@ -371,7 +366,7 @@ class StreamingSyncImplementation implements StreamingSync { } final diff = line; final Map newBuckets = {}; - for (var checksum in targetCheckpoint.checksums) { + for (var checksum in targetCheckpoint!.checksums) { newBuckets[checksum.bucket] = checksum; } for (var checksum in diff.updatedBuckets) { @@ -393,7 +388,7 @@ class StreamingSyncImplementation implements StreamingSync { bucketMap = newBuckets.map((name, checksum) => MapEntry(name, (name: name, priority: checksum.priority))); await adapter.removeBuckets(diff.removedBuckets); - adapter.setTargetCheckpoint(targetCheckpoint); + adapter.setTargetCheckpoint(targetCheckpoint!); case SyncDataBatch(): // TODO: This increments the counters before actually saving sync // data. Might be fine though? @@ -402,40 +397,44 @@ class StreamingSyncImplementation implements StreamingSync { case StreamingSyncKeepalive(:final tokenExpiresIn): if (tokenExpiresIn == 0) { // Token expired already - stop the connection immediately - invalidCredentialsCallback?.call().ignore(); + connector.prefetchCredentials(invalidate: true).ignore(); break; } else if (tokenExpiresIn <= 30) { // Token expires soon - refresh it in the background - if (credentialsInvalidation == null && - invalidCredentialsCallback != null) { - credentialsInvalidation = invalidCredentialsCallback!().then((_) { - // Token has been refreshed - we should restart the connection. - haveInvalidated = true; - // trigger next loop iteration ASAP, don't wait for another - // message from the server. - _localPingController.add(null); - }, onError: (_) { - // Token refresh failed - retry on next keepalive. - credentialsInvalidation = null; - }); - } + credentialsInvalidation ??= + connector.prefetchCredentials().then((_) { + // Token has been refreshed - we should restart the connection. + haveInvalidated = true; + // trigger next loop iteration ASAP, don't wait for another + // message from the server. + if (!aborted) { + _nonLineSyncEvents.add(TokenRefreshComplete()); + } + }, onError: (_) { + // Token refresh failed - retry on next keepalive. + credentialsInvalidation = null; + }); } case UnknownSyncLine(:final rawData): - isolateLogger.fine('Unknown sync line: $rawData'); - case null: // Local ping - if (targetCheckpoint == appliedCheckpoint) { - if (appliedCheckpoint case final completed?) { - _state.updateStatus((s) => s.applyCheckpointReached(completed)); - } - } else if (validatedCheckpoint == targetCheckpoint) { - final result = await _applyCheckpoint(targetCheckpoint!, _abort); - if (result.abort) { - return; - } - if (result.didApply) { - appliedCheckpoint = targetCheckpoint; - } - } + logger.fine('Unknown sync line: $rawData'); + } + } + + await for (var line in merged) { + if (aborted || haveInvalidated) { + break; + } + + switch (line) { + case ReceivedLine(:final line): + _state.updateStatus((s) => s.setConnected()); + await handleLine(line as StreamingSyncLine); + case UploadCompleted(): + // Only relevant for the Rust sync implementation. + break; + case TokenRefreshComplete(): + // We have a new token, so stop the iteration. + haveInvalidated = true; } if (haveInvalidated) { @@ -459,7 +458,7 @@ class StreamingSyncImplementation implements StreamingSync { // We have pending entries in the local upload queue or are waiting to // confirm a write checkpoint, which prevented this checkpoint from // applying. Wait for that to complete and try again. - isolateLogger.fine('Could not apply checkpoint due to local data. ' + logger.fine('Could not apply checkpoint due to local data. ' 'Waiting for in-progress upload before retrying...'); await Future.any([ pendingUpload.future, @@ -475,21 +474,20 @@ class StreamingSyncImplementation implements StreamingSync { } if (result.checkpointValid && result.ready) { - isolateLogger.fine('validated checkpoint: $targetCheckpoint'); + logger.fine('validated checkpoint: $targetCheckpoint'); _state.updateStatus((s) => s.applyCheckpointReached(targetCheckpoint)); return const (abort: false, didApply: true); } else { - isolateLogger.fine( + logger.fine( 'Could not apply checkpoint. Waiting for next sync complete line'); return const (abort: false, didApply: false); } } - Stream streamingSyncRequest( - StreamingSyncRequest data) async* { - final credentials = await credentialsCallback(); + Future _postStreamRequest(Object? data) async { + final credentials = await connector.getCredentialsCached(); if (credentials == null) { throw CredentialsException('Not logged in'); } @@ -502,32 +500,33 @@ class StreamingSyncImplementation implements StreamingSync { request.body = convert.jsonEncode(data); - http.StreamedResponse res; - try { - // Do not close the client during the request phase - this causes uncaught errors. - _safeToClose = false; - res = await _client.send(request); - } finally { - _safeToClose = true; - } + final res = await _client.send(request); if (aborted) { - return; + return null; } if (res.statusCode == 401) { - if (invalidCredentialsCallback != null) { - await invalidCredentialsCallback!(); - } + await connector.prefetchCredentials(invalidate: true); } if (res.statusCode != 200) { throw await SyncResponseException.fromStreamedResponse(res); } - // Note: The response stream is automatically closed when this loop errors - yield* ndjson(res.stream) + return res; + } + + Stream _rawStreamingSyncRequest(Object? data) async* { + final response = await _postStreamRequest(data); + if (response != null) { + yield* response.stream.lines; + } + } + + Stream _streamingSyncRequest(StreamingSyncRequest data) { + return _rawStreamingSyncRequest(data) + .parseJson .cast>() - .transform(StreamingSyncLine.reader) - .takeWhile((_) => !aborted); + .transform(StreamingSyncLine.reader); } /// Delays the standard `retryDelay` Duration, but exits early if @@ -565,3 +564,19 @@ typedef BucketDescription = ({ String name, int priority, }); + +sealed class SyncEvent {} + +final class ReceivedLine implements SyncEvent { + final Object /* String|Uint8List|StreamingSyncLine */ line; + + const ReceivedLine(this.line); +} + +final class UploadCompleted implements SyncEvent { + const UploadCompleted(); +} + +final class TokenRefreshComplete implements SyncEvent { + const TokenRefreshComplete(); +} diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index ae3869d3..e2443e23 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -13,6 +13,7 @@ import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite_async.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; +import 'package:powersync_core/src/sync/internal_connector.dart'; import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/web.dart'; @@ -70,14 +71,15 @@ class _ConnectedClient { final recoveredOptions = SyncOptions( crudThrottleTime: Duration(milliseconds: request.crudThrottleTimeMs), - retryDelay: Duration(milliseconds: request.retryDelayMs), + retryDelay: switch (request.retryDelayMs) { + null => null, + final retryDelay => Duration(milliseconds: retryDelay), + }, params: switch (request.syncParamsEncoded) { null => null, final encodedParams => jsonDecode(encodedParams) as Map, }, - syncImplementation: SyncClientImplementation.values - .byName(request.clientImplementationName), ); _runner = _worker.referenceSyncTask( @@ -258,9 +260,13 @@ class _SyncRunner { sync = StreamingSyncImplementation( adapter: WebBucketStorage(database), - credentialsCallback: client.channel.credentialsCallback, - invalidCredentialsCallback: client.channel.invalidCredentialsCallback, - uploadCrud: client.channel.uploadCrud, + connector: InternalConnector( + getCredentialsCached: client.channel.credentialsCallback, + prefetchCredentials: ({required bool invalidate}) async { + return await client.channel.invalidCredentialsCallback(); + }, + uploadCrud: client.channel.uploadCrud, + ), crudUpdateTriggerStream: crudStream, options: options, client: BrowserClient(), diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index 243dc01f..f4de6cc9 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -69,16 +69,14 @@ extension type StartSynchronization._(JSObject _) implements JSObject { required String databaseName, required int crudThrottleTimeMs, required int requestId, - required int retryDelayMs, - required String clientImplementationName, + required int? retryDelayMs, String? syncParamsEncoded, }); external String get databaseName; external int get requestId; external int get crudThrottleTimeMs; - external int get retryDelayMs; - external String get clientImplementationName; + external int? get retryDelayMs; external String? get syncParamsEncoded; } @@ -417,7 +415,6 @@ final class WorkerCommunicationChannel { payload: StartSynchronization( databaseName: databaseName, crudThrottleTimeMs: options.crudThrottleTime.inMilliseconds, - clientImplementationName: options.source.syncImplementation.name, retryDelayMs: options.retryDelay.inMilliseconds, requestId: id, syncParamsEncoded: switch (options.source.params) { diff --git a/packages/powersync_core/pubspec.yaml b/packages/powersync_core/pubspec.yaml index 691d6711..372d904d 100644 --- a/packages/powersync_core/pubspec.yaml +++ b/packages/powersync_core/pubspec.yaml @@ -37,6 +37,7 @@ dev_dependencies: shelf_router: ^1.1.4 shelf_static: ^1.1.2 stream_channel: ^2.1.2 + fake_async: ^1.3.3 platforms: android: diff --git a/packages/powersync_core/test/connected_test.dart b/packages/powersync_core/test/connected_test.dart index 560b925a..a2188958 100644 --- a/packages/powersync_core/test/connected_test.dart +++ b/packages/powersync_core/test/connected_test.dart @@ -86,6 +86,7 @@ void main() { schema: defaultSchema, maxReaders: 3); // Shorter retry delay, to speed up tests + // ignore: deprecated_member_use_from_same_package db.retryDelay = Duration(milliseconds: 10); addTearDown(() => {db.close()}); await db.initialize(); diff --git a/packages/powersync_core/test/disconnect_test.dart b/packages/powersync_core/test/disconnect_test.dart index a3706537..86f18130 100644 --- a/packages/powersync_core/test/disconnect_test.dart +++ b/packages/powersync_core/test/disconnect_test.dart @@ -29,6 +29,7 @@ void main() { expiresAt: DateTime.now()); } + // ignore: deprecated_member_use_from_same_package db.retryDelay = Duration(milliseconds: 5000); var connector = TestConnector(credentialsCallback); await db.connect(connector: connector); diff --git a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart index a38863eb..6bea9454 100644 --- a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart +++ b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart @@ -6,7 +6,7 @@ import 'package:shelf_router/shelf_router.dart'; final class MockSyncService { // Use a queued stream to make tests easier. - StreamController _controller = StreamController(); + StreamController controller = StreamController(); Completer _listener = Completer(); final router = Router(); @@ -21,15 +21,13 @@ final class MockSyncService { ..post('/sync/stream', (Request request) async { _listener.complete(request); // Respond immediately with a stream - return Response.ok(_controller.stream.transform(utf8.encoder), - headers: { - 'Content-Type': 'application/x-ndjson', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - }, - context: { - "shelf.io.buffer_output": false - }); + return Response.ok(controller.stream.transform(utf8.encoder), headers: { + 'Content-Type': 'application/x-ndjson', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, context: { + "shelf.io.buffer_output": false + }); }) ..get('/write-checkpoint2.json', (request) { return Response.ok(json.encode(writeCheckpoint()), headers: { @@ -42,7 +40,7 @@ final class MockSyncService { // Queue events which will be sent to connected clients. void addRawEvent(String data) { - _controller.add(data); + controller.add(data); } void addLine(Object? message) { @@ -54,22 +52,22 @@ final class MockSyncService { } void endCurrentListener() { - _controller.close(); - _controller = StreamController(); + controller.close(); + controller = StreamController(); _listener = Completer(); } // Clear events. We rely on a buffered controller here. Create a new controller // in order to clear the buffer. Future clearEvents() async { - await _controller.close(); + await controller.close(); _listener = Completer(); - _controller = StreamController(); + controller = StreamController(); } Future stop() async { - if (_controller.hasListener) { - await _controller.close(); + if (controller.hasListener) { + await controller.close(); } } } diff --git a/packages/powersync_core/test/stream_test.dart b/packages/powersync_core/test/stream_test.dart index 852be90d..56521333 100644 --- a/packages/powersync_core/test/stream_test.dart +++ b/packages/powersync_core/test/stream_test.dart @@ -5,6 +5,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'package:async/async.dart'; import 'package:http/http.dart'; import 'package:powersync_core/src/sync/stream_utils.dart'; import 'package:test/test.dart'; @@ -131,11 +132,29 @@ void main() { expect(countS2, greaterThanOrEqualTo(0)); }); + test('addBroadcast - done', () async { + final a = StreamController(); + final b = StreamController.broadcast(); + + final stream = StreamQueue(addBroadcast(a.stream, b.stream)); + a.add('a1'); + await expectLater(stream, emits('a1')); + + expect(a.hasListener, isTrue); + expect(b.hasListener, isTrue); + + b.close(); + await expectLater(stream, emitsDone); + + expect(a.hasListener, isFalse); + expect(b.hasListener, isFalse); + }); + test('ndjson', () async { var sourceData = '{"line": 1}\n{"line": 2}\n'; var sourceBytes = Utf8Codec().encode(sourceData); var sourceStream = ByteStream.fromBytes(sourceBytes); - var parsedStream = ndjson(sourceStream); + var parsedStream = sourceStream.lines.parseJson; var data = await parsedStream.toList(); expect( data, @@ -156,7 +175,7 @@ void main() { } writer(); - var parsedStream = ndjson(ByteStream(pipe.read)); + var parsedStream = ByteStream(pipe.read).lines.parseJson; var data = await parsedStream.toList(); expect( data, @@ -175,7 +194,7 @@ void main() { } writer(); - var parsedStream = ndjson(ByteStream(pipe.read)); + var parsedStream = ByteStream(pipe.read).lines.parseJson; List result = []; Object? error; @@ -204,7 +223,7 @@ void main() { } writer(); - var parsedStream = ndjson(ByteStream(pipe.read)); + var parsedStream = ByteStream(pipe.read).lines.parseJson; Stream stream2 = genStream('S2:', Duration(milliseconds: 50)).asBroadcastStream(); diff --git a/packages/powersync_core/test/streaming_sync_test.dart b/packages/powersync_core/test/streaming_sync_test.dart index b58eb2d1..40becd16 100644 --- a/packages/powersync_core/test/streaming_sync_test.dart +++ b/packages/powersync_core/test/streaming_sync_test.dart @@ -36,7 +36,7 @@ void main() { final pdb = await testUtils.setupPowerSync(path: path, logger: ignoreLogger); - pdb.retryDelay = Duration(milliseconds: 5000); + const options = SyncOptions(retryDelay: Duration(seconds: 5)); final connector = TestConnector(() async { return PowerSyncCredentials(endpoint: server.endpoint, token: 'token'); }); @@ -48,7 +48,7 @@ void main() { Future connectAndDisconnect() async { for (var i = 0; i < 10; i++) { await Future.delayed(nextDelay()); - await pdb.connect(connector: connector); + await pdb.connect(connector: connector, options: options); await Future.delayed(nextDelay()); await pdb.disconnect(); @@ -69,12 +69,12 @@ void main() { final pdb = await testUtils.setupPowerSync(path: path, logger: ignoreLogger); - pdb.retryDelay = Duration(milliseconds: 50); + const options = SyncOptions(retryDelay: Duration(seconds: 5)); final connector = TestConnector(expectAsync0(() async { return PowerSyncCredentials(endpoint: server.endpoint, token: 'token'); })); - await pdb.connect(connector: connector); + await pdb.connect(connector: connector, options: options); while (server.connectionCount != 1) { await Future.delayed(const Duration(milliseconds: 100)); } @@ -107,11 +107,15 @@ void main() { final pdb = await testUtils.setupPowerSync( path: path, logger: ignoreLogger, initialize: false); - pdb.retryDelay = Duration(milliseconds: 5000); + const options = SyncOptions(retryDelay: Duration(seconds: 5)); - await pdb.connect(connector: TestConnector(() async { - return PowerSyncCredentials(endpoint: server.endpoint, token: 'token'); - })); + await pdb.connect( + connector: TestConnector(() async { + return PowerSyncCredentials( + endpoint: server.endpoint, token: 'token'); + }), + options: options, + ); await expectLater( pdb.statusStream, @@ -134,9 +138,9 @@ void main() { } final pdb = await testUtils.setupPowerSync(path: path); - pdb.retryDelay = Duration(milliseconds: 5000); + const options = SyncOptions(retryDelay: Duration(seconds: 5)); var connector = TestConnector(credentialsCallback); - pdb.connect(connector: connector); + pdb.connect(connector: connector, options: options); await Future.delayed(Duration(milliseconds: random.nextInt(100))); if (random.nextBool()) { @@ -182,9 +186,9 @@ void main() { } final pdb = await testUtils.setupPowerSync(path: path); - pdb.retryDelay = const Duration(milliseconds: 5); + const options = SyncOptions(retryDelay: Duration(seconds: 5)); var connector = TestConnector(credentialsCallback); - pdb.connect(connector: connector); + pdb.connect(connector: connector, options: options); for (var i = 0; i < 10; i++) { server = await createServer(); @@ -209,10 +213,10 @@ void main() { } final pdb = await testUtils.setupPowerSync(path: path); - pdb.retryDelay = Duration(milliseconds: 5000); + const options = SyncOptions(retryDelay: Duration(seconds: 5)); var connector = TestConnector(credentialsCallback); - pdb.connect(connector: connector); - pdb.connect(connector: connector); + pdb.connect(connector: connector, options: options); + pdb.connect(connector: connector, options: options); final watch = Stopwatch()..start(); diff --git a/packages/powersync_core/test/upload_test.dart b/packages/powersync_core/test/upload_test.dart index 4bed1499..d0c8f23a 100644 --- a/packages/powersync_core/test/upload_test.dart +++ b/packages/powersync_core/test/upload_test.dart @@ -52,6 +52,7 @@ void main() { await testUtils.setupPowerSync(path: path, logger: testWarningLogger); // Use a short retry delay here. // A zero retry delay makes this test unstable, since it expects `2` error logs later. + // ignore: deprecated_member_use_from_same_package powersync.retryDelay = Duration(milliseconds: 100); var connector = TestConnector(credentialsCallback, uploadData: uploadData); diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 017f6c95..3ea4a319 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -2,6 +2,7 @@ import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/src/sync/bucket_storage.dart'; +import 'package:powersync_core/src/sync/internal_connector.dart'; import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/sqlite3_common.dart'; @@ -146,16 +147,15 @@ class TestConnector extends PowerSyncBackendConnector { extension MockSync on PowerSyncDatabase { StreamingSyncImplementation connectWithMockService( - Client client, PowerSyncBackendConnector connector) { + Client client, + PowerSyncBackendConnector connector, { + SyncOptions options = const SyncOptions(retryDelay: Duration(seconds: 5)), + }) { final impl = StreamingSyncImplementation( adapter: BucketStorage(this), client: client, - options: ResolvedSyncOptions(SyncOptions( - retryDelay: const Duration(seconds: 5), - )), - credentialsCallback: connector.getCredentialsCached, - invalidCredentialsCallback: connector.prefetchCredentials, - uploadCrud: () => connector.uploadData(this), + options: ResolvedSyncOptions(options), + connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: database .onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)), ); From 832f4a1df7318e6509f6b291786fe007430a7c57 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 20 May 2025 16:25:36 +0200 Subject: [PATCH 3/4] Allow passing params separately --- .../powersync_core/lib/src/database/powersync_db_mixin.dart | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index e33fbd96..6db00e0d 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -38,6 +38,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Use [attachedLogger] to propagate logs to [Logger.root] for custom logging. Logger get logger; + @Deprecated("This field is unused, pass params to connect() instead") Map? clientParams; /// Current connection status. @@ -281,7 +282,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { SyncOptions? options, @Deprecated('Use SyncOptions.crudThrottleTime instead') Duration? crudThrottleTime, - @Deprecated('Use SyncOptions.params instead') Map? params, + Map? params, }) async { // The initialization process acquires a sync connect lock (through // updateSchema), so ensure the database is ready before we try to acquire @@ -295,6 +296,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { params: options?.params ?? params, ); + // ignore: deprecated_member_use_from_same_package clientParams = params; var thisConnectAborter = AbortController(); final zone = Zone.current; From 1749b3c02eabc28ba6ff907b49b4e153497d425d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 20 May 2025 17:08:41 +0200 Subject: [PATCH 4/4] Test closing connections --- .../test/in_memory_sync_test.dart | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index dd71100e..3ab8ae76 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -9,6 +9,7 @@ import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:powersync_core/src/sync/protocol.dart'; import 'package:test/test.dart'; +import 'bucket_storage_test.dart'; import 'server/sync_server/in_memory_sync_server.dart'; import 'utils/abstract_test_utils.dart'; import 'utils/in_memory_http.dart'; @@ -653,6 +654,23 @@ void main() { emits(isSyncStatus(downloading: false, downloadProgress: isNull))); }); }); + + test('stopping closes connections', () async { + final status = await waitForConnection(); + + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '4', + writeCheckpoint: null, + checksums: [checksum(bucket: 'a', checksum: 0)], + ) + }); + + await expectLater(status, emits(isSyncStatus(downloading: true))); + await syncClient.abort(); + + expect(syncService.controller.hasListener, isFalse); + }); }); }