Skip to content

Commit 723e2c4

Browse files
committed
Improve recursive lock errors.
1 parent 7cf6db9 commit 723e2c4

File tree

7 files changed

+154
-70
lines changed

7 files changed

+154
-70
lines changed

lib/src/connection_pool.dart

Lines changed: 64 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -50,58 +50,60 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
5050

5151
@override
5252
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
53-
{Duration? lockTimeout}) async {
53+
{Duration? lockTimeout, String? debugContext}) async {
5454
await _expandPool();
5555

56-
bool haveLock = false;
57-
var completer = Completer<T>();
56+
return _runZoned(() async {
57+
bool haveLock = false;
58+
var completer = Completer<T>();
59+
60+
var futures = _readConnections.sublist(0).map((connection) async {
61+
try {
62+
return await connection.readLock((ctx) async {
63+
if (haveLock) {
64+
// Already have a different lock - release this one.
65+
return false;
66+
}
67+
haveLock = true;
68+
69+
var future = callback(ctx);
70+
completer.complete(future);
71+
72+
// We have to wait for the future to complete before we can release the
73+
// lock.
74+
try {
75+
await future;
76+
} catch (_) {
77+
// Ignore
78+
}
79+
80+
return true;
81+
}, lockTimeout: lockTimeout, debugContext: debugContext);
82+
} on TimeoutException {
83+
return false;
84+
}
85+
});
86+
87+
final stream = Stream<bool>.fromFutures(futures);
88+
var gotAny = await stream.any((element) => element);
89+
90+
if (!gotAny) {
91+
// All TimeoutExceptions
92+
throw TimeoutException('Failed to get a read connection', lockTimeout);
93+
}
5894

59-
var futures = _readConnections.sublist(0).map((connection) async {
6095
try {
61-
return await connection.readLock((ctx) async {
62-
if (haveLock) {
63-
// Already have a different lock - release this one.
64-
return false;
65-
}
66-
haveLock = true;
67-
68-
var future = callback(ctx);
69-
completer.complete(future);
70-
71-
// We have to wait for the future to complete before we can release the
72-
// lock.
73-
try {
74-
await future;
75-
} catch (_) {
76-
// Ignore
77-
}
78-
79-
return true;
80-
}, lockTimeout: lockTimeout);
81-
} on TimeoutException {
82-
return false;
96+
return await completer.future;
97+
} catch (e) {
98+
// throw e;
99+
rethrow;
83100
}
84-
});
85-
86-
final stream = Stream<bool>.fromFutures(futures);
87-
var gotAny = await stream.any((element) => element);
88-
89-
if (!gotAny) {
90-
// All TimeoutExceptions
91-
throw TimeoutException('Failed to get a read connection', lockTimeout);
92-
}
93-
94-
try {
95-
return await completer.future;
96-
} catch (e) {
97-
// throw e;
98-
rethrow;
99-
}
101+
}, debugContext: debugContext ?? 'get*()');
100102
}
101103

102104
@override
103105
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
104-
{Duration? lockTimeout}) {
106+
{Duration? lockTimeout, String? debugContext}) {
105107
if (closed) {
106108
throw AssertionError('Closed');
107109
}
@@ -113,7 +115,25 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
113115
mutex: mutex,
114116
readOnly: false,
115117
openFactory: _factory);
116-
return _writeConnection!.writeLock(callback, lockTimeout: lockTimeout);
118+
return _runZoned(() {
119+
return _writeConnection!.writeLock(callback,
120+
lockTimeout: lockTimeout, debugContext: debugContext);
121+
}, debugContext: debugContext ?? 'execute()');
122+
}
123+
124+
/// The [Mutex] on individual connections do already error in recursive locks.
125+
///
126+
/// We duplicate the same check here, to:
127+
/// 1. Also error when the recursive transaction is handled by a different
128+
/// connection (with a different lock).
129+
/// 2. Give a more specific error message when it happens.
130+
T _runZoned<T>(T Function() callback, {required String debugContext}) {
131+
if (Zone.current[this] != null) {
132+
throw LockError(
133+
'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.');
134+
}
135+
var zone = Zone.current.fork(zoneValues: {this: true});
136+
return zone.run(callback);
117137
}
118138

