Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.proto;

/**
* Constants.
*/
public final class ProtoConstants {

public static final String METADATA_ENUM_PREFIX = "parquet.proto.enum.";
public static final String METADATA_ENUM_KEY_VALUE_SEPARATOR = ":";
public static final String METADATA_ENUM_ITEM_SEPARATOR = ",";
/**
* Configuration flag to enable reader to accept enum label that's neither defined in its own proto schema nor conform
* to the "UNKNOWN_ENUM_*" pattern with which we can get the enum number. The enum value will be treated as an unknown
* enum with number -1. <br>
* Enabling it will avoid a job failure, but you should perhaps use an up-to-date schema instead.
*/
public static final String CONFIG_ACCEPT_UNKNOWN_ENUM = "parquet.proto.accept.unknown.enum";

private ProtoConstants() {
// Do not instantiate.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.hadoop.BadConfigurationException;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
Expand All @@ -33,6 +35,8 @@
import org.apache.parquet.schema.IncompatibleSchemaModificationException;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
Expand All @@ -42,30 +46,46 @@

import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import static java.util.Optional.of;
import static org.apache.parquet.proto.ProtoConstants.CONFIG_ACCEPT_UNKNOWN_ENUM;
import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR;
import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR;
import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX;

/**
* Converts Protocol Buffer message (both top level and inner) to parquet.
* This is internal class, use {@link ProtoRecordConverter}.
*/
class ProtoMessageConverter extends GroupConverter {
private static final Logger LOG = LoggerFactory.getLogger(ProtoMessageConverter.class);

private final Converter[] converters;
private final ParentValueContainer parent;
private final Message.Builder myBuilder;
protected final Configuration conf;
protected final Converter[] converters;
protected final ParentValueContainer parent;
protected final Message.Builder myBuilder;
protected final Map<String, String> extraMetadata;

// used in record converter
ProtoMessageConverter(ParentValueContainer pvc, Class<? extends Message> protoClass, GroupType parquetSchema) {
this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema);
/**
* Used in record converter.
*
* @param conf Configuration for some customizable behavior,
* eg. "parquet.proto.accept.unknown.enum" - whether to accept an unparsable (after trying with proto enum label and number) enum as `UNKNOWN` with a number -1 (the one generated automatically for each proto enum)
* @param pvc The parent value containing the converted proto
* @param protoClass The class of the converted proto
* @param parquetSchema The (part of) parquet schema that should match to the expected proto
* @param extraMetadata Metadata from parquet footer, containing useful information about parquet-proto convertion behavior
*/
ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Class<? extends Message> protoClass, GroupType parquetSchema, Map<String, String> extraMetadata) {
this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, extraMetadata);
}


// For usage in message arrays
ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) {
ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Map<String, String> extraMetadata) {

int schemaSize = parquetSchema.getFieldCount();
converters = new Converter[schemaSize];

this.conf = conf;
this.parent = pvc;
this.extraMetadata = extraMetadata;
int parquetFieldIndex = 1;

if (pvc == null) {
Expand Down Expand Up @@ -108,7 +128,7 @@ public void end() {
myBuilder.clear();
}

private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
protected Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {

boolean isRepeated = fieldDescriptor.isRepeated();

Expand Down Expand Up @@ -148,7 +168,7 @@ public Optional<Converter> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation
}).orElseGet(() -> newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType));
}

private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
protected Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {

JavaType javaType = fieldDescriptor.getJavaType();

Expand All @@ -163,7 +183,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p
case LONG: return new ProtoLongConverter(pvc);
case MESSAGE: {
Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor);
return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType());
return new ProtoMessageConverter(conf, pvc, subBuilder, parquetType.asGroupType(), extraMetadata);
}
}

