diff --git a/packages/powersync/lib/src/powersync_database.dart b/packages/powersync/lib/src/powersync_database.dart index 00cadc9e..7570ca71 100644 --- a/packages/powersync/lib/src/powersync_database.dart +++ b/packages/powersync/lib/src/powersync_database.dart @@ -223,13 +223,24 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection { /// Throttle time between CRUD operations /// Defaults to 10 milliseconds. Duration crudThrottleTime = const Duration(milliseconds: 10)}) async { - _connectMutex.lock(() => - _connect(connector: connector, crudThrottleTime: crudThrottleTime)); + Zone current = Zone.current; + + Future reconnect() { + return _connectMutex.lock(() => _connect( + connector: connector, + crudThrottleTime: crudThrottleTime, + // The reconnect function needs to run in the original zone, + // to avoid recursive lock errors. + reconnect: current.bindCallback(reconnect))); + } + + await reconnect(); } Future _connect( {required PowerSyncBackendConnector connector, - required Duration crudThrottleTime}) async { + required Duration crudThrottleTime, + required Future Function() reconnect}) async { await initialize(); // Disconnect if connected @@ -298,7 +309,9 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection { logger.severe('Sync Isolate error', message); // Reconnect - connect(connector: connector); + // Use the param like this instead of directly calling connect(), to avoid recursive + // locks in some edge cases. + reconnect(); }); disconnected() { @@ -565,6 +578,7 @@ Future _powerSyncDatabaseIsolate( CommonDatabase? db; final mutex = args.dbRef.mutex.open(); + StreamingSyncImplementation? openedStreamingSync; rPort.listen((message) async { if (message is List) { @@ -579,6 +593,9 @@ Future _powerSyncDatabaseIsolate( db = null; updateController.close(); upstreamDbClient.close(); + // Abort any open http requests, and wait for it to be closed properly + await openedStreamingSync?.abort(); + // No kill the Isolate Isolate.current.kill(); } } @@ -625,6 +642,7 @@ Future _powerSyncDatabaseIsolate( uploadCrud: uploadCrud, updateStream: updateController.stream, retryDelay: args.retryDelay); + openedStreamingSync = sync; sync.streamingSync(); sync.statusStream.listen((event) { sPort.send(['status', event]); diff --git a/packages/powersync/lib/src/streaming_sync.dart b/packages/powersync/lib/src/streaming_sync.dart index e7c10097..71325377 100644 --- a/packages/powersync/lib/src/streaming_sync.dart +++ b/packages/powersync/lib/src/streaming_sync.dart @@ -3,6 +3,7 @@ import 'dart:convert' as convert; import 'dart:io'; import 'package:http/http.dart' as http; +import 'package:powersync/src/abort_controller.dart'; import 'package:powersync/src/exceptions.dart'; import 'package:powersync/src/log_internal.dart'; @@ -39,6 +40,10 @@ class StreamingSyncImplementation { SyncStatus lastStatus = const SyncStatus(); + AbortController? _abort; + + bool _safeToClose = true; + StreamingSyncImplementation( {required this.adapter, required this.credentialsCallback, @@ -50,34 +55,74 @@ class StreamingSyncImplementation { statusStream = _statusStreamController.stream; } + /// Close any active streams. + Future abort() async { + // If streamingSync() hasn't been called yet, _abort will be null. + var future = _abort?.abort(); + // This immediately triggers a new iteration in the merged stream, allowing us + // to break immediately. + // However, we still need to close the underlying stream explicitly, otherwise + // the break will wait for the next line of data received on the stream. + _localPingController.add(null); + // According to the documentation, the behavior is undefined when calling + // close() while requests are pending. However, this is no other + // known way to cancel open streams, and this appears to end the stream with + // a consistent ClientException if a request is open. + // We avoid closing the client while opening a request, as that does cause + // unpredicable uncaught errors. + if (_safeToClose) { + _client.close(); + } + // wait for completeAbort() to be called + await future; + + // Now close the client in all cases not covered above + _client.close(); + } + + bool get aborted { + return _abort?.aborted ?? false; + } + Future streamingSync() async { - crudLoop(); - var invalidCredentials = false; - while (true) { - _updateStatus(connecting: true); - try { - if (invalidCredentials && invalidCredentialsCallback != null) { - // This may error. In that case it will be retried again on the next - // iteration. - await invalidCredentialsCallback!(); - invalidCredentials = false; - } - await streamingSyncIteration(); - // Continue immediately - } catch (e, stacktrace) { - final message = _syncErrorMessage(e); - isolateLogger.warning('Sync error: $message', e, stacktrace); - invalidCredentials = true; + try { + _abort = AbortController(); + crudLoop(); + var invalidCredentials = false; + while (!aborted) { + _updateStatus(connecting: true); + try { + if (invalidCredentials && invalidCredentialsCallback != null) { + // This may error. In that case it will be retried again on the next + // iteration. + await invalidCredentialsCallback!(); + invalidCredentials = false; + } + await streamingSyncIteration(); + // Continue immediately + } catch (e, stacktrace) { + if (aborted && e is http.ClientException) { + // Explicit abort requested - ignore. Example error: + // ClientException: Connection closed while receiving data, uri=http://localhost:8080/sync/stream + return; + } + final message = _syncErrorMessage(e); + isolateLogger.warning('Sync error: $message', e, stacktrace); + invalidCredentials = true; - _updateStatus( - connected: false, - connecting: true, - downloading: false, - downloadError: e); + _updateStatus( + connected: false, + connecting: true, + downloading: false, + downloadError: e); - // On error, wait a little before retrying - await Future.delayed(retryDelay); + // On error, wait a little before retrying + // When aborting, don't wait + await Future.any([Future.delayed(retryDelay), _abort!.onAbort]); + } } + } finally { + _abort!.completeAbort(); } } @@ -206,6 +251,10 @@ class StreamingSyncImplementation { bool haveInvalidated = false; await for (var line in merged) { + if (aborted) { + break; + } + _updateStatus(connected: true, connecting: false); if (line is Checkpoint) { targetCheckpoint = line; @@ -338,7 +387,18 @@ class StreamingSyncImplementation { request.headers['Authorization'] = "Token ${credentials.token}"; request.body = convert.jsonEncode(data); - final res = await _client.send(request); + http.StreamedResponse res; + try { + // Do not close the client during the request phase - this causes uncaught errors. + _safeToClose = false; + res = await _client.send(request); + } finally { + _safeToClose = true; + } + if (aborted) { + return; + } + if (res.statusCode == 401) { if (invalidCredentialsCallback != null) { await invalidCredentialsCallback!(); @@ -350,6 +410,9 @@ class StreamingSyncImplementation { // Note: The response stream is automatically closed when this loop errors await for (var line in ndjson(res.stream)) { + if (aborted) { + break; + } yield parseStreamingSyncLine(line as Map); } } diff --git a/packages/powersync/test/streaming_sync_test.dart b/packages/powersync/test/streaming_sync_test.dart index adda1399..52bf6acd 100644 --- a/packages/powersync/test/streaming_sync_test.dart +++ b/packages/powersync/test/streaming_sync_test.dart @@ -59,6 +59,15 @@ void main() { await pdb.close(); + // Give some time for connections to close + final watch = Stopwatch()..start(); + while (server.connectionCount != 0 && watch.elapsedMilliseconds < 100) { + await Future.delayed(Duration(milliseconds: random.nextInt(10))); + } + + expect(server.connectionCount, equals(0)); + expect(server.maxConnectionCount, lessThanOrEqualTo(1)); + server.close(); } }); diff --git a/packages/powersync/test/test_server.dart b/packages/powersync/test/test_server.dart index 2b9ac8b6..8054582e 100644 --- a/packages/powersync/test/test_server.dart +++ b/packages/powersync/test/test_server.dart @@ -11,7 +11,6 @@ import 'package:shelf_router/shelf_router.dart'; class TestServer { late HttpServer server; Router app = Router(); - int connectionCount = 0; int maxConnectionCount = 0; int tokenExpiresIn; @@ -27,19 +26,22 @@ class TestServer { return 'http://${server.address.host}:${server.port}'; } + int get connectionCount { + return server.connectionsInfo().total; + } + + HttpConnectionsInfo connectionsInfo() { + return server.connectionsInfo(); + } + Future handleSyncStream(Request request) async { - connectionCount += 1; maxConnectionCount = max(connectionCount, maxConnectionCount); stream() async* { - try { - var blob = "*" * 5000; - for (var i = 0; i < 50; i++) { - yield {"token_expires_in": tokenExpiresIn, "blob": blob}; - await Future.delayed(Duration(microseconds: 1)); - } - } finally { - connectionCount -= 1; + var blob = "*" * 5000; + for (var i = 0; i < 50; i++) { + yield {"token_expires_in": tokenExpiresIn, "blob": blob}; + await Future.delayed(Duration(microseconds: 1)); } }