diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 24284751..8b4c0cb0 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -35,7 +35,7 @@ class StreamingSyncImplementation implements StreamingSync { final InternalConnector connector; final ResolvedSyncOptions options; - final Logger logger = isolateLogger; + final Logger logger; final Stream crudUpdateTriggerStream; @@ -68,6 +68,7 @@ class StreamingSyncImplementation implements StreamingSync { required http.Client client, Mutex? syncMutex, Mutex? crudMutex, + Logger? logger, /// A unique identifier for this streaming sync implementation /// A good value is typically the DB file path which it will mutate when syncing. @@ -75,7 +76,8 @@ class StreamingSyncImplementation implements StreamingSync { }) : _client = client, syncMutex = syncMutex ?? Mutex(identifier: "sync-$identifier"), crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"), - _userAgentHeaders = userAgentHeaders(); + _userAgentHeaders = userAgentHeaders(), + logger = logger ?? isolateLogger; Duration get _retryDelay => options.retryDelay; @@ -122,6 +124,7 @@ class StreamingSyncImplementation implements StreamingSync { @override Future streamingSync() async { try { + assert(_abort == null); _abort = AbortController(); clientId = await adapter.getClientId(); _crudLoop(); @@ -310,7 +313,7 @@ class StreamingSyncImplementation implements StreamingSync { var merged = addBroadcast(requestStream, _nonLineSyncEvents.stream); Future? credentialsInvalidation; - bool haveInvalidated = false; + bool shouldStopIteration = false; // Trigger a CRUD upload on reconnect _internalCrudTriggerController.add(null); @@ -336,6 +339,7 @@ class StreamingSyncImplementation implements StreamingSync { case StreamingSyncCheckpointComplete(): final result = await _applyCheckpoint(targetCheckpoint!, _abort); if (result.abort) { + shouldStopIteration = true; return; } case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority): @@ -345,6 +349,7 @@ class StreamingSyncImplementation implements StreamingSync { // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off // await new Promise((resolve) => setTimeout(resolve, 50)); + shouldStopIteration = true; return; } else if (!result.ready) { // If we have pending uploads, we can't complete new checkpoints @@ -398,13 +403,14 @@ class StreamingSyncImplementation implements StreamingSync { if (tokenExpiresIn == 0) { // Token expired already - stop the connection immediately connector.prefetchCredentials(invalidate: true).ignore(); + shouldStopIteration = true; break; } else if (tokenExpiresIn <= 30) { // Token expires soon - refresh it in the background credentialsInvalidation ??= connector.prefetchCredentials().then((_) { // Token has been refreshed - we should restart the connection. - haveInvalidated = true; + shouldStopIteration = true; // trigger next loop iteration ASAP, don't wait for another // message from the server. if (!aborted) { @@ -421,7 +427,7 @@ class StreamingSyncImplementation implements StreamingSync { } await for (var line in merged) { - if (aborted || haveInvalidated) { + if (aborted || shouldStopIteration) { break; } @@ -434,10 +440,10 @@ class StreamingSyncImplementation implements StreamingSync { break; case TokenRefreshComplete(): // We have a new token, so stop the iteration. - haveInvalidated = true; + shouldStopIteration = true; } - if (haveInvalidated) { + if (shouldStopIteration) { // Stop this connection, so that a new one will be started break; } diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 89af5727..d5bf6e5d 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -4,7 +4,6 @@ import 'package:async/async.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite3_common.dart'; -import 'package:powersync_core/src/log_internal.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:powersync_core/src/sync/protocol.dart'; import 'package:test/test.dart'; @@ -25,6 +24,7 @@ void main() { late CommonDatabase raw; late PowerSyncDatabase database; late MockSyncService syncService; + late Logger logger; late StreamingSync syncClient; var credentialsCallbackCount = 0; @@ -34,7 +34,7 @@ void main() { final (client, server) = inMemoryServer(); server.mount(syncService.router.call); - syncClient = database.connectWithMockService( + final thisSyncClient = syncClient = database.connectWithMockService( client, TestConnector(() async { credentialsCallbackCount++; @@ -44,10 +44,17 @@ void main() { expiresAt: DateTime.now(), ); }, uploadData: (db) => uploadData(db)), + options: const SyncOptions(retryDelay: Duration(milliseconds: 200)), + logger: logger, ); + + addTearDown(() async { + await thisSyncClient.abort(); + }); } setUp(() async { + logger = Logger.detached('powersync.active')..level = Level.ALL; credentialsCallbackCount = 0; syncService = MockSyncService(); @@ -58,7 +65,6 @@ void main() { }); tearDown(() async { - await syncClient.abort(); await database.close(); await syncService.stop(); }); @@ -66,9 +72,9 @@ void main() { Future> waitForConnection( {bool expectNoWarnings = true}) async { if (expectNoWarnings) { - isolateLogger.onRecord.listen((e) { + logger.onRecord.listen((e) { if (e.level >= Level.WARNING) { - fail('Unexpected log: $e'); + fail('Unexpected log: $e, ${e.stackTrace}'); } }); } @@ -700,6 +706,52 @@ void main() { expect(syncService.controller.hasListener, isFalse); }); + + test('closes connection after failed checksum', () async { + final status = await waitForConnection(expectNoWarnings: false); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '4', + writeCheckpoint: null, + checksums: [checksum(bucket: 'a', checksum: 10)], + ) + }); + + await expectLater(status, emits(isSyncStatus(downloading: true))); + syncService.addLine({ + 'checkpoint_complete': {'last_op_id': '10'} + }); + + await pumpEventQueue(); + expect(syncService.controller.hasListener, isFalse); + syncService.endCurrentListener(); + + // Should reconnect after delay. + await Future.delayed(const Duration(milliseconds: 500)); + expect(syncService.controller.hasListener, isTrue); + }); + + test('closes connection after token expires', () async { + final status = await waitForConnection(expectNoWarnings: false); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '4', + writeCheckpoint: null, + checksums: [checksum(bucket: 'a', checksum: 10)], + ) + }); + + await expectLater(status, emits(isSyncStatus(downloading: true))); + syncService.addKeepAlive(0); + + await pumpEventQueue(); + expect(syncService.controller.hasListener, isFalse); + syncService.endCurrentListener(); + + // Should reconnect after delay. + await Future.delayed(const Duration(milliseconds: 500)); + expect(syncService.controller.hasListener, isTrue); + }); }); } diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 3ea4a319..95f71211 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -149,6 +149,7 @@ extension MockSync on PowerSyncDatabase { StreamingSyncImplementation connectWithMockService( Client client, PowerSyncBackendConnector connector, { + Logger? logger, SyncOptions options = const SyncOptions(retryDelay: Duration(seconds: 5)), }) { final impl = StreamingSyncImplementation( @@ -156,6 +157,7 @@ extension MockSync on PowerSyncDatabase { client: client, options: ResolvedSyncOptions(options), connector: InternalConnector.wrap(connector, this), + logger: logger, crudUpdateTriggerStream: database .onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)), );