Skip to content

Close http connections immediately on disconnect #115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions packages/powersync/lib/src/powersync_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,24 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
/// Throttle time between CRUD operations
/// Defaults to 10 milliseconds.
Duration crudThrottleTime = const Duration(milliseconds: 10)}) async {
_connectMutex.lock(() =>
_connect(connector: connector, crudThrottleTime: crudThrottleTime));
Zone current = Zone.current;

Future<void> reconnect() {
return _connectMutex.lock(() => _connect(
connector: connector,
crudThrottleTime: crudThrottleTime,
// The reconnect function needs to run in the original zone,
// to avoid recursive lock errors.
reconnect: current.bindCallback(reconnect)));
}

await reconnect();
}

Future<void> _connect(
{required PowerSyncBackendConnector connector,
required Duration crudThrottleTime}) async {
required Duration crudThrottleTime,
required Future<void> Function() reconnect}) async {
await initialize();

// Disconnect if connected
Expand Down Expand Up @@ -298,7 +309,9 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
logger.severe('Sync Isolate error', message);

// Reconnect
connect(connector: connector);
// Use the param like this instead of directly calling connect(), to avoid recursive
// locks in some edge cases.
reconnect();
});

disconnected() {
Expand Down Expand Up @@ -565,6 +578,7 @@ Future<void> _powerSyncDatabaseIsolate(

CommonDatabase? db;
final mutex = args.dbRef.mutex.open();
StreamingSyncImplementation? openedStreamingSync;

rPort.listen((message) async {
if (message is List) {
Expand All @@ -579,6 +593,9 @@ Future<void> _powerSyncDatabaseIsolate(
db = null;
updateController.close();
upstreamDbClient.close();
// Abort any open http requests, and wait for it to be closed properly
await openedStreamingSync?.abort();
// No kill the Isolate
Isolate.current.kill();
}
}
Expand Down Expand Up @@ -625,6 +642,7 @@ Future<void> _powerSyncDatabaseIsolate(
uploadCrud: uploadCrud,
updateStream: updateController.stream,
retryDelay: args.retryDelay);
openedStreamingSync = sync;
sync.streamingSync();
sync.statusStream.listen((event) {
sPort.send(['status', event]);
Expand Down
113 changes: 88 additions & 25 deletions packages/powersync/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:convert' as convert;
import 'dart:io';

import 'package:http/http.dart' as http;
import 'package:powersync/src/abort_controller.dart';
import 'package:powersync/src/exceptions.dart';
import 'package:powersync/src/log_internal.dart';

Expand Down Expand Up @@ -39,6 +40,10 @@ class StreamingSyncImplementation {

SyncStatus lastStatus = const SyncStatus();

AbortController? _abort;

bool _safeToClose = true;

StreamingSyncImplementation(
{required this.adapter,
required this.credentialsCallback,
Expand All @@ -50,34 +55,74 @@ class StreamingSyncImplementation {
statusStream = _statusStreamController.stream;
}

/// Close any active streams.
Future<void> abort() async {
// If streamingSync() hasn't been called yet, _abort will be null.
var future = _abort?.abort();
// This immediately triggers a new iteration in the merged stream, allowing us
// to break immediately.
// However, we still need to close the underlying stream explicitly, otherwise
// the break will wait for the next line of data received on the stream.
_localPingController.add(null);
// According to the documentation, the behavior is undefined when calling
// close() while requests are pending. However, this is no other
// known way to cancel open streams, and this appears to end the stream with
// a consistent ClientException if a request is open.
// We avoid closing the client while opening a request, as that does cause
// unpredicable uncaught errors.
if (_safeToClose) {
_client.close();
}
// wait for completeAbort() to be called
await future;

// Now close the client in all cases not covered above
_client.close();
}

bool get aborted {
return _abort?.aborted ?? false;
}

Future<void> streamingSync() async {
crudLoop();
var invalidCredentials = false;
while (true) {
_updateStatus(connecting: true);
try {
if (invalidCredentials && invalidCredentialsCallback != null) {
// This may error. In that case it will be retried again on the next
// iteration.
await invalidCredentialsCallback!();
invalidCredentials = false;
}
await streamingSyncIteration();
// Continue immediately
} catch (e, stacktrace) {
final message = _syncErrorMessage(e);
isolateLogger.warning('Sync error: $message', e, stacktrace);
invalidCredentials = true;
try {
_abort = AbortController();
crudLoop();
var invalidCredentials = false;
while (!aborted) {
_updateStatus(connecting: true);
try {
if (invalidCredentials && invalidCredentialsCallback != null) {
// This may error. In that case it will be retried again on the next
// iteration.
await invalidCredentialsCallback!();
invalidCredentials = false;
}
await streamingSyncIteration();
// Continue immediately
} catch (e, stacktrace) {
if (aborted && e is http.ClientException) {
// Explicit abort requested - ignore. Example error:
// ClientException: Connection closed while receiving data, uri=http://localhost:8080/sync/stream
return;
}
final message = _syncErrorMessage(e);
isolateLogger.warning('Sync error: $message', e, stacktrace);
invalidCredentials = true;

_updateStatus(
connected: false,
connecting: true,
downloading: false,
downloadError: e);
_updateStatus(
connected: false,
connecting: true,
downloading: false,
downloadError: e);

// On error, wait a little before retrying
await Future.delayed(retryDelay);
// On error, wait a little before retrying
// When aborting, don't wait
await Future.any([Future.delayed(retryDelay), _abort!.onAbort]);
}
}
} finally {
_abort!.completeAbort();
}
}

Expand Down Expand Up @@ -206,6 +251,10 @@ class StreamingSyncImplementation {
bool haveInvalidated = false;

await for (var line in merged) {
if (aborted) {
break;
}

_updateStatus(connected: true, connecting: false);
if (line is Checkpoint) {
targetCheckpoint = line;
Expand Down Expand Up @@ -338,7 +387,18 @@ class StreamingSyncImplementation {
request.headers['Authorization'] = "Token ${credentials.token}";
request.body = convert.jsonEncode(data);

final res = await _client.send(request);
http.StreamedResponse res;
try {
// Do not close the client during the request phase - this causes uncaught errors.
_safeToClose = false;
res = await _client.send(request);
} finally {
_safeToClose = true;
}
if (aborted) {
return;
}

if (res.statusCode == 401) {
if (invalidCredentialsCallback != null) {
await invalidCredentialsCallback!();
Expand All @@ -350,6 +410,9 @@ class StreamingSyncImplementation {

// Note: The response stream is automatically closed when this loop errors
await for (var line in ndjson(res.stream)) {
if (aborted) {
break;
}
yield parseStreamingSyncLine(line as Map<String, dynamic>);
}
}
Expand Down
9 changes: 9 additions & 0 deletions packages/powersync/test/streaming_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ void main() {

await pdb.close();

// Give some time for connections to close
final watch = Stopwatch()..start();
while (server.connectionCount != 0 && watch.elapsedMilliseconds < 100) {
await Future.delayed(Duration(milliseconds: random.nextInt(10)));
}

expect(server.connectionCount, equals(0));
expect(server.maxConnectionCount, lessThanOrEqualTo(1));

server.close();
}
});
Expand Down
22 changes: 12 additions & 10 deletions packages/powersync/test/test_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import 'package:shelf_router/shelf_router.dart';
class TestServer {
late HttpServer server;
Router app = Router();
int connectionCount = 0;
int maxConnectionCount = 0;
int tokenExpiresIn;

Expand All @@ -27,19 +26,22 @@ class TestServer {
return 'http://${server.address.host}:${server.port}';
}

int get connectionCount {
return server.connectionsInfo().total;
}

HttpConnectionsInfo connectionsInfo() {
return server.connectionsInfo();
}

Future<Response> handleSyncStream(Request request) async {
connectionCount += 1;
maxConnectionCount = max(connectionCount, maxConnectionCount);

stream() async* {
try {
var blob = "*" * 5000;
for (var i = 0; i < 50; i++) {
yield {"token_expires_in": tokenExpiresIn, "blob": blob};
await Future.delayed(Duration(microseconds: 1));
}
} finally {
connectionCount -= 1;
var blob = "*" * 5000;
for (var i = 0; i < 50; i++) {
yield {"token_expires_in": tokenExpiresIn, "blob": blob};
await Future.delayed(Duration(microseconds: 1));
}
}

Expand Down
Loading