diff --git a/packages/powersync_core/lib/src/bucket_storage.dart b/packages/powersync_core/lib/src/bucket_storage.dart index 6815e8d9..9df64f17 100644 --- a/packages/powersync_core/lib/src/bucket_storage.dart +++ b/packages/powersync_core/lib/src/bucket_storage.dart @@ -344,121 +344,6 @@ class BucketState { } } -class SyncDataBatch { - List buckets; - - SyncDataBatch(this.buckets); -} - -class SyncBucketData { - final String bucket; - final List data; - final bool hasMore; - final String? after; - final String? nextAfter; - - const SyncBucketData( - {required this.bucket, - required this.data, - this.hasMore = false, - this.after, - this.nextAfter}); - - SyncBucketData.fromJson(Map json) - : bucket = json['bucket'] as String, - hasMore = json['has_more'] as bool? ?? false, - after = json['after'] as String?, - nextAfter = json['next_after'] as String?, - data = (json['data'] as List) - .map((e) => OplogEntry.fromJson(e as Map)) - .toList(); - - Map toJson() { - return { - 'bucket': bucket, - 'has_more': hasMore, - 'after': after, - 'next_after': nextAfter, - 'data': data - }; - } -} - -class OplogEntry { - final String opId; - - final OpType? op; - - /// rowType + rowId uniquely identifies an entry in the local database. - final String? rowType; - final String? rowId; - - /// Together with rowType and rowId, this uniquely identifies a source entry - /// per bucket in the oplog. There may be multiple source entries for a single - /// "rowType + rowId" combination. - final String? subkey; - - final String? data; - final int checksum; - - const OplogEntry( - {required this.opId, - required this.op, - this.subkey, - this.rowType, - this.rowId, - this.data, - required this.checksum}); - - OplogEntry.fromJson(Map json) - : opId = json['op_id'] as String, - op = OpType.fromJson(json['op'] as String), - rowType = json['object_type'] as String?, - rowId = json['object_id'] as String?, - checksum = json['checksum'] as int, - data = switch (json['data']) { - String data => data, - var other => jsonEncode(other), - }, - subkey = switch (json['subkey']) { - String subkey => subkey, - _ => null, - }; - - Map? get parsedData { - return switch (data) { - final data? => jsonDecode(data) as Map, - null => null, - }; - } - - /// Key to uniquely represent a source entry in a bucket. - /// This is used to supersede old entries. - /// Relevant for put and remove ops. - String get key { - return "$rowType/$rowId/$subkey"; - } - - Map toJson() { - return { - 'op_id': opId, - 'op': op?.toJson(), - 'object_type': rowType, - 'object_id': rowId, - 'checksum': checksum, - 'subkey': subkey, - 'data': data - }; - } -} - -class SqliteOp { - String sql; - List args; - - SqliteOp(this.sql, this.args); -} - class SyncLocalDatabaseResult { final bool ready; final bool checkpointValid; diff --git a/packages/powersync_core/lib/src/stream_utils.dart b/packages/powersync_core/lib/src/stream_utils.dart index 46be2f42..1360c7bd 100644 --- a/packages/powersync_core/lib/src/stream_utils.dart +++ b/packages/powersync_core/lib/src/stream_utils.dart @@ -68,13 +68,6 @@ Stream ndjson(ByteStream input) { return jsonInput; } -/// Given a raw ByteStream, parse each line as JSON. -Stream newlines(ByteStream input) { - final textInput = input.transform(convert.utf8.decoder); - final lineInput = textInput.transform(const convert.LineSplitter()); - return lineInput; -} - void pauseAll(List> subscriptions) { for (var sub in subscriptions) { sub.pause(); diff --git a/packages/powersync_core/lib/src/streaming_sync.dart b/packages/powersync_core/lib/src/streaming_sync.dart index f8a15566..045cc8af 100644 --- a/packages/powersync_core/lib/src/streaming_sync.dart +++ b/packages/powersync_core/lib/src/streaming_sync.dart @@ -53,7 +53,7 @@ class StreamingSyncImplementation implements StreamingSync { late final http.Client _client; - final StreamController _localPingController = + final StreamController _localPingController = StreamController.broadcast(); final Duration retryDelay; @@ -340,96 +340,19 @@ class StreamingSyncImplementation implements StreamingSync { } _updateStatus(connected: true, connecting: false); - if (line is Checkpoint) { - targetCheckpoint = line; - final Set bucketsToDelete = {...bucketSet}; - final Set newBuckets = {}; - for (final checksum in line.checksums) { - newBuckets.add(checksum.bucket); - bucketsToDelete.remove(checksum.bucket); - } - bucketSet = newBuckets; - await adapter.removeBuckets([...bucketsToDelete]); - _updateStatus(downloading: true); - } else if (line is StreamingSyncCheckpointComplete) { - final result = await adapter.syncLocalDatabase(targetCheckpoint!); - if (!result.checkpointValid) { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - // await new Promise((resolve) => setTimeout(resolve, 50)); - return false; - } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - } else { - appliedCheckpoint = targetCheckpoint; - - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: DateTime.now()); - } - - validatedCheckpoint = targetCheckpoint; - } else if (line is StreamingSyncCheckpointDiff) { - // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint - if (targetCheckpoint == null) { - throw PowerSyncProtocolException( - 'Checkpoint diff without previous checkpoint'); - } - _updateStatus(downloading: true); - final diff = line; - final Map newBuckets = {}; - for (var checksum in targetCheckpoint.checksums) { - newBuckets[checksum.bucket] = checksum; - } - for (var checksum in diff.updatedBuckets) { - newBuckets[checksum.bucket] = checksum; - } - for (var bucket in diff.removedBuckets) { - newBuckets.remove(bucket); - } - - final newCheckpoint = Checkpoint( - lastOpId: diff.lastOpId, - checksums: [...newBuckets.values], - writeCheckpoint: diff.writeCheckpoint); - targetCheckpoint = newCheckpoint; - - bucketSet = Set.from(newBuckets.keys); - await adapter.removeBuckets(diff.removedBuckets); - adapter.setTargetCheckpoint(targetCheckpoint); - } else if (line is SyncBucketData) { - _updateStatus(downloading: true); - await adapter.saveSyncData(SyncDataBatch([line])); - } else if (line is StreamingSyncKeepalive) { - if (line.tokenExpiresIn == 0) { - // Token expired already - stop the connection immediately - invalidCredentialsCallback?.call().ignore(); - break; - } else if (line.tokenExpiresIn <= 30) { - // Token expires soon - refresh it in the background - if (credentialsInvalidation == null && - invalidCredentialsCallback != null) { - credentialsInvalidation = invalidCredentialsCallback!().then((_) { - // Token has been refreshed - we should restart the connection. - haveInvalidated = true; - // trigger next loop iteration ASAP, don't wait for another - // message from the server. - _localPingController.add(null); - }, onError: (_) { - // Token refresh failed - retry on next keepalive. - credentialsInvalidation = null; - }); + switch (line) { + case Checkpoint(): + targetCheckpoint = line; + final Set bucketsToDelete = {...bucketSet}; + final Set newBuckets = {}; + for (final checksum in line.checksums) { + newBuckets.add(checksum.bucket); + bucketsToDelete.remove(checksum.bucket); } - } - } else { - if (targetCheckpoint == appliedCheckpoint) { - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: DateTime.now()); - } else if (validatedCheckpoint == targetCheckpoint) { + bucketSet = newBuckets; + await adapter.removeBuckets([...bucketsToDelete]); + _updateStatus(downloading: true); + case StreamingSyncCheckpointComplete(): final result = await adapter.syncLocalDatabase(targetCheckpoint!); if (!result.checkpointValid) { // This means checksums failed. Start again with a new checkpoint. @@ -447,7 +370,88 @@ class StreamingSyncImplementation implements StreamingSync { downloadError: _noError, lastSyncedAt: DateTime.now()); } - } + + validatedCheckpoint = targetCheckpoint; + case StreamingSyncCheckpointDiff(): + // TODO: It may be faster to just keep track of the diff, instead of + // the entire checkpoint + if (targetCheckpoint == null) { + throw PowerSyncProtocolException( + 'Checkpoint diff without previous checkpoint'); + } + _updateStatus(downloading: true); + final diff = line; + final Map newBuckets = {}; + for (var checksum in targetCheckpoint.checksums) { + newBuckets[checksum.bucket] = checksum; + } + for (var checksum in diff.updatedBuckets) { + newBuckets[checksum.bucket] = checksum; + } + for (var bucket in diff.removedBuckets) { + newBuckets.remove(bucket); + } + + final newCheckpoint = Checkpoint( + lastOpId: diff.lastOpId, + checksums: [...newBuckets.values], + writeCheckpoint: diff.writeCheckpoint); + targetCheckpoint = newCheckpoint; + + bucketSet = Set.from(newBuckets.keys); + await adapter.removeBuckets(diff.removedBuckets); + adapter.setTargetCheckpoint(targetCheckpoint); + case SyncDataBatch(): + _updateStatus(downloading: true); + await adapter.saveSyncData(line); + case StreamingSyncKeepalive(:final tokenExpiresIn): + if (tokenExpiresIn == 0) { + // Token expired already - stop the connection immediately + invalidCredentialsCallback?.call().ignore(); + break; + } else if (tokenExpiresIn <= 30) { + // Token expires soon - refresh it in the background + if (credentialsInvalidation == null && + invalidCredentialsCallback != null) { + credentialsInvalidation = invalidCredentialsCallback!().then((_) { + // Token has been refreshed - we should restart the connection. + haveInvalidated = true; + // trigger next loop iteration ASAP, don't wait for another + // message from the server. + _localPingController.add(null); + }, onError: (_) { + // Token refresh failed - retry on next keepalive. + credentialsInvalidation = null; + }); + } + } + case UnknownSyncLine(:final rawData): + isolateLogger.fine('Unknown sync line: $rawData'); + case null: // Local ping + if (targetCheckpoint == appliedCheckpoint) { + _updateStatus( + downloading: false, + downloadError: _noError, + lastSyncedAt: DateTime.now()); + } else if (validatedCheckpoint == targetCheckpoint) { + final result = await adapter.syncLocalDatabase(targetCheckpoint!); + if (!result.checkpointValid) { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + // await new Promise((resolve) => setTimeout(resolve, 50)); + return false; + } else if (!result.ready) { + // Checksums valid, but need more data for a consistent checkpoint. + // Continue waiting. + } else { + appliedCheckpoint = targetCheckpoint; + + _updateStatus( + downloading: false, + downloadError: _noError, + lastSyncedAt: DateTime.now()); + } + } } if (haveInvalidated) { @@ -458,7 +462,8 @@ class StreamingSyncImplementation implements StreamingSync { return true; } - Stream streamingSyncRequest(StreamingSyncRequest data) async* { + Stream streamingSyncRequest( + StreamingSyncRequest data) async* { final credentials = await credentialsCallback(); if (credentials == null) { throw CredentialsException('Not logged in'); @@ -494,12 +499,10 @@ class StreamingSyncImplementation implements StreamingSync { } // Note: The response stream is automatically closed when this loop errors - await for (var line in ndjson(res.stream)) { - if (aborted) { - break; - } - yield parseStreamingSyncLine(line as Map); - } + yield* ndjson(res.stream) + .cast>() + .transform(StreamingSyncLine.reader) + .takeWhile((_) => !aborted); } /// Delays the standard `retryDelay` Duration, but exits early if diff --git a/packages/powersync_core/lib/src/sync_types.dart b/packages/powersync_core/lib/src/sync_types.dart index 5ff51de0..e5d4ab28 100644 --- a/packages/powersync_core/lib/src/sync_types.dart +++ b/packages/powersync_core/lib/src/sync_types.dart @@ -1,6 +1,118 @@ +import 'dart:async'; +import 'dart:convert'; + import 'bucket_storage.dart'; -class Checkpoint { +/// Messages sent from the sync service. +sealed class StreamingSyncLine { + const StreamingSyncLine(); + + /// Parses a [StreamingSyncLine] from JSON. + static StreamingSyncLine fromJson(Map line) { + if (line.containsKey('checkpoint')) { + return Checkpoint.fromJson(line['checkpoint'] as Map); + } else if (line.containsKey('checkpoint_diff')) { + return StreamingSyncCheckpointDiff.fromJson( + line['checkpoint_diff'] as Map); + } else if (line.containsKey('checkpoint_complete')) { + return StreamingSyncCheckpointComplete.fromJson( + line['checkpoint_complete'] as Map); + } else if (line.containsKey('data')) { + return SyncDataBatch([ + SyncBucketData.fromJson(line['data'] as Map), + ]); + } else if (line.containsKey('token_expires_in')) { + return StreamingSyncKeepalive.fromJson(line); + } else { + return UnknownSyncLine(line); + } + } + + /// A [StreamTransformer] that returns a stream emitting raw JSON objects into + /// a stream emitting [StreamingSyncLine]. + static StreamTransformer, StreamingSyncLine> reader = + StreamTransformer.fromBind((source) { + return Stream.eventTransformed(source, _StreamingSyncLineParser.new); + }); +} + +final class _StreamingSyncLineParser + implements EventSink> { + final EventSink _out; + + /// When we receive multiple `data` lines in quick succession, group them into + /// a single batch. This will make the streaming sync service insert them with + /// a single transaction, which is more efficient than inserting them + /// individually. + (SyncDataBatch, Timer)? _pendingBatch; + + _StreamingSyncLineParser(this._out); + + void _flushBatch() { + if (_pendingBatch case (final pending, final timer)?) { + timer.cancel(); + _pendingBatch = null; + _out.add(pending); + } + } + + @override + void add(Map event) { + final parsed = StreamingSyncLine.fromJson(event); + + // Buffer small batches and group them to reduce amounts of transactions + // used to store them. + if (parsed is SyncDataBatch && parsed.totalOperations <= 100) { + if (_pendingBatch case (final batch, _)?) { + // Add this line to the pending batch of data items + batch.buckets.addAll(parsed.buckets); + + if (batch.totalOperations >= 1000) { + // This is unlikely to happen since we're only buffering for a single + // event loop iteration, but make sure we're not keeping huge amonts + // of data in memory. + _flushBatch(); + } + } else { + // Insert of adding this batch directly, keep it buffered here for a + // while so that we can add new entries to it. + final timer = Timer(Duration.zero, () { + _out.add(_pendingBatch!.$1); + _pendingBatch = null; + }); + _pendingBatch = (parsed, timer); + } + } else { + _flushBatch(); + _out.add(parsed); + } + } + + @override + void addError(Object error, [StackTrace? stackTrace]) { + _flushBatch(); + _out.addError(error, stackTrace); + } + + @override + void close() { + _flushBatch(); + _out.close(); + } +} + +/// A message from the sync service that this client doesn't support. +final class UnknownSyncLine implements StreamingSyncLine { + final Map rawData; + + const UnknownSyncLine(this.rawData); +} + +/// Indicates that a checkpoint is available, along with checksums for each +/// bucket in the checkpoint. +/// +/// Note: Called `StreamingSyncCheckpoint` in sync service protocol. +final class Checkpoint extends StreamingSyncLine { final String lastOpId; final String? writeCheckpoint; final List checksums; @@ -26,7 +138,7 @@ class Checkpoint { } } -class BucketChecksum { +final class BucketChecksum { final String bucket; final int checksum; @@ -47,16 +159,12 @@ class BucketChecksum { lastOpId = json['last_op_id'] as String?; } -class StreamingSyncCheckpoint { - Checkpoint checkpoint; - - StreamingSyncCheckpoint(this.checkpoint); - - StreamingSyncCheckpoint.fromJson(Map json) - : checkpoint = Checkpoint.fromJson(json); -} - -class StreamingSyncCheckpointDiff { +/// A variant of [Checkpoint] that may be sent when the server has already sent +/// a [Checkpoint] message before. +/// +/// It has the same conceptual meaning as a [Checkpoint] message, but only +/// contains details about changed buckets as an optimization. +final class StreamingSyncCheckpointDiff extends StreamingSyncLine { String lastOpId; List updatedBuckets; List removedBuckets; @@ -74,7 +182,12 @@ class StreamingSyncCheckpointDiff { removedBuckets = (json['removed_buckets'] as List).cast(); } -class StreamingSyncCheckpointComplete { +/// Sent after the last [SyncBucketData] message for a checkpoint. +/// +/// Since this indicates that we may have a consistent view of the data, the +/// client may make previous [SyncBucketData] rows visible to the application +/// at this point. +final class StreamingSyncCheckpointComplete extends StreamingSyncLine { String lastOpId; StreamingSyncCheckpointComplete(this.lastOpId); @@ -83,7 +196,12 @@ class StreamingSyncCheckpointComplete { : lastOpId = json['last_op_id'] as String; } -class StreamingSyncKeepalive { +/// Sent as a periodic ping to keep the connection alive and to notify the +/// client about the remaining lifetime of the JWT. +/// +/// When the token is nearing its expiry date, the client may ask for another +/// one and open a new sync session with that token. +final class StreamingSyncKeepalive extends StreamingSyncLine { int tokenExpiresIn; StreamingSyncKeepalive(this.tokenExpiresIn); @@ -92,24 +210,6 @@ class StreamingSyncKeepalive { : tokenExpiresIn = json['token_expires_in'] as int; } -Object? parseStreamingSyncLine(Map line) { - if (line.containsKey('checkpoint')) { - return Checkpoint.fromJson(line['checkpoint'] as Map); - } else if (line.containsKey('checkpoint_diff')) { - return StreamingSyncCheckpointDiff.fromJson( - line['checkpoint_diff'] as Map); - } else if (line.containsKey('checkpoint_complete')) { - return StreamingSyncCheckpointComplete.fromJson( - line['checkpoint_complete'] as Map); - } else if (line.containsKey('data')) { - return SyncBucketData.fromJson(line['data'] as Map); - } else if (line.containsKey('token_expires_in')) { - return StreamingSyncKeepalive.fromJson(line); - } else { - return null; - } -} - class StreamingSyncRequest { List buckets; bool includeChecksum = true; @@ -145,3 +245,119 @@ class BucketRequest { 'after': after, }; } + +/// A batch of sync operations being delivered from the sync service. +/// +/// Note that the service will always send individual [SyncBucketData] lines, +/// but we group them into [SyncDataBatch]es because writing multiple entries +/// at once improves performance. +final class SyncDataBatch extends StreamingSyncLine { + List buckets; + + int get totalOperations => + buckets.fold(0, (prev, data) => prev + data.data.length); + + SyncDataBatch(this.buckets); +} + +final class SyncBucketData { + final String bucket; + final List data; + final bool hasMore; + final String? after; + final String? nextAfter; + + const SyncBucketData( + {required this.bucket, + required this.data, + this.hasMore = false, + this.after, + this.nextAfter}); + + SyncBucketData.fromJson(Map json) + : bucket = json['bucket'] as String, + hasMore = json['has_more'] as bool? ?? false, + after = json['after'] as String?, + nextAfter = json['next_after'] as String?, + data = (json['data'] as List) + .map((e) => OplogEntry.fromJson(e as Map)) + .toList(); + + Map toJson() { + return { + 'bucket': bucket, + 'has_more': hasMore, + 'after': after, + 'next_after': nextAfter, + 'data': data + }; + } +} + +class OplogEntry { + final String opId; + + final OpType? op; + + /// rowType + rowId uniquely identifies an entry in the local database. + final String? rowType; + final String? rowId; + + /// Together with rowType and rowId, this uniquely identifies a source entry + /// per bucket in the oplog. There may be multiple source entries for a single + /// "rowType + rowId" combination. + final String? subkey; + + final String? data; + final int checksum; + + const OplogEntry( + {required this.opId, + required this.op, + this.subkey, + this.rowType, + this.rowId, + this.data, + required this.checksum}); + + OplogEntry.fromJson(Map json) + : opId = json['op_id'] as String, + op = OpType.fromJson(json['op'] as String), + rowType = json['object_type'] as String?, + rowId = json['object_id'] as String?, + checksum = json['checksum'] as int, + data = switch (json['data']) { + String data => data, + var other => jsonEncode(other), + }, + subkey = switch (json['subkey']) { + String subkey => subkey, + _ => null, + }; + + Map? get parsedData { + return switch (data) { + final data? => jsonDecode(data) as Map, + null => null, + }; + } + + /// Key to uniquely represent a source entry in a bucket. + /// This is used to supersede old entries. + /// Relevant for put and remove ops. + String get key { + return "$rowType/$rowId/$subkey"; + } + + Map toJson() { + return { + 'op_id': opId, + 'op': op?.toJson(), + 'object_type': rowType, + 'object_id': rowId, + 'checksum': checksum, + 'subkey': subkey, + 'data': data + }; + } +} diff --git a/packages/powersync_core/pubspec.yaml b/packages/powersync_core/pubspec.yaml index f1c1cf29..be72adaa 100644 --- a/packages/powersync_core/pubspec.yaml +++ b/packages/powersync_core/pubspec.yaml @@ -8,7 +8,7 @@ environment: sdk: ^3.4.3 dependencies: - sqlite_async: ^0.11.2 + sqlite_async: ^0.11.4 # We only use sqlite3 as a transitive dependency, # but right now we need a minimum of v2.4.6. sqlite3: ^2.4.6 diff --git a/packages/powersync_core/test/connected_test.dart b/packages/powersync_core/test/connected_test.dart index d42afc52..560b925a 100644 --- a/packages/powersync_core/test/connected_test.dart +++ b/packages/powersync_core/test/connected_test.dart @@ -9,7 +9,6 @@ import 'package:powersync_core/powersync_core.dart'; import 'package:test/test.dart'; import 'server/sync_server/mock_sync_server.dart'; -import 'streaming_sync_test.dart'; import 'utils/abstract_test_utils.dart'; import 'utils/test_utils_impl.dart'; diff --git a/packages/powersync_core/test/disconnect_test.dart b/packages/powersync_core/test/disconnect_test.dart index 489928ef..a3706537 100644 --- a/packages/powersync_core/test/disconnect_test.dart +++ b/packages/powersync_core/test/disconnect_test.dart @@ -1,7 +1,7 @@ import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite_async.dart'; import 'package:test/test.dart'; -import 'streaming_sync_test.dart'; +import 'utils/abstract_test_utils.dart'; import 'utils/test_utils_impl.dart'; import 'watch_test.dart'; diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart new file mode 100644 index 00000000..ee5a0fc1 --- /dev/null +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -0,0 +1,175 @@ +import 'package:async/async.dart'; +import 'package:logging/logging.dart'; +import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/sqlite3_common.dart'; +import 'package:powersync_core/src/log_internal.dart'; +import 'package:powersync_core/src/streaming_sync.dart'; +import 'package:powersync_core/src/sync_types.dart'; +import 'package:test/test.dart'; + +import 'server/sync_server/in_memory_sync_server.dart'; +import 'utils/abstract_test_utils.dart'; +import 'utils/in_memory_http.dart'; +import 'utils/test_utils_impl.dart'; + +void main() { + group('in-memory sync tests', () { + late final testUtils = TestUtils(); + + late TestPowerSyncFactory factory; + late CommonDatabase raw; + late PowerSyncDatabase database; + late MockSyncService syncService; + late StreamingSyncImplementation syncClient; + + setUp(() async { + final (client, server) = inMemoryServer(); + syncService = MockSyncService(); + server.mount(syncService.router.call); + + factory = await testUtils.testFactory(); + (raw, database) = await factory.openInMemoryDatabase(); + await database.initialize(); + syncClient = database.connectWithMockService( + client, + TestConnector(() async { + return PowerSyncCredentials( + endpoint: server.url.toString(), + token: 'token not used here', + expiresAt: DateTime.now(), + ); + }), + ); + }); + + tearDown(() async { + await syncClient.abort(); + await database.close(); + await syncService.stop(); + }); + + Future> waitForConnection( + {bool expectNoWarnings = true}) async { + if (expectNoWarnings) { + isolateLogger.onRecord.listen((e) { + if (e.level >= Level.WARNING) { + fail('Unexpected log: $e'); + } + }); + } + syncClient.streamingSync(); + await syncService.waitForListener; + + expect(database.currentStatus.lastSyncedAt, isNull); + expect(database.currentStatus.downloading, isFalse); + final status = StreamQueue(database.statusStream); + addTearDown(status.cancel); + + syncService.addKeepAlive(); + await expectLater( + status, emits(isSyncStatus(connected: true, hasSynced: false))); + return status; + } + + test('persists completed sync information', () async { + final status = await waitForConnection(); + + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '0', + writeCheckpoint: null, + checksums: [BucketChecksum(bucket: 'bkt', checksum: 0)], + ) + }); + await expectLater(status, emits(isSyncStatus(downloading: true))); + + syncService.addLine({ + 'checkpoint_complete': {'last_op_id': '0'} + }); + await expectLater( + status, emits(isSyncStatus(downloading: false, hasSynced: true))); + + final independentDb = factory.wrapRaw(raw); + // Even though this database doesn't have a sync client attached to it, + // is should reconstruct hasSynced from the database. + await independentDb.initialize(); + expect(independentDb.currentStatus.hasSynced, isTrue); + }); + + test('can save independent buckets in same transaction', () async { + final status = await waitForConnection(); + + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '0', + writeCheckpoint: null, + checksums: [ + BucketChecksum(bucket: 'a', checksum: 0), + BucketChecksum(bucket: 'b', checksum: 0), + ], + ) + }); + await expectLater(status, emits(isSyncStatus(downloading: true))); + + var commits = 0; + raw.commits.listen((_) => commits++); + + syncService + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': >[ + { + 'op_id': '1', + 'op': 'PUT', + 'object_type': 'a', + 'object_id': '1', + 'checksum': 0, + 'data': {}, + } + ], + } + }) + ..addLine({ + 'data': { + 'bucket': 'b', + 'data': >[ + { + 'op_id': '2', + 'op': 'PUT', + 'object_type': 'b', + 'object_id': '1', + 'checksum': 0, + 'data': {}, + } + ], + } + }); + + // Wait for the operations to be inserted. + while (raw.select('SELECT * FROM ps_oplog;').length < 2) { + await pumpEventQueue(); + } + + // The two buckets should have been inserted in a single transaction + // because the messages were received in quick succession. + expect(commits, 1); + }); + }); +} + +TypeMatcher isSyncStatus( + {Object? downloading, Object? connected, Object? hasSynced}) { + var matcher = isA(); + if (downloading != null) { + matcher = matcher.having((e) => e.downloading, 'downloading', downloading); + } + if (connected != null) { + matcher = matcher.having((e) => e.connected, 'connected', connected); + } + if (hasSynced != null) { + matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced); + } + + return matcher; +} diff --git a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart new file mode 100644 index 00000000..b31dffc2 --- /dev/null +++ b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart @@ -0,0 +1,64 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:shelf/shelf.dart'; +import 'package:shelf_router/shelf_router.dart'; + +final class MockSyncService { + // Use a queued stream to make tests easier. + StreamController _controller = StreamController(); + Completer _listener = Completer(); + + final router = Router(); + + MockSyncService() { + router + ..post('/sync/stream', (Request request) async { + _listener.complete(); + // Respond immediately with a stream + return Response.ok(_controller.stream.transform(utf8.encoder), + headers: { + 'Content-Type': 'application/x-ndjson', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + context: { + "shelf.io.buffer_output": false + }); + }) + ..get('/write-checkpoint2.json', (request) { + return Response.ok('{"data": {"write_checkpoint": "10"}}', headers: { + 'Content-Type': 'application/json', + }); + }); + } + + Future get waitForListener => _listener.future; + + // Queue events which will be sent to connected clients. + void addRawEvent(String data) { + _controller.add(data); + } + + void addLine(Object? message) { + addRawEvent('${json.encode(message)}\n'); + } + + void addKeepAlive([int tokenExpiresIn = 3600]) { + addLine({'token_expires_in': tokenExpiresIn}); + } + + // Clear events. We rely on a buffered controller here. Create a new controller + // in order to clear the buffer. + Future clearEvents() async { + await _controller.close(); + _listener = Completer(); + _controller = StreamController(); + } + + Future stop() async { + if (_controller.hasListener) { + await _controller.close(); + } + } +} diff --git a/packages/powersync_core/test/server/sync_server/mock_sync_server.dart b/packages/powersync_core/test/server/sync_server/mock_sync_server.dart index 9844692f..e4710f3d 100644 --- a/packages/powersync_core/test/server/sync_server/mock_sync_server.dart +++ b/packages/powersync_core/test/server/sync_server/mock_sync_server.dart @@ -1,58 +1,37 @@ import 'dart:async'; -import 'dart:convert'; import 'dart:io'; -import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart' as io; -import 'package:shelf_router/shelf_router.dart'; + +import 'in_memory_sync_server.dart'; // A basic Mock PowerSync service server which queues commands // which clients can receive via connecting to the `/sync/stream` route. // This assumes only one client will ever be connected at a time. class TestHttpServerHelper { - // Use a queued stream to make tests easier. - StreamController _controller = StreamController(); + final MockSyncService service = MockSyncService(); late HttpServer _server; + Uri get uri => Uri.parse('http://localhost:${_server.port}'); Future start() async { - final router = Router() - ..post('/sync/stream', (Request request) async { - // Respond immediately with a stream - return Response.ok(_controller.stream.transform(utf8.encoder), - headers: { - 'Content-Type': 'application/x-ndjson', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - }, - context: { - "shelf.io.buffer_output": false - }); - }) - ..get('/write-checkpoint2.json', (request) { - return Response.ok('{"data": {"write_checkpoint": "10"}}', headers: { - 'Content-Type': 'application/json', - }); - }); - - _server = await io.serve(router.call, 'localhost', 0); + _server = await io.serve(service.router.call, 'localhost', 0); print('Test server running at ${_server.address}:${_server.port}'); } // Queue events which will be sent to connected clients. void addEvent(String data) { - _controller.add(data); + service.addRawEvent(data); } // Clear events. We rely on a buffered controller here. Create a new controller // in order to clear the buffer. Future clearEvents() async { - await _controller.close(); - _controller = StreamController(); + await service.clearEvents(); } Future stop() async { - await _controller.close(); + await service.stop(); await _server.close(); } } diff --git a/packages/powersync_core/test/streaming_sync_test.dart b/packages/powersync_core/test/streaming_sync_test.dart index ae9e95b1..ed528869 100644 --- a/packages/powersync_core/test/streaming_sync_test.dart +++ b/packages/powersync_core/test/streaming_sync_test.dart @@ -9,29 +9,11 @@ import 'package:powersync_core/powersync_core.dart'; import 'package:test/test.dart'; import 'test_server.dart'; +import 'utils/abstract_test_utils.dart'; import 'utils/test_utils_impl.dart'; final testUtils = TestUtils(); -class TestConnector extends PowerSyncBackendConnector { - final Future Function() _fetchCredentials; - final Future Function(PowerSyncDatabase)? _uploadData; - - TestConnector(this._fetchCredentials, - {Future Function(PowerSyncDatabase)? uploadData}) - : _uploadData = uploadData; - - @override - Future fetchCredentials() { - return _fetchCredentials(); - } - - @override - Future uploadData(PowerSyncDatabase database) async { - await _uploadData?.call(database); - } -} - void main() { group('Streaming Sync Test', () { late String path; diff --git a/packages/powersync_core/test/sync_types_test.dart b/packages/powersync_core/test/sync_types_test.dart new file mode 100644 index 00000000..fa4e71fa --- /dev/null +++ b/packages/powersync_core/test/sync_types_test.dart @@ -0,0 +1,218 @@ +import 'dart:async'; + +import 'package:powersync_core/src/sync_types.dart'; +import 'package:test/test.dart'; + +void main() { + group('Sync types', () { + test('parses JSON stream', () { + final source = StreamController>(); + expect( + source.stream.transform(StreamingSyncLine.reader), + emitsInOrder([ + isA(), + isA(), + isA(), + isA(), + isA(), + isA(), + emitsDone, + ]), + ); + + source + ..add({'token_expires_in': 10}) + ..add({ + 'data': { + 'bucket': 'a', + 'data': >[], + 'hasMore': false + } + }) + ..add({ + 'checkpoint_complete': {'last_op_id': '10'} + }) + ..add({ + 'checkpoint': { + 'last_op_id': '10', + 'write_checkpoint': null, + 'buckets': >[], + } + }) + ..add({ + 'checkpoint_diff': { + 'last_op_id': '10', + 'write_checkpoint': null, + 'updated_buckets': >[], + 'removed_buckets': >[], + } + }) + ..add({'invalid_line': ''}) + ..close(); + }); + + test('can group data lines', () { + final source = StreamController>(); + expect( + source.stream.transform(StreamingSyncLine.reader), + emits( + isA() + .having((e) => e.buckets, 'buckets', hasLength(2)), + ), + ); + + source + ..add({ + 'data': { + 'bucket': 'a', + 'data': >[], + 'hasMore': false + } + }) + ..add({ + 'data': { + 'bucket': 'b', + 'data': >[], + 'hasMore': false + } + }); + }); + + test('flushes pending data lines before closing', () { + final source = StreamController>(); + expect( + source.stream.transform(StreamingSyncLine.reader), + emitsInOrder([ + isA(), + emitsDone, + ]), + ); + + source + ..add({ + 'data': { + 'bucket': 'a', + 'data': >[], + 'hasMore': false + } + }) + ..close(); + }); + + test('data line grouping keeps order', () { + final source = StreamController>(); + expect( + source.stream.transform(StreamingSyncLine.reader), + emitsInOrder([ + isA(), + isA(), + isA(), + emitsDone, + ]), + ); + + source + ..add({ + 'data': { + 'bucket': 'a', + 'data': >[], + 'hasMore': false + } + }) + ..add({ + 'checkpoint_complete': {'last_op_id': '10'} + }) + ..add({ + 'data': { + 'bucket': 'b', + 'data': >[], + 'hasMore': false + } + }) + ..close(); + }); + + test('does not combine large batches', () async { + final source = StreamController>(); + expect( + source.stream.transform(StreamingSyncLine.reader), + emitsInOrder([ + isA() + .having((e) => e.totalOperations, 'totalOperations', 1), + isA() + .having((e) => e.totalOperations, 'totalOperations', 150), + ]), + ); + + source + ..add({ + 'data': { + 'bucket': 'a', + 'data': >[ + { + 'op_id': '0', + 'op': 'PUT', + 'object_type': 'a', + 'object_id': '0', + 'checksum': 0, + 'data': {}, + } + ], + 'hasMore': false + } + }) + ..add({ + 'data': { + 'bucket': 'a', + 'data': >[ + for (var i = 1; i <= 150; i++) + { + 'op_id': '$i', + 'op': 'PUT', + 'object_type': 'a', + 'object_id': '$i', + 'checksum': 0, + 'data': {}, + } + ], + 'hasMore': false + } + }); + }); + + test('flushes when internal buffer gets too large', () { + final source = StreamController>(); + expect( + source.stream.transform(StreamingSyncLine.reader), + emitsInOrder([ + isA() + .having((e) => e.totalOperations, 'totalOperations', 1000), + isA() + .having((e) => e.totalOperations, 'totalOperations', 500), + ]), + ); + + // Add 1500 operations in chunks of 100 items. This should emit an + // 1000-item chunk and another one for the rest. + for (var i = 0; i < 15; i++) { + source.add({ + 'data': { + 'bucket': 'a', + 'data': >[ + for (var i = 0; i < 100; i++) + { + 'op_id': '1', + 'op': 'PUT', + 'object_type': 'a', + 'object_id': '1', + 'checksum': 0, + 'data': {}, + } + ], + 'hasMore': false + } + }); + } + }); + }); +} diff --git a/packages/powersync_core/test/upload_test.dart b/packages/powersync_core/test/upload_test.dart index 62e952ed..4bed1499 100644 --- a/packages/powersync_core/test/upload_test.dart +++ b/packages/powersync_core/test/upload_test.dart @@ -14,23 +14,6 @@ const testId2 = "2290de4f-0488-4e50-abed-f8e8eb1d0b43"; const partialWarning = 'Potentially previously uploaded CRUD entries are still present'; -class TestConnector extends PowerSyncBackendConnector { - final Future Function() _fetchCredentials; - final Future Function(PowerSyncDatabase database) _uploadData; - - TestConnector(this._fetchCredentials, this._uploadData); - - @override - Future fetchCredentials() { - return _fetchCredentials(); - } - - @override - Future uploadData(PowerSyncDatabase database) async { - return _uploadData(database); - } -} - void main() { group('CRUD Tests', () { late PowerSyncDatabase powersync; @@ -70,7 +53,8 @@ void main() { // Use a short retry delay here. // A zero retry delay makes this test unstable, since it expects `2` error logs later. powersync.retryDelay = Duration(milliseconds: 100); - var connector = TestConnector(credentialsCallback, uploadData); + var connector = + TestConnector(credentialsCallback, uploadData: uploadData); powersync.connect(connector: connector); // Create something with CRUD in it. diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 561e5c3b..48c73e03 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -1,5 +1,8 @@ +import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/streaming_sync.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test_api/src/backend/invoker.dart'; @@ -52,6 +55,23 @@ Logger _makeTestLogger({Level level = Level.ALL, String? name}) { return logger; } +abstract mixin class TestPowerSyncFactory implements PowerSyncOpenFactory { + Future openRawInMemoryDatabase(); + + Future<(CommonDatabase, PowerSyncDatabase)> openInMemoryDatabase() async { + final raw = await openRawInMemoryDatabase(); + return (raw, wrapRaw(raw)); + } + + PowerSyncDatabase wrapRaw(CommonDatabase raw) { + return PowerSyncDatabase.withDatabase( + schema: schema, + database: SqliteDatabase.singleConnection( + SqliteConnection.synchronousWrapper(raw)), + ); + } +} + abstract class AbstractTestUtils { String get _testName => Invoker.current!.liveTest.test.name; @@ -63,12 +83,10 @@ abstract class AbstractTestUtils { } /// Generates a test open factory - Future testFactory( + Future testFactory( {String? path, String sqlitePath = '', - SqliteOptions options = const SqliteOptions.defaults()}) async { - return PowerSyncOpenFactory(path: path ?? dbPath(), sqliteOptions: options); - } + SqliteOptions options = const SqliteOptions.defaults()}); /// Creates a SqliteDatabaseConnection Future setupPowerSync( @@ -93,3 +111,41 @@ abstract class AbstractTestUtils { /// Deletes any DB data Future cleanDb({required String path}); } + +class TestConnector extends PowerSyncBackendConnector { + final Future Function() _fetchCredentials; + final Future Function(PowerSyncDatabase)? _uploadData; + + TestConnector(this._fetchCredentials, + {Future Function(PowerSyncDatabase)? uploadData}) + : _uploadData = uploadData; + + @override + Future fetchCredentials() { + return _fetchCredentials(); + } + + @override + Future uploadData(PowerSyncDatabase database) async { + await _uploadData?.call(database); + } +} + +extension MockSync on PowerSyncDatabase { + StreamingSyncImplementation connectWithMockService( + Client client, PowerSyncBackendConnector connector) { + final impl = StreamingSyncImplementation( + adapter: BucketStorage(this), + client: client, + retryDelay: const Duration(seconds: 5), + credentialsCallback: connector.getCredentialsCached, + invalidCredentialsCallback: connector.prefetchCredentials, + uploadCrud: () => connector.uploadData(this), + crudUpdateTriggerStream: database + .onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)), + ); + impl.statusStream.listen(setStatus); + + return impl; + } +} diff --git a/packages/powersync_core/test/utils/in_memory_http.dart b/packages/powersync_core/test/utils/in_memory_http.dart new file mode 100644 index 00000000..61550e3c --- /dev/null +++ b/packages/powersync_core/test/utils/in_memory_http.dart @@ -0,0 +1,56 @@ +import 'package:http/http.dart'; +import 'package:http/testing.dart'; +import 'package:shelf/shelf.dart' as shelf; + +final Uri mockHttpUri = Uri.parse('https://testing.powersync.com/'); + +/// Returns a [Client] that can send HTTP requests to the returned +/// [shelf.Server]. +/// +/// The server can be used to serve shelf routes via [shelf.Server.mount]. +(Client, shelf.Server) inMemoryServer() { + final server = _MockServer(); + final client = MockClient.streaming(server.handleRequest); + + return (client, server); +} + +final class _MockServer implements shelf.Server { + shelf.Handler? _handler; + + @override + void mount(shelf.Handler handler) { + if (_handler != null) { + throw StateError('already has a handler'); + } + + _handler = handler; + } + + @override + Future close() async {} + + @override + Uri get url => mockHttpUri; + + Future handleRequest( + BaseRequest request, ByteStream body) async { + if (_handler case final endpoint?) { + final shelfRequest = shelf.Request( + request.method, + request.url, + headers: request.headers, + body: body, + ); + final shelfResponse = await endpoint(shelfRequest); + + return StreamedResponse( + shelfResponse.read(), + shelfResponse.statusCode, + headers: shelfResponse.headers, + ); + } else { + throw StateError('Request before handler was set on mock server'); + } + } +} diff --git a/packages/powersync_core/test/utils/native_test_utils.dart b/packages/powersync_core/test/utils/native_test_utils.dart index ab8cfd6a..915af2a0 100644 --- a/packages/powersync_core/test/utils/native_test_utils.dart +++ b/packages/powersync_core/test/utils/native_test_utils.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:ffi'; import 'dart:io'; import 'package:powersync_core/powersync_core.dart'; +import 'package:sqlite3/sqlite3.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite3/open.dart' as sqlite_open; @@ -10,20 +11,31 @@ import 'abstract_test_utils.dart'; const defaultSqlitePath = 'libsqlite3.so.0'; -class TestOpenFactory extends PowerSyncOpenFactory { +class TestOpenFactory extends PowerSyncOpenFactory with TestPowerSyncFactory { TestOpenFactory({required super.path}); - @override - CommonDatabase open(SqliteOpenOptions options) { + void applyOpenOverride() { sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.linux, () { return DynamicLibrary.open('libsqlite3.so.0'); }); sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.macOS, () { return DynamicLibrary.open('libsqlite3.dylib'); }); + } + + @override + CommonDatabase open(SqliteOpenOptions options) { + applyOpenOverride(); return super.open(options); } + @override + void enableExtension() { + var powersyncLib = getLibraryForPlatform(); + sqlite3.ensureExtensionLoaded(SqliteExtension.inLibrary( + DynamicLibrary.open(powersyncLib), 'sqlite3_powersync_init')); + } + @override String getLibraryForPlatform({String? path = "."}) { switch (Abi.current()) { @@ -52,6 +64,22 @@ class TestOpenFactory extends PowerSyncOpenFactory { ); } } + + @override + Future openRawInMemoryDatabase() async { + applyOpenOverride(); + + try { + enableExtension(); + } on PowersyncNotReadyException catch (e) { + autoLogger.severe(e.message); + rethrow; + } + + final db = sqlite3.openInMemory(); + setupFunctions(db); + return db; + } } class TestUtils extends AbstractTestUtils { diff --git a/packages/powersync_core/test/utils/stub_test_utils.dart b/packages/powersync_core/test/utils/stub_test_utils.dart index 3f86512c..3755e47f 100644 --- a/packages/powersync_core/test/utils/stub_test_utils.dart +++ b/packages/powersync_core/test/utils/stub_test_utils.dart @@ -1,6 +1,16 @@ +import 'package:sqlite_async/src/sqlite_options.dart'; + import 'abstract_test_utils.dart'; class TestUtils extends AbstractTestUtils { + @override + Future testFactory( + {String? path, + String sqlitePath = '', + SqliteOptions options = const SqliteOptions.defaults()}) { + throw UnimplementedError(); + } + @override Future cleanDb({required String path}) { throw UnimplementedError(); diff --git a/packages/powersync_core/test/utils/web_test_utils.dart b/packages/powersync_core/test/utils/web_test_utils.dart index 71a462a2..678af784 100644 --- a/packages/powersync_core/test/utils/web_test_utils.dart +++ b/packages/powersync_core/test/utils/web_test_utils.dart @@ -3,7 +3,7 @@ import 'dart:js_interop'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; -import 'package:sqlite_async/sqlite3_common.dart'; +import 'package:sqlite3/wasm.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; import 'package:web/web.dart' show Blob, BlobPropertyBag; @@ -12,6 +12,29 @@ import 'abstract_test_utils.dart'; @JS('URL.createObjectURL') external String _createObjectURL(Blob blob); +class TestOpenFactory extends PowerSyncOpenFactory with TestPowerSyncFactory { + TestOpenFactory({required super.path, super.sqliteOptions}); + + @override + Future openRawInMemoryDatabase() async { + final sqlite = await WasmSqlite3.loadFromUrl( + Uri.parse(sqliteOptions.webSqliteOptions.wasmUri)); + sqlite.registerVirtualFileSystem(InMemoryFileSystem(), makeDefault: true); + + final db = sqlite.openInMemory(); + + try { + enableExtension(); + } on PowersyncNotReadyException catch (e) { + autoLogger.severe(e.message); + rethrow; + } + + setupFunctions(db); + return db; + } +} + class TestUtils extends AbstractTestUtils { late Future _isInitialized; late final String sqlite3WASMUri; @@ -39,7 +62,7 @@ class TestUtils extends AbstractTestUtils { Future cleanDb({required String path}) async {} @override - Future testFactory( + Future testFactory( {String? path, String? sqlitePath, SqliteOptions options = const SqliteOptions.defaults()}) async { @@ -48,7 +71,7 @@ class TestUtils extends AbstractTestUtils { final webOptions = SqliteOptions( webSqliteOptions: WebSqliteOptions(wasmUri: sqlite3WASMUri, workerUri: workerUri)); - return super.testFactory(path: path, options: webOptions); + return TestOpenFactory(path: path ?? '', sqliteOptions: webOptions); } @override