From 519d60be51d135f490e95432ceec065cd959e58e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sun, 30 Jul 2023 12:44:08 +0200 Subject: [PATCH 1/5] Improve row decoding in v3 API --- lib/postgres_v3_experimental.dart | 12 ++++++++ lib/src/binary_codec.dart | 46 ++++++++++++------------------- lib/src/v3/connection.dart | 45 ++++++++++++++++++------------ lib/src/v3/types.dart | 12 +++++++- test/v3_test.dart | 23 +++++++++++++++- 5 files changed, 91 insertions(+), 47 deletions(-) diff --git a/lib/postgres_v3_experimental.dart b/lib/postgres_v3_experimental.dart index 0b0a1def..3d469b93 100644 --- a/lib/postgres_v3_experimental.dart +++ b/lib/postgres_v3_experimental.dart @@ -40,11 +40,23 @@ abstract class PgSession { Duration? timeout, }); + /// Executes the [query] with the given [parameters]. + /// + /// [query] must either be a [String] or a [PgSql] query with types for + /// parameters. When a [PgSql] query object is used, [parameters] can be a + /// list of direct values. Otherwise, it must be a list of [PgTypedParameter]s. + /// + /// When [schemaOnly] is set to true, the implementation may internally + /// optimize the execution to ignore rows returned by the query. Whether this + /// optimization can be applied also depends on the parameters chosen, so + /// there is no guarantee that the [PgResult] from a [schemaOnly] excution has + /// no rows. Future execute( Object /* String | PgSql */ query, { Object? /* List | Map */ parameters, Duration? timeout, + bool schemaOnly = false, }); Future close(); diff --git a/lib/src/binary_codec.dart b/lib/src/binary_codec.dart index 67f76d21..26fc34be 100644 --- a/lib/src/binary_codec.dart +++ b/lib/src/binary_codec.dart @@ -46,6 +46,8 @@ class PostgresBinaryEncoder // ignore: unnecessary_cast switch (_dataType as PgDataType) { + case PgDataType.unknownType: + throw ArgumentError('Cannot encode into an unknown type'); case PgDataType.boolean: { if (input is bool) { @@ -247,7 +249,16 @@ class PostgresBinaryEncoder throw FormatException( 'Invalid type for parameter value. Expected: PgPoint Got: ${input.runtimeType}'); } + case PgDataType.regtype: + final oid = input is PgDataType ? input.oid : null; + if (oid == null) { + throw FormatException( + 'Invalid type for parameter value, expected a data type an oid, got $input'); + } + final outBuffer = Uint8List(4); + outBuffer.buffer.asByteData().setInt32(0, oid); + return outBuffer; case PgDataType.booleanArray: { if (input is List) { @@ -515,6 +526,12 @@ class PostgresBinaryDecoder extends Converter { return buf.toString() as T; } + case PgDataType.regtype: + final data = input.buffer.asByteData(input.offsetInBytes, input.length); + final oid = data.getInt32(0); + return (PgDataType.byTypeOid[oid] ?? PgDataType.unknownType) as T; + case PgDataType.voidType: + return null; case PostgreSQLDataType.point: return PgPoint(buffer.getFloat64(0), buffer.getFloat64(8)) as T; @@ -585,34 +602,7 @@ class PostgresBinaryDecoder extends Converter { } /// See: https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat - static final Map typeMap = { - 16: PostgreSQLDataType.boolean, - 17: PostgreSQLDataType.byteArray, - 19: PostgreSQLDataType.name, - 20: PostgreSQLDataType.bigInteger, - 21: PostgreSQLDataType.smallInteger, - 23: PostgreSQLDataType.integer, - 25: PostgreSQLDataType.text, - 114: PostgreSQLDataType.json, - 600: PostgreSQLDataType.point, - 700: PostgreSQLDataType.real, - 701: PostgreSQLDataType.double, - 1000: PostgreSQLDataType.booleanArray, - 1007: PostgreSQLDataType.integerArray, - 1016: PostgreSQLDataType.bigIntegerArray, - 1009: PostgreSQLDataType.textArray, - 1015: PostgreSQLDataType.varCharArray, - 1043: PostgreSQLDataType.varChar, - 1022: PostgreSQLDataType.doubleArray, - 1082: PostgreSQLDataType.date, - 1114: PostgreSQLDataType.timestampWithoutTimezone, - 1184: PostgreSQLDataType.timestampWithTimezone, - 1186: PostgreSQLDataType.interval, - 1700: PostgreSQLDataType.numeric, - 2950: PostgreSQLDataType.uuid, - 3802: PostgreSQLDataType.jsonb, - 3807: PostgreSQLDataType.jsonbArray, - }; + static final Map typeMap = PgDataType.byTypeOid; /// Decode numeric / decimal to String without loosing precision. /// See encoding: https://github.com/postgres/postgres/blob/0e39a608ed5545cc6b9d538ac937c3c1ee8cdc36/src/backend/utils/adt/numeric.c#L305 diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 1e95903f..a2637776 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -108,12 +108,16 @@ abstract class _PgSessionBase implements PgSession { } @override - Future execute(Object query, - {Object? parameters, Duration? timeout}) async { + Future execute( + Object query, { + Object? parameters, + Duration? timeout, + bool schemaOnly = false, + }) async { final description = InternalQueryDescription.wrap(query); final variables = description.bindParameters(parameters); - if (variables.isNotEmpty) { + if (!schemaOnly || variables.isNotEmpty) { // The simple query protocol does not support variables, so we have to // prepare a statement explicitly. final prepared = await prepare(description, timeout: timeout); @@ -127,7 +131,8 @@ abstract class _PgSessionBase implements PgSession { final controller = StreamController(); final items = []; - final querySubscription = _PgResultStreamSubscription.simpleQuery( + final querySubscription = + _PgResultStreamSubscription.simpleQueryAndIgnoreRows( description.transformedSql, this, controller, @@ -415,6 +420,7 @@ class _PgResultStreamSubscription final _PgSessionBase session; final StreamController _controller; final StreamSubscription _source; + final bool ignoreRows; final Completer _affectedRows = Completer(); final Completer _schema = Completer(); @@ -428,7 +434,8 @@ class _PgResultStreamSubscription _PgResultStreamSubscription( _BoundStatement statement, this._controller, this._source) - : session = statement.statement._session { + : session = statement.statement._session, + ignoreRows = false { session._operationLock.withResource(() async { connection._pending = this; @@ -450,8 +457,9 @@ class _PgResultStreamSubscription }); } - _PgResultStreamSubscription.simpleQuery( - String sql, this.session, this._controller, this._source) { + _PgResultStreamSubscription.simpleQueryAndIgnoreRows( + String sql, this.session, this._controller, this._source) + : ignoreRows = true { session._operationLock.withResource(() async { connection._pending = this; @@ -487,20 +495,23 @@ class _PgResultStreamSubscription ]); _schema.complete(schema); } else if (message is DataRowMessage) { - final schema = _resultSchema!; + if (!ignoreRows) { + final schema = _resultSchema!; - final columnValues = []; - for (var i = 0; i < message.values.length; i++) { - final field = schema.columns[i]; + final columnValues = []; + for (var i = 0; i < message.values.length; i++) { + final field = schema.columns[i]; - final type = field.type; - final codec = field.binaryEncoding ? type.binaryCodec : type.textCodec; + final type = field.type; + final codec = + field.binaryEncoding ? type.binaryCodec : type.textCodec; - columnValues.add(codec.decode(message.values[i])); - } + columnValues.add(codec.decode(message.values[i])); + } - final row = _ResultRow(schema, columnValues); - _controller.add(row); + final row = _ResultRow(schema, columnValues); + _controller.add(row); + } } else if (message is CommandCompleteMessage) { _affectedRows.complete(message.rowsAffected); } else if (message is ReadyForQueryMessage) { diff --git a/lib/src/v3/types.dart b/lib/src/v3/types.dart index bef44d29..5e2217d7 100644 --- a/lib/src/v3/types.dart +++ b/lib/src/v3/types.dart @@ -28,6 +28,9 @@ class PgPoint { /// Supported data types. enum PgDataType { + /// Used to represent a type not yet understood by this package. + unknownType(null), + /// Must be a [String]. text(25), @@ -121,7 +124,14 @@ enum PgDataType { varCharArray>(1015), /// Must be a [List] of encodable objects - jsonbArray(3807); + jsonbArray(3807), + + /// Must be a [PgDataType]. + regtype(2206), + + /// Impossible to bind to, always null when read. + voidType(2278), + ; /// The object ID of this data type. final int? oid; diff --git a/test/v3_test.dart b/test/v3_test.dart index 908cdd87..0379f20a 100644 --- a/test/v3_test.dart +++ b/test/v3_test.dart @@ -20,7 +20,9 @@ final _endpoint = PgEndpoint( // We log all packets sent to and received from the postgres server. This can be // used to debug failing tests. To view logs, something like this can be put // at the beginning of `main()`: -// Logger.root.onRecord.listen((r) => print('${r.loggerName}: ${r.message}')); +// +// Logger.root.level = Level.ALL; +// Logger.root.onRecord.listen((r) => print('${r.loggerName}: ${r.message}')); StreamChannelTransformer get _loggingTransformer { final inLogger = Logger('postgres.connection.in'); final outLogger = Logger('postgres.connection.out'); @@ -69,6 +71,20 @@ void main() { ]); }); + test('statement without rows', () async { + final result = await connection.execute( + PgSql('''SELECT pg_notify('VIRTUAL','Payload 2');'''), + schemaOnly: true, + ); + + expect(result, isEmpty); + expect(result.schema.columns, [ + isA() + .having((e) => e.columnName, 'columnName', 'pg_notify') + .having((e) => e.type, 'type', PgDataType.voidType) + ]); + }); + test('queries without a schema message', () async { final response = await connection.execute('CREATE TEMPORARY TABLE foo (bar INTEGER);'); @@ -100,6 +116,11 @@ void main() { await shouldPassthrough(PgDataType.integer, 1024); await shouldPassthrough(PgDataType.bigInteger, 999999999999); }); + + test('regtype', () async { + await shouldPassthrough( + PgDataType.regtype, PgDataType.bigInteger); + }); }); test('listen and notify', () async { From f9cd8310eeec379ddc21ae16f14e10055520bf57 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sun, 30 Jul 2023 12:45:57 +0200 Subject: [PATCH 2/5] Explain motivation --- lib/src/v3/connection.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index a2637776..9c4e9399 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -118,8 +118,8 @@ abstract class _PgSessionBase implements PgSession { final variables = description.bindParameters(parameters); if (!schemaOnly || variables.isNotEmpty) { - // The simple query protocol does not support variables, so we have to - // prepare a statement explicitly. + // The simple query protocol does not support variables and returns rows + // as text. So when we need rows or parameters, we need an explicit prepare. final prepared = await prepare(description, timeout: timeout); try { return await prepared.run(variables, timeout: timeout); From 04dd3fcc871963cce7f88ae68112520a15344538 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sun, 30 Jul 2023 14:38:33 +0200 Subject: [PATCH 3/5] Fix tests --- lib/src/binary_codec.dart | 3 ++- lib/src/query.dart | 1 + lib/src/substituter.dart | 2 ++ lib/src/v3/connection.dart | 6 ++++-- test/encoding_test.dart | 24 ++++++++++++++++++++++++ test/v3_test.dart | 11 +++++++---- 6 files changed, 40 insertions(+), 7 deletions(-) diff --git a/lib/src/binary_codec.dart b/lib/src/binary_codec.dart index 26fc34be..80cdfcbc 100644 --- a/lib/src/binary_codec.dart +++ b/lib/src/binary_codec.dart @@ -47,7 +47,8 @@ class PostgresBinaryEncoder // ignore: unnecessary_cast switch (_dataType as PgDataType) { case PgDataType.unknownType: - throw ArgumentError('Cannot encode into an unknown type'); + case PgDataType.voidType: + throw ArgumentError('Cannot encode into an unknown type or into void'); case PgDataType.boolean: { if (input is bool) { diff --git a/lib/src/query.dart b/lib/src/query.dart index 94b143a9..f7b3d55d 100644 --- a/lib/src/query.dart +++ b/lib/src/query.dart @@ -355,6 +355,7 @@ class PostgreSQLFormatIdentifier { '_text': PostgreSQLDataType.textArray, '_float8': PostgreSQLDataType.doubleArray, 'varchar': PostgreSQLDataType.varChar, + 'regtype': PostgreSQLDataType.regtype, '_varchar': PostgreSQLDataType.varCharArray, '_jsonb': PostgreSQLDataType.jsonbArray, }; diff --git a/lib/src/substituter.dart b/lib/src/substituter.dart index de48cf4a..d2b90d70 100644 --- a/lib/src/substituter.dart +++ b/lib/src/substituter.dart @@ -71,6 +71,8 @@ class PostgreSQLFormat { return '_varchar'; case PostgreSQLDataType.jsonbArray: return '_jsonb'; + case PostgreSQLDataType.regtype: + return 'regtype'; default: return null; } diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 9c4e9399..533e98f1 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -600,7 +600,8 @@ class _Channels implements PgChannels { void _subscribe(String channel, MultiStreamController firstListener) { Future(() async { - await _connection.execute(PgSql('LISTEN ${identifier(channel)}')); + await _connection.execute(PgSql('LISTEN ${identifier(channel)}'), + schemaOnly: true); }).onError((error, stackTrace) { _activeListeners[channel]?.remove(firstListener); @@ -618,7 +619,8 @@ class _Channels implements PgChannels { _activeListeners.remove(channel); // Send unlisten command - await _connection.execute(PgSql('UNLISTEN ${identifier(channel)}')); + await _connection.execute(PgSql('UNLISTEN ${identifier(channel)}'), + schemaOnly: true); } } diff --git a/test/encoding_test.dart b/test/encoding_test.dart index bd9f5c92..753a65a2 100644 --- a/test/encoding_test.dart +++ b/test/encoding_test.dart @@ -490,6 +490,30 @@ void main() { fail('unreachable'); } on JsonUnsupportedObjectError catch (_) {} }); + + test('void', () async { + final result = await conn.query('SELECT NULL::void AS r'); + expect(result.columnDescriptions, [ + isA() + .having((e) => e.typeId, 'typeId', PostgreSQLDataType.voidType.oid) + ]); + + expect(result, [ + [null] + ]); + + expect( + () => PostgresBinaryEncoder(PostgreSQLDataType.voidType).convert(1), + throwsArgumentError, + ); + }); + + test('regtype', () async { + await expectInverse( + PostgreSQLDataType.bigInteger, PostgreSQLDataType.regtype); + await expectInverse( + PostgreSQLDataType.voidType, PostgreSQLDataType.regtype); + }); }); group('Text encoders', () { diff --git a/test/v3_test.dart b/test/v3_test.dart index 0379f20a..18861ef9 100644 --- a/test/v3_test.dart +++ b/test/v3_test.dart @@ -299,9 +299,12 @@ void main() { test('A transaction does not preempt pending queries', () async { // Add a few insert queries but don't await, then do a transaction that does a fetch, // make sure that transaction sees all of the elements. - unawaited(connection.execute('INSERT INTO t (id) VALUES (1)')); - unawaited(connection.execute('INSERT INTO t (id) VALUES (2)')); - unawaited(connection.execute('INSERT INTO t (id) VALUES (3)')); + unawaited(connection.execute('INSERT INTO t (id) VALUES (1)', + schemaOnly: true)); + unawaited(connection.execute('INSERT INTO t (id) VALUES (2)', + schemaOnly: true)); + unawaited(connection.execute('INSERT INTO t (id) VALUES (3)', + schemaOnly: true)); final results = await connection.runTx((ctx) async { return await ctx.execute('SELECT id FROM t'); @@ -355,7 +358,7 @@ void main() { ); addTearDown(connection.close); - await connection.execute("SELECT 'foo'"); + await connection.execute("SELECT 'foo'", schemaOnly: true); expect(incoming, contains(isA())); expect(outgoing, contains(isA())); }); From 976b76565985fee7f52f8e154dc12c4dd1418c56 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sun, 30 Jul 2023 14:45:41 +0200 Subject: [PATCH 4/5] Fix interpolation test --- test/interpolation_test.dart | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/test/interpolation_test.dart b/test/interpolation_test.dart index 5f57756b..f8e4cdce 100644 --- a/test/interpolation_test.dart +++ b/test/interpolation_test.dart @@ -6,15 +6,24 @@ import 'package:test/test.dart'; void main() { test('Ensure all types/format type mappings are available and accurate', () { - PostgreSQLDataType.values - .where((t) => - t != PostgreSQLDataType.bigSerial && t != PostgreSQLDataType.serial) - .forEach((t) { - expect(PostgreSQLFormatIdentifier.typeStringToCodeMap.values.contains(t), - true); - final code = PostgreSQLFormat.dataTypeStringForDataType(t); - expect(PostgreSQLFormatIdentifier.typeStringToCodeMap[code], t); - }); + const withoutMapping = { + PostgreSQLDataType.unknownType, // Can't bind into unknown type + PostgreSQLDataType.voidType, // Can't assign to void + PostgreSQLDataType.bigSerial, // Can only be created from a table sequence + PostgreSQLDataType.serial, + }; + + for (final type in PostgreSQLDataType.values) { + if (withoutMapping.contains(type)) continue; + + expect( + PostgreSQLFormatIdentifier.typeStringToCodeMap.values.contains(type), + true, + reason: 'There should be a type mapping for $type', + ); + final code = PostgreSQLFormat.dataTypeStringForDataType(type); + expect(PostgreSQLFormatIdentifier.typeStringToCodeMap[code], type); + } }); test('Ensure bigserial gets translated to int8', () { From dcac97fe32cbb3ed4269d16832c125114c753361 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sun, 30 Jul 2023 22:57:55 +0200 Subject: [PATCH 5/5] Rename to ignoreRows, improve docs --- lib/postgres_v3_experimental.dart | 10 ++++++---- lib/src/v3/connection.dart | 8 ++++---- test/v3_test.dart | 10 +++++----- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/lib/postgres_v3_experimental.dart b/lib/postgres_v3_experimental.dart index 3d469b93..5d015906 100644 --- a/lib/postgres_v3_experimental.dart +++ b/lib/postgres_v3_experimental.dart @@ -44,19 +44,21 @@ abstract class PgSession { /// /// [query] must either be a [String] or a [PgSql] query with types for /// parameters. When a [PgSql] query object is used, [parameters] can be a - /// list of direct values. Otherwise, it must be a list of [PgTypedParameter]s. + /// list of direct values. Otherwise, it must be a list of + /// [PgTypedParameter]s. With [PgSql.map], values can also be provided as a + /// map from the substituted parameter keys to objects or [PgTypedParameter]s. /// - /// When [schemaOnly] is set to true, the implementation may internally + /// When [ignoreRows] is set to true, the implementation may internally /// optimize the execution to ignore rows returned by the query. Whether this /// optimization can be applied also depends on the parameters chosen, so - /// there is no guarantee that the [PgResult] from a [schemaOnly] excution has + /// there is no guarantee that the [PgResult] from a [ignoreRows] excution has /// no rows. Future execute( Object /* String | PgSql */ query, { Object? /* List | Map */ parameters, Duration? timeout, - bool schemaOnly = false, + bool ignoreRows = false, }); Future close(); diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 533e98f1..1c35d355 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -112,12 +112,12 @@ abstract class _PgSessionBase implements PgSession { Object query, { Object? parameters, Duration? timeout, - bool schemaOnly = false, + bool ignoreRows = false, }) async { final description = InternalQueryDescription.wrap(query); final variables = description.bindParameters(parameters); - if (!schemaOnly || variables.isNotEmpty) { + if (!ignoreRows || variables.isNotEmpty) { // The simple query protocol does not support variables and returns rows // as text. So when we need rows or parameters, we need an explicit prepare. final prepared = await prepare(description, timeout: timeout); @@ -601,7 +601,7 @@ class _Channels implements PgChannels { void _subscribe(String channel, MultiStreamController firstListener) { Future(() async { await _connection.execute(PgSql('LISTEN ${identifier(channel)}'), - schemaOnly: true); + ignoreRows: true); }).onError((error, stackTrace) { _activeListeners[channel]?.remove(firstListener); @@ -620,7 +620,7 @@ class _Channels implements PgChannels { // Send unlisten command await _connection.execute(PgSql('UNLISTEN ${identifier(channel)}'), - schemaOnly: true); + ignoreRows: true); } } diff --git a/test/v3_test.dart b/test/v3_test.dart index 18861ef9..edfc4a2f 100644 --- a/test/v3_test.dart +++ b/test/v3_test.dart @@ -74,7 +74,7 @@ void main() { test('statement without rows', () async { final result = await connection.execute( PgSql('''SELECT pg_notify('VIRTUAL','Payload 2');'''), - schemaOnly: true, + ignoreRows: true, ); expect(result, isEmpty); @@ -300,11 +300,11 @@ void main() { // Add a few insert queries but don't await, then do a transaction that does a fetch, // make sure that transaction sees all of the elements. unawaited(connection.execute('INSERT INTO t (id) VALUES (1)', - schemaOnly: true)); + ignoreRows: true)); unawaited(connection.execute('INSERT INTO t (id) VALUES (2)', - schemaOnly: true)); + ignoreRows: true)); unawaited(connection.execute('INSERT INTO t (id) VALUES (3)', - schemaOnly: true)); + ignoreRows: true)); final results = await connection.runTx((ctx) async { return await ctx.execute('SELECT id FROM t'); @@ -358,7 +358,7 @@ void main() { ); addTearDown(connection.close); - await connection.execute("SELECT 'foo'", schemaOnly: true); + await connection.execute("SELECT 'foo'", ignoreRows: true); expect(incoming, contains(isA())); expect(outgoing, contains(isA())); });