Skip to content

Commit 6c5337b

Browse files
committed
Add size restriction on internal buffer
1 parent e2ef493 commit 6c5337b

File tree

2 files changed

+97
-1
lines changed

2 files changed

+97
-1
lines changed

packages/powersync_core/lib/src/sync_types.dart

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,20 @@ final class _StreamingSyncLineParser
5959
@override
6060
void add(Map<String, dynamic> event) {
6161
final parsed = StreamingSyncLine.fromJson(event);
62-
if (parsed is SyncDataBatch) {
62+
63+
// Buffer small batches and group them to reduce amounts of transactions
64+
// used to store them.
65+
if (parsed is SyncDataBatch && parsed.totalOperations <= 100) {
6366
if (_pendingBatch case (final batch, _)?) {
6467
// Add this line to the pending batch of data items
6568
batch.buckets.addAll(parsed.buckets);
69+
70+
if (batch.totalOperations >= 1000) {
71+
// This is unlikely to happen since we're only buffering for a single
72+
// event loop iteration, but make sure we're not keeping huge amonts
73+
// of data in memory.
74+
_flushBatch();
75+
}
6676
} else {
6777
// Insert of adding this batch directly, keep it buffered here for a
6878
// while so that we can add new entries to it.
@@ -244,6 +254,9 @@ class BucketRequest {
244254
final class SyncDataBatch extends StreamingSyncLine {
245255
List<SyncBucketData> buckets;
246256

257+
int get totalOperations =>
258+
buckets.fold(0, (prev, data) => prev + data.data.length);
259+
247260
SyncDataBatch(this.buckets);
248261
}
249262

packages/powersync_core/test/sync_types_test.dart

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,88 @@ void main() {
131131
})
132132
..close();
133133
});
134+
135+
test('does not combine large batches', () async {
136+
final source = StreamController<Map<String, dynamic>>();
137+
expect(
138+
source.stream.transform(StreamingSyncLine.reader),
139+
emitsInOrder([
140+
isA<SyncDataBatch>()
141+
.having((e) => e.totalOperations, 'totalOperations', 1),
142+
isA<SyncDataBatch>()
143+
.having((e) => e.totalOperations, 'totalOperations', 150),
144+
]),
145+
);
146+
147+
source
148+
..add({
149+
'data': {
150+
'bucket': 'a',
151+
'data': <Map<String, Object?>>[
152+
{
153+
'op_id': '0',
154+
'op': 'PUT',
155+
'object_type': 'a',
156+
'object_id': '0',
157+
'checksum': 0,
158+
'data': {},
159+
}
160+
],
161+
'hasMore': false
162+
}
163+
})
164+
..add({
165+
'data': {
166+
'bucket': 'a',
167+
'data': <Map<String, Object?>>[
168+
for (var i = 1; i <= 150; i++)
169+
{
170+
'op_id': '$i',
171+
'op': 'PUT',
172+
'object_type': 'a',
173+
'object_id': '$i',
174+
'checksum': 0,
175+
'data': {},
176+
}
177+
],
178+
'hasMore': false
179+
}
180+
});
181+
});
182+
183+
test('flushes when internal buffer gets too large', () {
184+
final source = StreamController<Map<String, dynamic>>();
185+
expect(
186+
source.stream.transform(StreamingSyncLine.reader),
187+
emitsInOrder([
188+
isA<SyncDataBatch>()
189+
.having((e) => e.totalOperations, 'totalOperations', 1000),
190+
isA<SyncDataBatch>()
191+
.having((e) => e.totalOperations, 'totalOperations', 500),
192+
]),
193+
);
194+
195+
// Add 1500 operations in chunks of 100 items. This should emit an
196+
// 1000-item chunk and another one for the rest.
197+
for (var i = 0; i < 15; i++) {
198+
source.add({
199+
'data': {
200+
'bucket': 'a',
201+
'data': <Map<String, Object?>>[
202+
for (var i = 0; i < 100; i++)
203+
{
204+
'op_id': '1',
205+
'op': 'PUT',
206+
'object_type': 'a',
207+
'object_id': '1',
208+
'checksum': 0,
209+
'data': {},
210+
}
211+
],
212+
'hasMore': false
213+
}
214+
});
215+
}
216+
});
134217
});
135218
}

0 commit comments

Comments
 (0)