Skip to content

Commit 6aed836

Browse files
committed
Compare record with Variant
1 parent dbd8bb8 commit 6aed836

File tree

4 files changed

+81
-82
lines changed

4 files changed

+81
-82
lines changed

parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,9 @@ private static Converter newConverter(
395395
}
396396
return newStringConverter(schema, model, parent, validator);
397397
case RECORD:
398-
if (type.getName().equals("var") || type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
398+
if (type.getName().equals("var")
399+
|| type.getLogicalTypeAnnotation()
400+
instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
399401
return new AvroVariantConverter(parent, type.asGroupType(), schema, model);
400402
} else {
401403
return new AvroRecordConverter(parent, type.asGroupType(), schema, model, validator);

parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import java.nio.ByteBuffer;
2222
import java.util.function.Consumer;
2323
import org.apache.avro.Schema;
24+
import org.apache.avro.SchemaBuilder;
2425
import org.apache.avro.generic.GenericData;
2526
import org.apache.parquet.Preconditions;
2627
import org.apache.parquet.io.api.Converter;
2728
import org.apache.parquet.io.api.GroupConverter;
2829
import org.apache.parquet.schema.GroupType;
2930
import org.apache.parquet.variant.ImmutableMetadata;
30-
import org.apache.parquet.variant.Variant;
3131
import org.apache.parquet.variant.VariantBuilder;
3232
import org.apache.parquet.variant.VariantConverters;
3333

@@ -36,21 +36,15 @@
3636
*/
3737
class AvroVariantConverter extends GroupConverter implements VariantConverters.ParentConverter<VariantBuilder> {
3838
private final ParentValueContainer parent;
39-
// private final Schema avroSchema;
40-
// private final GenericData model;
41-
// private final int metadataPos;
42-
// private final int valuePos;
39+
private final GenericData model;
4340
private final GroupConverter wrappedConverter;
4441

4542
private VariantBuilder builder = null;
4643
private ImmutableMetadata metadata = null;
4744

4845
AvroVariantConverter(ParentValueContainer parent, GroupType variantGroup, Schema avroSchema, GenericData model) {
4946
this.parent = parent;
50-
// this.avroSchema = avroSchema;
51-
// this.metadataPos = avroSchema.getField("metadata").pos();
52-
// this.valuePos = avroSchema.getField("value").pos();
53-
// this.model = model;
47+
this.model = model;
5448
this.wrappedConverter = VariantConverters.newVariantConverter(variantGroup, this::setMetadata, this);
5549
}
5650

@@ -78,12 +72,22 @@ public void end() {
7872

7973
builder.appendNullIfEmpty();
8074

81-
Variant variant = builder.build();
82-
parent.add(variant);
83-
// Object record = model.newRecord(null, avroSchema);
84-
// model.setField(record, "metadata", metadataPos, metadata.getEncodedBuffer());
85-
// model.setField(record, "value", valuePos, builder.encodedValue());
86-
// parent.add(record);
75+
Object record = model.newRecord(
76+
null,
77+
SchemaBuilder.record("VariantRecord")
78+
.fields()
79+
.name("metadata")
80+
.type()
81+
.bytesType()
82+
.noDefault()
83+
.name("value")
84+
.type()
85+
.bytesType()
86+
.noDefault()
87+
.endRecord());
88+
model.setField(record, "metadata", 0, metadata.getEncodedBuffer());
89+
model.setField(record, "value", 1, builder.encodedValue());
90+
parent.add(record);
8791

8892
this.builder = null;
8993
}

parquet-avro/src/test/java/org/apache/parquet/variant/JsonUtil.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,14 @@
2424
import com.fasterxml.jackson.databind.ObjectMapper;
2525
import org.apache.parquet.Preconditions;
2626

27-
2827
public class JsonUtil {
2928

3029
private JsonUtil() {}
3130

32-
private static final JsonFactory FACTORY =
33-
new JsonFactoryBuilder()
34-
.configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false)
35-
.configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false)
36-
.build();
31+
private static final JsonFactory FACTORY = new JsonFactoryBuilder()
32+
.configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false)
33+
.configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false)
34+
.build();
3735
private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY);
3836

3937
public static ObjectMapper mapper() {
@@ -72,5 +70,4 @@ public static String getStringOrNull(String property, JsonNode node) {
7270
}
7371
return getString(property, node);
7472
}
75-
7673
}

parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java

Lines changed: 55 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.parquet.variant;
2121

22-
2322
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
2423

2524
import com.fasterxml.jackson.databind.JsonNode;
@@ -33,19 +32,18 @@
3332
import java.nio.file.Paths;
3433
import java.util.List;
3534
import java.util.stream.Stream;
35+
import org.apache.avro.generic.GenericData;
36+
import org.apache.avro.generic.GenericRecord;
3637
import org.apache.parquet.Preconditions;
38+
import org.apache.parquet.avro.AvroParquetReader;
3739
import org.apache.parquet.io.InputFile;
3840
import org.apache.parquet.io.LocalInputFile;
3941
import org.assertj.core.api.Assertions;
4042
import org.junit.jupiter.params.ParameterizedTest;
4143
import org.junit.jupiter.params.provider.Arguments;
4244
import org.junit.jupiter.params.provider.MethodSource;
43-
import org.apache.avro.generic.GenericRecord;
44-
import org.apache.parquet.avro.AvroParquetReader;
45-
4645

