From abf74a9225fd2de5a919cbc609138cb62b2ca314 Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Fri, 14 Dec 2018 11:22:43 +0100 Subject: [PATCH 1/4] PARQUET-1478: Can't read spec compliant, 3-level lists via parquet-proto Test to demonstrate the problem. --- .../proto/ProtoInputOutputFormatTest.java | 2 +- .../parquet/proto/utils/ReadUsingMR.java | 19 +++++++++++++------ .../parquet/proto/utils/WriteUsingMR.java | 9 ++++----- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index 5544dc6887..07813a3841 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -303,7 +303,7 @@ public void testRepeatedInnerMessageClassSchemaCompliant() throws Exception { ProtoWriteSupport.setWriteSpecsCompliant(conf, true); Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); - ReadUsingMR readUsingMR = new ReadUsingMR(); + ReadUsingMR readUsingMR = new ReadUsingMR(conf); String customClass = TestProtobuf.RepeatedInnerMessage.class.getName(); ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); List result = readUsingMR.read(outputPath); diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java index 8905968b01..1171a1566d 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/ReadUsingMR.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,7 +28,6 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.parquet.proto.ProtoParquetInputFormat; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,19 +40,27 @@ public class ReadUsingMR { private static List outputMessages; - Configuration conf = new Configuration(); + Configuration conf; private String projection; public void setRequestedProjection(String projection) { this.projection = projection; } + public ReadUsingMR() { + this(new Configuration()); + } + + public ReadUsingMR(Configuration conf) { + this.conf = conf; + } + public Configuration getConfiguration() { return conf; } public static class ReadingMapper extends Mapper { - protected void map(Void key, MessageOrBuilder value, Context context) throws IOException, InterruptedException { + protected void map(Void key, MessageOrBuilder value, Context context) { Message clone = ((Message.Builder) value).build(); outputMessages.add(clone); } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java index 55f9237ec5..90bb3fd756 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -55,7 +54,7 @@ public WriteUsingMR() { } public WriteUsingMR(Configuration conf) { - this.conf = new Configuration(); + this.conf = conf; } public Configuration getConfiguration() { From f22296edfe54c7f7c3dee054e3104d11a7b29d15 Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Fri, 14 Dec 2018 17:29:45 +0100 Subject: [PATCH 2/4] Fix list/map read path for parquet-proto --- .../parquet/proto/ProtoMessageConverter.java | 37 +++++++++++-------- .../parquet/proto/ProtoReadSupport.java | 23 +++++++++--- .../parquet/proto/ProtoRecordConverter.java | 21 +++++++---- .../proto/ProtoRecordMaterializer.java | 10 +++-- .../proto/ProtoInputOutputFormatTest.java | 2 +- 5 files changed, 62 insertions(+), 31 deletions(-) diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index 92d8b624d9..baa072ae5a 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -54,13 +54,13 @@ class ProtoMessageConverter extends GroupConverter { private final Message.Builder myBuilder; // used in record converter - ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema) { - this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema); + ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema, boolean specCompliant) { + this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, specCompliant); } // For usage in message arrays - ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) { + ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, boolean specCompliant) { int schemaSize = parquetSchema.getFieldCount(); converters = new Converter[schemaSize]; @@ -80,12 +80,18 @@ class ProtoMessageConverter extends GroupConverter { Descriptors.FieldDescriptor protoField = protoDescriptor.findFieldByName(parquetField.getName()); if (protoField == null) { + // Skip extra list/key_value layer for spec-compliant, 3-level structures, + // when the proto schema doesn't have a field with this name. + if (specCompliant && + ("key_value".equals(parquetField.getName()) || "list".equals(parquetField.getName()))) { + continue; + } String description = "Scheme mismatch \n\"" + parquetField + "\"" + "\n proto descriptor:\n" + protoDescriptor.toProto(); throw new IncompatibleSchemaModificationException("Cant find \"" + parquetField.getName() + "\" " + description); } - converters[parquetFieldIndex - 1] = newMessageConverter(myBuilder, protoField, parquetField); + converters[parquetFieldIndex - 1] = newMessageConverter(myBuilder, protoField, parquetField, specCompliant); parquetFieldIndex++; } @@ -108,7 +114,7 @@ public void end() { myBuilder.clear(); } - private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType, boolean specCompliant) { boolean isRepeated = fieldDescriptor.isRepeated(); @@ -132,23 +138,24 @@ public void add(Object value) { LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation(); if (logicalTypeAnnotation == null) { - return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); + return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType, specCompliant); } return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { @Override public Optional visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) { - return of(new ListConverter(parentBuilder, fieldDescriptor, parquetType)); + return of(new ListConverter(parentBuilder, fieldDescriptor, parquetType, specCompliant)); } @Override public Optional visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) { - return of(new MapConverter(parentBuilder, fieldDescriptor, parquetType)); + return of(new MapConverter(parentBuilder, fieldDescriptor, parquetType, specCompliant)); } - }).orElse(newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType)); + }).orElse(newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType, specCompliant)); } - private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, + Descriptors.FieldDescriptor fieldDescriptor, Type parquetType, boolean specCompliant) { JavaType javaType = fieldDescriptor.getJavaType(); @@ -163,7 +170,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p case LONG: return new ProtoLongConverter(pvc); case MESSAGE: { Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor); - return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType()); + return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType(), specCompliant); } } @@ -386,7 +393,7 @@ public void addBinary(Binary binary) { final class ListConverter extends GroupConverter { private final Converter converter; - public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType, boolean specCompliant) { LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation(); if (!(logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) || parquetType.isPrimitive()) { throw new ParquetDecodingException("Expected LIST wrapper. Found: " + logicalTypeAnnotation + " instead."); @@ -403,7 +410,7 @@ public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor } Type elementType = listType.getType("element"); - converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType); + converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType, specCompliant); } @Override @@ -445,7 +452,7 @@ public void end() { final class MapConverter extends GroupConverter { private final Converter converter; - public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType, boolean specCompliant) { LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation(); if (!(logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation)) { throw new ParquetDecodingException("Expected MAP wrapper. Found: " + logicalTypeAnnotation + " instead."); @@ -458,7 +465,7 @@ public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor f throw new ParquetDecodingException("Expected map but got: " + parquetType); } - converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema); + converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema, specCompliant); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 0d79d019c1..2e52096c29 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,6 +30,8 @@ import java.util.Map; +import static org.apache.parquet.proto.ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE; + public class ProtoReadSupport extends ReadSupport { @@ -40,6 +42,8 @@ public class ProtoReadSupport extends ReadSupport { public static final String PB_CLASS = "parquet.proto.class"; public static final String PB_DESCRIPTOR = "parquet.proto.descriptor"; + private boolean readSpecsCompliant = false; + public static void setRequestedProjection(Configuration configuration, String requestedProjection) { configuration.set(PB_REQUESTED_PROJECTION, requestedProjection); } @@ -76,6 +80,7 @@ public ReadContext init(InitContext context) { public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { String headerProtoClass = keyValueMetaData.get(PB_CLASS); String configuredProtoClass = configuration.get(PB_CLASS); + readSpecsCompliant = configuration.getBoolean(PB_SPECS_COMPLIANT_WRITE, readSpecsCompliant); if (configuredProtoClass != null) { LOG.debug("Replacing class " + headerProtoClass + " by " + configuredProtoClass); @@ -90,8 +95,16 @@ public RecordMaterializer prepareForRead(Configuration configuration, Map protobufClass = Protobufs.getProtobufClass(headerProtoClass); - return new ProtoRecordMaterializer(requestedSchema, protobufClass); + return new ProtoRecordMaterializer(requestedSchema, protobufClass, readSpecsCompliant); } - + /** + * Make parquet-protobuf use the LIST and MAP wrappers for collections. Set to false if you need backward + * compatibility with parquet before PARQUET-968 (1.9.0 and older). + * @param configuration The hadoop configuration + * @param readSpecsCompliant If set to true, the old schema style will be used (without wrappers). + */ + public static void setReadSpecsCompliant(Configuration configuration, boolean readSpecsCompliant) { + configuration.setBoolean(PB_SPECS_COMPLIANT_WRITE, readSpecsCompliant); + } } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java index e161819af2..7ce20a2d96 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,15 +45,22 @@ public void add(Object a) { } } + public ProtoRecordConverter(Class protoclass, MessageType parquetSchema, boolean specCompliant) { + super(new SkipParentValueContainer(), protoclass, parquetSchema, specCompliant); + reusedBuilder = getBuilder(); + } - public ProtoRecordConverter(Class protoclass, MessageType parquetSchema) { - super(new SkipParentValueContainer(), protoclass, parquetSchema); + public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema, boolean specCompliant) { + super(new SkipParentValueContainer(), builder, parquetSchema, specCompliant); reusedBuilder = getBuilder(); } + public ProtoRecordConverter(Class protoclass, MessageType parquetSchema) { + this(protoclass, parquetSchema, false); + } + public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) { - super(new SkipParentValueContainer(), builder, parquetSchema); - reusedBuilder = getBuilder(); + this(builder, parquetSchema, false); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java index 039a571137..33a79b4df9 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,6 +32,10 @@ public ProtoRecordMaterializer(MessageType requestedSchema, Class(protobufClass, requestedSchema); } + public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass, boolean readSpecsCompliant) { + this.root = new ProtoRecordConverter(protobufClass, requestedSchema, readSpecsCompliant); + } + @Override public T getCurrentRecord() { return root.getCurrentRecord(); diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index 07813a3841..f40b202905 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -262,7 +262,7 @@ public void testMapIntMessageClassSchemaCompliant() throws Exception { ProtoWriteSupport.setWriteSpecsCompliant(conf, true); Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); - ReadUsingMR readUsingMR = new ReadUsingMR(); + ReadUsingMR readUsingMR = new ReadUsingMR(conf); String customClass = TestProtobuf.MapIntMessage.class.getName(); ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); List result = readUsingMR.read(outputPath); From 0d5153618c8a3d627ab5608f5a56350215f6e7ef Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Sat, 29 Dec 2018 10:48:52 +0100 Subject: [PATCH 3/4] Revert "Fix list/map read path for parquet-proto" This reverts commit f22296edfe54c7f7c3dee054e3104d11a7b29d15. --- .../parquet/proto/ProtoMessageConverter.java | 37 ++++++++----------- .../parquet/proto/ProtoReadSupport.java | 23 +++--------- .../parquet/proto/ProtoRecordConverter.java | 21 ++++------- .../proto/ProtoRecordMaterializer.java | 10 ++--- .../proto/ProtoInputOutputFormatTest.java | 2 +- 5 files changed, 31 insertions(+), 62 deletions(-) diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index baa072ae5a..92d8b624d9 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -54,13 +54,13 @@ class ProtoMessageConverter extends GroupConverter { private final Message.Builder myBuilder; // used in record converter - ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema, boolean specCompliant) { - this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, specCompliant); + ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema) { + this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema); } // For usage in message arrays - ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, boolean specCompliant) { + ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) { int schemaSize = parquetSchema.getFieldCount(); converters = new Converter[schemaSize]; @@ -80,18 +80,12 @@ class ProtoMessageConverter extends GroupConverter { Descriptors.FieldDescriptor protoField = protoDescriptor.findFieldByName(parquetField.getName()); if (protoField == null) { - // Skip extra list/key_value layer for spec-compliant, 3-level structures, - // when the proto schema doesn't have a field with this name. - if (specCompliant && - ("key_value".equals(parquetField.getName()) || "list".equals(parquetField.getName()))) { - continue; - } String description = "Scheme mismatch \n\"" + parquetField + "\"" + "\n proto descriptor:\n" + protoDescriptor.toProto(); throw new IncompatibleSchemaModificationException("Cant find \"" + parquetField.getName() + "\" " + description); } - converters[parquetFieldIndex - 1] = newMessageConverter(myBuilder, protoField, parquetField, specCompliant); + converters[parquetFieldIndex - 1] = newMessageConverter(myBuilder, protoField, parquetField); parquetFieldIndex++; } @@ -114,7 +108,7 @@ public void end() { myBuilder.clear(); } - private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType, boolean specCompliant) { + private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { boolean isRepeated = fieldDescriptor.isRepeated(); @@ -138,24 +132,23 @@ public void add(Object value) { LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation(); if (logicalTypeAnnotation == null) { - return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType, specCompliant); + return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); } return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { @Override public Optional visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) { - return of(new ListConverter(parentBuilder, fieldDescriptor, parquetType, specCompliant)); + return of(new ListConverter(parentBuilder, fieldDescriptor, parquetType)); } @Override public Optional visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) { - return of(new MapConverter(parentBuilder, fieldDescriptor, parquetType, specCompliant)); + return of(new MapConverter(parentBuilder, fieldDescriptor, parquetType)); } - }).orElse(newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType, specCompliant)); + }).orElse(newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType)); } - private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, - Descriptors.FieldDescriptor fieldDescriptor, Type parquetType, boolean specCompliant) { + private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { JavaType javaType = fieldDescriptor.getJavaType(); @@ -170,7 +163,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p case LONG: return new ProtoLongConverter(pvc); case MESSAGE: { Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor); - return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType(), specCompliant); + return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType()); } } @@ -393,7 +386,7 @@ public void addBinary(Binary binary) { final class ListConverter extends GroupConverter { private final Converter converter; - public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType, boolean specCompliant) { + public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation(); if (!(logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) || parquetType.isPrimitive()) { throw new ParquetDecodingException("Expected LIST wrapper. Found: " + logicalTypeAnnotation + " instead."); @@ -410,7 +403,7 @@ public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor } Type elementType = listType.getType("element"); - converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType, specCompliant); + converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType); } @Override @@ -452,7 +445,7 @@ public void end() { final class MapConverter extends GroupConverter { private final Converter converter; - public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType, boolean specCompliant) { + public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation(); if (!(logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation)) { throw new ParquetDecodingException("Expected MAP wrapper. Found: " + logicalTypeAnnotation + " instead."); @@ -465,7 +458,7 @@ public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor f throw new ParquetDecodingException("Expected map but got: " + parquetType); } - converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema, specCompliant); + converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 2e52096c29..0d79d019c1 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,8 +30,6 @@ import java.util.Map; -import static org.apache.parquet.proto.ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE; - public class ProtoReadSupport extends ReadSupport { @@ -42,8 +40,6 @@ public class ProtoReadSupport extends ReadSupport { public static final String PB_CLASS = "parquet.proto.class"; public static final String PB_DESCRIPTOR = "parquet.proto.descriptor"; - private boolean readSpecsCompliant = false; - public static void setRequestedProjection(Configuration configuration, String requestedProjection) { configuration.set(PB_REQUESTED_PROJECTION, requestedProjection); } @@ -80,7 +76,6 @@ public ReadContext init(InitContext context) { public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { String headerProtoClass = keyValueMetaData.get(PB_CLASS); String configuredProtoClass = configuration.get(PB_CLASS); - readSpecsCompliant = configuration.getBoolean(PB_SPECS_COMPLIANT_WRITE, readSpecsCompliant); if (configuredProtoClass != null) { LOG.debug("Replacing class " + headerProtoClass + " by " + configuredProtoClass); @@ -95,16 +90,8 @@ public RecordMaterializer prepareForRead(Configuration configuration, Map protobufClass = Protobufs.getProtobufClass(headerProtoClass); - return new ProtoRecordMaterializer(requestedSchema, protobufClass, readSpecsCompliant); + return new ProtoRecordMaterializer(requestedSchema, protobufClass); } - /** - * Make parquet-protobuf use the LIST and MAP wrappers for collections. Set to false if you need backward - * compatibility with parquet before PARQUET-968 (1.9.0 and older). - * @param configuration The hadoop configuration - * @param readSpecsCompliant If set to true, the old schema style will be used (without wrappers). - */ - public static void setReadSpecsCompliant(Configuration configuration, boolean readSpecsCompliant) { - configuration.setBoolean(PB_SPECS_COMPLIANT_WRITE, readSpecsCompliant); - } + } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java index 7ce20a2d96..e161819af2 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,22 +45,15 @@ public void add(Object a) { } } - public ProtoRecordConverter(Class protoclass, MessageType parquetSchema, boolean specCompliant) { - super(new SkipParentValueContainer(), protoclass, parquetSchema, specCompliant); - reusedBuilder = getBuilder(); - } - - public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema, boolean specCompliant) { - super(new SkipParentValueContainer(), builder, parquetSchema, specCompliant); - reusedBuilder = getBuilder(); - } public ProtoRecordConverter(Class protoclass, MessageType parquetSchema) { - this(protoclass, parquetSchema, false); + super(new SkipParentValueContainer(), protoclass, parquetSchema); + reusedBuilder = getBuilder(); } public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) { - this(builder, parquetSchema, false); + super(new SkipParentValueContainer(), builder, parquetSchema); + reusedBuilder = getBuilder(); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java index 33a79b4df9..039a571137 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,10 +32,6 @@ public ProtoRecordMaterializer(MessageType requestedSchema, Class(protobufClass, requestedSchema); } - public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass, boolean readSpecsCompliant) { - this.root = new ProtoRecordConverter(protobufClass, requestedSchema, readSpecsCompliant); - } - @Override public T getCurrentRecord() { return root.getCurrentRecord(); diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index f40b202905..07813a3841 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -262,7 +262,7 @@ public void testMapIntMessageClassSchemaCompliant() throws Exception { ProtoWriteSupport.setWriteSpecsCompliant(conf, true); Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); - ReadUsingMR readUsingMR = new ReadUsingMR(conf); + ReadUsingMR readUsingMR = new ReadUsingMR(); String customClass = TestProtobuf.MapIntMessage.class.getName(); ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); List result = readUsingMR.read(outputPath); From 90be23b3e12c1b653d89bb48e22db2a359a1a24e Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Sat, 29 Dec 2018 11:36:17 +0100 Subject: [PATCH 4/4] Use suggested way from review to fix the failure --- .../java/org/apache/parquet/proto/ProtoMessageConverter.java | 2 +- .../org/apache/parquet/proto/ProtoInputOutputFormatTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index 92d8b624d9..173fa7799b 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -145,7 +145,7 @@ public Optional visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation public Optional visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) { return of(new MapConverter(parentBuilder, fieldDescriptor, parquetType)); } - }).orElse(newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType)); + }).orElseGet(() -> newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType)); } private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java index 07813a3841..f40b202905 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java @@ -262,7 +262,7 @@ public void testMapIntMessageClassSchemaCompliant() throws Exception { ProtoWriteSupport.setWriteSpecsCompliant(conf, true); Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty); - ReadUsingMR readUsingMR = new ReadUsingMR(); + ReadUsingMR readUsingMR = new ReadUsingMR(conf); String customClass = TestProtobuf.MapIntMessage.class.getName(); ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass); List result = readUsingMR.read(outputPath);