From 8dcd0ea6ffa7f27e417f74a4536e0609bd802857 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Wed, 2 Jan 2019 14:47:37 +0800 Subject: [PATCH 1/8] Fix direct memory leak --- .../hadoop/codec/SnappyCompressor.java | 19 +++++++++++++++ .../hadoop/codec/SnappyDecompressor.java | 23 +++++++++++++++++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java index d0270ca7c1..864529a3fb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java @@ -26,6 +26,7 @@ import org.xerial.snappy.Snappy; import org.apache.parquet.Preconditions; +import sun.nio.ch.DirectBuffer; /** * This class is a wrapper around the snappy compressor. It always consumes the @@ -38,6 +39,8 @@ public class SnappyCompressor implements Compressor { // Buffer for uncompressed input. This buffer grows as necessary. private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); + private int maxBufferSize = 64 * 1024 * 1024; + private long bytesRead = 0L; private long bytesWritten = 0L; private boolean finishCalled = false; @@ -66,7 +69,9 @@ public synchronized int compress(byte[] buffer, int off, int len) throws IOExcep // There is uncompressed input, compress it now int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position()); if (maxOutputSize > outputBuffer.capacity()) { + ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(maxOutputSize); + ((DirectBuffer)oldBuffer).cleaner().clean(); } // Reset the previous outputBuffer outputBuffer.clear(); @@ -97,7 +102,9 @@ public synchronized void setInput(byte[] buffer, int off, int len) { ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len); inputBuffer.rewind(); tmp.put(inputBuffer); + ByteBuffer oldBuffer = inputBuffer; inputBuffer = tmp; + ((DirectBuffer)(oldBuffer)).cleaner().clean(); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -146,6 +153,18 @@ public void reinit(Configuration c) { @Override public synchronized void reset() { + if (inputBuffer.capacity() > maxBufferSize) { + ByteBuffer oldBuffer = inputBuffer; + inputBuffer = ByteBuffer.allocateDirect(maxBufferSize); + ((DirectBuffer)oldBuffer).cleaner().clean(); + } + + if (outputBuffer.capacity() > maxBufferSize) { + ByteBuffer oldBuffer = outputBuffer; + outputBuffer = ByteBuffer.allocateDirect(maxBufferSize); + ((DirectBuffer)oldBuffer).cleaner().clean(); + } + finishCalled = false; bytesRead = bytesWritten = 0; inputBuffer.rewind(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index 190f8d5318..2dc60febdf 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -25,6 +25,7 @@ import org.xerial.snappy.Snappy; import org.apache.parquet.Preconditions; +import sun.nio.ch.DirectBuffer; public class SnappyDecompressor implements Decompressor { // Buffer for uncompressed output. This buffer grows as necessary. @@ -34,7 +35,9 @@ public class SnappyDecompressor implements Decompressor { private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); private boolean finished; - + + private int maxBufferSize = 64 * 1024 * 1024; + /** * Fills specified buffer with uncompressed data. Returns actual number * of bytes of uncompressed data. A return value of 0 indicates that @@ -61,7 +64,9 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc // There is compressed input, decompress it now. int decompressedSize = Snappy.uncompressedLength(inputBuffer); if (decompressedSize > outputBuffer.capacity()) { + ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(decompressedSize); + ((DirectBuffer)oldBuffer).cleaner().clean(); } // Reset the previous outputBuffer (i.e. set position to 0) @@ -102,7 +107,9 @@ public synchronized void setInput(byte[] buffer, int off, int len) { ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len); inputBuffer.rewind(); newBuffer.put(inputBuffer); - inputBuffer = newBuffer; + ByteBuffer oldBuffer = inputBuffer; + inputBuffer = newBuffer; + ((DirectBuffer)(oldBuffer)).cleaner().clean(); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -131,6 +138,18 @@ public synchronized boolean needsInput() { @Override public synchronized void reset() { + if (inputBuffer.capacity() > maxBufferSize) { + ByteBuffer oldBuffer = inputBuffer; + inputBuffer = ByteBuffer.allocateDirect(maxBufferSize); + ((DirectBuffer)oldBuffer).cleaner().clean(); + } + + if (outputBuffer.capacity() > maxBufferSize) { + ByteBuffer oldBuffer = outputBuffer; + outputBuffer = ByteBuffer.allocateDirect(maxBufferSize); + ((DirectBuffer)oldBuffer).cleaner().clean(); + } + finished = false; inputBuffer.rewind(); outputBuffer.rewind(); From d75c8e4016289632c8ed4650fbc014446914fed7 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Sat, 9 Feb 2019 22:36:19 +0800 Subject: [PATCH 2/8] Use reflection for cleaning DirectBuffer to fix java9+ incompatibility --- .../parquet/hadoop/codec/SnappyCompressor.java | 13 ++++++------- .../parquet/hadoop/codec/SnappyDecompressor.java | 13 ++++++------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java index 864529a3fb..3e287e29e4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java @@ -26,21 +26,20 @@ import org.xerial.snappy.Snappy; import org.apache.parquet.Preconditions; -import sun.nio.ch.DirectBuffer; /** * This class is a wrapper around the snappy compressor. It always consumes the * entire input in setInput and compresses it as one compressed block. */ public class SnappyCompressor implements Compressor { + private static final int maxBufferSize = 64 * 1024 * 1024; + // Buffer for compressed output. This buffer grows as necessary. private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); // Buffer for uncompressed input. This buffer grows as necessary. private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); - private int maxBufferSize = 64 * 1024 * 1024; - private long bytesRead = 0L; private long bytesWritten = 0L; private boolean finishCalled = false; @@ -71,7 +70,7 @@ public synchronized int compress(byte[] buffer, int off, int len) throws IOExcep if (maxOutputSize > outputBuffer.capacity()) { ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(maxOutputSize); - ((DirectBuffer)oldBuffer).cleaner().clean(); + CleanUtil.clean(oldBuffer); } // Reset the previous outputBuffer outputBuffer.clear(); @@ -104,7 +103,7 @@ public synchronized void setInput(byte[] buffer, int off, int len) { tmp.put(inputBuffer); ByteBuffer oldBuffer = inputBuffer; inputBuffer = tmp; - ((DirectBuffer)(oldBuffer)).cleaner().clean(); + CleanUtil.clean(oldBuffer); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -156,13 +155,13 @@ public synchronized void reset() { if (inputBuffer.capacity() > maxBufferSize) { ByteBuffer oldBuffer = inputBuffer; inputBuffer = ByteBuffer.allocateDirect(maxBufferSize); - ((DirectBuffer)oldBuffer).cleaner().clean(); + CleanUtil.clean(oldBuffer); } if (outputBuffer.capacity() > maxBufferSize) { ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(maxBufferSize); - ((DirectBuffer)oldBuffer).cleaner().clean(); + CleanUtil.clean(oldBuffer); } finishCalled = false; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index 2dc60febdf..b82c891745 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -25,9 +25,10 @@ import org.xerial.snappy.Snappy; import org.apache.parquet.Preconditions; -import sun.nio.ch.DirectBuffer; public class SnappyDecompressor implements Decompressor { + private static final int maxBufferSize = 64 * 1024 * 1024; + // Buffer for uncompressed output. This buffer grows as necessary. private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); @@ -36,8 +37,6 @@ public class SnappyDecompressor implements Decompressor { private boolean finished; - private int maxBufferSize = 64 * 1024 * 1024; - /** * Fills specified buffer with uncompressed data. Returns actual number * of bytes of uncompressed data. A return value of 0 indicates that @@ -66,7 +65,7 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc if (decompressedSize > outputBuffer.capacity()) { ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(decompressedSize); - ((DirectBuffer)oldBuffer).cleaner().clean(); + CleanUtil.clean(oldBuffer); } // Reset the previous outputBuffer (i.e. set position to 0) @@ -109,7 +108,7 @@ public synchronized void setInput(byte[] buffer, int off, int len) { newBuffer.put(inputBuffer); ByteBuffer oldBuffer = inputBuffer; inputBuffer = newBuffer; - ((DirectBuffer)(oldBuffer)).cleaner().clean(); + CleanUtil.clean(oldBuffer); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -141,13 +140,13 @@ public synchronized void reset() { if (inputBuffer.capacity() > maxBufferSize) { ByteBuffer oldBuffer = inputBuffer; inputBuffer = ByteBuffer.allocateDirect(maxBufferSize); - ((DirectBuffer)oldBuffer).cleaner().clean(); + CleanUtil.clean(oldBuffer); } if (outputBuffer.capacity() > maxBufferSize) { ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(maxBufferSize); - ((DirectBuffer)oldBuffer).cleaner().clean(); + CleanUtil.clean(oldBuffer); } finished = false; From 238b149dfafa8b6c823c5745d30b724e74fcc21e Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Sat, 9 Feb 2019 22:37:30 +0800 Subject: [PATCH 3/8] Add CleanUtil --- .../parquet/hadoop/codec/CleanUtil.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java new file mode 100644 index 0000000000..6bb1ba8ed7 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.codec; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CleanUtil { + private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class); + private static final Field CLEANER_FIELD; + private static final Method CLEAN_METHOD; + + static { + ByteBuffer buf = null; + try { + buf = ByteBuffer.allocateDirect(1); + CLEANER_FIELD = buf.getClass().getDeclaredField("cleaner"); + CLEANER_FIELD.setAccessible(true); + Object cleaner = CLEANER_FIELD.get(buf); + CLEAN_METHOD = cleaner.getClass().getDeclaredMethod("clean"); + } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) { + clean(buf); + throw new IllegalStateException("No available cleaner found.", e); + } + } + + public static void clean(ByteBuffer buffer) { + try { + Object cleaner = CLEANER_FIELD.get(buffer); + CLEAN_METHOD.invoke(cleaner); + } catch (IllegalAccessException | InvocationTargetException e) { + // Ignore clean failure + } + } +} From b4a249f3bc2e9417dd704e91d9b3b16002bfb585 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Sat, 9 Feb 2019 22:44:52 +0800 Subject: [PATCH 4/8] Update log and Refine initial buffer size --- .../org/apache/parquet/hadoop/codec/CleanUtil.java | 1 + .../parquet/hadoop/codec/SnappyCompressor.java | 14 +++++++------- .../parquet/hadoop/codec/SnappyDecompressor.java | 14 +++++++------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java index 6bb1ba8ed7..f94eaf0a61 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java @@ -51,6 +51,7 @@ public static void clean(ByteBuffer buffer) { CLEAN_METHOD.invoke(cleaner); } catch (IllegalAccessException | InvocationTargetException e) { // Ignore clean failure + logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java index 3e287e29e4..501a7f33e7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java @@ -32,13 +32,13 @@ * entire input in setInput and compresses it as one compressed block. */ public class SnappyCompressor implements Compressor { - private static final int maxBufferSize = 64 * 1024 * 1024; + private static final int initialBufferSize = 64 * 1024 * 1024; // Buffer for compressed output. This buffer grows as necessary. - private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); + private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); // Buffer for uncompressed input. This buffer grows as necessary. - private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); + private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); private long bytesRead = 0L; private long bytesWritten = 0L; @@ -152,15 +152,15 @@ public void reinit(Configuration c) { @Override public synchronized void reset() { - if (inputBuffer.capacity() > maxBufferSize) { + if (inputBuffer.capacity() > initialBufferSize) { ByteBuffer oldBuffer = inputBuffer; - inputBuffer = ByteBuffer.allocateDirect(maxBufferSize); + inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); CleanUtil.clean(oldBuffer); } - if (outputBuffer.capacity() > maxBufferSize) { + if (outputBuffer.capacity() > initialBufferSize) { ByteBuffer oldBuffer = outputBuffer; - outputBuffer = ByteBuffer.allocateDirect(maxBufferSize); + outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); CleanUtil.clean(oldBuffer); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index b82c891745..3e9c4cf5d8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -27,13 +27,13 @@ import org.apache.parquet.Preconditions; public class SnappyDecompressor implements Decompressor { - private static final int maxBufferSize = 64 * 1024 * 1024; + private static final int initialBufferSize = 64 * 1024 * 1024; // Buffer for uncompressed output. This buffer grows as necessary. - private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); + private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); // Buffer for compressed input. This buffer grows as necessary. - private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); + private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); private boolean finished; @@ -137,15 +137,15 @@ public synchronized boolean needsInput() { @Override public synchronized void reset() { - if (inputBuffer.capacity() > maxBufferSize) { + if (inputBuffer.capacity() > initialBufferSize) { ByteBuffer oldBuffer = inputBuffer; - inputBuffer = ByteBuffer.allocateDirect(maxBufferSize); + inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); CleanUtil.clean(oldBuffer); } - if (outputBuffer.capacity() > maxBufferSize) { + if (outputBuffer.capacity() > initialBufferSize) { ByteBuffer oldBuffer = outputBuffer; - outputBuffer = ByteBuffer.allocateDirect(maxBufferSize); + outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); CleanUtil.clean(oldBuffer); } From b26a9f4bfdfa7931489976042de0abfe96426d4c Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Sun, 10 Feb 2019 19:56:24 +0800 Subject: [PATCH 5/8] Fix OutputBuffer should be empty check --- .../org/apache/parquet/hadoop/codec/SnappyCompressor.java | 5 +++++ .../org/apache/parquet/hadoop/codec/SnappyDecompressor.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java index 501a7f33e7..b2a8e7f10e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java @@ -44,6 +44,11 @@ public class SnappyCompressor implements Compressor { private long bytesWritten = 0L; private boolean finishCalled = false; + public SnappyCompressor() { + inputBuffer.limit(0); + outputBuffer.limit(0); + } + /** * Fills specified buffer with compressed data. Returns actual number * of bytes of compressed data. A return value of 0 indicates that diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index 3e9c4cf5d8..8a7f86d5ae 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -37,6 +37,11 @@ public class SnappyDecompressor implements Decompressor { private boolean finished; + public SnappyDecompressor() { + inputBuffer.limit(0); + outputBuffer.limit(0); + } + /** * Fills specified buffer with uncompressed data. Returns actual number * of bytes of uncompressed data. A return value of 0 indicates that From 03467ddc42e203c50b370902c3681f1b8fa6a8bd Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Mon, 11 Feb 2019 15:39:25 +0800 Subject: [PATCH 6/8] Updated as comment --- .../parquet/hadoop/codec/CleanUtil.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java index f94eaf0a61..4ea07563d5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java @@ -26,6 +26,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A Helper class which use reflections to clean up DirectBuffer. It's implemented for + * better compatibility with both java8 and java9+, because the Cleaner class is moved to + * another place since java9+. + */ public class CleanUtil { private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class); private static final Field CLEANER_FIELD; @@ -33,23 +38,27 @@ public class CleanUtil { static { ByteBuffer buf = null; + Field cleanerField = null; + Method cleanMethod = null; try { buf = ByteBuffer.allocateDirect(1); - CLEANER_FIELD = buf.getClass().getDeclaredField("cleaner"); - CLEANER_FIELD.setAccessible(true); - Object cleaner = CLEANER_FIELD.get(buf); - CLEAN_METHOD = cleaner.getClass().getDeclaredMethod("clean"); - } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) { + cleanerField = buf.getClass().getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + Object cleaner = cleanerField.get(buf); + cleanMethod = cleaner.getClass().getDeclaredMethod("clean"); + } catch (Throwable e) { clean(buf); - throw new IllegalStateException("No available cleaner found.", e); + logger.warn("Initialization failed for cleanerField or cleanMethod", e); } + CLEANER_FIELD = cleanerField; + CLEAN_METHOD = cleanMethod; } public static void clean(ByteBuffer buffer) { try { Object cleaner = CLEANER_FIELD.get(buffer); CLEAN_METHOD.invoke(cleaner); - } catch (IllegalAccessException | InvocationTargetException e) { + } catch (Throwable e) { // Ignore clean failure logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e); } From 14e301cc0562113de7aadcca34afb02d4785328e Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Mon, 11 Feb 2019 18:14:32 +0800 Subject: [PATCH 7/8] Minor fix --- .../java/org/apache/parquet/hadoop/codec/CleanUtil.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java index 4ea07563d5..cd6c63fefe 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java @@ -46,9 +46,10 @@ public class CleanUtil { cleanerField.setAccessible(true); Object cleaner = cleanerField.get(buf); cleanMethod = cleaner.getClass().getDeclaredMethod("clean"); - } catch (Throwable e) { - clean(buf); + } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) { logger.warn("Initialization failed for cleanerField or cleanMethod", e); + } finally { + clean(buf); } CLEANER_FIELD = cleanerField; CLEAN_METHOD = cleanMethod; @@ -58,7 +59,7 @@ public static void clean(ByteBuffer buffer) { try { Object cleaner = CLEANER_FIELD.get(buffer); CLEAN_METHOD.invoke(cleaner); - } catch (Throwable e) { + } catch (IllegalAccessException | InvocationTargetException | NullPointerException e) { // Ignore clean failure logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e); } From b553464bf0c11ecfa7c44623d94e7b6c0173004a Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Mon, 11 Feb 2019 18:25:21 +0800 Subject: [PATCH 8/8] Return immediately when cleanerField or cleanMethod is null --- .../main/java/org/apache/parquet/hadoop/codec/CleanUtil.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java index cd6c63fefe..8bf24b2ca5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java @@ -56,6 +56,9 @@ public class CleanUtil { } public static void clean(ByteBuffer buffer) { + if (CLEANER_FIELD == null || CLEAN_METHOD == null) { + return; + } try { Object cleaner = CLEANER_FIELD.get(buffer); CLEAN_METHOD.invoke(cleaner);