Skip to content

Commit efa2aa7

Browse files
authored
Add isOpen and closed (#222)
1 parent 7d98e40 commit efa2aa7

File tree

6 files changed

+100
-3
lines changed

6 files changed

+100
-3
lines changed

lib/postgres.dart

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,20 @@ class Sql {
8787
}
8888

8989
abstract class Session {
90+
/// Whether this connection is currently open.
91+
///
92+
/// A [Connection] is open until it's closed (either by an explicit
93+
/// [Connection.close] call or due to an unrecoverable error from the server).
94+
/// Other sessions, such as transactions or connections borrowed from a pool,
95+
/// may have a shorter lifetime.
96+
///
97+
/// The [closed] future can be awaited to get notified when this session is
98+
/// closing.
99+
bool get isOpen;
100+
101+
/// A future that completes when [isOpen] turns false.
102+
Future<void> get closed;
103+
90104
/// Prepares a reusable statement from a [query].
91105
///
92106
/// [query] must either be a [String] or a [Sql] object with types for

lib/src/pool/pool_impl.dart

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ class PoolImplementation<L> implements Pool<L> {
3434
PoolImplementation(this._selector, PoolSettings? settings)
3535
: _settings = ResolvedPoolSettings(settings);
3636

37+
@override
38+
bool get isOpen => !_semaphore.isClosed;
39+
40+
@override
41+
Future<void> get closed => _semaphore.done;
42+
3743
@override
3844
Future<void> close() async {
3945
await _semaphore.close();
@@ -251,6 +257,12 @@ class _PoolConnection implements Connection {
251257
await _connection.close();
252258
}
253259

260+
@override
261+
bool get isOpen => _connection.isOpen;
262+
263+
@override
264+
Future<void> get closed => _connection.closed;
265+
254266
@override
255267
Channels get channels {
256268
throw UnsupportedError(

lib/src/v3/connection.dart

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,20 @@ abstract class _PgSessionBase implements Session {
3838
/// connection in the meantime.
3939
final _operationLock = pool.Pool(1);
4040

41-
bool _sessionClosed = false;
41+
final Completer<void> _sessionClosedCompleter = Completer();
42+
43+
bool get _sessionClosed => _sessionClosedCompleter.isCompleted;
4244

4345
PgConnectionImplementation get _connection;
4446
ResolvedSessionSettings get _settings;
4547
Encoding get encoding => _connection._settings.encoding;
4648

49+
void _closeSession() {
50+
if (!_sessionClosed) {
51+
_sessionClosedCompleter.complete();
52+
}
53+
}
54+
4755
void _checkActive() {
4856
if (_sessionClosed) {
4957
throw PgException(
@@ -88,6 +96,12 @@ abstract class _PgSessionBase implements Session {
8896
});
8997
}
9098

99+
@override
100+
bool get isOpen => !_sessionClosed && !_connection._isClosing;
101+
102+
@override
103+
Future<void> get closed => _sessionClosedCompleter.future;
104+
91105
@override
92106
Future<Result> execute(
93107
Object query, {
@@ -360,6 +374,9 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
360374
}
361375
}
362376

377+
@override
378+
Future<void> get closed => _channel.sink.done;
379+
363380
@override
364381
Future<R> run<R>(
365382
Future<R> Function(Session session) fn, {
@@ -370,7 +387,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
370387
// Unlike runTx, this doesn't need any locks. An active transaction changes
371388
// the state of the connection, this method does not. If methods requiring
372389
// locks are called by [fn], these methods will aquire locks as needed.
373-
return Future<R>(() => fn(session));
390+
return Future<R>(() => fn(session)).whenComplete(session._closeSession);
374391
}
375392

376393
@override
@@ -866,7 +883,7 @@ class _TransactionSession extends _PgSessionBase {
866883
controller.stream.listen(items.add),
867884
true,
868885
cleanup: () {
869-
_sessionClosed = true;
886+
_closeSession();
870887
_connection._activeTransaction = null;
871888
},
872889
);

test/pool_test.dart

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,22 @@ void main() {
7474
expect(await stmt.run([]), hasLength(3));
7575
await stmt.dispose();
7676
});
77+
78+
test('disables close()', () async {
79+
late Connection leakedConnection;
80+
81+
await pool.withConnection((connection) async {
82+
expect(connection.isOpen, isTrue);
83+
await connection.close();
84+
expect(connection.isOpen, isTrue);
85+
86+
leakedConnection = connection;
87+
});
88+
89+
await pool.close();
90+
expect(pool.isOpen, isFalse);
91+
expect(leakedConnection.isOpen, isFalse);
92+
});
7793
});
7894

7995
withPostgresServer('limit pool connections', (server) {

test/transaction_test.dart

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,23 @@ void main() {
308308
final total = await conn.execute('SELECT id FROM t');
309309
expect(total, []);
310310
});
311+
312+
test('isOpen and closed', () async {
313+
late Session leakedTransaction;
314+
var afterTransaction = false;
315+
316+
await conn.runTx(expectAsync1((session) async {
317+
leakedTransaction = session;
318+
expect(session.isOpen, isTrue);
319+
320+
// .closed should complete before the runTx future
321+
expectLater(
322+
session.closed.then((_) => afterTransaction), completion(isFalse));
323+
}));
324+
afterTransaction = true;
325+
326+
expect(leakedTransaction.isOpen, isFalse);
327+
});
311328
});
312329

313330
// A transaction can fail for three reasons: query error, exception in code, or a rollback.

test/v3_test.dart

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,13 @@ void main() {
224224

225225
test('run', () async {
226226
const returnValue = 'returned from run()';
227+
late Session leakedSession;
227228

228229
await expectLater(
229230
connection.run(expectAsync1((session) async {
231+
expect(session.isOpen, isTrue);
232+
leakedSession = session;
233+
230234
await session
231235
.execute('CREATE TEMPORARY TABLE foo (id INTEGER PRIMARY KEY);');
232236
await session.execute('INSERT INTO foo VALUES (3);');
@@ -236,6 +240,8 @@ void main() {
236240
completion(returnValue),
237241
);
238242

243+
expect(leakedSession.isOpen, isFalse);
244+
239245
final rows = await connection.execute('SELECT * FROM foo');
240246
expect(rows, [
241247
[3]
@@ -448,6 +454,21 @@ void main() {
448454
expect(incoming, contains(isA<DataRowMessage>()));
449455
expect(outgoing, contains(isA<QueryMessage>()));
450456
});
457+
458+
test('can close connection', () async {
459+
expect(connection.isOpen, isTrue);
460+
await connection.execute('SELECT 1');
461+
462+
var didFinishClose = false;
463+
expect(
464+
connection.closed.then((_) => didFinishClose), completion(isFalse));
465+
await connection.close();
466+
didFinishClose = true;
467+
468+
expect(connection.isOpen, isFalse);
469+
expect(
470+
() => connection.execute('SELECT 1'), throwsA(_isPostgresException));
471+
});
451472
});
452473

453474
withPostgresServer('can close connection after error conditions', (server) {

0 commit comments

Comments
 (0)