Skip to content

Commit 7563713

Browse files
authored
Fix: Pool connection timeout implemented with Timer. (#415)
1 parent c82623b commit 7563713

File tree

5 files changed

+136
-11
lines changed

5 files changed

+136
-11
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 3.5.2
4+
5+
- Fix: `Pool` connection timeout does not rely on `package:pool`'s timeout, instead uses a separate `Timer`.
6+
37
## 3.5.1
48

59
- Fix: `ARRAY_AGG` (and other arrays) may return `[null]`.

lib/src/pool/pool_impl.dart

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'dart:async';
22

33
import 'package:collection/collection.dart';
44
import 'package:pool/pool.dart' as pool;
5+
import 'package:postgres/src/utils/package_pool_ext.dart';
56

67
import '../../postgres.dart';
78
import '../v3/connection.dart';
@@ -22,14 +23,8 @@ class PoolImplementation<L> implements Pool<L> {
2223

2324
final _connections = <_PoolConnection>[];
2425
late final _maxConnectionCount = _settings.maxConnectionCount;
25-
late final _semaphore = pool.Pool(
26-
_maxConnectionCount,
27-
timeout: _settings.connectTimeout,
28-
);
29-
late final _connectLock = pool.Pool(
30-
1,
31-
timeout: _settings.connectTimeout,
32-
);
26+
late final _semaphore = pool.Pool(_maxConnectionCount);
27+
late final _connectLock = pool.Pool(1);
3328
bool _closing = false;
3429

3530
PoolImplementation(this._selector, PoolSettings? settings)
@@ -135,7 +130,8 @@ class PoolImplementation<L> implements Pool<L> {
135130
ConnectionSettings? settings,
136131
L? locality,
137132
}) async {
138-
final resource = await _semaphore.request();
133+
final resource =
134+
await _semaphore.requestWithTimeout(_settings.connectTimeout);
139135
_PoolConnection? connection;
140136
bool reuse = true;
141137
final sw = Stopwatch();
@@ -189,7 +185,8 @@ class PoolImplementation<L> implements Pool<L> {
189185
return oldc;
190186
}
191187

192-
return await _connectLock.withResource(() async {
188+
return await _connectLock
189+
.withRequestTimeout(timeout: _settings.connectTimeout, (_) async {
193190
while (_connections.length >= _maxConnectionCount) {
194191
final candidates =
195192
_connections.where((c) => c._isInUse == false).toList();

lib/src/utils/package_pool_ext.dart

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import 'dart:async';
2+
3+
import 'package:pool/pool.dart';
4+
import 'package:stack_trace/stack_trace.dart';
5+
6+
extension PackagePoolExt on Pool {
7+
Future<PoolResource> requestWithTimeout(Duration timeout) async {
8+
final stack = StackTrace.current;
9+
final completer = Completer<PoolResource>();
10+
11+
Timer? timer;
12+
if (timeout > Duration.zero) {
13+
timer = Timer(timeout, () {
14+
if (!completer.isCompleted) {
15+
completer.completeError(
16+
TimeoutException('Failed to acquire pool lock.'), stack);
17+
}
18+
});
19+
}
20+
21+
final resourceFuture = request();
22+
23+
scheduleMicrotask(() {
24+
resourceFuture.then(
25+
(resource) async {
26+
timer?.cancel();
27+
if (completer.isCompleted) {
28+
resource.release();
29+
return;
30+
}
31+
completer.complete(resource);
32+
},
33+
onError: (e, st) {
34+
timer?.cancel();
35+
if (!completer.isCompleted) {
36+
completer.completeError(
37+
e, Chain([Trace.from(st), Trace.from(stack)]));
38+
}
39+
},
40+
);
41+
});
42+
43+
return completer.future;
44+
}
45+
46+
Future<T> withRequestTimeout<T>(
47+
FutureOr<T> Function(Duration remainingTimeout) callback, {
48+
required Duration timeout,
49+
}) async {
50+
final sw = Stopwatch()..start();
51+
final resource = await requestWithTimeout(timeout);
52+
final remainingTimeout = timeout - sw.elapsed;
53+
try {
54+
return await callback(remainingTimeout);
55+
} finally {
56+
resource.release();
57+
}
58+
}
59+
}

pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: postgres
22
description: PostgreSQL database driver. Supports statement reuse and binary protocol and connection pooling.
3-
version: 3.5.1
3+
version: 3.5.2
44
homepage: https://github.com/isoos/postgresql-dart
55
topics:
66
- sql

test/utils/package_pool_ext_test.dart

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import 'dart:async';
2+
3+
import 'package:pool/pool.dart';
4+
import 'package:postgres/src/utils/package_pool_ext.dart';
5+
import 'package:test/test.dart';
6+
7+
void main() {
8+
group('package:pool extensions', () {
9+
test('acquire with timeout succeeds - no parallel use', () async {
10+
final pool = Pool(1);
11+
final x = await pool.withRequestTimeout(
12+
timeout: Duration(seconds: 1),
13+
(_) async {
14+
return 1;
15+
},
16+
);
17+
expect(x, 1);
18+
final r = await pool.request();
19+
r.release();
20+
await pool.close();
21+
});
22+
23+
test('acquire with timeout succeeds - quick parallel use', () async {
24+
final pool = Pool(1);
25+
final other = await pool.request();
26+
Timer(Duration(seconds: 1), other.release);
27+
var remainingMillis = 0;
28+
final x = await pool.withRequestTimeout(
29+
timeout: Duration(seconds: 2),
30+
(remaining) async {
31+
remainingMillis = remaining.inMilliseconds;
32+
return 1;
33+
},
34+
);
35+
expect(x, 1);
36+
final r = await pool.request();
37+
r.release();
38+
await pool.close();
39+
expect(remainingMillis, greaterThan(500));
40+
expect(remainingMillis, lessThan(1500));
41+
});
42+
43+
test('acquire with timeout fails - long parallel use', () async {
44+
final pool = Pool(1);
45+
final other = await pool.request();
46+
Timer(Duration(seconds: 2), other.release);
47+
await expectLater(
48+
pool.withRequestTimeout(
49+
timeout: Duration(seconds: 1),
50+
(_) async {
51+
return 1;
52+
},
53+
),
54+
throwsA(isA<TimeoutException>()),
55+
);
56+
final sw = Stopwatch()..start();
57+
final r = await pool.request();
58+
sw.stop();
59+
r.release();
60+
await pool.close();
61+
expect(sw.elapsedMilliseconds, greaterThan(500));
62+
expect(sw.elapsedMilliseconds, lessThan(1500));
63+
});
64+
});
65+
}

0 commit comments

Comments
 (0)