Skip to content

Commit 1f9268f

Browse files
authored
Merge pull request #242 from powersync-ja/feat/group-sync-lines
Small streaming sync improvements
2 parents 15d1682 + 6c5337b commit 1f9268f

18 files changed

+1002
-331
lines changed

packages/powersync_core/lib/src/bucket_storage.dart

Lines changed: 0 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -344,121 +344,6 @@ class BucketState {
344344
}
345345
}
346346

347-
class SyncDataBatch {
348-
List<SyncBucketData> buckets;
349-
350-
SyncDataBatch(this.buckets);
351-
}
352-
353-
class SyncBucketData {
354-
final String bucket;
355-
final List<OplogEntry> data;
356-
final bool hasMore;
357-
final String? after;
358-
final String? nextAfter;
359-
360-
const SyncBucketData(
361-
{required this.bucket,
362-
required this.data,
363-
this.hasMore = false,
364-
this.after,
365-
this.nextAfter});
366-
367-
SyncBucketData.fromJson(Map<String, dynamic> json)
368-
: bucket = json['bucket'] as String,
369-
hasMore = json['has_more'] as bool? ?? false,
370-
after = json['after'] as String?,
371-
nextAfter = json['next_after'] as String?,
372-
data = (json['data'] as List)
373-
.map((e) => OplogEntry.fromJson(e as Map<String, dynamic>))
374-
.toList();
375-
376-
Map<String, dynamic> toJson() {
377-
return {
378-
'bucket': bucket,
379-
'has_more': hasMore,
380-
'after': after,
381-
'next_after': nextAfter,
382-
'data': data
383-
};
384-
}
385-
}
386-
387-
class OplogEntry {
388-
final String opId;
389-
390-
final OpType? op;
391-
392-
/// rowType + rowId uniquely identifies an entry in the local database.
393-
final String? rowType;
394-
final String? rowId;
395-
396-
/// Together with rowType and rowId, this uniquely identifies a source entry
397-
/// per bucket in the oplog. There may be multiple source entries for a single
398-
/// "rowType + rowId" combination.
399-
final String? subkey;
400-
401-
final String? data;
402-
final int checksum;
403-
404-
const OplogEntry(
405-
{required this.opId,
406-
required this.op,
407-
this.subkey,
408-
this.rowType,
409-
this.rowId,
410-
this.data,
411-
required this.checksum});
412-
413-
OplogEntry.fromJson(Map<String, dynamic> json)
414-
: opId = json['op_id'] as String,
415-
op = OpType.fromJson(json['op'] as String),
416-
rowType = json['object_type'] as String?,
417-
rowId = json['object_id'] as String?,
418-
checksum = json['checksum'] as int,
419-
data = switch (json['data']) {
420-
String data => data,
421-
var other => jsonEncode(other),
422-
},
423-
subkey = switch (json['subkey']) {
424-
String subkey => subkey,
425-
_ => null,
426-
};
427-
428-
Map<String, dynamic>? get parsedData {
429-
return switch (data) {
430-
final data? => jsonDecode(data) as Map<String, dynamic>,
431-
null => null,
432-
};
433-
}
434-
435-
/// Key to uniquely represent a source entry in a bucket.
436-
/// This is used to supersede old entries.
437-
/// Relevant for put and remove ops.
438-
String get key {
439-
return "$rowType/$rowId/$subkey";
440-
}
441-
442-
Map<String, dynamic> toJson() {
443-
return {
444-
'op_id': opId,
445-
'op': op?.toJson(),
446-
'object_type': rowType,
447-
'object_id': rowId,
448-
'checksum': checksum,
449-
'subkey': subkey,
450-
'data': data
451-
};
452-
}
453-
}
454-
455-
class SqliteOp {
456-
String sql;
457-
List<dynamic> args;
458-
459-
SqliteOp(this.sql, this.args);
460-
}
461-
462347
class SyncLocalDatabaseResult {
463348
final bool ready;
464349
final bool checkpointValid;

packages/powersync_core/lib/src/stream_utils.dart

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,6 @@ Stream<Object?> ndjson(ByteStream input) {
6868
return jsonInput;
6969
}
7070

71-
/// Given a raw ByteStream, parse each line as JSON.
72-
Stream<String> newlines(ByteStream input) {
73-
final textInput = input.transform(convert.utf8.decoder);
74-
final lineInput = textInput.transform(const convert.LineSplitter());
75-
return lineInput;
76-
}
77-
7871
void pauseAll(List<StreamSubscription<void>> subscriptions) {
7972
for (var sub in subscriptions) {
8073
sub.pause();

packages/powersync_core/lib/src/streaming_sync.dart

Lines changed: 101 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class StreamingSyncImplementation implements StreamingSync {
5353

5454
late final http.Client _client;
5555

56-
final StreamController<String?> _localPingController =
56+
final StreamController<Null> _localPingController =
5757
StreamController.broadcast();
5858

5959
final Duration retryDelay;
@@ -340,96 +340,19 @@ class StreamingSyncImplementation implements StreamingSync {
340340
}
341341

342342
_updateStatus(connected: true, connecting: false);
343-
if (line is Checkpoint) {
344-
targetCheckpoint = line;
345-
final Set<String> bucketsToDelete = {...bucketSet};
346-
final Set<String> newBuckets = {};
347-
for (final checksum in line.checksums) {
348-
newBuckets.add(checksum.bucket);
349-
bucketsToDelete.remove(checksum.bucket);
350-
}
351-
bucketSet = newBuckets;
352-
await adapter.removeBuckets([...bucketsToDelete]);
353-
_updateStatus(downloading: true);
354-
} else if (line is StreamingSyncCheckpointComplete) {
355-
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
356-
if (!result.checkpointValid) {
357-
// This means checksums failed. Start again with a new checkpoint.
358-
// TODO: better back-off
359-
// await new Promise((resolve) => setTimeout(resolve, 50));
360-
return false;
361-
} else if (!result.ready) {
362-
// Checksums valid, but need more data for a consistent checkpoint.
363-
// Continue waiting.
364-
} else {
365-
appliedCheckpoint = targetCheckpoint;
366-
367-
_updateStatus(
368-
downloading: false,
369-
downloadError: _noError,
370-
lastSyncedAt: DateTime.now());
371-
}
372-
373-
validatedCheckpoint = targetCheckpoint;
374-
} else if (line is StreamingSyncCheckpointDiff) {
375-
// TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint
376-
if (targetCheckpoint == null) {
377-
throw PowerSyncProtocolException(
378-
'Checkpoint diff without previous checkpoint');
379-
}
380-
_updateStatus(downloading: true);
381-
final diff = line;
382-
final Map<String, BucketChecksum> newBuckets = {};
383-
for (var checksum in targetCheckpoint.checksums) {
384-
newBuckets[checksum.bucket] = checksum;
385-
}
386-
for (var checksum in diff.updatedBuckets) {
387-
newBuckets[checksum.bucket] = checksum;
388-
}
389-
for (var bucket in diff.removedBuckets) {
390-
newBuckets.remove(bucket);
391-
}
392-
393-
final newCheckpoint = Checkpoint(
394-
lastOpId: diff.lastOpId,
395-
checksums: [...newBuckets.values],
396-
writeCheckpoint: diff.writeCheckpoint);
397-
targetCheckpoint = newCheckpoint;
398-
399-
bucketSet = Set.from(newBuckets.keys);
400-
await adapter.removeBuckets(diff.removedBuckets);
401-
adapter.setTargetCheckpoint(targetCheckpoint);
402-
} else if (line is SyncBucketData) {
403-
_updateStatus(downloading: true);
404-
await adapter.saveSyncData(SyncDataBatch([line]));
405-
} else if (line is StreamingSyncKeepalive) {
406-
if (line.tokenExpiresIn == 0) {
407-
// Token expired already - stop the connection immediately
408-
invalidCredentialsCallback?.call().ignore();
409-
break;
410-
} else if (line.tokenExpiresIn <= 30) {
411-
// Token expires soon - refresh it in the background
412-
if (credentialsInvalidation == null &&
413-
invalidCredentialsCallback != null) {
414-
credentialsInvalidation = invalidCredentialsCallback!().then((_) {
415-
// Token has been refreshed - we should restart the connection.
416-
haveInvalidated = true;
417-
// trigger next loop iteration ASAP, don't wait for another
418-
// message from the server.
419-
_localPingController.add(null);
420-
}, onError: (_) {
421-
// Token refresh failed - retry on next keepalive.
422-
credentialsInvalidation = null;
423-
});
343+
switch (line) {
344+
case Checkpoint():
345+
targetCheckpoint = line;
346+
final Set<String> bucketsToDelete = {...bucketSet};
347+
final Set<String> newBuckets = {};
348+
for (final checksum in line.checksums) {
349+
newBuckets.add(checksum.bucket);
350+
bucketsToDelete.remove(checksum.bucket);
424351
}
425-
}
426-
} else {
427-
if (targetCheckpoint == appliedCheckpoint) {
428-
_updateStatus(
429-
downloading: false,
430-
downloadError: _noError,
431-
lastSyncedAt: DateTime.now());
432-
} else if (validatedCheckpoint == targetCheckpoint) {
352+
bucketSet = newBuckets;
353+
await adapter.removeBuckets([...bucketsToDelete]);
354+
_updateStatus(downloading: true);
355+
case StreamingSyncCheckpointComplete():
433356
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
434357
if (!result.checkpointValid) {
435358
// This means checksums failed. Start again with a new checkpoint.
@@ -447,7 +370,88 @@ class StreamingSyncImplementation implements StreamingSync {
447370
downloadError: _noError,
448371
lastSyncedAt: DateTime.now());
449372
}
450-
}
373+
374+
validatedCheckpoint = targetCheckpoint;
375+
case StreamingSyncCheckpointDiff():
376+
// TODO: It may be faster to just keep track of the diff, instead of
377+
// the entire checkpoint
378+
if (targetCheckpoint == null) {
379+
throw PowerSyncProtocolException(
380+
'Checkpoint diff without previous checkpoint');
381+
}
382+
_updateStatus(downloading: true);
383+
final diff = line;
384+
final Map<String, BucketChecksum> newBuckets = {};
385+
for (var checksum in targetCheckpoint.checksums) {
386+
newBuckets[checksum.bucket] = checksum;
387+
}
388+
for (var checksum in diff.updatedBuckets) {
389+
newBuckets[checksum.bucket] = checksum;
390+
}
391+
for (var bucket in diff.removedBuckets) {
392+
newBuckets.remove(bucket);
393+
}
394+
395+
final newCheckpoint = Checkpoint(
396+
lastOpId: diff.lastOpId,
397+
checksums: [...newBuckets.values],
398+
writeCheckpoint: diff.writeCheckpoint);
399+
targetCheckpoint = newCheckpoint;
400+
401+
bucketSet = Set.from(newBuckets.keys);
402+
await adapter.removeBuckets(diff.removedBuckets);
403+
adapter.setTargetCheckpoint(targetCheckpoint);
404+
case SyncDataBatch():
405+
_updateStatus(downloading: true);
406+
await adapter.saveSyncData(line);
407+
case StreamingSyncKeepalive(:final tokenExpiresIn):
408+
if (tokenExpiresIn == 0) {
409+
// Token expired already - stop the connection immediately
410+
invalidCredentialsCallback?.call().ignore();
411+
break;
412+
} else if (tokenExpiresIn <= 30) {
413+
// Token expires soon - refresh it in the background
414+
if (credentialsInvalidation == null &&
415+
invalidCredentialsCallback != null) {
416+
credentialsInvalidation = invalidCredentialsCallback!().then((_) {
417+
// Token has been refreshed - we should restart the connection.
418+
haveInvalidated = true;
419+
// trigger next loop iteration ASAP, don't wait for another
420+
// message from the server.
421+
_localPingController.add(null);
422+
}, onError: (_) {
423+
// Token refresh failed - retry on next keepalive.
424+
credentialsInvalidation = null;
425+
});
426+
}
427+
}
428+
case UnknownSyncLine(:final rawData):
429+
isolateLogger.fine('Unknown sync line: $rawData');
430+
case null: // Local ping
431+
if (targetCheckpoint == appliedCheckpoint) {
432+
_updateStatus(
433+
downloading: false,
434+
downloadError: _noError,
435+
lastSyncedAt: DateTime.now());
436+
} else if (validatedCheckpoint == targetCheckpoint) {
437+
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
438+
if (!result.checkpointValid) {
439+
// This means checksums failed. Start again with a new checkpoint.
440+
// TODO: better back-off
441+
// await new Promise((resolve) => setTimeout(resolve, 50));
442+
return false;
443+
} else if (!result.ready) {
444+
// Checksums valid, but need more data for a consistent checkpoint.
445+
// Continue waiting.
446+
} else {
447+
appliedCheckpoint = targetCheckpoint;
448+
449+
_updateStatus(
450+
downloading: false,
451+
downloadError: _noError,
452+
lastSyncedAt: DateTime.now());
453+
}
454+
}
451455
}
452456

453457
if (haveInvalidated) {
@@ -458,7 +462,8 @@ class StreamingSyncImplementation implements StreamingSync {
458462
return true;
459463
}
460464

461-
Stream<Object?> streamingSyncRequest(StreamingSyncRequest data) async* {
465+
Stream<StreamingSyncLine> streamingSyncRequest(
466+
StreamingSyncRequest data) async* {
462467
final credentials = await credentialsCallback();
463468
if (credentials == null) {
464469
throw CredentialsException('Not logged in');
@@ -494,12 +499,10 @@ class StreamingSyncImplementation implements StreamingSync {
494499
}
495500

496501
// Note: The response stream is automatically closed when this loop errors
497-
await for (var line in ndjson(res.stream)) {
498-
if (aborted) {
499-
break;
500-
}
501-
yield parseStreamingSyncLine(line as Map<String, dynamic>);
502-
}
502+
yield* ndjson(res.stream)
503+
.cast<Map<String, dynamic>>()
504+
.transform(StreamingSyncLine.reader)
505+
.takeWhile((_) => !aborted);
503506
}
504507

505508
/// Delays the standard `retryDelay` Duration, but exits early if

0 commit comments

Comments
 (0)