Skip to content

Commit 5ab8093

Browse files
authored
Improve row decoding in v3 API (#102)
* Improve row decoding in v3 API * Explain motivation * Fix tests * Fix interpolation test * Rename to ignoreRows, improve docs
1 parent 001d9bf commit 5ab8093

File tree

9 files changed

+152
-64
lines changed

9 files changed

+152
-64
lines changed

lib/postgres_v3_experimental.dart

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,25 @@ abstract class PgSession {
4040
Duration? timeout,
4141
});
4242

43+
/// Executes the [query] with the given [parameters].
44+
///
45+
/// [query] must either be a [String] or a [PgSql] query with types for
46+
/// parameters. When a [PgSql] query object is used, [parameters] can be a
47+
/// list of direct values. Otherwise, it must be a list of
48+
/// [PgTypedParameter]s. With [PgSql.map], values can also be provided as a
49+
/// map from the substituted parameter keys to objects or [PgTypedParameter]s.
50+
///
51+
/// When [ignoreRows] is set to true, the implementation may internally
52+
/// optimize the execution to ignore rows returned by the query. Whether this
53+
/// optimization can be applied also depends on the parameters chosen, so
54+
/// there is no guarantee that the [PgResult] from a [ignoreRows] excution has
55+
/// no rows.
4356
Future<PgResult> execute(
4457
Object /* String | PgSql */ query, {
4558
Object? /* List<Object?|PgTypedParameter> | Map<String, Object?|PgTypedParameter> */
4659
parameters,
4760
Duration? timeout,
61+
bool ignoreRows = false,
4862
});
4963

5064
Future<void> close();

lib/src/binary_codec.dart

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ class PostgresBinaryEncoder<T extends Object>
4646

