Skip to content

Commit e4c8023

Browse files
authored
Merge pull request #115 from powersync-ja/close-connections-on-disconnect
Close http connections immediately on disconnect
2 parents 8e37523 + 6aac801 commit e4c8023

File tree

4 files changed

+131
-39
lines changed

4 files changed

+131
-39
lines changed

packages/powersync/lib/src/powersync_database.dart

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,24 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
223223
/// Throttle time between CRUD operations
224224
/// Defaults to 10 milliseconds.
225225
Duration crudThrottleTime = const Duration(milliseconds: 10)}) async {
226-
_connectMutex.lock(() =>
227-
_connect(connector: connector, crudThrottleTime: crudThrottleTime));
226+
Zone current = Zone.current;
227+
228+
Future<void> reconnect() {
229+
return _connectMutex.lock(() => _connect(
230+
connector: connector,
231+
crudThrottleTime: crudThrottleTime,
232+
// The reconnect function needs to run in the original zone,
233+
// to avoid recursive lock errors.
234+
reconnect: current.bindCallback(reconnect)));
235+
}
236+
237+
await reconnect();
228238
}
229239

230240
Future<void> _connect(
231241
{required PowerSyncBackendConnector connector,
232-
required Duration crudThrottleTime}) async {
242+
required Duration crudThrottleTime,
243+
required Future<void> Function() reconnect}) async {
233244
await initialize();
234245

235246
// Disconnect if connected
@@ -298,7 +309,9 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
298309
logger.severe('Sync Isolate error', message);
299310

300311
// Reconnect
301-
connect(connector: connector);
312+
// Use the param like this instead of directly calling connect(), to avoid recursive
313+
// locks in some edge cases.
314+
reconnect();
302315
});
303316

