diff --git a/CHANGELOG.md b/CHANGELOG.md index 142629a..0391759 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.5.2 + +- Fix releasing of locks when closing `SharedMutex``. + ## 0.5.1 - Fix `watch` when called with query parameters. diff --git a/lib/src/mutex.dart b/lib/src/mutex.dart index 578e1d4..5725554 100644 --- a/lib/src/mutex.dart +++ b/lib/src/mutex.dart @@ -126,6 +126,7 @@ class SerializedMutex { /// Uses a [SendPort] to communicate with the source mutex. class SharedMutex implements Mutex { final ChildPortClient client; + bool closed = false; SharedMutex._(this.client); @@ -135,6 +136,9 @@ class SharedMutex implements Mutex { throw LockError('Recursive lock is not allowed'); } return runZoned(() async { + if (closed) { + throw const ClosedException(); + } await _acquire(timeout: timeout); try { final T result = await callback(); @@ -174,7 +178,20 @@ class SharedMutex implements Mutex { } @override + + /// Wait for existing locks to be released, then close this SharedMutex + /// and prevent further locks from being taken out. Future close() async { + if (closed) { + return; + } + closed = true; + // Wait for any existing locks to complete, then prevent any further locks from being taken out. + await _acquire(); + client.fire(const _CloseMessage()); + // Close client immediately after _unlock(), + // so that we're sure no further locks are acquired. + // This also cancels any lock request in process. client.close(); } } @@ -184,6 +201,7 @@ class _SharedMutexServer { Completer? unlock; late final SerializedMutex serialized; final Mutex mutex; + bool closed = false; late final PortServer server; @@ -198,6 +216,11 @@ class _SharedMutexServer { if (arg is _AcquireMessage) { var lock = Completer.sync(); mutex.lock(() async { + if (closed) { + // The client will error already - we just need to ensure + // we don't take out another lock. + return; + } assert(unlock == null); unlock = Completer.sync(); lock.complete(); @@ -208,6 +231,10 @@ class _SharedMutexServer { } else if (arg is _UnlockMessage) { assert(unlock != null); unlock!.complete(); + } else if (arg is _CloseMessage) { + // Unlock and close (from client side) + closed = true; + unlock?.complete(); } } @@ -224,6 +251,11 @@ class _UnlockMessage { const _UnlockMessage(); } +/// Unlock and close +class _CloseMessage { + const _CloseMessage(); +} + class LockError extends Error { final String message; diff --git a/lib/src/port_channel.dart b/lib/src/port_channel.dart index 717c3d0..3d1ac42 100644 --- a/lib/src/port_channel.dart +++ b/lib/src/port_channel.dart @@ -125,6 +125,7 @@ class ChildPortClient implements PortClient { final SendPort sendPort; final ReceivePort receivePort = ReceivePort(); int _nextId = 1; + bool closed = false; final Map> handlers = HashMap(); @@ -144,6 +145,9 @@ class ChildPortClient implements PortClient { @override Future post(Object message) async { + if (closed) { + throw const ClosedException(); + } var completer = Completer.sync(); var id = _nextId++; handlers[id] = completer; @@ -153,11 +157,14 @@ class ChildPortClient implements PortClient { @override void fire(Object message) { + if (closed) { + throw ClosedException(); + } sendPort.send(_FireMessage(message)); } void _cancelAll(Object error) { - var handlers = this.handlers; + var handlers = HashMap>.from(this.handlers); this.handlers.clear(); for (var message in handlers.values) { message.completeError(error); @@ -165,6 +172,7 @@ class ChildPortClient implements PortClient { } void close() { + closed = true; _cancelAll(const ClosedException()); receivePort.close(); } diff --git a/pubspec.yaml b/pubspec.yaml index 6d81a86..1c9d84f 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.5.1 +version: 0.5.2 repository: https://github.com/journeyapps/sqlite_async.dart environment: sdk: '>=2.19.1 <4.0.0' diff --git a/test/mutex_test.dart b/test/mutex_test.dart new file mode 100644 index 0000000..0eb90e1 --- /dev/null +++ b/test/mutex_test.dart @@ -0,0 +1,47 @@ +import 'dart:isolate'; + +import 'package:sqlite_async/mutex.dart'; +import 'package:test/test.dart'; + +void main() { + group('Mutex Tests', () { + test('Closing', () async { + // Test that locks are properly released when calling SharedMutex.close() + // in in Isolate. + // A timeout in this test indicates a likely error. + for (var i = 0; i < 50; i++) { + final mutex = SimpleMutex(); + final serialized = mutex.shared; + + final result = await Isolate.run(() async { + return _lockInIsolate(serialized); + }); + + await mutex.lock(() async {}); + + expect(result, equals(5)); + } + }); + }, timeout: const Timeout(Duration(milliseconds: 5000))); +} + +Future _lockInIsolate( + SerializedMutex smutex, +) async { + final mutex = smutex.open(); + // Start a "thread" that repeatedly takes a lock + _infiniteLock(mutex).ignore(); + await Future.delayed(const Duration(milliseconds: 10)); + // Then close the mutex while the above loop is running. + await mutex.close(); + + return 5; +} + +Future _infiniteLock(SharedMutex mutex) async { + while (true) { + await mutex.lock(() async { + await Future.delayed(const Duration(milliseconds: 1)); + }); + } +}