Skip to content

feat: allow explicit use of Simple Query Protocol (v3) #118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions lib/postgres_v3_experimental.dart
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,17 @@ abstract class PgSession {
/// optimization can be applied also depends on the parameters chosen, so
/// there is no guarantee that the [PgResult] from a [ignoreRows] excution has
/// no rows.
///
/// [queryMode] is optional to override the default query execution mode that
/// is defined in [PgSessionSettings]. Unless necessary, always prefer using
/// [QueryMode.extended] which is the default value. For more information,
/// see [PgSessionSettings.queryMode]
Future<PgResult> execute(
Object /* String | PgSql */ query, {
Object? /* List<Object?|PgTypedParameter> | Map<String, Object?|PgTypedParameter> */
parameters,
bool ignoreRows = false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we'll receive the rows anyway and just chose to ignore them, perhaps the ignoreRows parameter doesn't make much sense anymore when we have a useSimpleQueryProtocol parameter as well.

So maybe the ignoreRows parameter should just be removed in favor of useSimpleQueryProtocol?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what is the use of ignoreRows so I left it as is.. I will take a closer look later today and update the PR accordingly.

I will also expand on the docs for useSimpleQueryProtocol and mention some of its use cases.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, I would like to keep ignoreRows, because if the caller needs only a non-error execution, we can spare a bit of processing. I don't mind having useSimpleQueryProtocol but only if that's the only way to do it (see my main comment about having it at connection level or with a callback).

QueryMode? queryMode,
});

/// Closes this session, cleaning up resources and forbiding further calls to
Expand Down Expand Up @@ -376,13 +382,23 @@ final class PgSessionSettings {
/// [Streaming Replication Protocol]: https://www.postgresql.org/docs/current/protocol-replication.html
final ReplicationMode replicationMode;

/// The Query Execution Mode
///
/// The default value is [QueryMode.extended] which uses the Extended Query
/// Protocol. In certain cases, the Extended protocol cannot be used
/// (e.g. in replication mode or with proxies such as PGBouncer), hence the
/// the Simple one would be the only viable option. Unless necessary, always
/// prefer using [QueryMode.extended].
final QueryMode queryMode;

PgSessionSettings({
this.connectTimeout,
this.timeZone,
this.encoding,
this.onBadSslCertificate,
this.transformer,
this.replicationMode = ReplicationMode.none,
this.queryMode = QueryMode.extended,
});
}

Expand All @@ -393,3 +409,11 @@ final class PgPoolSettings {
this.maxConnectionCount,
});
}