304317
disconnected() {
@@ -565,6 +578,7 @@ Future<void> _powerSyncDatabaseIsolate(
565578

566579
CommonDatabase? db;
567580
final mutex = args.dbRef.mutex.open();
581+
StreamingSyncImplementation? openedStreamingSync;
568582

569583
rPort.listen((message) async {
570584
if (message is List) {
@@ -579,6 +593,9 @@ Future<void> _powerSyncDatabaseIsolate(
579593
db = null;
580594
updateController.close();
581595
upstreamDbClient.close();
596+
// Abort any open http requests, and wait for it to be closed properly
597+
await openedStreamingSync?.abort();
598+
// No kill the Isolate
582599
Isolate.current.kill();
583600
}
584601
}
@@ -625,6 +642,7 @@ Future<void> _powerSyncDatabaseIsolate(
625642
uploadCrud: uploadCrud,
626643
updateStream: updateController.stream,
627644
retryDelay: args.retryDelay);
645+
openedStreamingSync = sync;
628646
sync.streamingSync();
629647
sync.statusStream.listen((event) {
630648
sPort.send(['status', event]);

packages/powersync/lib/src/streaming_sync.dart

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:convert' as convert;
33
import 'dart:io';
44

55
import 'package:http/http.dart' as http;
6+
import 'package:powersync/src/abort_controller.dart';
67
import 'package:powersync/src/exceptions.dart';
78
import 'package:powersync/src/log_internal.dart';
89

@@ -39,6 +40,10 @@ class StreamingSyncImplementation {
3940

4041
SyncStatus lastStatus = const SyncStatus();
4142

43+
AbortController? _abort;
44+
45+
bool _safeToClose = true;
46+
4247
StreamingSyncImplementation(
4348
{required this.adapter,
4449
required this.credentialsCallback,
@@ -50,34 +55,74 @@ class StreamingSyncImplementation {
5055
statusStream = _statusStreamController.stream;
5156
}
5257

58+
/// Close any active streams.
59+
Future<void> abort() async {
60+
// If streamingSync() hasn't been called yet, _abort will be null.
61+
var future = _abort?.abort();
62+
// This immediately triggers a new iteration in the merged stream, allowing us
63+
// to break immediately.
64+
// However, we still need to close the underlying stream explicitly, otherwise
65+
// the break will wait for the next line of data received on the stream.
66+
_localPingController.add(null);
67+
// According to the documentation, the behavior is undefined when calling
68+
// close() while requests are pending. However, this is no other
69+
// known way to cancel open streams, and this appears to end the stream with
70+
// a consistent ClientException if a request is open.
71+
// We avoid closing the client while opening a request, as that does cause
72+
// unpredicable uncaught errors.
73+
if (_safeToClose) {
74+
_client.close();
75+
}
76+
// wait for completeAbort() to be called
77+
await future;
78+
79+
// Now close the client in all cases not covered above
80+
_client.close();
81+
}
82+
83+
bool get aborted {
84+
return _abort?.aborted ?? false;
85+
}
86+
5387
Future<void> streamingSync() async {
54-
crudLoop();
55-
var invalidCredentials = false;
56-
while (true) {
57-
_updateStatus(connecting: true);
58-
try {
59-
if (invalidCredentials && invalidCredentialsCallback != null) {
60-
// This may error. In that case it will be retried again on the next
61-
// iteration.
62-
await invalidCredentialsCallback!();
63-
invalidCredentials = false;
64-
}
65-
await streamingSyncIteration();
66-
// Continue immediately
67-
} catch (e, stacktrace) {
68-
final message = _syncErrorMessage(e);
69-
isolateLogger.warning('Sync error: $message', e, stacktrace);
70-
invalidCredentials = true;
88+
try {
89+
_abort = AbortController();
90+
crudLoop();
91+
var invalidCredentials = false;
92+
while (!aborted) {
93+
_updateStatus(connecting: true);
94+
try {
95+
if (invalidCredentials && invalidCredentialsCallback != null) {
96+
// This may error. In that case it will be retried again on the next
97+
// iteration.
98+
await invalidCredentialsCallback!();
99+
invalidCredentials = false;
100+
}
101+
await streamingSyncIteration();
102+
// Continue immediately
103+
} catch (e, stacktrace) {
104+
if (aborted && e is http.ClientException) {
105+
// Explicit abort requested - ignore. Example error:
106+
// ClientException: Connection closed while receiving data, uri=http://localhost:8080/sync/stream
107+
return;
108+
}
109+
final message = _syncErrorMessage(e);
110+
isolateLogger.warning('Sync error: $message', e, stacktrace);
111+
invalidCredentials = true;
71112

72-
_updateStatus(
73-
connected: false,
74-
connecting: true,
75-
downloading: false,
76-
downloadError: e);
113+
_updateStatus(
114+
connected: false,
115+
connecting: true,
116+
downloading: false,
117+
downloadError: e);
77118

78-
// On error, wait a little before retrying
79-
await Future.delayed(retryDelay);
119+
// On error, wait a little before retrying
120+
// When aborting, don't wait
121+
await Future.any([Future.delayed(retryDelay), _abort!.onAbort]);
122+
}
80123
}
124+
} finally {
125+
_abort!.completeAbort();
81126
}
82127
}
83128

@@ -206,6 +251,10 @@ class StreamingSyncImplementation {
206251
bool haveInvalidated = false;
207252

208253
await for (var line in merged) {
254+
if (aborted) {
255+
break;
256+
}
257+
209258
_updateStatus(connected: true, connecting: false);
210259
if (line is Checkpoint) {
211260
targetCheckpoint = line;
@@ -338,7 +387,18 @@ class StreamingSyncImplementation {
338387
request.headers['Authorization'] = "Token ${credentials.token}";
339388
request.body = convert.jsonEncode(data);
340389

341-
final res = await _client.send(request);
390+
http.StreamedResponse res;
391+
try {
392+
// Do not close the client during the request phase - this causes uncaught errors.
393+
_safeToClose = false;
394+
res = await _client.send(request);
395+
} finally {
396+
_safeToClose = true;
397+
}
398+
if (aborted) {
399+
return;
400+
}
401+
342402
if (res.statusCode == 401) {
343403
if (invalidCredentialsCallback != null) {
344404
await invalidCredentialsCallback!();
@@ -350,6 +410,9 @@ class StreamingSyncImplementation {
350410

351411
// Note: The response stream is automatically closed when this loop errors
352412
await for (var line in ndjson(res.stream)) {
413+
if (aborted) {
414+
break;
415+
}
353416
yield parseStreamingSyncLine(line as Map<String, dynamic>);
354417
}
355418
}

packages/powersync/test/streaming_sync_test.dart

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ void main() {
5959

6060
await pdb.close();
6161

62+
// Give some time for connections to close
63+
final watch = Stopwatch()..start();
64+
while (server.connectionCount != 0 && watch.elapsedMilliseconds < 100) {
65+
await Future.delayed(Duration(milliseconds: random.nextInt(10)));
66+
}
67+
68+
expect(server.connectionCount, equals(0));
69+
expect(server.maxConnectionCount, lessThanOrEqualTo(1));
70+
6271
server.close();
6372
}
6473
});

packages/powersync/test/test_server.dart

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import 'package:shelf_router/shelf_router.dart';
1111
class TestServer {
1212
late HttpServer server;
1313
Router app = Router();
14-
int connectionCount = 0;
1514
int maxConnectionCount = 0;
1615
int tokenExpiresIn;
1716

@@ -27,19 +26,22 @@ class TestServer {
2726
return 'http://${server.address.host}:${server.port}';
2827
}
2928

29+
int get connectionCount {
30+
return server.connectionsInfo().total;
31+
}
32+
33+
HttpConnectionsInfo connectionsInfo() {
34+
return server.connectionsInfo();
35+
}
36+
3037
Future<Response> handleSyncStream(Request request) async {
31-
connectionCount += 1;
3238
maxConnectionCount = max(connectionCount, maxConnectionCount);
3339

3440
stream() async* {
35-
try {
36-
var blob = "*" * 5000;
37-
for (var i = 0; i < 50; i++) {
38-
yield {"token_expires_in": tokenExpiresIn, "blob": blob};
39-
await Future.delayed(Duration(microseconds: 1));
40-
}
41-
} finally {
42-
connectionCount -= 1;
41+
var blob = "*" * 5000;
42+
for (var i = 0; i < 50; i++) {
43+
yield {"token_expires_in": tokenExpiresIn, "blob": blob};
44+
await Future.delayed(Duration(microseconds: 1));
4345
}
4446
}
4547

0 commit comments

Comments
 (0)