4746
public class TestVariantReadsFromFile {
48-
// Set this location to generated variant test cases
4947
private static final String CASE_LOCATION = null;
5048

5149
private static Stream<JsonNode> cases() throws IOException {
@@ -62,50 +60,42 @@ private static Stream<JsonNode> cases() throws IOException {
6260
}
6361

6462
private static Stream<Arguments> errorCases() throws IOException {
65-
return cases()
66-
.filter(caseNode -> caseNode.has("error_message") || !caseNode.has("parquet_file"))
67-
.map(
68-
caseNode -> {
69-
int caseNumber = JsonUtil.getInt("case_number", caseNode);
70-
String testName = JsonUtil.getStringOrNull("test", caseNode);
71-
String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode);
72-
String errorMessage = JsonUtil.getStringOrNull("error_message", caseNode);
73-
return Arguments.of(caseNumber, testName, parquetFile, errorMessage);
74-
});
63+
return cases().filter(caseNode -> caseNode.has("error_message") || !caseNode.has("parquet_file"))
64+
.map(caseNode -> {
65+
int caseNumber = JsonUtil.getInt("case_number", caseNode);
66+
String testName = JsonUtil.getStringOrNull("test", caseNode);
67+
String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode);
68+
String errorMessage = JsonUtil.getStringOrNull("error_message", caseNode);
69+
return Arguments.of(caseNumber, testName, parquetFile, errorMessage);
70+
});
7571
}
7672

7773
private static Stream<Arguments> singleVariantCases() throws IOException {
78-
return cases()
79-
.filter(caseNode -> caseNode.has("variant_file") || !caseNode.has("parquet_file"))
80-
.map(
81-
caseNode -> {
82-
int caseNumber = JsonUtil.getInt("case_number", caseNode);
83-
String testName = JsonUtil.getStringOrNull("test", caseNode);
84-
String variant = JsonUtil.getStringOrNull("variant", caseNode);
85-
String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode);
86-
String variantFile = JsonUtil.getStringOrNull("variant_file", caseNode);
87-
return Arguments.of(caseNumber, testName, variant, parquetFile, variantFile);
88-
});
74+
return cases().filter(caseNode -> caseNode.has("variant_file") || !caseNode.has("parquet_file"))
75+
.map(caseNode -> {
76+
int caseNumber = JsonUtil.getInt("case_number", caseNode);
77+
String testName = JsonUtil.getStringOrNull("test", caseNode);
78+
String variant = JsonUtil.getStringOrNull("variant", caseNode);
79+
String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode);
80+
String variantFile = JsonUtil.getStringOrNull("variant_file", caseNode);
81+
return Arguments.of(caseNumber, testName, variant, parquetFile, variantFile);
82+
});
8983
}
9084

9185
private static Stream<Arguments> multiVariantCases() throws IOException {
92-
return cases()
93-
.filter(caseNode -> caseNode.has("variant_files") || !caseNode.has("parquet_file"))
94-
.map(
95-
caseNode -> {
96-
int caseNumber = JsonUtil.getInt("case_number", caseNode);
97-
String testName = JsonUtil.getStringOrNull("test", caseNode);
98-
String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode);
99-
List<String> variantFiles =
100-
caseNode.has("variant_files")
101-
? Lists.newArrayList(
102-
Iterables.transform(
103-
caseNode.get("variant_files"),
104-
node -> node == null || node.isNull() ? null : node.asText()))
105-
: null;
106-
String variants = JsonUtil.getStringOrNull("variants", caseNode);
107-
return Arguments.of(caseNumber, testName, variants, parquetFile, variantFiles);
108-
});
86+
return cases().filter(caseNode -> caseNode.has("variant_files") || !caseNode.has("parquet_file"))
87+
.map(caseNode -> {
88+
int caseNumber = JsonUtil.getInt("case_number", caseNode);
89+
String testName = JsonUtil.getStringOrNull("test", caseNode);
90+
String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode);
91+
List<String> variantFiles = caseNode.has("variant_files")
92+
? Lists.newArrayList(Iterables.transform(
93+
caseNode.get("variant_files"),
94+
node -> node == null || node.isNull() ? null : node.asText()))
95+
: null;
96+
String variants = JsonUtil.getStringOrNull("variants", caseNode);
97+
return Arguments.of(caseNumber, testName, variants, parquetFile, variantFiles);
98+
});
10999
}
110100

111101
@ParameterizedTest
@@ -115,9 +105,8 @@ public void testError(int caseNumber, String testName, String parquetFile, Strin
115105
return;
116106
}
117107

