From a5af072aee8f94086d9750557842c1936c90ecc8 Mon Sep 17 00:00:00 2001 From: Qinghui Xu Date: Sun, 9 Aug 2020 19:55:10 +0200 Subject: [PATCH 1/2] PARQUET-1455: Handle "unknown enum" for protobuf Protobuf can set enum field using number, while a number does not match any enum value defined in the schema, it is still accepted and a label "UNKNOWN_ENUM__" is generated when we use protobuf reflection API (proto descriptors) to access it. And in parquet-protobuf, we rely on protobuf reflection API to convert forward/backward between the two world. There are two cases of unknown enum while using parquet-protobuf: 1. Protobuf already contains unknown enum when we write it to parquet (eg1. sometmes people set enum fields using numbers; eg2 writer deserialize data from wire and the sender can have a newer version of proto schema with new enum values). The behavior of parquet-protobuf writer as before this patch is to write a label "UNKNOWN_ENUM_" as string in the enum column of parquet. And when we read it back as protobuf, we found this unknown label which does not match any enum def (even with the same schema as the sender in eg2) 2. Protobuf contains valid value when write to parquet, but the reader uses an outdated proto schema which misses some enum values. So the not-in-old-schema enum values are "unknown" to the reader. Previous behavior of parquet-proto reader is to reject in both cases with some runtime exception. To be able to handle the problems: We keep enum (name -> number) mapping in the parquet metadata, so that in read time, reader can discover the number and use protobuf reflection API to set enum number. Keep in mind though, for the case reading enum with outdated schema (case 2), the enum read back will have the right number, but the label is set to "UNKNOW_ENUM_". So this feature is helpful only if the user is using number to manipulate enum data. And for old data containing "true" unknown value (thus case 1) created before this patch (thus name -> number mapping is not available), we now try to parse the string regarding to the "UNKNOWN_ENUM_" pattern. If we read old data created before this patch (thus name -> number is not available), with an outdated schema, and we find some enum value not defined in the schema nor following "UNKNOWN_ENUM_*" pattern, we could either fail the job by raising an exception or treat the value as unknown enum with number -1, by setting a flag in the configuration. The name -> number mapping is a new metadata under the "parquet.proto.enum" namespace. The metadata for protobuf enum (label:number) mapping should follow some specific pattern, throw BadConfigurationException in read time if it is not. Tests for enum schema evolution (read/write with different protobuf schema) are added. --- .../apache/parquet/proto/ProtoConstants.java | 40 ++++++ .../parquet/proto/ProtoMessageConverter.java | 115 ++++++++++++++---- .../parquet/proto/ProtoReadSupport.java | 8 +- .../parquet/proto/ProtoRecordConverter.java | 29 +++-- .../proto/ProtoRecordMaterializer.java | 13 +- .../parquet/proto/ProtoWriteSupport.java | 54 +++++++- .../proto/ProtoRecordConverterTest.java | 23 +++- .../proto/ProtoSchemaEvolutionTest.java | 68 +++++++++++ .../org/apache/parquet/proto/TestUtils.java | 46 ++++--- .../test/resources/TestProto3SchemaV1.proto | 38 ++++++ .../test/resources/TestProto3SchemaV2.proto | 40 ++++++ pom.xml | 6 +- 12 files changed, 415 insertions(+), 65 deletions(-) create mode 100644 parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java create mode 100644 parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java create mode 100644 parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto create mode 100644 parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java new file mode 100644 index 0000000000..7458f8913b --- /dev/null +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.proto; + +/** + * Constants. + */ +public final class ProtoConstants { + + public static final String METADATA_ENUM_PREFIX = "parquet.proto.enum."; + public static final String METADATA_ENUM_KEY_VALUE_SEPARATOR = ":"; + public static final String METADATA_ENUM_ITEM_SEPARATOR = ","; + /** + * Configuration flag to enable reader to accept enum label that's neither defined in its own proto schema nor conform + * to the "UNKNOWN_ENUM_*" pattern with which we can get the enum number. The enum value will be treated as an unknown + * enum with number -1.
+ * Enabling it will avoid a job failure, but you should perhaps use an up-to-date schema instead. + */ + public static final String CONFIG_ACCEPT_UNKNOWN_ENUM = "parquet.proto.accept.unknown.enum"; + + private ProtoConstants() { + // Do not instantiate. + } +} diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index 173fa7799b..b57a1da12a 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -22,7 +22,10 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; @@ -33,6 +36,8 @@ import org.apache.parquet.schema.IncompatibleSchemaModificationException; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; @@ -42,30 +47,46 @@ import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType; import static java.util.Optional.of; +import static org.apache.parquet.proto.ProtoConstants.CONFIG_ACCEPT_UNKNOWN_ENUM; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX; /** * Converts Protocol Buffer message (both top level and inner) to parquet. * This is internal class, use {@link ProtoRecordConverter}. */ class ProtoMessageConverter extends GroupConverter { + private static final Logger LOG = LoggerFactory.getLogger(ProtoMessageConverter.class); - private final Converter[] converters; - private final ParentValueContainer parent; - private final Message.Builder myBuilder; + protected final Configuration conf; + protected final Converter[] converters; + protected final ParentValueContainer parent; + protected final Message.Builder myBuilder; + protected final Map extraMetadata; - // used in record converter - ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema) { - this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema); + /** + * Used in record converter. + * + * @param conf Configuration for some customizable behavior, + * eg. "parquet.proto.accept.unknown.enum" - whether to accept an unparsable (after trying with proto enum label and number) enum as `UNKNOWN` with a number -1 (the one generated automatically for each proto enum) + * @param pvc The parent value containing the converted proto + * @param protoClass The class of the converted proto + * @param parquetSchema The (part of) parquet schema that should match to the expected proto + * @param extraMetadata Metadata from parquet footer, containing useful information about parquet-proto convertion behavior + */ + ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Class protoClass, GroupType parquetSchema, Map extraMetadata) { + this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, extraMetadata); } - // For usage in message arrays - ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) { + ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Map extraMetadata) { int schemaSize = parquetSchema.getFieldCount(); converters = new Converter[schemaSize]; - + this.conf = conf; this.parent = pvc; + this.extraMetadata = extraMetadata; int parquetFieldIndex = 1; if (pvc == null) { @@ -108,7 +129,7 @@ public void end() { myBuilder.clear(); } - private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + protected Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { boolean isRepeated = fieldDescriptor.isRepeated(); @@ -148,7 +169,7 @@ public Optional visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation }).orElseGet(() -> newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType)); } - private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + protected Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { JavaType javaType = fieldDescriptor.getJavaType(); @@ -163,7 +184,7 @@ private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder p case LONG: return new ProtoLongConverter(pvc); case MESSAGE: { Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor); - return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType()); + return new ProtoMessageConverter(conf, pvc, subBuilder, parquetType.asGroupType(), extraMetadata); } } @@ -190,25 +211,45 @@ final class ProtoEnumConverter extends PrimitiveConverter { private final Map enumLookup; private Descriptors.EnumValueDescriptor[] dict; private final ParentValueContainer parent; + private final Descriptors.EnumDescriptor enumType; + private final String unknownEnumPrefix; + private final boolean acceptUnknownEnum; public ProtoEnumConverter(ParentValueContainer parent, Descriptors.FieldDescriptor fieldType) { this.parent = parent; this.fieldType = fieldType; - this.enumLookup = makeLookupStructure(fieldType); + this.enumType = fieldType.getEnumType(); + this.enumLookup = makeLookupStructure(enumType); + unknownEnumPrefix = "UNKNOWN_ENUM_VALUE_" + enumType.getName() + "_"; + acceptUnknownEnum = conf.getBoolean(CONFIG_ACCEPT_UNKNOWN_ENUM, false); } /** * Fills lookup structure for translating between parquet enum values and Protocol buffer enum values. * */ - private Map makeLookupStructure(Descriptors.FieldDescriptor enumFieldType) { - Descriptors.EnumDescriptor enumType = enumFieldType.getEnumType(); + private Map makeLookupStructure(Descriptors.EnumDescriptor enumType) { Map lookupStructure = new HashMap(); - List enumValues = enumType.getValues(); + if (extraMetadata.containsKey(METADATA_ENUM_PREFIX + enumType.getFullName())) { + String enumNameNumberPairs = extraMetadata.get(METADATA_ENUM_PREFIX + enumType.getFullName()); + if (StringUtils.isBlank(enumNameNumberPairs)) { + LOG.info("No enum is written for " + enumType.getFullName()); + return lookupStructure; + } + for (String enumItem : enumNameNumberPairs.split(METADATA_ENUM_ITEM_SEPARATOR)) { + String[] nameAndNumber = enumItem.split(METADATA_ENUM_KEY_VALUE_SEPARATOR); + if (nameAndNumber.length != 2) { + throw new BadConfigurationException("Invalid enum bookkeeper from the metadata: " + enumNameNumberPairs); + } + lookupStructure.put(Binary.fromString(nameAndNumber[0]), enumType.findValueByNumberCreatingIfUnknown(Integer.parseInt(nameAndNumber[1]))); + } + } else { + List enumValues = enumType.getValues(); - for (Descriptors.EnumValueDescriptor value : enumValues) { - String name = value.getName(); - lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name)); + for (Descriptors.EnumValueDescriptor value : enumValues) { + String name = value.getName(); + lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name)); + } } return lookupStructure; @@ -222,11 +263,37 @@ private Descriptors.EnumValueDescriptor translateEnumValue(Binary binaryValue) { Descriptors.EnumValueDescriptor protoValue = enumLookup.get(binaryValue); if (protoValue == null) { - Set knownValues = enumLookup.keySet(); - String msg = "Illegal enum value \"" + binaryValue + "\"" - + " in protocol buffer \"" + fieldType.getFullName() + "\"" - + " legal values are: \"" + knownValues + "\""; - throw new InvalidRecordException(msg); + // in case of unknown enum value, protobuf is creating new EnumValueDescriptor with the unknown number + // and name as following "UNKNOWN_ENUM_VALUE_" + parent.getName() + "_" + number + // so the idea is to parse the name for data created by parquet-proto before this patch + String unknownLabel = new String(binaryValue.getBytes()); + if (unknownLabel.startsWith(unknownEnumPrefix)) { + try { + int i = Integer.parseInt(unknownLabel.substring(unknownEnumPrefix.length())); + Descriptors.EnumValueDescriptor unknownEnumValue = enumType.findValueByNumberCreatingIfUnknown(i); + // build new EnumValueDescriptor and put it in the value cache + enumLookup.put(binaryValue, unknownEnumValue); + return unknownEnumValue; + } catch (NumberFormatException e) { + // The value does not respect "UNKNOWN_ENUM_VALUE_" + parent.getName() + "_" + number pattern + // We accept it as unknown enum with number -1. + } + } + if (!acceptUnknownEnum) { + // Safe mode, when an enum does not have its number in metadata (data written before this fix), and its label + // is unrecognizable (neither defined in the schema, nor parsable with "UNKNOWN_ENUM_*" pattern, which means + // probably the reader schema is not up-to-date), we reject with an error. + Set knownValues = enumLookup.keySet(); + String msg = "Illegal enum value \"" + binaryValue + "\"" + + " in protocol buffer \"" + fieldType.getFullName() + "\"" + + " legal values are: \"" + knownValues + "\""; + throw new InvalidRecordException(msg); + } + LOG.error("Found unknown value " + unknownLabel + " for field " + fieldType.getFullName() + + " probably because your proto schema is outdated, accept it as unknown enum with number -1"); + Descriptors.EnumValueDescriptor unrecognized = enumType.findValueByNumberCreatingIfUnknown(-1); + enumLookup.put(binaryValue, unrecognized); + return unrecognized; } return protoValue; } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 0d79d019c1..78edf70d2e 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -90,7 +90,7 @@ public RecordMaterializer prepareForRead(Configuration configuration, Map protobufClass = Protobufs.getProtobufClass(headerProtoClass); - return new ProtoRecordMaterializer(requestedSchema, protobufClass); + return new ProtoRecordMaterializer(configuration, requestedSchema, protobufClass, keyValueMetaData); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java index e161819af2..75a67f12cf 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,8 +21,12 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.schema.MessageType; +import java.util.Collections; +import java.util.Map; + /** * Converts data content of root message from Protocol Buffer message to parquet message. * It delegates conversion of inner fields to {@link ProtoMessageConverter} class using inheritance. @@ -45,15 +49,26 @@ public void add(Object a) { } } + public ProtoRecordConverter(Configuration conf, Class protoclass, MessageType parquetSchema, Map extraMetadata) { + super(conf, new SkipParentValueContainer(), protoclass, parquetSchema, extraMetadata); + reusedBuilder = getBuilder(); + } + + public ProtoRecordConverter(Configuration conf, Message.Builder builder, MessageType parquetSchema, Map extraMetadata) { + super(conf, new SkipParentValueContainer(), builder, parquetSchema, extraMetadata); + reusedBuilder = getBuilder(); + } + // Old version constructors, kept for code backward compatibility. + // The instance will not be able to handle unknowned enum values written by parquet-proto (the behavior before PARQUET-1455) + @Deprecated public ProtoRecordConverter(Class protoclass, MessageType parquetSchema) { - super(new SkipParentValueContainer(), protoclass, parquetSchema); - reusedBuilder = getBuilder(); + this(new Configuration(), protoclass, parquetSchema, Collections.emptyMap()); } + @Deprecated public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) { - super(new SkipParentValueContainer(), builder, parquetSchema); - reusedBuilder = getBuilder(); + this(new Configuration(), builder, parquetSchema, Collections.emptyMap()); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java index 039a571137..dd77ca6b61 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,19 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; +import java.util.Map; + class ProtoRecordMaterializer extends RecordMaterializer { private final ProtoRecordConverter root; - public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass) { - this.root = new ProtoRecordConverter(protobufClass, requestedSchema); + public ProtoRecordMaterializer(Configuration conf, MessageType requestedSchema, Class protobufClass, Map metadata) { + this.root = new ProtoRecordConverter(conf, protobufClass, requestedSchema, metadata); } @Override diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index a8038020a8..2322667651 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -38,6 +38,10 @@ import static java.util.Optional.ofNullable; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR; +import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX; + /** * Implementation of {@link WriteSupport} for writing Protocol Buffers. */ @@ -55,6 +59,9 @@ public class ProtoWriteSupport extends WriteSupport< private RecordConsumer recordConsumer; private Class protoMessage; private MessageWriter messageWriter; + // Keep protobuf enum value with number in the metadata, so that in read time, a reader can read at least + // the number back even with an outdated schema which might not contain all enum values. + private Map> protoEnumBookKeeper = new HashMap<>(); public ProtoWriteSupport() { } @@ -126,13 +133,41 @@ public WriteContext init(Configuration configuration) { this.messageWriter = new MessageWriter(messageDescriptor, rootSchema); - Map extraMetaData = new HashMap(); + Map extraMetaData = new HashMap<>(); extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName()); extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage)); extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant)); return new WriteContext(rootSchema, extraMetaData); } + @Override + public FinalizedWriteContext finalizeWrite() { + Map protoMetadata = enumMetadata(); + return new FinalizedWriteContext(protoMetadata); + } + + private Map enumMetadata() { + Map enumMetadata = new HashMap<>(); + for (Map.Entry> enumNameNumberMapping : protoEnumBookKeeper.entrySet()) { + StringBuilder nameNumberPairs = new StringBuilder(); + if (enumNameNumberMapping.getValue().isEmpty()) { + // No enum is ever written to any column of this file, put an empty string as the value in the metadata + LOG.info("No enum is written for " + enumNameNumberMapping.getKey()); + } + int idx = 0; + for (Map.Entry nameNumberPair : enumNameNumberMapping.getValue().entrySet()) { + nameNumberPairs.append(nameNumberPair.getKey()) + .append(METADATA_ENUM_KEY_VALUE_SEPARATOR) + .append(nameNumberPair.getValue()); + idx ++; + if (idx < enumNameNumberMapping.getValue().size()) { + nameNumberPairs.append(METADATA_ENUM_ITEM_SEPARATOR); + } + } + enumMetadata.put(METADATA_ENUM_PREFIX + enumNameNumberMapping.getKey(), nameNumberPairs.toString()); + } + return enumMetadata; + } class FieldWriter { String fieldName; @@ -202,7 +237,7 @@ private FieldWriter createWriter(FieldDescriptor fieldDescriptor, Type type) { case LONG: return new LongWriter(); case FLOAT: return new FloatWriter(); case DOUBLE: return new DoubleWriter(); - case ENUM: return new EnumWriter(); + case ENUM: return new EnumWriter(fieldDescriptor.getEnumType()); case BOOLEAN: return new BooleanWriter(); case BYTE_STRING: return new BinaryWriter(); } @@ -480,10 +515,23 @@ final void writeRawValue(Object value) { } class EnumWriter extends FieldWriter { + Map enumNameNumberPairs; + + public EnumWriter(Descriptors.EnumDescriptor enumType) { + if (protoEnumBookKeeper.containsKey(enumType.getFullName())) { + enumNameNumberPairs = protoEnumBookKeeper.get(enumType.getFullName()); + } else { + enumNameNumberPairs = new HashMap<>(); + protoEnumBookKeeper.put(enumType.getFullName(), enumNameNumberPairs); + } + } + @Override final void writeRawValue(Object value) { - Binary binary = Binary.fromString(((Descriptors.EnumValueDescriptor) value).getName()); + Descriptors.EnumValueDescriptor enumValueDesc = (Descriptors.EnumValueDescriptor) value; + Binary binary = Binary.fromString(enumValueDesc.getName()); recordConsumer.addBinary(binary); + enumNameNumberPairs.putIfAbsent(enumValueDesc.getName(), enumValueDesc.getNumber()); } } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java index e042f96821..74cddeb58a 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java @@ -25,11 +25,12 @@ import java.util.List; +import static org.apache.parquet.proto.TestUtils.testData; +import static org.apache.parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.apache.parquet.proto.TestUtils.testData; -import static org.apache.parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes; public class ProtoRecordConverterTest { @@ -341,4 +342,22 @@ public void testProto3LargeProtobufferFieldId() throws Exception { testData(builder.build()); } + + @Test + public void testUnknownEnum() throws Exception { + TestProto3.SchemaConverterAllDatatypes.Builder data; + data = TestProto3.SchemaConverterAllDatatypes.newBuilder(); + data.setOptionalEnumValue(42); + + TestProto3.SchemaConverterAllDatatypes dataBuilt = data.build(); + data.clear(); + + List result; + result = testData(dataBuilt); + + //data are fully checked in testData function. Lets do one more check. + TestProto3.SchemaConverterAllDatatypes o = result.get(0); + assertSame(o.getOptionalEnum(), TestProto3.SchemaConverterAllDatatypes.TestEnum.UNRECOGNIZED); + assertEquals(o.getOptionalEnumValue(), 42); + } } diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java new file mode 100644 index 0000000000..db7f6ced0a --- /dev/null +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.proto; + +import org.apache.hadoop.fs.Path; +import org.apache.parquet.proto.test.TestProto3SchemaV1; +import org.apache.parquet.proto.test.TestProto3SchemaV2; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.apache.parquet.proto.TestUtils.readMessages; +import static org.apache.parquet.proto.TestUtils.writeMessages; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** + * Tests for backward/forward compatibility while write and read parquet using different versions of protobuf schema. + */ +public class ProtoSchemaEvolutionTest { + + /** + * Test we can read enum value (number) with an old schema even the value is missing in the old schema. + */ + @Test + public void testEnumSchemaWriteV2ReadV1() throws IOException { + TestProto3SchemaV2.MessageSchema dataV2 = TestProto3SchemaV2.MessageSchema.newBuilder() + .setOptionalLabelNumberPair(TestProto3SchemaV2.MessageSchema.LabelNumberPair.SECOND) + .setOptionalString("string value") + .build(); + Path file = writeMessages(dataV2); + List messagesV1 = readMessages(file, TestProto3SchemaV1.MessageSchema.class); + assertEquals(messagesV1.size(), 1); + assertEquals(messagesV1.get(0).getOptionalLabelNumberPairValue(), 2); + } + + /** + * Write enum value unknown in V1 (thus "UNKNOWN_ENUM_VALUE_*"), and we can read it back with schema V2 that contains + * the enum definition. + */ + @Test + public void testEnumSchemaWriteV1ReadV2() throws IOException { + TestProto3SchemaV1.MessageSchema dataV1WithEnumValueFromV2 = TestProto3SchemaV1.MessageSchema.newBuilder() + .setOptionalLabelNumberPairValue(2) // "2" is not defined in V1 enum, but the number is still accepted by protobuf + .build(); + Path file = writeMessages(dataV1WithEnumValueFromV2); + List messagesV2 = readMessages(file, TestProto3SchemaV2.MessageSchema.class); + assertEquals(messagesV2.size(), 1); + assertSame(messagesV2.get(0).getOptionalLabelNumberPair(), TestProto3SchemaV2.MessageSchema.LabelNumberPair.SECOND); + } +} diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java index 2c8b41f395..2fbd2a4748 100644 --- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java +++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java @@ -23,13 +23,13 @@ import com.google.protobuf.MessageOrBuilder; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetReader; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; public class TestUtils { @@ -78,8 +78,7 @@ public static List testData(T... messages) thro checkSameBuilderInstance(messages); - List output = (List) writeAndRead(messages); - + List output = writeAndRead(messages); List outputAsMessages = asMessages(output); Descriptors.Descriptor messageDescriptor = Protobufs.getMessageDescriptor(asMessage(messages[0]).getClass()); Descriptors.FileDescriptor.Syntax syntax = messageDescriptor.getFile().getSyntax(); @@ -117,7 +116,6 @@ public static List asMessages(List mobs) { for (MessageOrBuilder messageOrBuilder : mobs) { result.add(asMessage(messageOrBuilder)); } - return result; } @@ -161,22 +159,34 @@ private static void checkSameBuilderInstance(MessageOrBuilder[] messages) { * Reads messages from given file. The file could/should be created by method writeMessages */ public static List readMessages(Path file) throws IOException { - ProtoParquetReader reader = new ProtoParquetReader(file); - - List result = new ArrayList(); - boolean hasNext = true; - while (hasNext) { - T item = reader.read(); - if (item == null) { - hasNext = false; - } else { - assertNotNull(item); - // It makes sense to return message but production code wont work with messages - result.add((T) asMessage(item).toBuilder()); + return readMessages(file, null); + } + + /** + * Read messages from given file into the expected proto class. + * @param file + * @param messageClass + * @param + * @return List of protobuf messages for the given type. + */ + public static List readMessages(Path file, Class messageClass) throws IOException { + ParquetReader.Builder readerBuilder = ProtoParquetReader.builder(file); + if (messageClass != null) { + readerBuilder.set(ProtoReadSupport.PB_CLASS, messageClass.getName()).build(); + } + try (ParquetReader reader = readerBuilder.build()) { + List result = new ArrayList(); + boolean hasNext = true; + while (hasNext) { + T item = (T) reader.read(); + if (item == null) { + hasNext = false; + } else { + result.add((T) asMessage(item)); + } } + return result; } - reader.close(); - return result; } /** diff --git a/parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto b/parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto new file mode 100644 index 0000000000..6c0c8cc8bc --- /dev/null +++ b/parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package TestProto3.Schema; + +option java_package = "org.apache.parquet.proto.test"; + +// For the test of schema evolution +// This is the "V1" schema, the "V2" (its evolution) is in TestProto3SchemaV2.proto +message MessageSchema { + + enum LabelNumberPair { + UNKNOWN_VALUE = 0; + FIRST = 1; + } + + LabelNumberPair optionalLabelNumberPair = 1; + string optionalString = 2; + int32 optionalInt32 = 3; + +} + diff --git a/parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto b/parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto new file mode 100644 index 0000000000..846f1a37a7 --- /dev/null +++ b/parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package TestProto3.Schema; + +option java_package = "org.apache.parquet.proto.test"; + +// For the test of schema evolution +// This is the "V2" schema, which is supposed to be an evolution from the "V1" (TestProto3SchemaV1.proto) +message MessageSchema { + + enum LabelNumberPair { + UNKNOWN_VALUE = 0; + FIRST = 1; + // We added one more value in V2 comparing to V1 + SECOND = 2; + } + + LabelNumberPair optionalLabelNumberPair = 1; + string optionalString = 2; + int32 optionalInt32 = 3; + +} + diff --git a/pom.xml b/pom.xml index 52e1fdbfc9..a84af6e76f 100644 --- a/pom.xml +++ b/pom.xml @@ -391,7 +391,6 @@ - org.apache.maven.plugins @@ -402,6 +401,7 @@ ${maven.compiler.target} + org.apache.maven.plugins maven-failsafe-plugin @@ -415,6 +415,7 @@ + org.apache.maven.plugins maven-surefire-plugin @@ -439,6 +440,7 @@ + org.codehaus.mojo buildnumber-maven-plugin @@ -452,6 +454,7 @@ + org.apache.rat apache-rat-plugin @@ -542,7 +545,6 @@ - From 469966fcbbe5a8fd4b22959a2a1f3b27dc14c56e Mon Sep 17 00:00:00 2001 From: Qinghui Xu Date: Thu, 20 Aug 2020 16:16:12 +0200 Subject: [PATCH 2/2] Address review comments --- .../org/apache/parquet/proto/ProtoMessageConverter.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index b57a1da12a..77f5d529e6 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -22,7 +22,6 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.Dictionary; import org.apache.parquet.hadoop.BadConfigurationException; @@ -232,8 +231,8 @@ private Map makeLookupStructure(Descrip if (extraMetadata.containsKey(METADATA_ENUM_PREFIX + enumType.getFullName())) { String enumNameNumberPairs = extraMetadata.get(METADATA_ENUM_PREFIX + enumType.getFullName()); - if (StringUtils.isBlank(enumNameNumberPairs)) { - LOG.info("No enum is written for " + enumType.getFullName()); + if (enumNameNumberPairs == null || enumNameNumberPairs.trim().isEmpty()) { + LOG.debug("No enum is written for " + enumType.getFullName()); return lookupStructure; } for (String enumItem : enumNameNumberPairs.split(METADATA_ENUM_ITEM_SEPARATOR)) { @@ -266,7 +265,7 @@ private Descriptors.EnumValueDescriptor translateEnumValue(Binary binaryValue) { // in case of unknown enum value, protobuf is creating new EnumValueDescriptor with the unknown number // and name as following "UNKNOWN_ENUM_VALUE_" + parent.getName() + "_" + number // so the idea is to parse the name for data created by parquet-proto before this patch - String unknownLabel = new String(binaryValue.getBytes()); + String unknownLabel = binaryValue.toStringUsingUTF8(); if (unknownLabel.startsWith(unknownEnumPrefix)) { try { int i = Integer.parseInt(unknownLabel.substring(unknownEnumPrefix.length()));