Skip to content

Improve row decoding in v3 API #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/postgres_v3_experimental.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,25 @@ abstract class PgSession {
Duration? timeout,
});

/// Executes the [query] with the given [parameters].
///
/// [query] must either be a [String] or a [PgSql] query with types for
/// parameters. When a [PgSql] query object is used, [parameters] can be a
/// list of direct values. Otherwise, it must be a list of
/// [PgTypedParameter]s. With [PgSql.map], values can also be provided as a
/// map from the substituted parameter keys to objects or [PgTypedParameter]s.
///
/// When [ignoreRows] is set to true, the implementation may internally
/// optimize the execution to ignore rows returned by the query. Whether this
/// optimization can be applied also depends on the parameters chosen, so
/// there is no guarantee that the [PgResult] from a [ignoreRows] excution has
/// no rows.
Future<PgResult> execute(
Object /* String | PgSql */ query, {
Object? /* List<Object?|PgTypedParameter> | Map<String, Object?|PgTypedParameter> */
parameters,
Duration? timeout,
bool ignoreRows = false,
});

Future<void> close();
Expand Down
47 changes: 19 additions & 28 deletions lib/src/binary_codec.dart
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class PostgresBinaryEncoder<T extends Object>

// ignore: unnecessary_cast
switch (_dataType as PgDataType<Object>) {
case PgDataType.unknownType:
case PgDataType.voidType:
throw ArgumentError('Cannot encode into an unknown type or into void');
case PgDataType.boolean:
{
if (input is bool) {
Expand Down Expand Up @@ -247,7 +250,16 @@ class PostgresBinaryEncoder<T extends Object>
throw FormatException(
'Invalid type for parameter value. Expected: PgPoint Got: ${input.runtimeType}');
}
case PgDataType.regtype:
final oid = input is PgDataType ? input.oid : null;
if (oid == null) {
throw FormatException(
'Invalid type for parameter value, expected a data type an oid, got $input');
}

final outBuffer = Uint8List(4);
outBuffer.buffer.asByteData().setInt32(0, oid);
return outBuffer;
case PgDataType.booleanArray:
{
if (input is List<bool>) {
Expand Down Expand Up @@ -515,6 +527,12 @@ class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {

return buf.toString() as T;
}
case PgDataType.regtype:
final data = input.buffer.asByteData(input.offsetInBytes, input.length);
final oid = data.getInt32(0);
return (PgDataType.byTypeOid[oid] ?? PgDataType.unknownType) as T;
case PgDataType.voidType:
return null;

case PostgreSQLDataType.point:
return PgPoint(buffer.getFloat64(0), buffer.getFloat64(8)) as T;
Expand Down Expand Up @@ -585,34 +603,7 @@ class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {
}

/// See: https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat
static final Map<int, PostgreSQLDataType> typeMap = {
16: PostgreSQLDataType.boolean,
17: PostgreSQLDataType.byteArray,
19: PostgreSQLDataType.name,
20: PostgreSQLDataType.bigInteger,
21: PostgreSQLDataType.smallInteger,
23: PostgreSQLDataType.integer,
25: PostgreSQLDataType.text,
114: PostgreSQLDataType.json,
600: PostgreSQLDataType.point,
700: PostgreSQLDataType.real,
701: PostgreSQLDataType.double,
1000: PostgreSQLDataType.booleanArray,
1007: PostgreSQLDataType.integerArray,
1016: PostgreSQLDataType.bigIntegerArray,
1009: PostgreSQLDataType.textArray,
1015: PostgreSQLDataType.varCharArray,
1043: PostgreSQLDataType.varChar,
1022: PostgreSQLDataType.doubleArray,
1082: PostgreSQLDataType.date,
1114: PostgreSQLDataType.timestampWithoutTimezone,
1184: PostgreSQLDataType.timestampWithTimezone,
1186: PostgreSQLDataType.interval,
1700: PostgreSQLDataType.numeric,
2950: PostgreSQLDataType.uuid,
3802: PostgreSQLDataType.jsonb,
3807: PostgreSQLDataType.jsonbArray,
};
static final Map<int, PostgreSQLDataType> typeMap = PgDataType.byTypeOid;

/// Decode numeric / decimal to String without loosing precision.
/// See encoding: https://github.com/postgres/postgres/blob/0e39a608ed5545cc6b9d538ac937c3c1ee8cdc36/src/backend/utils/adt/numeric.c#L305
Expand Down
1 change: 1 addition & 0 deletions lib/src/query.dart
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ class PostgreSQLFormatIdentifier {
'_text': PostgreSQLDataType.textArray,
'_float8': PostgreSQLDataType.doubleArray,
'varchar': PostgreSQLDataType.varChar,
'regtype': PostgreSQLDataType.regtype,
'_varchar': PostgreSQLDataType.varCharArray,
'_jsonb': PostgreSQLDataType.jsonbArray,
};
Expand Down
2 changes: 2 additions & 0 deletions lib/src/substituter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class PostgreSQLFormat {
return '_varchar';
case PostgreSQLDataType.jsonbArray:
return '_jsonb';
case PostgreSQLDataType.regtype:
return 'regtype';
default:
return null;
}
Expand Down
55 changes: 34 additions & 21 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,18 @@ abstract class _PgSessionBase implements PgSession {
}

@override
Future<PgResult> execute(Object query,
{Object? parameters, Duration? timeout}) async {
Future<PgResult> execute(
Object query, {
Object? parameters,
Duration? timeout,
bool ignoreRows = false,
}) async {
final description = InternalQueryDescription.wrap(query);
final variables = description.bindParameters(parameters);

if (variables.isNotEmpty) {
// The simple query protocol does not support variables, so we have to
// prepare a statement explicitly.
if (!ignoreRows || variables.isNotEmpty) {
// The simple query protocol does not support variables and returns rows
// as text. So when we need rows or parameters, we need an explicit prepare.
final prepared = await prepare(description, timeout: timeout);
try {
return await prepared.run(variables, timeout: timeout);
Expand All @@ -127,7 +131,8 @@ abstract class _PgSessionBase implements PgSession {
final controller = StreamController<PgResultRow>();
final items = <PgResultRow>[];

final querySubscription = _PgResultStreamSubscription.simpleQuery(
final querySubscription =
_PgResultStreamSubscription.simpleQueryAndIgnoreRows(
description.transformedSql,
this,
controller,
Expand Down Expand Up @@ -415,6 +420,7 @@ class _PgResultStreamSubscription
final _PgSessionBase session;
final StreamController<PgResultRow> _controller;
final StreamSubscription<PgResultRow> _source;
final bool ignoreRows;

final Completer<int> _affectedRows = Completer();
final Completer<PgResultSchema> _schema = Completer();
Expand All @@ -428,7 +434,8 @@ class _PgResultStreamSubscription

_PgResultStreamSubscription(
_BoundStatement statement, this._controller, this._source)
: session = statement.statement._session {
: session = statement.statement._session,
ignoreRows = false {
session._operationLock.withResource(() async {
connection._pending = this;

Expand All @@ -450,8 +457,9 @@ class _PgResultStreamSubscription
});
}

_PgResultStreamSubscription.simpleQuery(
String sql, this.session, this._controller, this._source) {
_PgResultStreamSubscription.simpleQueryAndIgnoreRows(
String sql, this.session, this._controller, this._source)
: ignoreRows = true {
session._operationLock.withResource(() async {
connection._pending = this;

Expand Down Expand Up @@ -487,20 +495,23 @@ class _PgResultStreamSubscription
]);
_schema.complete(schema);
} else if (message is DataRowMessage) {
final schema = _resultSchema!;
if (!ignoreRows) {
final schema = _resultSchema!;

final columnValues = <Object?>[];
for (var i = 0; i < message.values.length; i++) {
final field = schema.columns[i];
final columnValues = <Object?>[];
for (var i = 0; i < message.values.length; i++) {
final field = schema.columns[i];

final type = field.type;
final codec = field.binaryEncoding ? type.binaryCodec : type.textCodec;
final type = field.type;
final codec =
field.binaryEncoding ? type.binaryCodec : type.textCodec;

columnValues.add(codec.decode(message.values[i]));
}
columnValues.add(codec.decode(message.values[i]));
}

final row = _ResultRow(schema, columnValues);
_controller.add(row);
final row = _ResultRow(schema, columnValues);
_controller.add(row);
}
} else if (message is CommandCompleteMessage) {
_affectedRows.complete(message.rowsAffected);
} else if (message is ReadyForQueryMessage) {
Expand Down Expand Up @@ -589,7 +600,8 @@ class _Channels implements PgChannels {

void _subscribe(String channel, MultiStreamController firstListener) {
Future(() async {
await _connection.execute(PgSql('LISTEN ${identifier(channel)}'));
await _connection.execute(PgSql('LISTEN ${identifier(channel)}'),
ignoreRows: true);
}).onError<Object>((error, stackTrace) {
_activeListeners[channel]?.remove(firstListener);

Expand All @@ -607,7 +619,8 @@ class _Channels implements PgChannels {
_activeListeners.remove(channel);

// Send unlisten command
await _connection.execute(PgSql('UNLISTEN ${identifier(channel)}'));
await _connection.execute(PgSql('UNLISTEN ${identifier(channel)}'),
ignoreRows: true);
}
}

Expand Down
12 changes: 11 additions & 1 deletion lib/src/v3/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class PgPoint {

/// Supported data types.
enum PgDataType<Dart extends Object> {
/// Used to represent a type not yet understood by this package.
unknownType<Object>(null),

/// Must be a [String].
text<String>(25),

Expand Down Expand Up @@ -121,7 +124,14 @@ enum PgDataType<Dart extends Object> {
varCharArray<List<String>>(1015),

/// Must be a [List] of encodable objects
jsonbArray<List>(3807);
jsonbArray<List>(3807),

/// Must be a [PgDataType].
regtype<PgDataType>(2206),

/// Impossible to bind to, always null when read.
voidType<Object>(2278),
;

/// The object ID of this data type.
final int? oid;
Expand Down
24 changes: 24 additions & 0 deletions test/encoding_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,30 @@ void main() {
fail('unreachable');
} on JsonUnsupportedObjectError catch (_) {}
});

test('void', () async {
final result = await conn.query('SELECT NULL::void AS r');
expect(result.columnDescriptions, [
isA<ColumnDescription>()
.having((e) => e.typeId, 'typeId', PostgreSQLDataType.voidType.oid)
]);

expect(result, [
[null]
]);

expect(
() => PostgresBinaryEncoder(PostgreSQLDataType.voidType).convert(1),
throwsArgumentError,
);
});

test('regtype', () async {
await expectInverse(
PostgreSQLDataType.bigInteger, PostgreSQLDataType.regtype);
await expectInverse(
PostgreSQLDataType.voidType, PostgreSQLDataType.regtype);
});
});

group('Text encoders', () {
Expand Down
27 changes: 18 additions & 9 deletions test/interpolation_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@ import 'package:test/test.dart';

void main() {
test('Ensure all types/format type mappings are available and accurate', () {
PostgreSQLDataType.values
.where((t) =>
t != PostgreSQLDataType.bigSerial && t != PostgreSQLDataType.serial)
.forEach((t) {
expect(PostgreSQLFormatIdentifier.typeStringToCodeMap.values.contains(t),
true);
final code = PostgreSQLFormat.dataTypeStringForDataType(t);
expect(PostgreSQLFormatIdentifier.typeStringToCodeMap[code], t);
});
const withoutMapping = {
PostgreSQLDataType.unknownType, // Can't bind into unknown type
PostgreSQLDataType.voidType, // Can't assign to void
PostgreSQLDataType.bigSerial, // Can only be created from a table sequence
PostgreSQLDataType.serial,
};

for (final type in PostgreSQLDataType.values) {
if (withoutMapping.contains(type)) continue;

expect(
PostgreSQLFormatIdentifier.typeStringToCodeMap.values.contains(type),
true,
reason: 'There should be a type mapping for $type',
);
final code = PostgreSQLFormat.dataTypeStringForDataType(type);
expect(PostgreSQLFormatIdentifier.typeStringToCodeMap[code], type);
}
});

test('Ensure bigserial gets translated to int8', () {
Expand Down
34 changes: 29 additions & 5 deletions test/v3_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ final _endpoint = PgEndpoint(
// We log all packets sent to and received from the postgres server. This can be
// used to debug failing tests. To view logs, something like this can be put
// at the beginning of `main()`:
// Logger.root.onRecord.listen((r) => print('${r.loggerName}: ${r.message}'));
//
// Logger.root.level = Level.ALL;
// Logger.root.onRecord.listen((r) => print('${r.loggerName}: ${r.message}'));
StreamChannelTransformer<BaseMessage, BaseMessage> get _loggingTransformer {
final inLogger = Logger('postgres.connection.in');
final outLogger = Logger('postgres.connection.out');
Expand Down Expand Up @@ -69,6 +71,20 @@ void main() {
]);
});

test('statement without rows', () async {
final result = await connection.execute(
PgSql('''SELECT pg_notify('VIRTUAL','Payload 2');'''),
ignoreRows: true,
);

expect(result, isEmpty);
expect(result.schema.columns, [
isA<PgResultColumn>()
.having((e) => e.columnName, 'columnName', 'pg_notify')
.having((e) => e.type, 'type', PgDataType.voidType)
]);
});

test('queries without a schema message', () async {
final response =
await connection.execute('CREATE TEMPORARY TABLE foo (bar INTEGER);');
Expand Down Expand Up @@ -100,6 +116,11 @@ void main() {
await shouldPassthrough<int>(PgDataType.integer, 1024);
await shouldPassthrough<int>(PgDataType.bigInteger, 999999999999);
});

test('regtype', () async {
await shouldPassthrough<PgDataType>(
PgDataType.regtype, PgDataType.bigInteger);
});
});

test('listen and notify', () async {
Expand Down Expand Up @@ -278,9 +299,12 @@ void main() {
test('A transaction does not preempt pending queries', () async {
// Add a few insert queries but don't await, then do a transaction that does a fetch,
// make sure that transaction sees all of the elements.
unawaited(connection.execute('INSERT INTO t (id) VALUES (1)'));
unawaited(connection.execute('INSERT INTO t (id) VALUES (2)'));
unawaited(connection.execute('INSERT INTO t (id) VALUES (3)'));
unawaited(connection.execute('INSERT INTO t (id) VALUES (1)',
ignoreRows: true));
unawaited(connection.execute('INSERT INTO t (id) VALUES (2)',
ignoreRows: true));
unawaited(connection.execute('INSERT INTO t (id) VALUES (3)',
ignoreRows: true));

final results = await connection.runTx((ctx) async {
return await ctx.execute('SELECT id FROM t');
Expand Down Expand Up @@ -334,7 +358,7 @@ void main() {
);
addTearDown(connection.close);

await connection.execute("SELECT 'foo'");
await connection.execute("SELECT 'foo'", ignoreRows: true);
expect(incoming, contains(isA<DataRowMessage>()));
expect(outgoing, contains(isA<QueryMessage>()));
});
Expand Down