119139
Future<void> _expandPool() async {

lib/src/mutex.dart

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class SimpleMutex implements Mutex {
4040
@override
4141
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
4242
if (Zone.current[this] != null) {
43-
throw AssertionError('Recursive lock is not allowed');
43+
throw LockError('Recursive lock is not allowed');
4444
}
4545
var zone = Zone.current.fork(zoneValues: {this: true});
4646

@@ -132,7 +132,7 @@ class SharedMutex implements Mutex {
132132
@override
133133
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
134134
if (Zone.current[this] != null) {
135-
throw AssertionError('Recursive lock is not allowed');
135+
throw LockError('Recursive lock is not allowed');
136136
}
137137
return runZoned(() async {
138138
await _acquire(timeout: timeout);
@@ -223,3 +223,14 @@ class _AcquireMessage {
223223
class _UnlockMessage {
224224
const _UnlockMessage();
225225
}
226+
227+
class LockError extends Error {
228+
final String message;
229+
230+
LockError(this.message);
231+
232+
@override
233+
String toString() {
234+
return 'LockError: $message';
235+
}
236+
}

lib/src/sqlite_connection.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,13 @@ abstract class SqliteConnection extends SqliteWriteContext {
9494
///
9595
/// In most cases, [readTransaction] should be used instead.
9696
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
97-
{Duration? lockTimeout});
97+
{Duration? lockTimeout, String? debugContext});
9898

9999
/// Takes a global lock, without starting a transaction.
100100
///
101101
/// In most cases, [writeTransaction] should be used instead.
102102
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
103-
{Duration? lockTimeout});
103+
{Duration? lockTimeout, String? debugContext});
104104

105105
Future<void> close();
106106
}

lib/src/sqlite_connection_impl.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
7979

8080
@override
8181
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
82-
{Duration? lockTimeout}) async {
82+
{Duration? lockTimeout, String? debugContext}) async {
8383
// Private lock to synchronize this with other statements on the same connection,
8484
// to ensure that transactions aren't interleaved.
8585
return _connectionMutex.lock(() async {
@@ -94,7 +94,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
9494

9595
@override
9696
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
97-
{Duration? lockTimeout}) async {
97+
{Duration? lockTimeout, String? debugContext}) async {
9898
final stopWatch = lockTimeout == null ? null : (Stopwatch()..start());
9999
// Private lock to synchronize this with other statements on the same connection,
100100
// to ensure that transactions aren't interleaved.

lib/src/sqlite_database.dart

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,15 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
209209

210210
@override
211211
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
212-
{Duration? lockTimeout}) {
213-
return _pool.readLock(callback, lockTimeout: lockTimeout);
212+
{Duration? lockTimeout, String? debugContext}) {
213+
return _pool.readLock(callback,
214+
lockTimeout: lockTimeout, debugContext: debugContext);
214215
}
215216

216217
@override
217218
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
218-
{Duration? lockTimeout}) {
219-
return _pool.writeLock(callback, lockTimeout: lockTimeout);
219+
{Duration? lockTimeout, String? debugContext}) {
220+
return _pool.writeLock(callback,
221+
lockTimeout: lockTimeout, debugContext: debugContext);
220222
}
221223
}

lib/src/sqlite_queries.dart

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,30 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
1717
[List<Object?> parameters = const []]) async {
1818
return writeLock((ctx) async {
1919
return ctx.execute(sql, parameters);
20-
});
20+
}, debugContext: 'execute()');
2121
}
2222

2323
@override
2424
Future<sqlite.ResultSet> getAll(String sql,
2525
[List<Object?> parameters = const []]) {
2626
return readLock((ctx) async {
2727
return ctx.getAll(sql, parameters);
28-
});
28+
}, debugContext: 'getAll()');
2929
}
3030

3131
@override
3232
Future<sqlite.Row> get(String sql, [List<Object?> parameters = const []]) {
3333
return readLock((ctx) async {
3434
return ctx.get(sql, parameters);
35-
});
35+
}, debugContext: 'get()');
3636
}
3737

3838
@override
3939
Future<sqlite.Row?> getOptional(String sql,
4040
[List<Object?> parameters = const []]) {
4141
return readLock((ctx) async {
4242
return ctx.getOptional(sql, parameters);
43-
});
43+
}, debugContext: 'getOptional()');
4444
}
4545

4646
@override
@@ -103,7 +103,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
103103
{Duration? lockTimeout}) async {
104104
return readLock((ctx) async {
105105
return await internalReadTransaction(ctx, callback);
106-
}, lockTimeout: lockTimeout);
106+
}, lockTimeout: lockTimeout, debugContext: 'readTransaction()');
107107
}
108108

