diff --git a/packages/powersync/lib/src/powersync_database.dart b/packages/powersync/lib/src/powersync_database.dart index 3445fd2b..434d3c22 100644 --- a/packages/powersync/lib/src/powersync_database.dart +++ b/packages/powersync/lib/src/powersync_database.dart @@ -3,6 +3,7 @@ import 'dart:isolate'; import 'package:logging/logging.dart'; import 'package:powersync/src/log_internal.dart'; +import 'package:sqlite_async/mutex.dart'; import 'package:sqlite_async/sqlite3.dart' as sqlite; import 'package:sqlite_async/sqlite_async.dart'; @@ -69,6 +70,9 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection { /// null when disconnected, present when connecting or connected AbortController? _disconnecter; + /// Use to prevent multiple connections from being opened concurrently + final Mutex _connectMutex = Mutex(); + /// The Logger used by this [PowerSyncDatabase]. /// /// The default is [autoLogger], which logs to the console in debug builds. @@ -190,6 +194,13 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection { /// Throttle time between CRUD operations /// Defaults to 10 milliseconds. Duration crudThrottleTime = const Duration(milliseconds: 10)}) async { + _connectMutex.lock(() => + _connect(connector: connector, crudThrottleTime: crudThrottleTime)); + } + + Future _connect( + {required PowerSyncBackendConnector connector, + required Duration crudThrottleTime}) async { await initialize(); // Disconnect if connected diff --git a/packages/powersync/pubspec.yaml b/packages/powersync/pubspec.yaml index 5b9f44d7..820f9d7b 100644 --- a/packages/powersync/pubspec.yaml +++ b/packages/powersync/pubspec.yaml @@ -10,7 +10,7 @@ dependencies: flutter: sdk: flutter - sqlite_async: ^0.6.0 + sqlite_async: ^0.6.1 sqlite3_flutter_libs: ^0.5.15 http: ^1.1.0 uuid: ^4.2.0 diff --git a/packages/powersync/test/streaming_sync_test.dart b/packages/powersync/test/streaming_sync_test.dart index aff9c71c..adda1399 100644 --- a/packages/powersync/test/streaming_sync_test.dart +++ b/packages/powersync/test/streaming_sync_test.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import 'dart:io'; import 'dart:math'; import 'package:powersync/powersync.dart'; @@ -44,12 +43,8 @@ void main() { var server = await createServer(); credentialsCallback() async { - final endpoint = 'http://${server.address.host}:${server.port}'; return PowerSyncCredentials( - endpoint: endpoint, - token: 'token', - userId: 'u1', - expiresAt: DateTime.now()); + endpoint: server.endpoint, token: 'token'); } final pdb = await setupPowerSync(path: path); @@ -59,12 +54,12 @@ void main() { await Future.delayed(Duration(milliseconds: random.nextInt(100))); if (random.nextBool()) { - server.close(force: true).ignore(); + server.close(); } await pdb.close(); - server.close(force: true).ignore(); + server.close(); } }); @@ -81,18 +76,13 @@ void main() { // [PowerSync] WARNING: 2023-06-29 16:10:17.667537: Sync Isolate error // [Connection closed while receiving data, #0 IOClient.send. (package:http/src/io_client.dart:76:13) - HttpServer? server; + TestServer? server; credentialsCallback() async { if (server == null) { throw AssertionError('No active server'); } - final endpoint = 'http://${server.address.host}:${server.port}'; - return PowerSyncCredentials( - endpoint: endpoint, - token: 'token', - userId: 'u1', - expiresAt: DateTime.now()); + return PowerSyncCredentials(endpoint: server.endpoint, token: 'token'); } final pdb = await setupPowerSync(path: path); @@ -107,9 +97,47 @@ void main() { // 2ms: HttpException: HttpServer is not bound to a socket // 20ms: Connection closed while receiving data await Future.delayed(Duration(milliseconds: 20)); - server.close(force: true).ignore(); + server.close(); } await pdb.close(); }); + + test('multiple connect calls', () async { + // Test calling connect() multiple times. + // We check that this does not cause multiple connections to be opened concurrently. + final random = Random(); + var server = await createServer(); + + credentialsCallback() async { + return PowerSyncCredentials(endpoint: server.endpoint, token: 'token'); + } + + final pdb = await setupPowerSync(path: path); + pdb.retryDelay = Duration(milliseconds: 5000); + var connector = TestConnector(credentialsCallback); + pdb.connect(connector: connector); + pdb.connect(connector: connector); + + final watch = Stopwatch()..start(); + + // Wait for at least one connection + while (server.connectionCount < 1 && watch.elapsedMilliseconds < 500) { + await Future.delayed(Duration(milliseconds: random.nextInt(10))); + } + // Give some time for a second connection if any + await Future.delayed(Duration(milliseconds: random.nextInt(50))); + + await pdb.close(); + + // Give some time for connections to close + while (server.connectionCount != 0 && watch.elapsedMilliseconds < 1000) { + await Future.delayed(Duration(milliseconds: random.nextInt(10))); + } + + expect(server.connectionCount, equals(0)); + expect(server.maxConnectionCount, equals(1)); + + server.close(); + }); }); } diff --git a/packages/powersync/test/test_server.dart b/packages/powersync/test/test_server.dart index c85101a9..2b9ac8b6 100644 --- a/packages/powersync/test/test_server.dart +++ b/packages/powersync/test/test_server.dart @@ -1,18 +1,67 @@ import 'dart:async'; import 'dart:convert' as convert; import 'dart:io'; +import 'dart:math'; import 'package:http/http.dart' show ByteStream; import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart' as shelf_io; import 'package:shelf_router/shelf_router.dart'; -Future createServer() async { - var app = Router(); +class TestServer { + late HttpServer server; + Router app = Router(); + int connectionCount = 0; + int maxConnectionCount = 0; + int tokenExpiresIn; - app.post('/sync/stream', handleSyncStream); - // Open on an arbitrary open port - var server = await shelf_io.serve(app.call, 'localhost', 0); + TestServer({this.tokenExpiresIn = 65}); + + Future init() async { + app.post('/sync/stream', handleSyncStream); + // Open on an arbitrary open port + server = await shelf_io.serve(app.call, 'localhost', 0); + } + + String get endpoint { + return 'http://${server.address.host}:${server.port}'; + } + + Future handleSyncStream(Request request) async { + connectionCount += 1; + maxConnectionCount = max(connectionCount, maxConnectionCount); + + stream() async* { + try { + var blob = "*" * 5000; + for (var i = 0; i < 50; i++) { + yield {"token_expires_in": tokenExpiresIn, "blob": blob}; + await Future.delayed(Duration(microseconds: 1)); + } + } finally { + connectionCount -= 1; + } + } + + return Response.ok( + encodeNdjson(stream()), + headers: { + 'Content-Type': 'application/x-ndjson', + }, + context: { + 'shelf.io.buffer_output': false, + }, + ); + } + + void close() { + server.close(force: true).ignore(); + } +} + +Future createServer() async { + var server = TestServer(); + await server.init(); return server; } @@ -22,23 +71,3 @@ ByteStream encodeNdjson(Stream jsonInput) { final byteInput = stringInput.transform(convert.utf8.encoder); return ByteStream(byteInput); } - -Future handleSyncStream(Request request) async { - stream() async* { - var blob = "*" * 5000; - for (var i = 0; i < 50; i++) { - yield {"token_expires_in": 5, "blob": blob}; - await Future.delayed(Duration(microseconds: 1)); - } - } - - return Response.ok( - encodeNdjson(stream()), - headers: { - 'Content-Type': 'application/x-ndjson', - }, - context: { - 'shelf.io.buffer_output': false, - }, - ); -}