Expand All @@ -190,25 +210,45 @@ final class ProtoEnumConverter extends PrimitiveConverter {
private final Map<Binary, Descriptors.EnumValueDescriptor> enumLookup;
private Descriptors.EnumValueDescriptor[] dict;
private final ParentValueContainer parent;
private final Descriptors.EnumDescriptor enumType;
private final String unknownEnumPrefix;
private final boolean acceptUnknownEnum;

public ProtoEnumConverter(ParentValueContainer parent, Descriptors.FieldDescriptor fieldType) {
this.parent = parent;
this.fieldType = fieldType;
this.enumLookup = makeLookupStructure(fieldType);
this.enumType = fieldType.getEnumType();
this.enumLookup = makeLookupStructure(enumType);
unknownEnumPrefix = "UNKNOWN_ENUM_VALUE_" + enumType.getName() + "_";
acceptUnknownEnum = conf.getBoolean(CONFIG_ACCEPT_UNKNOWN_ENUM, false);
}

/**
* Fills lookup structure for translating between parquet enum values and Protocol buffer enum values.
* */
private Map<Binary, Descriptors.EnumValueDescriptor> makeLookupStructure(Descriptors.FieldDescriptor enumFieldType) {
Descriptors.EnumDescriptor enumType = enumFieldType.getEnumType();
private Map<Binary, Descriptors.EnumValueDescriptor> makeLookupStructure(Descriptors.EnumDescriptor enumType) {
Map<Binary, Descriptors.EnumValueDescriptor> lookupStructure = new HashMap<Binary, Descriptors.EnumValueDescriptor>();

List<Descriptors.EnumValueDescriptor> enumValues = enumType.getValues();
if (extraMetadata.containsKey(METADATA_ENUM_PREFIX + enumType.getFullName())) {
String enumNameNumberPairs = extraMetadata.get(METADATA_ENUM_PREFIX + enumType.getFullName());
if (enumNameNumberPairs == null || enumNameNumberPairs.trim().isEmpty()) {
LOG.debug("No enum is written for " + enumType.getFullName());
return lookupStructure;
}
for (String enumItem : enumNameNumberPairs.split(METADATA_ENUM_ITEM_SEPARATOR)) {
String[] nameAndNumber = enumItem.split(METADATA_ENUM_KEY_VALUE_SEPARATOR);
if (nameAndNumber.length != 2) {
throw new BadConfigurationException("Invalid enum bookkeeper from the metadata: " + enumNameNumberPairs);
}
lookupStructure.put(Binary.fromString(nameAndNumber[0]), enumType.findValueByNumberCreatingIfUnknown(Integer.parseInt(nameAndNumber[1])));
}
} else {
List<Descriptors.EnumValueDescriptor> enumValues = enumType.getValues();

for (Descriptors.EnumValueDescriptor value : enumValues) {
String name = value.getName();
lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name));
for (Descriptors.EnumValueDescriptor value : enumValues) {
String name = value.getName();
lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name));
}
}

