Skip to content

Commit e2ef493

Browse files
committed
Add test
1 parent d74a3b4 commit e2ef493

15 files changed

+571
-80
lines changed

packages/powersync_core/lib/src/sync_types.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ final class _StreamingSyncLineParser
6767
// Insert of adding this batch directly, keep it buffered here for a
6868
// while so that we can add new entries to it.
6969
final timer = Timer(Duration.zero, () {
70-
_pendingBatch = null;
7170
_out.add(_pendingBatch!.$1);
71+
_pendingBatch = null;
7272
});
7373
_pendingBatch = (parsed, timer);
7474
}

packages/powersync_core/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ environment:
88
sdk: ^3.4.3
99

1010
dependencies:
11-
sqlite_async: ^0.11.2
11+
sqlite_async: ^0.11.4
1212
# We only use sqlite3 as a transitive dependency,
1313
# but right now we need a minimum of v2.4.6.
1414
sqlite3: ^2.4.6

packages/powersync_core/test/connected_test.dart

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import 'package:powersync_core/powersync_core.dart';
99
import 'package:test/test.dart';
1010

1111
import 'server/sync_server/mock_sync_server.dart';
12-
import 'streaming_sync_test.dart';
1312
import 'utils/abstract_test_utils.dart';
1413
import 'utils/test_utils_impl.dart';
1514

