From 26ccaabc9d4205eaca79dd972867ef92f8963a5e Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 14 Feb 2025 23:26:23 +0100 Subject: [PATCH] Fix: Pool connection timeout implemented with Timer. --- CHANGELOG.md | 4 ++ lib/src/pool/pool_impl.dart | 17 +++---- lib/src/utils/package_pool_ext.dart | 59 ++++++++++++++++++++++++ pubspec.yaml | 2 +- test/utils/package_pool_ext_test.dart | 65 +++++++++++++++++++++++++++ 5 files changed, 136 insertions(+), 11 deletions(-) create mode 100644 lib/src/utils/package_pool_ext.dart create mode 100644 test/utils/package_pool_ext_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index fb36338..6cb9853 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 3.5.2 + +- Fix: `Pool` connection timeout does not rely on `package:pool`'s timeout, instead uses a separate `Timer`. + ## 3.5.1 - Fix: `ARRAY_AGG` (and other arrays) may return `[null]`. diff --git a/lib/src/pool/pool_impl.dart b/lib/src/pool/pool_impl.dart index d16a989..c986e34 100644 --- a/lib/src/pool/pool_impl.dart +++ b/lib/src/pool/pool_impl.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'package:collection/collection.dart'; import 'package:pool/pool.dart' as pool; +import 'package:postgres/src/utils/package_pool_ext.dart'; import '../../postgres.dart'; import '../v3/connection.dart'; @@ -22,14 +23,8 @@ class PoolImplementation implements Pool { final _connections = <_PoolConnection>[]; late final _maxConnectionCount = _settings.maxConnectionCount; - late final _semaphore = pool.Pool( - _maxConnectionCount, - timeout: _settings.connectTimeout, - ); - late final _connectLock = pool.Pool( - 1, - timeout: _settings.connectTimeout, - ); + late final _semaphore = pool.Pool(_maxConnectionCount); + late final _connectLock = pool.Pool(1); bool _closing = false; PoolImplementation(this._selector, PoolSettings? settings) @@ -135,7 +130,8 @@ class PoolImplementation implements Pool { ConnectionSettings? settings, L? locality, }) async { - final resource = await _semaphore.request(); + final resource = + await _semaphore.requestWithTimeout(_settings.connectTimeout); _PoolConnection? connection; bool reuse = true; final sw = Stopwatch(); @@ -189,7 +185,8 @@ class PoolImplementation implements Pool { return oldc; } - return await _connectLock.withResource(() async { + return await _connectLock + .withRequestTimeout(timeout: _settings.connectTimeout, (_) async { while (_connections.length >= _maxConnectionCount) { final candidates = _connections.where((c) => c._isInUse == false).toList(); diff --git a/lib/src/utils/package_pool_ext.dart b/lib/src/utils/package_pool_ext.dart new file mode 100644 index 0000000..db47dfb --- /dev/null +++ b/lib/src/utils/package_pool_ext.dart @@ -0,0 +1,59 @@ +import 'dart:async'; + +import 'package:pool/pool.dart'; +import 'package:stack_trace/stack_trace.dart'; + +extension PackagePoolExt on Pool { + Future requestWithTimeout(Duration timeout) async { + final stack = StackTrace.current; + final completer = Completer(); + + Timer? timer; + if (timeout > Duration.zero) { + timer = Timer(timeout, () { + if (!completer.isCompleted) { + completer.completeError( + TimeoutException('Failed to acquire pool lock.'), stack); + } + }); + } + + final resourceFuture = request(); + + scheduleMicrotask(() { + resourceFuture.then( + (resource) async { + timer?.cancel(); + if (completer.isCompleted) { + resource.release(); + return; + } + completer.complete(resource); + }, + onError: (e, st) { + timer?.cancel(); + if (!completer.isCompleted) { + completer.completeError( + e, Chain([Trace.from(st), Trace.from(stack)])); + } + }, + ); + }); + + return completer.future; + } + + Future withRequestTimeout( + FutureOr Function(Duration remainingTimeout) callback, { + required Duration timeout, + }) async { + final sw = Stopwatch()..start(); + final resource = await requestWithTimeout(timeout); + final remainingTimeout = timeout - sw.elapsed; + try { + return await callback(remainingTimeout); + } finally { + resource.release(); + } + } +} diff --git a/pubspec.yaml b/pubspec.yaml index f480196..c524d59 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: postgres description: PostgreSQL database driver. Supports statement reuse and binary protocol and connection pooling. -version: 3.5.1 +version: 3.5.2 homepage: https://github.com/isoos/postgresql-dart topics: - sql diff --git a/test/utils/package_pool_ext_test.dart b/test/utils/package_pool_ext_test.dart new file mode 100644 index 0000000..f2715ed --- /dev/null +++ b/test/utils/package_pool_ext_test.dart @@ -0,0 +1,65 @@ +import 'dart:async'; + +import 'package:pool/pool.dart'; +import 'package:postgres/src/utils/package_pool_ext.dart'; +import 'package:test/test.dart'; + +void main() { + group('package:pool extensions', () { + test('acquire with timeout succeeds - no parallel use', () async { + final pool = Pool(1); + final x = await pool.withRequestTimeout( + timeout: Duration(seconds: 1), + (_) async { + return 1; + }, + ); + expect(x, 1); + final r = await pool.request(); + r.release(); + await pool.close(); + }); + + test('acquire with timeout succeeds - quick parallel use', () async { + final pool = Pool(1); + final other = await pool.request(); + Timer(Duration(seconds: 1), other.release); + var remainingMillis = 0; + final x = await pool.withRequestTimeout( + timeout: Duration(seconds: 2), + (remaining) async { + remainingMillis = remaining.inMilliseconds; + return 1; + }, + ); + expect(x, 1); + final r = await pool.request(); + r.release(); + await pool.close(); + expect(remainingMillis, greaterThan(500)); + expect(remainingMillis, lessThan(1500)); + }); + + test('acquire with timeout fails - long parallel use', () async { + final pool = Pool(1); + final other = await pool.request(); + Timer(Duration(seconds: 2), other.release); + await expectLater( + pool.withRequestTimeout( + timeout: Duration(seconds: 1), + (_) async { + return 1; + }, + ), + throwsA(isA()), + ); + final sw = Stopwatch()..start(); + final r = await pool.request(); + sw.stop(); + r.release(); + await pool.close(); + expect(sw.elapsedMilliseconds, greaterThan(500)); + expect(sw.elapsedMilliseconds, lessThan(1500)); + }); + }); +}