Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ public void setDictionary(Dictionary dictionary) {
}
}

public T prepareDictionaryValue(T value) {
return value;
}

@Override
public void addValueFromDictionary(int dictionaryId) {
parent.add(dict[dictionaryId]);
parent.add(prepareDictionaryValue(dict[dictionaryId]));
}
}

Expand Down Expand Up @@ -220,6 +224,11 @@ public FieldByteBufferConverter(ParentValueContainer parent) {
public ByteBuffer convert(Binary binary) {
return ByteBuffer.wrap(binary.getBytes());
}

@Override
public ByteBuffer prepareDictionaryValue(ByteBuffer value) {
return value.duplicate();
}
}

static final class FieldStringConverter extends BinaryConverter<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
Expand All @@ -37,6 +38,7 @@
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -87,10 +89,7 @@ public void testEmptyArray() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("array.avsc").openStream());

File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
Path file = new Path(tmp.getPath());
Path file = new Path(createTempFile().getPath());

ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
Expand All @@ -117,10 +116,7 @@ public void testEmptyMap() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map.avsc").openStream());

File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
Path file = new Path(tmp.getPath());
Path file = new Path(createTempFile().getPath());

ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
Expand All @@ -147,10 +143,7 @@ public void testMapWithNulls() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map_with_nulls.avsc").openStream());

File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
Path file = new Path(tmp.getPath());
Path file = new Path(createTempFile().getPath());

ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
Expand Down Expand Up @@ -182,10 +175,7 @@ public void testMapRequiredValueWithNull() throws Exception {
schema.setFields(Lists.newArrayList(
new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null)));

File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
Path file = new Path(tmp.getPath());
Path file = new Path(createTempFile().getPath());

ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
Expand All @@ -209,10 +199,7 @@ public void testMapWithUtf8Key() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map.avsc").openStream());

File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
Path file = new Path(tmp.getPath());
Path file = new Path(createTempFile().getPath());

ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
Expand Down Expand Up @@ -346,11 +333,8 @@ public void testAll() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("all.avsc").openStream());

File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
Path file = new Path(tmp.getPath());

Path file = new Path(createTempFile().getPath());

ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
.withSchema(schema)
Expand Down Expand Up @@ -429,10 +413,7 @@ public void testAll() throws Exception {

@Test
public void testAllUsingDefaultAvroSchema() throws Exception {
File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
Path file = new Path(tmp.getPath());
Path file = new Path(createTempFile().getPath());

// write file using Parquet APIs
ParquetWriter<Map<String, Object>> parquetWriter = new ParquetWriter<Map<String, Object>>(file,
Expand Down Expand Up @@ -654,10 +635,7 @@ public void testUnionWithSingleNonNullType() throws Exception {
Collections.singletonList(new Schema.Field("value",
Schema.createUnion(Schema.create(Schema.Type.STRING)), null, null)));

File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
Path file = new Path(tmp.getPath());
Path file = new Path(createTempFile().getPath());

// Parquet writer
ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(avroSchema)
Expand All @@ -678,6 +656,46 @@ public void testUnionWithSingleNonNullType() throws Exception {
assertEquals(str("theValue"), nextRecord.get("value"));
}

@Test
public void testDuplicatedValuesWithDictionary() throws Exception {
Schema schema = SchemaBuilder.record("spark_schema")
.fields().optionalBytes("value").endRecord();

Path file = new Path(createTempFile().getPath());

String[] records = {"one", "two", "three", "three", "two", "one", "zero"};
try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter
.<GenericData.Record>builder(file)
.withSchema(schema)
.withConf(testConf)
.build()) {
for (String record : records) {
writer.write(new GenericRecordBuilder(schema)
.set("value", record.getBytes()).build());
}
}

try (ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(file)
.withConf(testConf).build()) {
GenericRecord rec;
int i = 0;
while ((rec = reader.read()) != null) {
ByteBuffer buf = (ByteBuffer) rec.get("value");
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
assertEquals(records[i++], new String(bytes));
}
}
}

private File createTempFile() throws IOException {
File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
return tmp;
}

/**
* Return a String or Utf8 depending on whether compatibility is on
*/
Expand Down