packages/powersync_core/test/disconnect_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import 'package:powersync_core/powersync_core.dart';
22
import 'package:powersync_core/sqlite_async.dart';
33
import 'package:test/test.dart';
4-
import 'streaming_sync_test.dart';
4+
import 'utils/abstract_test_utils.dart';
55
import 'utils/test_utils_impl.dart';
66
import 'watch_test.dart';
77

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import 'package:async/async.dart';
2+
import 'package:logging/logging.dart';
3+
import 'package:powersync_core/powersync_core.dart';
4+
import 'package:powersync_core/sqlite3_common.dart';
5+
import 'package:powersync_core/src/log_internal.dart';
6+
import 'package:powersync_core/src/streaming_sync.dart';
7+
import 'package:powersync_core/src/sync_types.dart';
8+
import 'package:test/test.dart';
9+
10+
import 'server/sync_server/in_memory_sync_server.dart';
11+
import 'utils/abstract_test_utils.dart';
12+
import 'utils/in_memory_http.dart';
13+
import 'utils/test_utils_impl.dart';
14+
15+
void main() {
16+
group('in-memory sync tests', () {
17+
late final testUtils = TestUtils();
18+
19+
late TestPowerSyncFactory factory;
20+
late CommonDatabase raw;
21+
late PowerSyncDatabase database;
22+
late MockSyncService syncService;
23+
late StreamingSyncImplementation syncClient;
24+
25+
setUp(() async {
26+
final (client, server) = inMemoryServer();
27+
syncService = MockSyncService();
28+
server.mount(syncService.router.call);
29+
30+
factory = await testUtils.testFactory();
31+
(raw, database) = await factory.openInMemoryDatabase();
32+
await database.initialize();
33+
syncClient = database.connectWithMockService(
34+
client,
35+
TestConnector(() async {
36+
return PowerSyncCredentials(
37+
endpoint: server.url.toString(),
38+
token: 'token not used here',
39+
expiresAt: DateTime.now(),
40+
);
41+
}),
42+
);
43+
});
44+
45+
tearDown(() async {
46+
await syncClient.abort();
47+
await database.close();
48+
await syncService.stop();
49+
});
50+
51+
Future<StreamQueue<SyncStatus>> waitForConnection(
52+
{bool expectNoWarnings = true}) async {
53+
if (expectNoWarnings) {
54+
isolateLogger.onRecord.listen((e) {
55+
if (e.level >= Level.WARNING) {
56+
fail('Unexpected log: $e');
57+
}
58+
});
59+
}
60+
syncClient.streamingSync();
61+
await syncService.waitForListener;
62+
63+
expect(database.currentStatus.lastSyncedAt, isNull);
64+
expect(database.currentStatus.downloading, isFalse);
65+
final status = StreamQueue(database.statusStream);
66+
addTearDown(status.cancel);
67+
68+
syncService.addKeepAlive();
69+
await expectLater(
70+
status, emits(isSyncStatus(connected: true, hasSynced: false)));
71+
return status;
72+
}
73+
74+
test('persists completed sync information', () async {
75+
final status = await waitForConnection();
76+
77+
syncService.addLine({
78+
'checkpoint': Checkpoint(
79+
lastOpId: '0',
80+
writeCheckpoint: null,
81+
checksums: [BucketChecksum(bucket: 'bkt', checksum: 0)],
82+
)
83+
});
84+
await expectLater(status, emits(isSyncStatus(downloading: true)));
85+
86+
syncService.addLine({
87+
'checkpoint_complete': {'last_op_id': '0'}
88+
});
89+
await expectLater(
90+
status, emits(isSyncStatus(downloading: false, hasSynced: true)));
91+
92+
final independentDb = factory.wrapRaw(raw);
93+
// Even though this database doesn't have a sync client attached to it,
94+
// is should reconstruct hasSynced from the database.
95+
await independentDb.initialize();
96+
expect(independentDb.currentStatus.hasSynced, isTrue);
97+
});
98+
99+
test('can save independent buckets in same transaction', () async {
100+
final status = await waitForConnection();
101+
102+
syncService.addLine({
103+
'checkpoint': Checkpoint(
104+
lastOpId: '0',
105+
writeCheckpoint: null,
106+
checksums: [
107+
BucketChecksum(bucket: 'a', checksum: 0),
108+
BucketChecksum(bucket: 'b', checksum: 0),
109+
],
110+
)
111+
});
112+
await expectLater(status, emits(isSyncStatus(downloading: true)));
113+
114+
var commits = 0;
115+
raw.commits.listen((_) => commits++);
116+
117+
syncService
118+
..addLine({
119+
'data': {
120+
'bucket': 'a',
121+
'data': <Map<String, Object?>>[
122+
{
123+
'op_id': '1',
124+
'op': 'PUT',
125+
'object_type': 'a',
126+
'object_id': '1',
127+
'checksum': 0,
128+
'data': {},
129+
}
130+
],
131+
}
132+
})
133+
..addLine({
134+
'data': {
135+
'bucket': 'b',
136+
'data': <Map<String, Object?>>[
137+
{
138+
'op_id': '2',
139+
'op': 'PUT',
140+
'object_type': 'b',
141+
'object_id': '1',
142+
'checksum': 0,
143+
'data': {},
144+
}
145+
],
146+
}
147+
});
148+
149+
// Wait for the operations to be inserted.
150+
while (raw.select('SELECT * FROM ps_oplog;').length < 2) {
151+
await pumpEventQueue();
152+
}
153+
154+
// The two buckets should have been inserted in a single transaction
155+
// because the messages were received in quick succession.
156+
expect(commits, 1);
157+
});
158+
});
159+
}
160+
161+
TypeMatcher<SyncStatus> isSyncStatus(
162+
{Object? downloading, Object? connected, Object? hasSynced}) {
163+
var matcher = isA<SyncStatus>();
164+
if (downloading != null) {
165+
matcher = matcher.having((e) => e.downloading, 'downloading', downloading);
166+
}
167+
if (connected != null) {
168+
matcher = matcher.having((e) => e.connected, 'connected', connected);
169+
}
170+
if (hasSynced != null) {
171+
matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced);
172+
}
173+
174+
return matcher;
175+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import 'dart:async';
2+
import 'dart:convert';
3+
4+
import 'package:shelf/shelf.dart';
5+
import 'package:shelf_router/shelf_router.dart';
6+
7+
final class MockSyncService {
8+
// Use a queued stream to make tests easier.
9+
StreamController<String> _controller = StreamController<String>();
10+
Completer<void> _listener = Completer();
11+
12+
final router = Router();
13+
14+
MockSyncService() {
15+
router
16+
..post('/sync/stream', (Request request) async {
17+
_listener.complete();
18+
// Respond immediately with a stream
19+
return Response.ok(_controller.stream.transform(utf8.encoder),
20+
headers: {
21+
'Content-Type': 'application/x-ndjson',
22+
'Cache-Control': 'no-cache',
23+
'Connection': 'keep-alive',
24+
},
25+
context: {
26+
"shelf.io.buffer_output": false
27+
});
28+
})
29+
..get('/write-checkpoint2.json', (request) {
30+
return Response.ok('{"data": {"write_checkpoint": "10"}}', headers: {
31+
'Content-Type': 'application/json',
32+
});
33+
});
34+
}
35+
36+
Future<void> get waitForListener => _listener.future;
37+
38+
// Queue events which will be sent to connected clients.
39+
void addRawEvent(String data) {
40+
_controller.add(data);
41+
}
42+
43+
void addLine(Object? message) {
44+
addRawEvent('${json.encode(message)}\n');
45+
}
46+
47+
void addKeepAlive([int tokenExpiresIn = 3600]) {
48+
addLine({'token_expires_in': tokenExpiresIn});
49+
}
50+
51+
// Clear events. We rely on a buffered controller here. Create a new controller
52+
// in order to clear the buffer.
53+
Future<void> clearEvents() async {
54+
await _controller.close();
55+
_listener = Completer();
56+
_controller = StreamController<String>();
57+
}
58+
59+
Future<void> stop() async {
60+
if (_controller.hasListener) {
61+
await _controller.close();
62+
}
63+
}
64+
}
Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,37 @@
11
import 'dart:async';
2-
import 'dart:convert';
32
import 'dart:io';
43