118-
Assertions.assertThatThrownBy(() -> readParquet(parquetFile))
119-
.as("Test case %s: %s", caseNumber, testName);
120-
//.hasMessage(errorMessage);
108+
Assertions.assertThatThrownBy(() -> readParquet(parquetFile)).as("Test case %s: %s", caseNumber, testName);
109+
// .hasMessage(errorMessage);
121110
}
122111

123112
@ParameterizedTest
@@ -132,19 +121,15 @@ public void testSingleVariant(
132121
Variant expected = readVariant(variantFile);
133122

134123
GenericRecord record = readParquetRecord(parquetFile);
135-
Assertions.assertThat(record.get("var")).isInstanceOf(Variant.class);
136-
Variant actual = (Variant) record.get("var");
124+
Assertions.assertThat(record.get("var")).isInstanceOf(GenericData.Record.class);
125+
GenericData.Record actual = (GenericData.Record) record.get("var");
137126
assertEqual(expected, actual);
138127
}
139128

140129
@ParameterizedTest
141130
@MethodSource("multiVariantCases")
142131
public void testMultiVariant(
143-
int caseNumber,
144-
String testName,
145-
String variants,
146-
String parquetFile,
147-
List<String> variantFiles)
132+
int caseNumber, String testName, String variants, String parquetFile, List<String> variantFiles)
148133
throws IOException {
149134
if (parquetFile == null) {
150135
return;
@@ -158,8 +143,8 @@ public void testMultiVariant(
158143

159144
if (variantFile != null) {
160145
Variant expected = readVariant(variantFile);
161-
Assertions.assertThat(record.get("var")).isInstanceOf(Variant.class);
162-
Variant actual = (Variant) record.get("var");
146+
Assertions.assertThat(record.get("var")).isInstanceOf(GenericData.Record.class);
147+
GenericData.Record actual = (GenericData.Record) record.get("var");
163148
assertEqual(expected, actual);
164149
} else {
165150
Assertions.assertThat(record.get("var")).isNull();
@@ -187,7 +172,8 @@ private Variant readVariant(String variantFile) throws IOException {
187172
int dictSize = VariantUtil.readUnsigned(variantBuffer, 1, offsetSize);
188173
int offsetListOffset = 1 + offsetSize;
189174
int dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize);
190-
int endOffset = dataOffset + VariantUtil.readUnsigned(variantBuffer, offsetListOffset + (offsetSize * dictSize), offsetSize);
175+
int endOffset = dataOffset
176+
+ VariantUtil.readUnsigned(variantBuffer, offsetListOffset + (offsetSize * dictSize), offsetSize);
191177

192178
return new Variant(VariantUtil.slice(variantBuffer, endOffset), variantBuffer);
193179
}
@@ -200,7 +186,8 @@ private GenericRecord readParquetRecord(String parquetFile) throws IOException {
200186
private List<GenericRecord> readParquet(String parquetFile) throws IOException {
201187
org.apache.parquet.io.InputFile inputFile = new LocalInputFile(Paths.get(CASE_LOCATION + "/" + parquetFile));
202188
List<GenericRecord> records = Lists.newArrayList();
203-
try (org.apache.parquet.hadoop.ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile).build()) {
189+
try (org.apache.parquet.hadoop.ParquetReader<GenericRecord> reader =
190+
AvroParquetReader.<GenericRecord>builder(inputFile).build()) {
204191
GenericRecord record;
205192
while ((record = reader.read()) != null) {
206193
records.add(record);
@@ -209,9 +196,18 @@ private List<GenericRecord> readParquet(String parquetFile) throws IOException {
209196
return records;
210197
}
211198

199+
private static void assertEqual(Variant expected, GenericData.Record actualRecord) {
200+
assertThat(actualRecord).isNotNull();
201+
assertThat(expected).isNotNull();
202+
Variant actual = new Variant((ByteBuffer) actualRecord.get("value"), (ByteBuffer) actualRecord.get("metadata"));
203+
204+
assertEqual(expected, actual);
205+
}
206+
212207
private static void assertEqual(Variant expected, Variant actual) {
213208
assertThat(actual).isNotNull();
214209
assertThat(expected).isNotNull();
210+
215211
assertThat(actual.getType()).isEqualTo(expected.getType());
216212

217213
switch (expected.getType()) {
@@ -266,13 +262,13 @@ private static void assertEqual(Variant expected, Variant actual) {
266262
Variant.ObjectField actualField = actual.getFieldAtIndex(i);
267263

268264
assertThat(actualField.key).isEqualTo(expectedField.key);
269-
assertEqual(actualField.value, actualField.value);
265+
assertEqual(expectedField.value, actualField.value);
270266
}
271267
break;
272268
case ARRAY:
273269
assertThat(actual.numArrayElements()).isEqualTo(expected.numArrayElements());
274270
for (int i = 0; i < expected.numArrayElements(); ++i) {
275-
assertEqual(expected.getElementAtIndex(i),actual.getElementAtIndex(i));
271+
assertEqual(expected.getElementAtIndex(i), actual.getElementAtIndex(i));
276272
}
277273
break;
278274
default:

0 commit comments

Comments
 (0)