4747
// ignore: unnecessary_cast
4848
switch (_dataType as PgDataType<Object>) {
49+
case PgDataType.unknownType:
50+
case PgDataType.voidType:
51+
throw ArgumentError('Cannot encode into an unknown type or into void');
4952
case PgDataType.boolean:
5053
{
5154
if (input is bool) {
@@ -247,7 +250,16 @@ class PostgresBinaryEncoder<T extends Object>
247250
throw FormatException(
248251
'Invalid type for parameter value. Expected: PgPoint Got: ${input.runtimeType}');
249252
}
253+
case PgDataType.regtype:
254+
final oid = input is PgDataType ? input.oid : null;
255+
if (oid == null) {
256+
throw FormatException(
257+
'Invalid type for parameter value, expected a data type an oid, got $input');
258+
}
250259

260+
final outBuffer = Uint8List(4);
261+
outBuffer.buffer.asByteData().setInt32(0, oid);
262+
return outBuffer;
251263
case PgDataType.booleanArray:
252264
{
253265
if (input is List<bool>) {
@@ -515,6 +527,12 @@ class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {
515527

516528
return buf.toString() as T;
517529
}
530+
case PgDataType.regtype:
531+
final data = input.buffer.asByteData(input.offsetInBytes, input.length);
532+
final oid = data.getInt32(0);
533+
return (PgDataType.byTypeOid[oid] ?? PgDataType.unknownType) as T;
534+
case PgDataType.voidType:
535+
return null;
518536

519537
case PostgreSQLDataType.point:
520538
return PgPoint(buffer.getFloat64(0), buffer.getFloat64(8)) as T;
@@ -585,34 +603,7 @@ class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {
585603
}
586604

587605
/// See: https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat
588-
static final Map<int, PostgreSQLDataType> typeMap = {
589-
16: PostgreSQLDataType.boolean,
590-
17: PostgreSQLDataType.byteArray,
591-
19: PostgreSQLDataType.name,
592-
20: PostgreSQLDataType.bigInteger,
593-
21: PostgreSQLDataType.smallInteger,
594-
23: PostgreSQLDataType.integer,
595-
25: PostgreSQLDataType.text,
596-
114: PostgreSQLDataType.json,
597-
600: PostgreSQLDataType.point,
598-
700: PostgreSQLDataType.real,
599-
701: PostgreSQLDataType.double,
600-
1000: PostgreSQLDataType.booleanArray,
601-
1007: PostgreSQLDataType.integerArray,
602-
1016: PostgreSQLDataType.bigIntegerArray,
603-
1009: PostgreSQLDataType.textArray,
604-
1015: PostgreSQLDataType.varCharArray,
605-
1043: PostgreSQLDataType.varChar,
606-
1022: PostgreSQLDataType.doubleArray,
607-
1082: PostgreSQLDataType.date,
608-
1114: PostgreSQLDataType.timestampWithoutTimezone,
609-
1184: PostgreSQLDataType.timestampWithTimezone,
610-
1186: PostgreSQLDataType.interval,
611-
1700: PostgreSQLDataType.numeric,
612-
2950: PostgreSQLDataType.uuid,
613-
3802: PostgreSQLDataType.jsonb,
614-
3807: PostgreSQLDataType.jsonbArray,
615-
};
606+
static final Map<int, PostgreSQLDataType> typeMap = PgDataType.byTypeOid;
616607

617608
/// Decode numeric / decimal to String without loosing precision.
618609
/// See encoding: https://github.com/postgres/postgres/blob/0e39a608ed5545cc6b9d538ac937c3c1ee8cdc36/src/backend/utils/adt/numeric.c#L305

lib/src/query.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ class PostgreSQLFormatIdentifier {
355355
'_text': PostgreSQLDataType.textArray,
356356
'_float8': PostgreSQLDataType.doubleArray,
357357
'varchar': PostgreSQLDataType.varChar,
358+
'regtype': PostgreSQLDataType.regtype,
358359
'_varchar': PostgreSQLDataType.varCharArray,
359360
'_jsonb': PostgreSQLDataType.jsonbArray,
360361
};

lib/src/substituter.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ class PostgreSQLFormat {
7171
return '_varchar';
7272
case PostgreSQLDataType.jsonbArray:
7373
return '_jsonb';
74+
case PostgreSQLDataType.regtype:
75+
return 'regtype';
7476
default:
7577
return null;
7678
}

lib/src/v3/connection.dart

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,18 @@ abstract class _PgSessionBase implements PgSession {
108108
}
109109

110110
@override
111-
Future<PgResult> execute(Object query,
112-
{Object? parameters, Duration? timeout}) async {
111+
Future<PgResult> execute(
112+
Object query, {
113+
Object? parameters,
114+
Duration? timeout,
115+
bool ignoreRows = false,
116+
}) async {
113117
final description = InternalQueryDescription.wrap(query);
114118
final variables = description.bindParameters(parameters);
115119

116-
if (variables.isNotEmpty) {
117-
// The simple query protocol does not support variables, so we have to
118-
// prepare a statement explicitly.
120+
if (!ignoreRows || variables.isNotEmpty) {
121+
// The simple query protocol does not support variables and returns rows
122+
// as text. So when we need rows or parameters, we need an explicit prepare.
119123
final prepared = await prepare(description, timeout: timeout);
120124
try {
121125
return await prepared.run(variables, timeout: timeout);
@@ -127,7 +131,8 @@ abstract class _PgSessionBase implements PgSession {
127131
final controller = StreamController<PgResultRow>();
128132
final items = <PgResultRow>[];
129133

130-
final querySubscription = _PgResultStreamSubscription.simpleQuery(
134+
final querySubscription =
135+
_PgResultStreamSubscription.simpleQueryAndIgnoreRows(
131136
description.transformedSql,
132137
this,
133138
controller,
@@ -415,6 +420,7 @@ class _PgResultStreamSubscription
415420
final _PgSessionBase session;
416421
final StreamController<PgResultRow> _controller;
417422
final StreamSubscription<PgResultRow> _source;
423+
final bool ignoreRows;
418424

419425
final Completer<int> _affectedRows = Completer();
420426
final Completer<PgResultSchema> _schema = Completer();
@@ -428,7 +434,8 @@ class _PgResultStreamSubscription
428434

429435
_PgResultStreamSubscription(
430436
_BoundStatement statement, this._controller, this._source)
431-
: session = statement.statement._session {
437+
: session = statement.statement._session,
438+
ignoreRows = false {
432439
session._operationLock.withResource(() async {
433440
connection._pending = this;
434441

@@ -450,8 +457,9 @@ class _PgResultStreamSubscription
450457
});
451458
}
452459

453-
_PgResultStreamSubscription.simpleQuery(
454-
String sql, this.session, this._controller, this._source) {
460+
_PgResultStreamSubscription.simpleQueryAndIgnoreRows(
461+
String sql, this.session, this._controller, this._source)
462+
: ignoreRows = true {
455463
session._operationLock.withResource(() async {
456464
connection._pending = this;
457465

@@ -487,20 +495,23 @@ class _PgResultStreamSubscription
487495
]);
488496
_schema.complete(schema);
489497
} else if (message is DataRowMessage) {
490-
final schema = _resultSchema!;
498+
if (!ignoreRows) {
499+
final schema = _resultSchema!;
491500

492-
final columnValues = <Object?>[];
493-
for (var i = 0; i < message.values.length; i++) {
494-
final field = schema.columns[i];
501+
final columnValues = <Object?>[];
502+
for (var i = 0; i < message.values.length; i++) {
503+
final field = schema.columns[i];
495504

496-
final type = field.type;
497-
final codec = field.binaryEncoding ? type.binaryCodec : type.textCodec;
505+
final type = field.type;
506+
final codec =
507+
field.binaryEncoding ? type.binaryCodec : type.textCodec;
498508

499-
columnValues.add(codec.decode(message.values[i]));
500-
}
509+
columnValues.add(codec.decode(message.values[i]));
510+
}
501511

502-
final row = _ResultRow(schema, columnValues);
503-
_controller.add(row);
512+
final row = _ResultRow(schema, columnValues);
513+
_controller.add(row);
514+
}
504515
} else if (message is CommandCompleteMessage) {
505516
_affectedRows.complete(message.rowsAffected);
506517
} else if (message is ReadyForQueryMessage) {
@@ -589,7 +600,8 @@ class _Channels implements PgChannels {
589600

590601
void _subscribe(String channel, MultiStreamController firstListener) {
591602
Future(() async {
592-
await _connection.execute(PgSql('LISTEN ${identifier(channel)}'));
603+
await _connection.execute(PgSql('LISTEN ${identifier(channel)}'),
604+
ignoreRows: true);
593605
}).onError<Object>((error, stackTrace) {
594606
_activeListeners[channel]?.remove(firstListener);
595607

@@ -607,7 +619,8 @@ class _Channels implements PgChannels {
607619
_activeListeners.remove(channel);
608620

609621
// Send unlisten command
610-
await _connection.execute(PgSql('UNLISTEN ${identifier(channel)}'));
622+
await _connection.execute(PgSql('UNLISTEN ${identifier(channel)}'),
623+
ignoreRows: true);
611624
}
612625
}
613626

lib/src/v3/types.dart

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ class PgPoint {
2828

2929
/// Supported data types.
3030
enum PgDataType<Dart extends Object> {
31+
/// Used to represent a type not yet understood by this package.
32+
unknownType<Object>(null),
33+
3134
/// Must be a [String].
3235
text<String>(25),
3336

@@ -121,7 +124,14 @@ enum PgDataType<Dart extends Object> {
121124
varCharArray<List<String>>(1015),
122125

123126
/// Must be a [List] of encodable objects
124-
jsonbArray<List>(3807);
127+
jsonbArray<List>(3807),
128+
129+
/// Must be a [PgDataType].
130+
regtype<PgDataType>(2206),
131+
132+
/// Impossible to bind to, always null when read.
133+
voidType<Object>(2278),
134+
;
125135

126136
/// The object ID of this data type.
127137
final int? oid;

test/encoding_test.dart

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,30 @@ void main() {
490490
fail('unreachable');
491491
} on JsonUnsupportedObjectError catch (_) {}
492492
});
493+
494+
test('void', () async {
495+
final result = await conn.query('SELECT NULL::void AS r');
496+
expect(result.columnDescriptions, [
497+
isA<ColumnDescription>()
498+
.having((e) => e.typeId, 'typeId', PostgreSQLDataType.voidType.oid)
499+
]);
500+
501+
expect(result, [
502+
[null]
503+
]);
504+
505+
expect(
506+
() => PostgresBinaryEncoder(PostgreSQLDataType.voidType).convert(1),
507+
throwsArgumentError,
508+
);
509+
});
510+
511+
test('regtype', () async {
512+
await expectInverse(
513+
PostgreSQLDataType.bigInteger, PostgreSQLDataType.regtype);
514+
await expectInverse(
515+
PostgreSQLDataType.voidType, PostgreSQLDataType.regtype);
516+
});
493517
});
494518

495519
group('Text encoders', () {

test/interpolation_test.dart

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,24 @@ import 'package:test/test.dart';
66

77
void main() {
88
test('Ensure all types/format type mappings are available and accurate', () {
9-
PostgreSQLDataType.values
10-
.where((t) =>
11-
t != PostgreSQLDataType.bigSerial && t != PostgreSQLDataType.serial)
12-
.forEach((t) {
13-
expect(PostgreSQLFormatIdentifier.typeStringToCodeMap.values.contains(t),
14-
true);
15-
final code = PostgreSQLFormat.dataTypeStringForDataType(t);
16-
expect(PostgreSQLFormatIdentifier.typeStringToCodeMap[code], t);
17-
});
9+
const withoutMapping = {
10+
PostgreSQLDataType.unknownType, // Can't bind into unknown type
11+
PostgreSQLDataType.voidType, // Can't assign to void
12+
PostgreSQLDataType.bigSerial, // Can only be created from a table sequence
13+
PostgreSQLDataType.serial,
14+
};
15+
16+
for (final type in PostgreSQLDataType.values) {
17+
if (withoutMapping.contains(type)) continue;
18+
19+
expect(
20+
PostgreSQLFormatIdentifier.typeStringToCodeMap.values.contains(type),
21+
true,
22+
reason: 'There should be a type mapping for $type',
23+
);
24+
final code = PostgreSQLFormat.dataTypeStringForDataType(type);
25+
expect(PostgreSQLFormatIdentifier.typeStringToCodeMap[code], type);
26+
}
1827
});
1928

2029
test('Ensure bigserial gets translated to int8', () {

test/v3_test.dart

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ final _endpoint = PgEndpoint(
2020
// We log all packets sent to and received from the postgres server. This can be
2121
// used to debug failing tests. To view logs, something like this can be put
2222
// at the beginning of `main()`:
23-
// Logger.root.onRecord.listen((r) => print('${r.loggerName}: ${r.message}'));
23+
//
24+
// Logger.root.level = Level.ALL;
25+
// Logger.root.onRecord.listen((r) => print('${r.loggerName}: ${r.message}'));
2426
StreamChannelTransformer<BaseMessage, BaseMessage> get _loggingTransformer {
2527
final inLogger = Logger('postgres.connection.in');
2628
final outLogger = Logger('postgres.connection.out');
@@ -69,6 +71,20 @@ void main() {
6971
]);
7072
});
7173

74+
test('statement without rows', () async {
75+
final result = await connection.execute(
76+
PgSql('''SELECT pg_notify('VIRTUAL','Payload 2');'''),
77+
ignoreRows: true,
78+
);
79+
80+
expect(result, isEmpty);
81+
expect(result.schema.columns, [
82+
isA<PgResultColumn>()
83+
.having((e) => e.columnName, 'columnName', 'pg_notify')
84+
.having((e) => e.type, 'type', PgDataType.voidType)
85+
]);
86+
});
87+
7288
test('queries without a schema message', () async {
7389
final response =
7490
await connection.execute('CREATE TEMPORARY TABLE foo (bar INTEGER);');
@@ -100,6 +116,11 @@ void main() {
100116
await shouldPassthrough<int>(PgDataType.integer, 1024);
101117
await shouldPassthrough<int>(PgDataType.bigInteger, 999999999999);
102118
});
119+
120+
test('regtype', () async {
121+
await shouldPassthrough<PgDataType>(
122+
PgDataType.regtype, PgDataType.bigInteger);
123+
});
103124
});
104125

105126
test('listen and notify', () async {
@@ -278,9 +299,12 @@ void main() {
278299
test('A transaction does not preempt pending queries', () async {
279300
// Add a few insert queries but don't await, then do a transaction that does a fetch,
280301
// make sure that transaction sees all of the elements.
281-
unawaited(connection.execute('INSERT INTO t (id) VALUES (1)'));
282-
unawaited(connection.execute('INSERT INTO t (id) VALUES (2)'));
283-
unawaited(connection.execute('INSERT INTO t (id) VALUES (3)'));
302+
unawaited(connection.execute('INSERT INTO t (id) VALUES (1)',
303+
ignoreRows: true));
304+
unawaited(connection.execute('INSERT INTO t (id) VALUES (2)',
305+
ignoreRows: true));
306+
unawaited(connection.execute('INSERT INTO t (id) VALUES (3)',
307+
ignoreRows: true));
284308

285309
final results = await connection.runTx((ctx) async {
286310
return await ctx.execute('SELECT id FROM t');
@@ -334,7 +358,7 @@ void main() {
334358
);
335359
addTearDown(connection.close);
336360

337-
await connection.execute("SELECT 'foo'");
361+
await connection.execute("SELECT 'foo'", ignoreRows: true);
338362
expect(incoming, contains(isA<DataRowMessage>()));
339363
expect(outgoing, contains(isA<QueryMessage>()));
340364
});

0 commit comments

Comments
 (0)