109109
@override
@@ -112,7 +112,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
112112
{Duration? lockTimeout}) async {
113113
return writeLock((ctx) async {
114114
return await internalWriteTransaction(ctx, callback);
115-
}, lockTimeout: lockTimeout);
115+
}, lockTimeout: lockTimeout, debugContext: 'writeTransaction()');
116116
}
117117

118118
/// See [SqliteReadContext.computeWithDatabase].

test/basic_test.dart

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
import 'dart:async';
12
import 'dart:math';
23

4+
import 'package:sqlite3/sqlite3.dart' as sqlite;
5+
import 'package:sqlite_async/mutex.dart';
36
import 'package:sqlite_async/sqlite_async.dart';
47
import 'package:test/test.dart';
5-
import 'package:sqlite3/sqlite3.dart' as sqlite;
8+
69
import 'util.dart';
710

811
void main() {
@@ -108,18 +111,22 @@ void main() {
108111
await expectLater(() async {
109112
await db.execute(
110113
'INSERT INTO test_data(description) VALUES(?)', ['test']);
111-
}, throwsA((e) => e is AssertionError));
114+
}, throwsA((e) => e is LockError && e.message.contains('tx.execute')));
112115
});
113116
});
114117

115-
test('does allow read-only db calls within transaction callback', () async {
118+
test('should not allow read-only db calls within transaction callback',
119+
() async {
116120
final db = await setupDatabase(path: path);
117121
await createTables(db);
118122

119123
await db.writeTransaction((tx) async {
120-
// This uses a different connection, so it's fine.
121-
// Perhaps we should warn on this, since it's likely unintentional?
122-
await db.getAll('SELECT * FROM test_data');
124+
// This uses a different connection, so it _could_ work.
125+
// But it's likely unintentional and could cause weird bugs, so we don't
126+
// allow it by default.
127+
await expectLater(() async {
128+
await db.getAll('SELECT * FROM test_data');
129+
}, throwsA((e) => e is LockError && e.message.contains('tx.getAll')));
123130
});
124131

125132
await db.readTransaction((tx) async {
@@ -129,24 +136,68 @@ void main() {
129136
// opens another connection, but doesn't use it.
130137
await expectLater(() async {
131138
await db.getAll('SELECT * FROM test_data');
132-
}, throwsA((e) => e is AssertionError));
139+
}, throwsA((e) => e is LockError && e.message.contains('tx.getAll')));
133140
});
134141
});
135142

136-
test('does allow read-only db calls within lock callback', () async {
143+
test('should not allow read-only db calls within lock callback', () async {
137144
final db = await setupDatabase(path: path);
138145
await createTables(db);
139146
// Locks - should behave the same as transactions above
140147

141148
await db.writeLock((tx) async {
142-
await db.getAll('SELECT * FROM test_data');
149+
await expectLater(() async {
150+
await db.getOptional('SELECT * FROM test_data');
151+
},
152+
throwsA(
153+
(e) => e is LockError && e.message.contains('tx.getOptional')));
143154
});
144155

145156
await db.readLock((tx) async {
146157
await expectLater(() async {
158+
await db.getOptional('SELECT * FROM test_data');
159+
},
160+
throwsA(
161+
(e) => e is LockError && e.message.contains('tx.getOptional')));
162+
});
163+
});
164+
165+
test(
166+
'should allow read-only db calls within transaction callback in separate zone',
167+
() async {
168+
final db = await setupDatabase(path: path);
169+
await createTables(db);
170+
171+
// Get a reference to the parent zone (outside the transaction).
172+
final zone = Zone.current;
173+
174+
// Each of these are fine, since it could use a separate connection.
175+
// Note: In highly concurrent cases, it could exhaust the connection pool and cause a deadlock.
176+
177+
await db.writeTransaction((tx) async {
178+
await zone.fork().run(() async {
179+
await db.getAll('SELECT * FROM test_data');
180+
});
181+
});
182+
183+
await db.readTransaction((tx) async {
184+
await zone.fork().run(() async {
147185
await db.getAll('SELECT * FROM test_data');
148-
}, throwsA((e) => e is AssertionError));
186+
});
187+
});
188+
189+
await db.readTransaction((tx) async {
190+
await zone.fork().run(() async {
191+
await db.execute('SELECT * FROM test_data');
192+
});
149193
});
194+
195+
// This would deadlock, since it shares a global write lock.
196+
// await db.writeTransaction((tx) async {
197+
// await zone.fork().run(() async {
198+
// await db.execute('SELECT * FROM test_data');
199+
// });
200+
// });
150201
});
151202

152203
test('should allow PRAMGAs', () async {

0 commit comments

Comments
 (0)