Skip to content

Commit f8b06df

Browse files
tsdengdvryaboy
authored andcommitted
do ProtocolEvents fixing only when there is required fields missing in the requested schema
https://issues.apache.org/jira/browse/PARQUET-61 This PR is trying to redo the https://github.com/apache/incubator-parquet-mr/pull/7 In this PR, it fixes the protocol event in a more precise condition: Only when the requested schema missing some required fields that are present in the full schema So even if there a projection, as long as the projection is not getting rid of the required field, the protocol events amender will not be called. Could you take a look at this ? @dvryaboy @yan-qi Author: Tianshuo Deng <[email protected]> Closes apache#28 from tsdeng/fix_protocol_when_required_field_missing and squashes the following commits: ba778b9 [Tianshuo Deng] add continue for readability d5639df [Tianshuo Deng] fix unused import 090e894 [Tianshuo Deng] format 13a609d [Tianshuo Deng] comment format ef1fe58 [Tianshuo Deng] little refactor, remove the hasMissingRequiredFieldFromProjection method 7c2c158 [Tianshuo Deng] format 83a5655 [Tianshuo Deng] do ProtocolEvents fixing only when there is required fields missing in the requested schema
1 parent 7a10506 commit f8b06df

File tree

1 file changed

+35
-2
lines changed

1 file changed

+35
-2
lines changed

parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,7 @@ public void end() {
778778
private final ParquetReadProtocol protocol;
779779
private final GroupConverter structConverter;
780780
private List<TProtocol> rootEvents = new ArrayList<TProtocol>();
781+
private boolean missingRequiredFieldsInProjection = false;
781782

782783
/**
783784
*
@@ -791,9 +792,36 @@ public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageT
791792
this.thriftReader = thriftReader;
792793
this.protocol = new ParquetReadProtocol();
793794
this.thriftType = thriftType;
795+
MessageType fullSchema = new ThriftSchemaConverter().convert(thriftType);
796+
missingRequiredFieldsInProjection = hasMissingRequiredFieldInGroupType(requestedParquetSchema, fullSchema);
794797
this.structConverter = new StructConverter(rootEvents, requestedParquetSchema, new ThriftField(name, (short)0, Requirement.REQUIRED, thriftType));
795798
}
796799

800+
private boolean hasMissingRequiredFieldInGroupType(GroupType requested, GroupType fullSchema) {
801+
for (Type field : fullSchema.getFields()) {
802+
803+
if (requested.containsField(field.getName())) {
804+
Type requestedType = requested.getType(field.getName());
805+
// if a field is in requested schema and the type of it is a group type, then do recursive check
806+
if (!field.isPrimitive()) {
807+
if (hasMissingRequiredFieldInGroupType(requestedType.asGroupType(), field.asGroupType())) {
808+
return true;
809+
} else {
810+
continue;// check next field
811+
}
812+
}
813+
} else {
814+
if (field.getRepetition() == Type.Repetition.REQUIRED) {
815+
return true; // if a field is missing in requested schema and it's required
816+
} else {
817+
continue; // the missing field is not required, then continue checking next field
818+
}
819+
}
820+
}
821+
822+
return false;
823+
}
824+
797825
/**
798826
*
799827
* {@inheritDoc}
@@ -802,8 +830,13 @@ public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageT
802830
@Override
803831
public T getCurrentRecord() {
804832
try {
805-
List<TProtocol> fixedEvents = new ProtocolEventsAmender(rootEvents).amendMissingRequiredFields(thriftType);
806-
protocol.addAll(fixedEvents);
833+
if (missingRequiredFieldsInProjection) {
834+
List<TProtocol> fixedEvents = new ProtocolEventsAmender(rootEvents).amendMissingRequiredFields(thriftType);
835+
protocol.addAll(fixedEvents);
836+
} else {
837+
protocol.addAll(rootEvents);
838+
}
839+
807840
rootEvents.clear();
808841
return thriftReader.readOneRecord(protocol);
809842
} catch (TException e) {

0 commit comments

Comments
 (0)