diff --git a/packages/powersync_core/lib/src/abort_controller.dart b/packages/powersync_core/lib/src/abort_controller.dart index 96699c68..d78da3bf 100644 --- a/packages/powersync_core/lib/src/abort_controller.dart +++ b/packages/powersync_core/lib/src/abort_controller.dart @@ -25,7 +25,7 @@ class AbortController { _abortRequested.complete(); } - await _abortCompleter.future; + await onCompletion; } /// Signal that an abort has completed. diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 51c7cc31..6082fbe8 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -120,6 +120,7 @@ class PowerSyncDatabaseImpl required PowerSyncBackendConnector connector, required Duration crudThrottleTime, required AbortController abort, + required Zone asyncWorkZone, Map? params, }) async { final dbRef = database.isolateConnectionFactory(); @@ -157,7 +158,7 @@ class PowerSyncDatabaseImpl await waitForShutdown(); } - receiveMessages.listen((data) async { + Future handleMessage(Object? data) async { if (data is List) { String action = data[0] as String; if (action == "getCredentials") { @@ -192,7 +193,14 @@ class PowerSyncDatabaseImpl record.level, record.message, record.error, record.stackTrace); } } - }); + } + + // This function is called in a Zone marking the connection lock as locked. + // This is used to prevent reentrant calls to the lock (which would be a + // deadlock). However, the lock is returned as soon as this function + // returns - and handleMessage may run later. So, make sure we run those + // callbacks in the parent zone. + receiveMessages.listen(asyncWorkZone.bindUnaryCallback(handleMessage)); receiveUnhandledErrors.listen((message) async { // Sample error: diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index 8966f368..348297ed 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -114,6 +114,7 @@ class PowerSyncDatabaseImpl {required PowerSyncBackendConnector connector, required Duration crudThrottleTime, required AbortController abort, + required Zone asyncWorkZone, Map? params}) { throw UnimplementedError(); } diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 96ac5c4c..1934f463 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -282,6 +282,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { clientParams = params; var thisConnectAborter = AbortController(); + final zone = Zone.current; late void Function() retryHandler; @@ -296,6 +297,9 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { crudThrottleTime: crudThrottleTime, params: params, abort: thisConnectAborter, + // Run follow-up async tasks in the parent zone, a new one is introduced + // while we hold the lock (and async tasks won't hold the sync lock). + asyncWorkZone: zone, ); thisConnectAborter.onCompletion.whenComplete(retryHandler); @@ -347,6 +351,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { required PowerSyncBackendConnector connector, required Duration crudThrottleTime, required AbortController abort, + required Zone asyncWorkZone, Map? params, }); @@ -372,7 +377,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { _abortActiveSync = null; } else { /// Wait for the abort to complete. Continue updating the sync status after completed - await disconnector.onAbort; + await disconnector.onCompletion; } } } diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index acef9399..651bfc6e 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -116,6 +116,7 @@ class PowerSyncDatabaseImpl required PowerSyncBackendConnector connector, required Duration crudThrottleTime, required AbortController abort, + required Zone asyncWorkZone, Map? params, }) async { final crudStream = diff --git a/packages/powersync_core/test/streaming_sync_test.dart b/packages/powersync_core/test/streaming_sync_test.dart index 7b6974ee..b58eb2d1 100644 --- a/packages/powersync_core/test/streaming_sync_test.dart +++ b/packages/powersync_core/test/streaming_sync_test.dart @@ -9,6 +9,7 @@ import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:test/test.dart'; +import 'server/sync_server/in_memory_sync_server.dart'; import 'test_server.dart'; import 'utils/abstract_test_utils.dart'; import 'utils/test_utils_impl.dart'; @@ -61,6 +62,45 @@ void main() { server.close(); }); + test('can disconnect in fetchCredentials', () async { + final service = MockSyncService(); + final server = await createServer(mockSyncService: service); + final ignoreLogger = Logger.detached('powersync.test'); + + final pdb = + await testUtils.setupPowerSync(path: path, logger: ignoreLogger); + pdb.retryDelay = Duration(milliseconds: 50); + final connector = TestConnector(expectAsync0(() async { + return PowerSyncCredentials(endpoint: server.endpoint, token: 'token'); + })); + + await pdb.connect(connector: connector); + while (server.connectionCount != 1) { + await Future.delayed(const Duration(milliseconds: 100)); + } + + service.addKeepAlive(60); + + final didDisconnect = Completer(); + + connector.fetchCredentialsCallback = expectAsync0(() async { + didDisconnect.complete(pdb.disconnect()); + + throw 'deliberate disconnect'; + }); + + service.addKeepAlive(0); + await didDisconnect.future; + expect(pdb.currentStatus.connected, isFalse); + // The error should be cleared after calling disconnect + expect(pdb.currentStatus.downloadError, isNull); + + // Wait for a short while to make sure the database doesn't reconnect. + for (var i = 0; i < 10; i++) { + expect(pdb.currentStatus.connecting, isFalse); + } + }); + test('can connect as initial operation', () async { final server = await createServer(); final ignoreLogger = Logger.detached('powersync.test'); diff --git a/packages/powersync_core/test/test_server.dart b/packages/powersync_core/test/test_server.dart index 6b456076..372da797 100644 --- a/packages/powersync_core/test/test_server.dart +++ b/packages/powersync_core/test/test_server.dart @@ -8,7 +8,9 @@ import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart' as shelf_io; import 'package:shelf_router/shelf_router.dart'; -class TestServer { +import 'server/sync_server/in_memory_sync_server.dart'; + +final class TestServer { late HttpServer server; Router app = Router(); int maxConnectionCount = 0; @@ -16,10 +18,11 @@ class TestServer { TestServer({this.tokenExpiresIn = 65}); - Future init() async { + Future init({MockSyncService? mockSyncService}) async { app.post('/sync/stream', handleSyncStream); // Open on an arbitrary open port - server = await shelf_io.serve(app.call, 'localhost', 0); + server = await shelf_io.serve( + mockSyncService?.router.call ?? app.call, 'localhost', 0); } String get endpoint { @@ -34,6 +37,9 @@ class TestServer { return server.connectionsInfo(); } + /// The default response if no [MockSyncService] has been passed to [init]. + /// + /// This will emit keepalive messages frequently. Future handleSyncStream(Request request) async { maxConnectionCount = max(connectionCount, maxConnectionCount); @@ -61,9 +67,9 @@ class TestServer { } } -Future createServer() async { +Future createServer({MockSyncService? mockSyncService}) async { var server = TestServer(); - await server.init(); + await server.init(mockSyncService: mockSyncService); return server; } diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index f916f55e..116f319a 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -125,21 +125,21 @@ abstract class AbstractTestUtils { } class TestConnector extends PowerSyncBackendConnector { - final Future Function() _fetchCredentials; - final Future Function(PowerSyncDatabase)? _uploadData; + Future Function() fetchCredentialsCallback; + Future Function(PowerSyncDatabase)? uploadDataCallback; - TestConnector(this._fetchCredentials, + TestConnector(this.fetchCredentialsCallback, {Future Function(PowerSyncDatabase)? uploadData}) - : _uploadData = uploadData; + : uploadDataCallback = uploadData; @override Future fetchCredentials() { - return _fetchCredentials(); + return fetchCredentialsCallback(); } @override Future uploadData(PowerSyncDatabase database) async { - await _uploadData?.call(database); + await uploadDataCallback?.call(database); } }