5-
import 'package:shelf/shelf.dart';
64
import 'package:shelf/shelf_io.dart' as io;
7-
import 'package:shelf_router/shelf_router.dart';
5+
6+
import 'in_memory_sync_server.dart';
87

98
// A basic Mock PowerSync service server which queues commands
109
// which clients can receive via connecting to the `/sync/stream` route.
1110
// This assumes only one client will ever be connected at a time.
1211
class TestHttpServerHelper {
13-
// Use a queued stream to make tests easier.
14-
StreamController<String> _controller = StreamController<String>();
12+
final MockSyncService service = MockSyncService();
1513
late HttpServer _server;
14+
1615
Uri get uri => Uri.parse('http://localhost:${_server.port}');
1716

1817
Future<void> start() async {
19-
final router = Router()
20-
..post('/sync/stream', (Request request) async {
21-
// Respond immediately with a stream
22-
return Response.ok(_controller.stream.transform(utf8.encoder),
23-
headers: {
24-
'Content-Type': 'application/x-ndjson',
25-
'Cache-Control': 'no-cache',
26-
'Connection': 'keep-alive',
27-
},
28-
context: {
29-
"shelf.io.buffer_output": false
30-
});
31-
})
32-
..get('/write-checkpoint2.json', (request) {
33-
return Response.ok('{"data": {"write_checkpoint": "10"}}', headers: {
34-
'Content-Type': 'application/json',
35-
});
36-
});
37-
38-
_server = await io.serve(router.call, 'localhost', 0);
18+
_server = await io.serve(service.router.call, 'localhost', 0);
3919
print('Test server running at ${_server.address}:${_server.port}');
4020
}
4121

4222
// Queue events which will be sent to connected clients.
4323
void addEvent(String data) {
44-
_controller.add(data);
24+
service.addRawEvent(data);
4525
}
4626

4727
// Clear events. We rely on a buffered controller here. Create a new controller
4828
// in order to clear the buffer.
4929
Future<void> clearEvents() async {
50-
await _controller.close();
51-
_controller = StreamController<String>();
30+
await service.clearEvents();
5231
}
5332

5433
Future<void> stop() async {
55-
await _controller.close();
34+
await service.stop();
5635
await _server.close();
5736
}
5837
}

packages/powersync_core/test/streaming_sync_test.dart

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,11 @@ import 'package:powersync_core/powersync_core.dart';
99
import 'package:test/test.dart';
1010

1111
import 'test_server.dart';
12+
import 'utils/abstract_test_utils.dart';
1213
import 'utils/test_utils_impl.dart';
1314

1415
final testUtils = TestUtils();
1516

16-
class TestConnector extends PowerSyncBackendConnector {
17-
final Future<PowerSyncCredentials?> Function() _fetchCredentials;
18-
final Future<void> Function(PowerSyncDatabase)? _uploadData;
19-
20-
TestConnector(this._fetchCredentials,
21-
{Future<void> Function(PowerSyncDatabase)? uploadData})
22-
: _uploadData = uploadData;
23-
24-
@override
25-
Future<PowerSyncCredentials?> fetchCredentials() {
26-
return _fetchCredentials();
27-
}
28-
29-
@override
30-
Future<void> uploadData(PowerSyncDatabase database) async {
31-
await _uploadData?.call(database);
32-
}
33-
}
34-
3517
void main() {
3618
group('Streaming Sync Test', () {
3719
late String path;

0 commit comments

Comments
 (0)