diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java new file mode 100644 index 0000000000..7458f8913b --- /dev/null +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.proto; + +/** + * Constants. + */ +public final class ProtoConstants { + + public static final String METADATA_ENUM_PREFIX = "parquet.proto.enum."; + public static final String METADATA_ENUM_KEY_VALUE_SEPARATOR = ":"; + public static final String METADATA_ENUM_ITEM_SEPARATOR = ","; + /** + * Configuration flag to enable reader to accept enum label that's neither defined in its own proto schema nor conform + * to the "UNKNOWN_ENUM_*" pattern with which we can get the enum number. The enum value will be treated as an unknown + * enum with number -1.
+ * Enabling it will avoid a job failure, but you should perhaps use an up-to-date schema instead. + */ + public static final String CONFIG_ACCEPT_UNKNOWN_ENUM = "parquet.proto.accept.unknown.enum"; + + private ProtoConstants() { + // Do not instantiate. + } +} diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index 173fa7799b..77f5d529e6 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -22,7 +22,9 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; @@ -33,6 +35,8 @@ import org.apache.parquet.schema.IncompatibleSchemaModificationException; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; @@ -42,30 +46,46 @@ import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType; import static java.util.Optional.of; +import static org.apache.parquet.proto.ProtoConstants.CONFIG_ACCEPT_UNKNOWN_ENUM; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX; /** * Converts Protocol Buffer message (both top level and inner) to parquet. * This is internal class, use {@link ProtoRecordConverter}. */ class ProtoMessageConverter extends GroupConverter { + private static final Logger LOG = LoggerFactory.getLogger(ProtoMessageConverter.class); - private final Converter[] converters; - private final ParentValueContainer parent; - private final Message.Builder myBuilder; + protected final Configuration conf; + protected final Converter[] converters; + protected final ParentValueContainer parent; + protected final Message.Builder myBuilder; + protected final Map extraMetadata; - // used in record converter - ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema) { - this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema); + /** + * Used in record converter. + * + * @param conf Configuration for some customizable behavior, + * eg. "parquet.proto.accept.unknown.enum" - whether to accept an unparsable (after trying with proto enum label and number) enum as `UNKNOWN` with a number -1 (the one generated automatically for each proto enum) + * @param pvc The parent value containing the converted proto + * @param protoClass The class of the converted proto + * @param parquetSchema The (part of) parquet schema that should match to the expected proto + * @param extraMetadata Metadata from parquet footer, containing useful information about parquet-proto convertion behavior + */ + ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Class protoClass, GroupType parquetSchema, Map extraMetadata) { + this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, extraMetadata); } - // For usage in message arrays - ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) { + ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Map extraMetadata) { int schemaSize = parquetSchema.getFieldCount(); converters = new Converter[schemaSize]; - + this.conf = conf; this.parent = pvc; + this.extraMetadata = extraMetadata; int parquetFieldIndex = 1; if (pvc == null) { @@ -108,7 +128,7 @@ public void end() { myBuilder.clear(); } - private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + protected Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { boolean isRepeated = fieldDescriptor.isRepeated(); @@ -148,7 +168,7 @@ public Optional visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation }).orElseGet(() -> newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType)); } - private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + protected Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { JavaType javaType = fieldDescriptor.getJavaType(); @@ -163,7 +183,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p case LONG: return new ProtoLongConverter(pvc); case MESSAGE: { Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor); - return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType()); + return new ProtoMessageConverter(conf, pvc, subBuilder, parquetType.asGroupType(), extraMetadata); } } @@ -190,25 +210,45 @@ final class ProtoEnumConverter extends PrimitiveConverter { private final Map enumLookup; private Descriptors.EnumValueDescriptor[] dict; private final ParentValueContainer parent; + private final Descriptors.EnumDescriptor enumType; + private final String unknownEnumPrefix; + private final boolean acceptUnknownEnum; public ProtoEnumConverter(ParentValueContainer parent, Descriptors.FieldDescriptor fieldType) { this.parent = parent; this.fieldType = fieldType; - this.enumLookup = makeLookupStructure(fieldType); + this.enumType = fieldType.getEnumType(); + this.enumLookup = makeLookupStructure(enumType); + unknownEnumPrefix = "UNKNOWN_ENUM_VALUE_" + enumType.getName() + "_"; + acceptUnknownEnum = conf.getBoolean(CONFIG_ACCEPT_UNKNOWN_ENUM, false); } /** * Fills lookup structure for translating between parquet enum values and Protocol buffer enum values. * */ - private Map makeLookupStructure(Descriptors.FieldDescriptor enumFieldType) { - Descriptors.EnumDescriptor enumType = enumFieldType.getEnumType(); + private Map makeLookupStructure(Descriptors.EnumDescriptor enumType) { Map lookupStructure = new HashMap(); - List enumValues = enumType.getValues(); + if (extraMetadata.containsKey(METADATA_ENUM_PREFIX + enumType.getFullName())) { + String enumNameNumberPairs = extraMetadata.get(METADATA_ENUM_PREFIX + enumType.getFullName()); + if (enumNameNumberPairs == null || enumNameNumberPairs.trim().isEmpty()) { + LOG.debug("No enum is written for " + enumType.getFullName()); + return lookupStructure; + } + for (String enumItem : enumNameNumberPairs.split(METADATA_ENUM_ITEM_SEPARATOR)) { + String[] nameAndNumber = enumItem.split(METADATA_ENUM_KEY_VALUE_SEPARATOR); + if (nameAndNumber.length != 2) { + throw new BadConfigurationException("Invalid enum bookkeeper from the metadata: " + enumNameNumberPairs); + } + lookupStructure.put(Binary.fromString(nameAndNumber[0]), enumType.findValueByNumberCreatingIfUnknown(Integer.parseInt(nameAndNumber[1]))); + } + } else { + List enumValues = enumType.getValues(); - for (Descriptors.EnumValueDescriptor value : enumValues) { - String name = value.getName(); - lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name)); + for (Descriptors.EnumValueDescriptor value : enumValues) { + String name = value.getName(); + lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name)); + } } return lookupStructure; @@ -222,11 +262,37 @@ private Descriptors.EnumValueDescriptor translateEnumValue(Binary binaryValue) { Descriptors.EnumValueDescriptor protoValue = enumLookup.get(binaryValue); if (protoValue == null) { - Set knownValues = enumLookup.keySet(); - String msg = "Illegal enum value \"" + binaryValue + "\"" - + " in protocol buffer \"" + fieldType.getFullName() + "\"" - + " legal values are: \"" + knownValues + "\""; - throw new InvalidRecordException(msg); + // in case of unknown enum value, protobuf is creating new EnumValueDescriptor with the unknown number + // and name as following "UNKNOWN_ENUM_VALUE_" + parent.getName() + "_" + number + // so the idea is to parse the name for data created by parquet-proto before this patch + String unknownLabel = binaryValue.toStringUsingUTF8(); + if (unknownLabel.startsWith(unknownEnumPrefix)) { + try { + int i = Integer.parseInt(unknownLabel.substring(unknownEnumPrefix.length())); + Descriptors.EnumValueDescriptor unknownEnumValue = enumType.findValueByNumberCreatingIfUnknown(i); + // build new EnumValueDescriptor and put it in the value cache + enumLookup.put(binaryValue, unknownEnumValue); + return unknownEnumValue; + } catch (NumberFormatException e) { + // The value does not respect "UNKNOWN_ENUM_VALUE_" + parent.getName() + "_" + number pattern + // We accept it as unknown enum with number -1. + } + } + if (!acceptUnknownEnum) { + // Safe mode, when an enum does not have its number in metadata (data written before this fix), and its label + // is unrecognizable (neither defined in the schema, nor parsable with "UNKNOWN_ENUM_*" pattern, which means + // probably the reader schema is not up-to-date), we reject with an error. + Set knownValues = enumLookup.keySet(); + String msg = "Illegal enum value \"" + binaryValue + "\"" + + " in protocol buffer \"" + fieldType.getFullName() + "\"" + + " legal values are: \"" + knownValues + "\""; + throw new InvalidRecordException(msg); + } + LOG.error("Found unknown value " + unknownLabel + " for field " + fieldType.getFullName() + + " probably because your proto schema is outdated, accept it as unknown enum with number -1"); + Descriptors.EnumValueDescriptor unrecognized = enumType.findValueByNumberCreatingIfUnknown(-1); + enumLookup.put(binaryValue, unrecognized); + return unrecognized; } return protoValue; } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 0d79d019c1..78edf70d2e 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -90,7 +90,7 @@ public RecordMaterializer prepareForRead(Configuration configuration, Map protobufClass = Protobufs.getProtobufClass(headerProtoClass); - return new ProtoRecordMaterializer(requestedSchema, protobufClass); + return new ProtoRecordMaterializer(configuration, requestedSchema, protobufClass, keyValueMetaData); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java index e161819af2..75a67f12cf 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,8 +21,12 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.schema.MessageType; +import java.util.Collections; +import java.util.Map; + /** * Converts data content of root message from Protocol Buffer message to parquet message. * It delegates conversion of inner fields to {@link ProtoMessageConverter} class using inheritance. @@ -45,15 +49,26 @@ public void add(Object a) { } } + public ProtoRecordConverter(Configuration conf, Class protoclass, MessageType parquetSchema, Map extraMetadata) { + super(conf, new SkipParentValueContainer(), protoclass, parquetSchema, extraMetadata); + reusedBuilder = getBuilder(); + } + + public ProtoRecordConverter(Configuration conf, Message.Builder builder, MessageType parquetSchema, Map extraMetadata) { + super(conf, new SkipParentValueContainer(), builder, parquetSchema, extraMetadata); + reusedBuilder = getBuilder(); + } + // Old version constructors, kept for code backward compatibility. + // The instance will not be able to handle unknowned enum values written by parquet-proto (the behavior before PARQUET-1455) + @Deprecated public ProtoRecordConverter(Class protoclass, MessageType parquetSchema) { - super(new SkipParentValueContainer(), protoclass, parquetSchema); - reusedBuilder = getBuilder(); + this(new Configuration(), protoclass, parquetSchema, Collections.emptyMap()); } + @Deprecated public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) { - super(new SkipParentValueContainer(), builder, parquetSchema); - reusedBuilder = getBuilder(); + this(new Configuration(), builder, parquetSchema, Collections.emptyMap()); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java index 039a571137..dd77ca6b61 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,19 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; +import java.util.Map; + class ProtoRecordMaterializer extends RecordMaterializer { private final ProtoRecordConverter root; - public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass) { - this.root = new ProtoRecordConverter(protobufClass, requestedSchema); + public ProtoRecordMaterializer(Configuration conf, MessageType requestedSchema, Class protobufClass, Map metadata) { + this.root = new ProtoRecordConverter(conf, protobufClass, requestedSchema, metadata); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index a8038020a8..2322667651 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -38,6 +38,10 @@ import static java.util.Optional.ofNullable; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX; + /** * Implementation of {@link WriteSupport} for writing Protocol Buffers. */ @@ -55,6 +59,9 @@ public class ProtoWriteSupport extends WriteSupport< private RecordConsumer recordConsumer; private Class protoMessage; private MessageWriter messageWriter; + // Keep protobuf enum value with number in the metadata, so that in read time, a reader can read at least + // the number back even with an outdated schema which might not contain all enum values. + private Map> protoEnumBookKeeper = new HashMap<>(); public ProtoWriteSupport() { } @@ -126,13 +133,41 @@ public WriteContext init(Configuration configuration) { this.messageWriter = new MessageWriter(messageDescriptor, rootSchema); - Map extraMetaData = new HashMap(); + Map extraMetaData = new HashMap<>(); extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName()); extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage)); extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant)); return new WriteContext(rootSchema, extraMetaData); } + @Override + public FinalizedWriteContext finalizeWrite() { + Map protoMetadata = enumMetadata(); + return new FinalizedWriteContext(protoMetadata); + } + + private Map enumMetadata() { + Map enumMetadata = new HashMap<>(); + for (Map.Entry> enumNameNumberMapping : protoEnumBookKeeper.entrySet()) { + StringBuilder nameNumberPairs = new StringBuilder(); + if (enumNameNumberMapping.getValue().isEmpty()) { + // No enum is ever written to any column of this file, put an empty string as the value in the metadata + LOG.info("No enum is written for " + enumNameNumberMapping.getKey()); + } + int idx = 0; + for (Map.Entry nameNumberPair : enumNameNumberMapping.getValue().entrySet()) { + nameNumberPairs.append(nameNumberPair.getKey()) + .append(METADATA_ENUM_KEY_VALUE_SEPARATOR) + .append(nameNumberPair.getValue()); + idx ++; + if (idx < enumNameNumberMapping.getValue().size()) { + nameNumberPairs.append(METADATA_ENUM_ITEM_SEPARATOR); + } + } + enumMetadata.put(METADATA_ENUM_PREFIX + enumNameNumberMapping.getKey(), nameNumberPairs.toString()); + } + return enumMetadata; + } class FieldWriter { String fieldName; @@ -202,7 +237,7 @@ private FieldWriter createWriter(FieldDescriptor fieldDescriptor, Type type) { case LONG: return new LongWriter(); case FLOAT: return new FloatWriter(); case DOUBLE: return new DoubleWriter(); - case ENUM: return new EnumWriter(); + case ENUM: return new EnumWriter(fieldDescriptor.getEnumType()); case BOOLEAN: return new BooleanWriter(); case BYTE_STRING: return new BinaryWriter(); } @@ -480,10 +515,23 @@ final void writeRawValue(Object value) { } class EnumWriter extends FieldWriter { + Map enumNameNumberPairs; + + public EnumWriter(Descriptors.EnumDescriptor enumType) { + if (protoEnumBookKeeper.containsKey(enumType.getFullName())) { + enumNameNumberPairs = protoEnumBookKeeper.get(enumType.getFullName()); + } else { + enumNameNumberPairs = new HashMap<>(); + protoEnumBookKeeper.put(enumType.getFullName(), enumNameNumberPairs); + } + } + @Override final void writeRawValue(Object value) { - Binary binary = Binary.fromString(((Descriptors.EnumValueDescriptor) value).getName()); + Descriptors.EnumValueDescriptor enumValueDesc = (Descriptors.EnumValueDescriptor) value; + Binary binary = Binary.fromString(enumValueDesc.getName()); recordConsumer.addBinary(binary); + enumNameNumberPairs.putIfAbsent(enumValueDesc.getName(), enumValueDesc.getNumber()); } } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java index e042f96821..74cddeb58a 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java @@ -25,11 +25,12 @@ import java.util.List; +import static org.apache.parquet.proto.TestUtils.testData; +import static org.apache.parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.apache.parquet.proto.TestUtils.testData; -import static org.apache.parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes; public class ProtoRecordConverterTest { @@ -341,4 +342,22 @@ public void testProto3LargeProtobufferFieldId() throws Exception { testData(builder.build()); } + + @Test + public void testUnknownEnum() throws Exception { + TestProto3.SchemaConverterAllDatatypes.Builder data; + data = TestProto3.SchemaConverterAllDatatypes.newBuilder(); + data.setOptionalEnumValue(42); + + TestProto3.SchemaConverterAllDatatypes dataBuilt = data.build(); + data.clear(); + + List result; + result = testData(dataBuilt); + + //data are fully checked in testData function. Lets do one more check. + TestProto3.SchemaConverterAllDatatypes o = result.get(0); + assertSame(o.getOptionalEnum(), TestProto3.SchemaConverterAllDatatypes.TestEnum.UNRECOGNIZED); + assertEquals(o.getOptionalEnumValue(), 42); + } } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java new file mode 100644 index 0000000000..db7f6ced0a --- /dev/null +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.proto; + +import org.apache.hadoop.fs.Path; +import org.apache.parquet.proto.test.TestProto3SchemaV1; +import org.apache.parquet.proto.test.TestProto3SchemaV2; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.apache.parquet.proto.TestUtils.readMessages; +import static org.apache.parquet.proto.TestUtils.writeMessages; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** + * Tests for backward/forward compatibility while write and read parquet using different versions of protobuf schema. + */ +public class ProtoSchemaEvolutionTest { + + /** + * Test we can read enum value (number) with an old schema even the value is missing in the old schema. + */ + @Test + public void testEnumSchemaWriteV2ReadV1() throws IOException { + TestProto3SchemaV2.MessageSchema dataV2 = TestProto3SchemaV2.MessageSchema.newBuilder() + .setOptionalLabelNumberPair(TestProto3SchemaV2.MessageSchema.LabelNumberPair.SECOND) + .setOptionalString("string value") + .build(); + Path file = writeMessages(dataV2); + List messagesV1 = readMessages(file, TestProto3SchemaV1.MessageSchema.class); + assertEquals(messagesV1.size(), 1); + assertEquals(messagesV1.get(0).getOptionalLabelNumberPairValue(), 2); + } + + /** + * Write enum value unknown in V1 (thus "UNKNOWN_ENUM_VALUE_*"), and we can read it back with schema V2 that contains + * the enum definition. + */ + @Test + public void testEnumSchemaWriteV1ReadV2() throws IOException { + TestProto3SchemaV1.MessageSchema dataV1WithEnumValueFromV2 = TestProto3SchemaV1.MessageSchema.newBuilder() + .setOptionalLabelNumberPairValue(2) // "2" is not defined in V1 enum, but the number is still accepted by protobuf + .build(); + Path file = writeMessages(dataV1WithEnumValueFromV2); + List messagesV2 = readMessages(file, TestProto3SchemaV2.MessageSchema.class); + assertEquals(messagesV2.size(), 1); + assertSame(messagesV2.get(0).getOptionalLabelNumberPair(), TestProto3SchemaV2.MessageSchema.LabelNumberPair.SECOND); + } +} diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java index 2c8b41f395..2fbd2a4748 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java @@ -23,13 +23,13 @@ import com.google.protobuf.MessageOrBuilder; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetReader; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; public class TestUtils { @@ -78,8 +78,7 @@ public static List testData(T... messages) thro checkSameBuilderInstance(messages); - List output = (List) writeAndRead(messages); - + List output = writeAndRead(messages); List outputAsMessages = asMessages(output); Descriptors.Descriptor messageDescriptor = Protobufs.getMessageDescriptor(asMessage(messages[0]).getClass()); Descriptors.FileDescriptor.Syntax syntax = messageDescriptor.getFile().getSyntax(); @@ -117,7 +116,6 @@ public static List asMessages(List mobs) { for (MessageOrBuilder messageOrBuilder : mobs) { result.add(asMessage(messageOrBuilder)); } - return result; } @@ -161,22 +159,34 @@ private static void checkSameBuilderInstance(MessageOrBuilder[] messages) { * Reads messages from given file. The file could/should be created by method writeMessages */ public static List readMessages(Path file) throws IOException { - ProtoParquetReader reader = new ProtoParquetReader(file); - - List result = new ArrayList(); - boolean hasNext = true; - while (hasNext) { - T item = reader.read(); - if (item == null) { - hasNext = false; - } else { - assertNotNull(item); - // It makes sense to return message but production code wont work with messages - result.add((T) asMessage(item).toBuilder()); + return readMessages(file, null); + } + + /** + * Read messages from given file into the expected proto class. + * @param file + * @param messageClass + * @param + * @return List of protobuf messages for the given type. + */ + public static List readMessages(Path file, Class messageClass) throws IOException { + ParquetReader.Builder readerBuilder = ProtoParquetReader.builder(file); + if (messageClass != null) { + readerBuilder.set(ProtoReadSupport.PB_CLASS, messageClass.getName()).build(); + } + try (ParquetReader reader = readerBuilder.build()) { + List result = new ArrayList(); + boolean hasNext = true; + while (hasNext) { + T item = (T) reader.read(); + if (item == null) { + hasNext = false; + } else { + result.add((T) asMessage(item)); + } } + return result; } - reader.close(); - return result; } /** diff --git a/parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto b/parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto new file mode 100644 index 0000000000..6c0c8cc8bc --- /dev/null +++ b/parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package TestProto3.Schema; + +option java_package = "org.apache.parquet.proto.test"; + +// For the test of schema evolution +// This is the "V1" schema, the "V2" (its evolution) is in TestProto3SchemaV2.proto +message MessageSchema { + + enum LabelNumberPair { + UNKNOWN_VALUE = 0; + FIRST = 1; + } + + LabelNumberPair optionalLabelNumberPair = 1; + string optionalString = 2; + int32 optionalInt32 = 3; + +} + diff --git a/parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto b/parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto new file mode 100644 index 0000000000..846f1a37a7 --- /dev/null +++ b/parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package TestProto3.Schema; + +option java_package = "org.apache.parquet.proto.test"; + +// For the test of schema evolution +// This is the "V2" schema, which is supposed to be an evolution from the "V1" (TestProto3SchemaV1.proto) +message MessageSchema { + + enum LabelNumberPair { + UNKNOWN_VALUE = 0; + FIRST = 1; + // We added one more value in V2 comparing to V1 + SECOND = 2; + } + + LabelNumberPair optionalLabelNumberPair = 1; + string optionalString = 2; + int32 optionalInt32 = 3; + +} + diff --git a/pom.xml b/pom.xml index 52e1fdbfc9..a84af6e76f 100644 --- a/pom.xml +++ b/pom.xml @@ -391,7 +391,6 @@ - org.apache.maven.plugins @@ -402,6 +401,7 @@ ${maven.compiler.target} + org.apache.maven.plugins maven-failsafe-plugin @@ -415,6 +415,7 @@ + org.apache.maven.plugins maven-surefire-plugin @@ -439,6 +440,7 @@ + org.codehaus.mojo buildnumber-maven-plugin @@ -452,6 +454,7 @@ + org.apache.rat apache-rat-plugin @@ -542,7 +545,6 @@ -