return lookupStructure;
Expand All @@ -222,11 +262,37 @@ private Descriptors.EnumValueDescriptor translateEnumValue(Binary binaryValue) {
Descriptors.EnumValueDescriptor protoValue = enumLookup.get(binaryValue);

if (protoValue == null) {
Set<Binary> knownValues = enumLookup.keySet();
String msg = "Illegal enum value \"" + binaryValue + "\""
+ " in protocol buffer \"" + fieldType.getFullName() + "\""
+ " legal values are: \"" + knownValues + "\"";
throw new InvalidRecordException(msg);
// in case of unknown enum value, protobuf is creating new EnumValueDescriptor with the unknown number
// and name as following "UNKNOWN_ENUM_VALUE_" + parent.getName() + "_" + number
// so the idea is to parse the name for data created by parquet-proto before this patch
String unknownLabel = binaryValue.toStringUsingUTF8();
if (unknownLabel.startsWith(unknownEnumPrefix)) {
try {
int i = Integer.parseInt(unknownLabel.substring(unknownEnumPrefix.length()));
Descriptors.EnumValueDescriptor unknownEnumValue = enumType.findValueByNumberCreatingIfUnknown(i);
// build new EnumValueDescriptor and put it in the value cache
enumLookup.put(binaryValue, unknownEnumValue);
return unknownEnumValue;
} catch (NumberFormatException e) {
// The value does not respect "UNKNOWN_ENUM_VALUE_" + parent.getName() + "_" + number pattern
// We accept it as unknown enum with number -1.
}
}
if (!acceptUnknownEnum) {
// Safe mode, when an enum does not have its number in metadata (data written before this fix), and its label
// is unrecognizable (neither defined in the schema, nor parsable with "UNKNOWN_ENUM_*" pattern, which means
// probably the reader schema is not up-to-date), we reject with an error.
Set<Binary> knownValues = enumLookup.keySet();
String msg = "Illegal enum value \"" + binaryValue + "\""
+ " in protocol buffer \"" + fieldType.getFullName() + "\""
+ " legal values are: \"" + knownValues + "\"";
throw new InvalidRecordException(msg);
}
LOG.error("Found unknown value " + unknownLabel + " for field " + fieldType.getFullName() +
" probably because your proto schema is outdated, accept it as unknown enum with number -1");
Descriptors.EnumValueDescriptor unrecognized = enumType.findValueByNumberCreatingIfUnknown(-1);
enumLookup.put(binaryValue, unrecognized);
return unrecognized;
}
return protoValue;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -90,7 +90,7 @@ public RecordMaterializer<T> prepareForRead(Configuration configuration, Map<Str

MessageType requestedSchema = readContext.getRequestedSchema();
Class<? extends Message> protobufClass = Protobufs.getProtobufClass(headerProtoClass);
return new ProtoRecordMaterializer(requestedSchema, protobufClass);
return new ProtoRecordMaterializer(configuration, requestedSchema, protobufClass, keyValueMetaData);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -21,8 +21,12 @@

import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.schema.MessageType;

import java.util.Collections;
import java.util.Map;

/**
* Converts data content of root message from Protocol Buffer message to parquet message.
* It delegates conversion of inner fields to {@link ProtoMessageConverter} class using inheritance.
Expand All @@ -45,15 +49,26 @@ public void add(Object a) {
}
}

public ProtoRecordConverter(Configuration conf, Class<? extends Message> protoclass, MessageType parquetSchema, Map<String, String> extraMetadata) {
super(conf, new SkipParentValueContainer(), protoclass, parquetSchema, extraMetadata);
reusedBuilder = getBuilder();
}

public ProtoRecordConverter(Configuration conf, Message.Builder builder, MessageType parquetSchema, Map<String, String> extraMetadata) {
super(conf, new SkipParentValueContainer(), builder, parquetSchema, extraMetadata);
reusedBuilder = getBuilder();
}

// Old version constructors, kept for code backward compatibility.
// The instance will not be able to handle unknowned enum values written by parquet-proto (the behavior before PARQUET-1455)
@Deprecated
public ProtoRecordConverter(Class<? extends Message> protoclass, MessageType parquetSchema) {
super(new SkipParentValueContainer(), protoclass, parquetSchema);
reusedBuilder = getBuilder();
this(new Configuration(), protoclass, parquetSchema, Collections.emptyMap());
}

@Deprecated
public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) {
super(new SkipParentValueContainer(), builder, parquetSchema);
reusedBuilder = getBuilder();
this(new Configuration(), builder, parquetSchema, Collections.emptyMap());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -20,16 +20,19 @@

import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;

import java.util.Map;

class ProtoRecordMaterializer<T extends MessageOrBuilder> extends RecordMaterializer<T> {

private final ProtoRecordConverter<T> root;

public ProtoRecordMaterializer(MessageType requestedSchema, Class<? extends Message> protobufClass) {
this.root = new ProtoRecordConverter<T>(protobufClass, requestedSchema);
public ProtoRecordMaterializer(Configuration conf, MessageType requestedSchema, Class<? extends Message> protobufClass, Map<String, String> metadata) {
this.root = new ProtoRecordConverter<T>(conf, protobufClass, requestedSchema, metadata);
}

@Override
Expand Down
Loading