Skip to content

Query count + proper pool check. #217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions lib/src/pool/pool_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ class _PoolConnection implements Connection {
Duration _elapsedInUse = Duration.zero;
DateTime _lastReturned = DateTime.now();
bool _isInUse = false;
int _queryCount = 0;

_PoolConnection(
this._pool, this._endpoint, this._connectionSettings, this._connection);
Expand All @@ -235,7 +234,7 @@ class _PoolConnection implements Connection {
if (_elapsedInUse >= _pool._settings.maxSessionUse) {
return true;
}
if (_queryCount >= _pool._settings.maxQueryCount) {
if (_connection.queryCount >= _pool._settings.maxQueryCount) {
return true;
}
return false;
Expand Down Expand Up @@ -268,7 +267,6 @@ class _PoolConnection implements Connection {
QueryMode? queryMode,
Duration? timeout,
}) {
_queryCount++;
return _connection.execute(
query,
parameters: parameters,
Expand All @@ -280,7 +278,6 @@ class _PoolConnection implements Connection {

@override
Future<Statement> prepare(Object query) {
// TODO: increment query count on statement runs
return _connection.prepare(query);
}

Expand All @@ -289,7 +286,6 @@ class _PoolConnection implements Connection {
Future<R> Function(Session session) fn, {
SessionSettings? settings,
}) {
// TODO: increment query count on session callbacks
return _connection.run(fn, settings: settings);
}

Expand All @@ -298,7 +294,6 @@ class _PoolConnection implements Connection {
Future<R> Function(Session session) fn, {
TransactionSettings? settings,
}) {
// TODO: increment query count on session callbacks
return _connection.runTx(
fn,
settings: settings,
Expand Down
15 changes: 11 additions & 4 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import 'dart:typed_data';

import 'package:async/async.dart' as async;
import 'package:charcode/ascii.dart';
import 'package:meta/meta.dart';
import 'package:pool/pool.dart' as pool;
import 'package:postgres/src/v3/resolved_settings.dart';
import 'package:stream_channel/stream_channel.dart';

import '../../postgres.dart';
Expand All @@ -15,10 +15,11 @@ import '../binary_codec.dart';
import '../text_codec.dart';
import 'protocol.dart';
import 'query_description.dart';
import 'resolved_settings.dart';

const _debugLog = false;

String identifier(String source) {
String _identifier(String source) {
// To avoid complex ambiguity rules, we always wrap identifier in double
// quotes. That means the only character we need to escape are double quotes
// in the source.
Expand Down Expand Up @@ -110,6 +111,7 @@ abstract class _PgSessionBase implements Session {
}

if (isSimple || (ignoreRows && variables.isEmpty)) {
_connection._queryCount++;
// Great, we can just run a simple query.
final controller = StreamController<ResultRow>();
final items = <ResultRow>[];
Expand Down Expand Up @@ -294,9 +296,13 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {

var _statementCounter = 0;
var _portalCounter = 0;
var _queryCount = 0;

late final _channels = _Channels(this);

@internal
int get queryCount => _queryCount;

@override
Channels get channels => _channels;

Expand Down Expand Up @@ -472,6 +478,7 @@ class _PreparedStatement extends Statement {
Object? parameters, {
Duration? timeout,
}) async {
_session._connection._queryCount++;
timeout ??= _session._settings.queryTimeout;
final items = <ResultRow>[];
final subscription = bind(parameters).listen(items.add);
Expand Down Expand Up @@ -763,7 +770,7 @@ class _Channels implements Channels {

void _subscribe(String channel, MultiStreamController firstListener) {
Future(() async {
await _connection.execute(Sql('LISTEN ${identifier(channel)}'),
await _connection.execute(Sql('LISTEN ${_identifier(channel)}'),
ignoreRows: true);
}).onError<Object>((error, stackTrace) {
_activeListeners[channel]?.remove(firstListener);
Expand All @@ -782,7 +789,7 @@ class _Channels implements Channels {
_activeListeners.remove(channel);

// Send unlisten command
await _connection.execute(Sql('UNLISTEN ${identifier(channel)}'),
await _connection.execute(Sql('UNLISTEN ${_identifier(channel)}'),
ignoreRows: true);
}
}
Expand Down