/// Options for the Query Execution Mode
enum QueryMode {
/// Extended Query Protocol
extended,
/// Simple Query Protocol
simple,
}
7 changes: 6 additions & 1 deletion lib/src/text_codec.dart
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,12 @@ class PostgresTextDecoder<T extends Object> {
case PgDataType.double:
return num.parse(asText) as T;
case PgDataType.boolean:
return (asText == 'true') as T;
// In text data format when using simple query protocol, "true" & "false"
// are represented as `t` and `f`, respectively.
// we will check for both just in case
// TODO: should we check for other representations (e.g. `1`, `on`, `y`,
// and `yes`)?
return (asText == 't' || asText == 'true') as T;

// We could list out all cases, but it's about 20 lines of code.
// ignore: no_default_cases
Expand Down
44 changes: 29 additions & 15 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class _ResolvedSettings {
final Encoding encoding;

final ReplicationMode replicationMode;
final QueryMode queryMode;

final StreamChannelTransformer<BaseMessage, BaseMessage>? transformer;

Expand All @@ -56,7 +57,8 @@ class _ResolvedSettings {
timeZone = settings?.timeZone ?? 'UTC',
encoding = settings?.encoding ?? utf8,
transformer = settings?.transformer,
replicationMode = settings?.replicationMode ?? ReplicationMode.none;
replicationMode = settings?.replicationMode ?? ReplicationMode.none,
queryMode = settings?.queryMode ?? QueryMode.extended;

bool onBadSslCertificate(X509Certificate certificate) {
return settings?.onBadSslCertificate?.call(certificate) ?? false;
Expand Down Expand Up @@ -120,36 +122,49 @@ abstract class _PgSessionBase implements PgSession {
Object query, {
Object? parameters,
bool ignoreRows = false,
QueryMode? queryMode,
}) async {
final description = InternalQueryDescription.wrap(query);
final variables = description.bindParameters(parameters);

if (!ignoreRows || variables.isNotEmpty) {
// The simple query protocol does not support variables and returns rows
// as text. So when we need rows or parameters, we need an explicit prepare.
final prepared = await prepare(description);
try {
return await prepared.run(variables);
} finally {
await prepared.dispose();
}
late final bool isSimple;
if (queryMode != null) {
isSimple = queryMode == QueryMode.simple;
} else {
isSimple = _connection._settings.queryMode == QueryMode.simple;
}

if (isSimple && variables.isNotEmpty) {
throw PostgreSQLException('Parameterized queries are not supported when '
'using the Simple Query Protocol');
}

if (isSimple || (ignoreRows && variables.isEmpty)) {
// Great, we can just run a simple query.
final controller = StreamController<PgResultRow>();
final items = <PgResultRow>[];

final querySubscription =
_PgResultStreamSubscription.simpleQueryAndIgnoreRows(
final querySubscription = _PgResultStreamSubscription.simpleQueryProtocol(
description.transformedSql,
this,
controller,
controller.stream.listen(items.add),
ignoreRows,
);
await querySubscription.asFuture();
await querySubscription.cancel();

return PgResult(items, await querySubscription.affectedRows,
await querySubscription.schema);
} else {
// The simple query protocol does not support variables. So when we have
// parameters, we need an explicit prepare.
final prepared = await prepare(description);
try {
return await prepared.run(variables);
} finally {
await prepared.dispose();
}
}
}

Expand Down Expand Up @@ -496,9 +511,8 @@ class _PgResultStreamSubscription
});
}

_PgResultStreamSubscription.simpleQueryAndIgnoreRows(
String sql, this.session, this._controller, this._source)
: ignoreRows = true {
_PgResultStreamSubscription.simpleQueryProtocol(String sql, this.session,
this._controller, this._source, this.ignoreRows) {
session._withResource(() async {
connection._pending = this;

Expand Down
18 changes: 14 additions & 4 deletions lib/src/v3/pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@ class PoolImplementation implements PgPool {
}

@override
Future<PgResult> execute(Object query,
{Object? parameters, bool ignoreRows = false}) {
Future<PgResult> execute(
Object query, {
Object? parameters,
bool ignoreRows = false,
QueryMode? queryMode,
}) {
return withConnection((connection) => connection.execute(
query,
parameters: parameters,
ignoreRows: ignoreRows,
queryMode: queryMode,
));
}

Expand Down Expand Up @@ -161,12 +166,17 @@ class _PoolConnection implements PgConnection {
}

@override
Future<PgResult> execute(Object query,
{Object? parameters, bool ignoreRows = false}) {
Future<PgResult> execute(
Object query, {
Object? parameters,
bool ignoreRows = false,
QueryMode? queryMode,
}) {
return _connection.execute(
query,
parameters: parameters,
ignoreRows: ignoreRows,
queryMode: queryMode,
);
}

Expand Down
23 changes: 23 additions & 0 deletions test/v3_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,29 @@ void main() {
expect(await connection.execute('SELECT id FROM t'), isEmpty);
});
});

group('Simple Query Protocol', () {
test('single simple query', () async {
final res = await connection.execute(
"SELECT 'dart', 42, true, false, NULL",
queryMode: QueryMode.simple,
);
expect(res, [
['dart', 42, true, false, null]
]);
});

test('parameterized query throws', () async {
await expectLater(
() => connection.execute(
r'SELECT 1',
parameters: [PgTypedParameter(PgDataType.integer, 1)],
queryMode: QueryMode.simple
),
_throwsPostgresException,
);
});
});
});

test('can inject transformer into connection', () async {
Expand Down