Skip to content

Fix: Pool connection timeout implemented with Timer. #415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 3.5.2

- Fix: `Pool` connection timeout does not rely on `package:pool`'s timeout, instead uses a separate `Timer`.

## 3.5.1

- Fix: `ARRAY_AGG` (and other arrays) may return `[null]`.
Expand Down
17 changes: 7 additions & 10 deletions lib/src/pool/pool_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:async';

import 'package:collection/collection.dart';
import 'package:pool/pool.dart' as pool;
import 'package:postgres/src/utils/package_pool_ext.dart';

import '../../postgres.dart';
import '../v3/connection.dart';
Expand All @@ -22,14 +23,8 @@ class PoolImplementation<L> implements Pool<L> {

final _connections = <_PoolConnection>[];
late final _maxConnectionCount = _settings.maxConnectionCount;
late final _semaphore = pool.Pool(
_maxConnectionCount,
timeout: _settings.connectTimeout,
);
late final _connectLock = pool.Pool(
1,
timeout: _settings.connectTimeout,
);
late final _semaphore = pool.Pool(_maxConnectionCount);
late final _connectLock = pool.Pool(1);
bool _closing = false;

PoolImplementation(this._selector, PoolSettings? settings)
Expand Down Expand Up @@ -135,7 +130,8 @@ class PoolImplementation<L> implements Pool<L> {
ConnectionSettings? settings,
L? locality,
}) async {
final resource = await _semaphore.request();
final resource =
await _semaphore.requestWithTimeout(_settings.connectTimeout);
_PoolConnection? connection;
bool reuse = true;
final sw = Stopwatch();
Expand Down Expand Up @@ -189,7 +185,8 @@ class PoolImplementation<L> implements Pool<L> {
return oldc;
}

return await _connectLock.withResource(() async {
return await _connectLock
.withRequestTimeout(timeout: _settings.connectTimeout, (_) async {
while (_connections.length >= _maxConnectionCount) {
final candidates =
_connections.where((c) => c._isInUse == false).toList();
Expand Down
59 changes: 59 additions & 0 deletions lib/src/utils/package_pool_ext.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import 'dart:async';

import 'package:pool/pool.dart';
import 'package:stack_trace/stack_trace.dart';

extension PackagePoolExt on Pool {
Future<PoolResource> requestWithTimeout(Duration timeout) async {
final stack = StackTrace.current;
final completer = Completer<PoolResource>();

Timer? timer;
if (timeout > Duration.zero) {
timer = Timer(timeout, () {
if (!completer.isCompleted) {
completer.completeError(
TimeoutException('Failed to acquire pool lock.'), stack);
}
});
}

final resourceFuture = request();

scheduleMicrotask(() {
resourceFuture.then(
(resource) async {
timer?.cancel();
if (completer.isCompleted) {
resource.release();
return;
}
completer.complete(resource);
},
onError: (e, st) {
timer?.cancel();
if (!completer.isCompleted) {
completer.completeError(
e, Chain([Trace.from(st), Trace.from(stack)]));
}
},
);
});

return completer.future;
}

Future<T> withRequestTimeout<T>(
FutureOr<T> Function(Duration remainingTimeout) callback, {
required Duration timeout,
}) async {
final sw = Stopwatch()..start();
final resource = await requestWithTimeout(timeout);
final remainingTimeout = timeout - sw.elapsed;
try {
return await callback(remainingTimeout);
} finally {
resource.release();
}
}
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: postgres
description: PostgreSQL database driver. Supports statement reuse and binary protocol and connection pooling.
version: 3.5.1
version: 3.5.2
homepage: https://github.com/isoos/postgresql-dart
topics:
- sql
Expand Down
65 changes: 65 additions & 0 deletions test/utils/package_pool_ext_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import 'dart:async';

import 'package:pool/pool.dart';
import 'package:postgres/src/utils/package_pool_ext.dart';
import 'package:test/test.dart';

void main() {
group('package:pool extensions', () {
test('acquire with timeout succeeds - no parallel use', () async {
final pool = Pool(1);
final x = await pool.withRequestTimeout(
timeout: Duration(seconds: 1),
(_) async {
return 1;
},
);
expect(x, 1);
final r = await pool.request();
r.release();
await pool.close();
});

test('acquire with timeout succeeds - quick parallel use', () async {
final pool = Pool(1);
final other = await pool.request();
Timer(Duration(seconds: 1), other.release);
var remainingMillis = 0;
final x = await pool.withRequestTimeout(
timeout: Duration(seconds: 2),
(remaining) async {
remainingMillis = remaining.inMilliseconds;
return 1;
},
);
expect(x, 1);
final r = await pool.request();
r.release();
await pool.close();
expect(remainingMillis, greaterThan(500));
expect(remainingMillis, lessThan(1500));
});

test('acquire with timeout fails - long parallel use', () async {
final pool = Pool(1);
final other = await pool.request();
Timer(Duration(seconds: 2), other.release);
await expectLater(
pool.withRequestTimeout(
timeout: Duration(seconds: 1),
(_) async {
return 1;
},
),
throwsA(isA<TimeoutException>()),
);
final sw = Stopwatch()..start();
final r = await pool.request();
sw.stop();
r.release();
await pool.close();
expect(sw.elapsedMilliseconds, greaterThan(500));
expect(sw.elapsedMilliseconds, lessThan(1500));
});
});
}