From 43a3b58b720c0f64c5b8dd0ac52d0df1e20b8939 Mon Sep 17 00:00:00 2001 From: Kevin Sewell Date: Mon, 18 Jul 2022 11:09:11 +0200 Subject: [PATCH 1/9] HADOOP-12007. GzipCodec native CodecPool leaks memory --- .../src/main/java/org/apache/hadoop/io/compress/CodecPool.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java index 1f095c6c6736e..5b1826f9e30a8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java @@ -205,6 +205,7 @@ public static void returnCompressor(Compressor compressor) { } // if the compressor can't be reused, don't pool it. if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { + compressor.end(); return; } compressor.reset(); @@ -225,6 +226,7 @@ public static void returnDecompressor(Decompressor decompressor) { } // if the decompressor can't be reused, don't pool it. if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { + decompressor.end(); return; } decompressor.reset(); From 032209a606a1ba7e44072f87fd0d6a3a28660d8c Mon Sep 17 00:00:00 2001 From: Kevin Sewell Date: Tue, 19 Jul 2022 13:02:10 +0200 Subject: [PATCH 2/9] Adding unit test --- .../hadoop/io/compress/TestCodecPool.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index ec99598e79cee..0ba60d4122bdb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -18,7 +18,13 @@ package org.apache.hadoop.io.compress; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -26,6 +32,9 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor; +import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -189,4 +198,54 @@ public void testDecompressorNotReturnSameInstance() { CodecPool.returnDecompressor(decompressor); } } + + @Test(timeout = 10000) + public void testDoNotPoolCompressorNotUseableAfterReturn() throws IOException { + + final GzipCodec gzipCodec = new GzipCodec(); + gzipCodec.setConf(new Configuration()); + + // BuiltInGzipCompressor is an explicit example of a Compressor with the @DoNotPool annotation + final Compressor compressor = new BuiltInGzipCompressor(new Configuration()); + CodecPool.returnCompressor(compressor); + + try (CompressionOutputStream outputStream = + gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor)) { + outputStream.write(1); + fail("Compressor from Codec with @DoNotPool should not be useable after returning to CodecPool"); + } catch (NullPointerException exception) { + Assert.assertEquals("Deflater has been closed", exception.getMessage()); + } + } + + @Test(timeout = 10000) + public void testDoNotPoolDecompressorNotUseableAfterReturn() throws IOException { + + final GzipCodec gzipCodec = new GzipCodec(); + gzipCodec.setConf(new Configuration()); + + final Random random = new Random(); + final byte[] bytes = new byte[1024]; + random.nextBytes(bytes); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) { + outputStream.write(bytes); + } + + final byte[] gzipBytes = baos.toByteArray(); + final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes); + + // BuiltInGzipDecompressor is an explicit example of a Decompressor with the @DoNotPool annotation + final Decompressor decompressor = new BuiltInGzipDecompressor(); + CodecPool.returnDecompressor(decompressor); + + try (CompressionInputStream inputStream = + gzipCodec.createInputStream(bais, decompressor)) { + inputStream.read(); + fail("Decompressor from Codec with @DoNotPool should not be useable after returning to CodecPool"); + } catch (NullPointerException exception) { + Assert.assertEquals("Inflater has been closed", exception.getMessage()); + } + } } From a70860806c05030116508002cd80d551e29379c7 Mon Sep 17 00:00:00 2001 From: Kevin Sewell Date: Wed, 20 Jul 2022 00:26:31 +0200 Subject: [PATCH 3/9] Using LambdaTestUtils.intercept in test case --- .../hadoop/io/compress/TestCodecPool.java | 53 ++++++++----------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 0ba60d4122bdb..68150316852b1 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -17,29 +17,22 @@ */ package org.apache.hadoop.io.compress; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor; import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; -import org.junit.Assert; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; import java.util.HashSet; +import java.util.Random; import java.util.Set; +import java.util.concurrent.*; + +import static org.junit.Assert.assertEquals; public class TestCodecPool { private final String LEASE_COUNT_ERR = @@ -200,7 +193,7 @@ public void testDecompressorNotReturnSameInstance() { } @Test(timeout = 10000) - public void testDoNotPoolCompressorNotUseableAfterReturn() throws IOException { + public void testDoNotPoolCompressorNotUseableAfterReturn() throws Exception { final GzipCodec gzipCodec = new GzipCodec(); gzipCodec.setConf(new Configuration()); @@ -209,17 +202,16 @@ public void testDoNotPoolCompressorNotUseableAfterReturn() throws IOException { final Compressor compressor = new BuiltInGzipCompressor(new Configuration()); CodecPool.returnCompressor(compressor); - try (CompressionOutputStream outputStream = - gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor)) { - outputStream.write(1); - fail("Compressor from Codec with @DoNotPool should not be useable after returning to CodecPool"); - } catch (NullPointerException exception) { - Assert.assertEquals("Deflater has been closed", exception.getMessage()); - } + final CompressionOutputStream outputStream = gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor); + LambdaTestUtils.intercept( + NullPointerException.class, + "Deflater has been closed", + "Compressor from Codec with @DoNotPool should not be useable after returning to CodecPool", + () -> outputStream.write(1)); } @Test(timeout = 10000) - public void testDoNotPoolDecompressorNotUseableAfterReturn() throws IOException { + public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception { final GzipCodec gzipCodec = new GzipCodec(); gzipCodec.setConf(new Configuration()); @@ -240,12 +232,11 @@ public void testDoNotPoolDecompressorNotUseableAfterReturn() throws IOException final Decompressor decompressor = new BuiltInGzipDecompressor(); CodecPool.returnDecompressor(decompressor); - try (CompressionInputStream inputStream = - gzipCodec.createInputStream(bais, decompressor)) { - inputStream.read(); - fail("Decompressor from Codec with @DoNotPool should not be useable after returning to CodecPool"); - } catch (NullPointerException exception) { - Assert.assertEquals("Inflater has been closed", exception.getMessage()); - } + final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor); + LambdaTestUtils.intercept( + NullPointerException.class, + "Inflater has been closed", + "Decompressor from Codec with @DoNotPool should not be useable after returning to CodecPool", + () -> inputStream.read()); } } From b6ed11c1608e31b6821b009d953e042f138b48d9 Mon Sep 17 00:00:00 2001 From: Kevin Sewell Date: Wed, 20 Jul 2022 11:25:17 +0200 Subject: [PATCH 4/9] undo import optimization --- .../hadoop/io/compress/TestCodecPool.java | 20 ++++++++----- hadoop-common-project/pom.xml | 10 +++---- pom.xml | 29 ++++++++++--------- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 68150316852b1..12df67a57beec 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -17,22 +17,26 @@ */ package org.apache.hadoop.io.compress; +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor; import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; -import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Before; import org.junit.Test; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.OutputStream; import java.util.HashSet; -import java.util.Random; import java.util.Set; -import java.util.concurrent.*; - -import static org.junit.Assert.assertEquals; public class TestCodecPool { private final String LEASE_COUNT_ERR = diff --git a/hadoop-common-project/pom.xml b/hadoop-common-project/pom.xml index f167a079a9b0c..685122004c002 100644 --- a/hadoop-common-project/pom.xml +++ b/hadoop-common-project/pom.xml @@ -31,13 +31,13 @@ hadoop-auth - hadoop-auth-examples + hadoop-common hadoop-annotations - hadoop-nfs - hadoop-minikdc - hadoop-kms - hadoop-registry + + + + diff --git a/pom.xml b/pom.xml index fca42f71630df..ec2a6d64085c1 100644 --- a/pom.xml +++ b/pom.xml @@ -125,20 +125,20 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x - hadoop-project - hadoop-project-dist - hadoop-assemblies - hadoop-maven-plugins + + + + hadoop-common-project - hadoop-hdfs-project - hadoop-yarn-project - hadoop-mapreduce-project - hadoop-tools - hadoop-dist - hadoop-minicluster - hadoop-client-modules - hadoop-build-tools - hadoop-cloud-storage-project + + + + + + + + + @@ -603,6 +603,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x maven-javadoc-plugin ${maven-javadoc-plugin.version} false + + true + aggregate From 74e2a9740bb4b2497ea895557558144ecea9be7f Mon Sep 17 00:00:00 2001 From: Kevin Sewell Date: Wed, 20 Jul 2022 11:29:34 +0200 Subject: [PATCH 5/9] Revert "undo import optimization" This reverts commit b6ed11c1608e31b6821b009d953e042f138b48d9. --- .../hadoop/io/compress/TestCodecPool.java | 20 +++++-------- hadoop-common-project/pom.xml | 10 +++---- pom.xml | 29 +++++++++---------- 3 files changed, 26 insertions(+), 33 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 12df67a57beec..68150316852b1 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -17,26 +17,22 @@ */ package org.apache.hadoop.io.compress; -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.OutputStream; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor; import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; import java.util.HashSet; +import java.util.Random; import java.util.Set; +import java.util.concurrent.*; + +import static org.junit.Assert.assertEquals; public class TestCodecPool { private final String LEASE_COUNT_ERR = diff --git a/hadoop-common-project/pom.xml b/hadoop-common-project/pom.xml index 685122004c002..f167a079a9b0c 100644 --- a/hadoop-common-project/pom.xml +++ b/hadoop-common-project/pom.xml @@ -31,13 +31,13 @@ hadoop-auth - + hadoop-auth-examples hadoop-common hadoop-annotations - - - - + hadoop-nfs + hadoop-minikdc + hadoop-kms + hadoop-registry diff --git a/pom.xml b/pom.xml index ec2a6d64085c1..fca42f71630df 100644 --- a/pom.xml +++ b/pom.xml @@ -125,20 +125,20 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x - - - - + hadoop-project + hadoop-project-dist + hadoop-assemblies + hadoop-maven-plugins hadoop-common-project - - - - - - - - - + hadoop-hdfs-project + hadoop-yarn-project + hadoop-mapreduce-project + hadoop-tools + hadoop-dist + hadoop-minicluster + hadoop-client-modules + hadoop-build-tools + hadoop-cloud-storage-project @@ -603,9 +603,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x maven-javadoc-plugin ${maven-javadoc-plugin.version} false - - true - aggregate From c6e9bc2cc0ebdb8650b09de330ebba7c5f9e4c6c Mon Sep 17 00:00:00 2001 From: Kevin Sewell Date: Wed, 20 Jul 2022 11:30:05 +0200 Subject: [PATCH 6/9] undo import optimization --- .../hadoop/io/compress/TestCodecPool.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 68150316852b1..12df67a57beec 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -17,22 +17,26 @@ */ package org.apache.hadoop.io.compress; +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor; import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; -import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Before; import org.junit.Test; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.OutputStream; import java.util.HashSet; -import java.util.Random; import java.util.Set; -import java.util.concurrent.*; - -import static org.junit.Assert.assertEquals; public class TestCodecPool { private final String LEASE_COUNT_ERR = From c44808b40a0fd5ddf900a48490a4984ddcdf5e4d Mon Sep 17 00:00:00 2001 From: Kevin Sewell Date: Wed, 20 Jul 2022 11:36:55 +0200 Subject: [PATCH 7/9] adding LambdaTestUtils import --- .../test/java/org/apache/hadoop/io/compress/TestCodecPool.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 12df67a57beec..ed6fd7236e84c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor; import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Before; import org.junit.Test; From 622e5fd92839409f660f744b511c6f36ac61b9a3 Mon Sep 17 00:00:00 2001 From: Kevin Sewell Date: Mon, 1 Aug 2022 16:15:19 +0200 Subject: [PATCH 8/9] Adding AlreadyClosedException. Fixing formatting. --- .../io/compress/AlreadyClosedException.java | 42 +++++++++++++++++++ .../compress/zlib/BuiltInGzipCompressor.java | 11 ++++- .../zlib/BuiltInGzipDecompressor.java | 6 +++ .../hadoop/io/compress/TestCodecPool.java | 20 +++++---- 4 files changed, 70 insertions(+), 9 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java new file mode 100644 index 0000000000000..a5284f10425bf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; + +/** + * An exception class for when a closed compressor/decopressor is being used + * {@link org.apache.hadoop.io.compress.Compressor} + * {@link org.apache.hadoop.io.compress.Decompressor} + */ +public class AlreadyClosedException extends IOException { + + /** + * Constructs a new exception with the specified cause and a detail + * message of (cause==null ? null : cause.toString()) (which + * typically contains the class and detail message of cause). + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + */ + public AlreadyClosedException(Throwable cause) { + super(cause); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java index fcb431dce86ca..5525c1276d529 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java @@ -24,6 +24,7 @@ import java.util.zip.GZIPOutputStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.AlreadyClosedException; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.DoNotPool; import org.apache.hadoop.util.DataChecksum; @@ -102,7 +103,15 @@ public int compress(byte[] b, int off, int len) throws IOException { if (state == BuiltInGzipDecompressor.GzipStateLabel.INFLATE_STREAM) { // now compress it into b[] - int deflated = deflater.deflate(b, off, len); + int deflated; + try { + deflated = deflater.deflate(b, off, len); + } catch (NullPointerException npe) { + if ("Deflater has been closed".equals(npe.getMessage())) { + throw new AlreadyClosedException(npe); + } + throw npe; + } compressedBytesWritten += deflated; off += deflated; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java index 47c21b4e3ea98..959b0ffec42e7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java @@ -23,6 +23,7 @@ import java.util.zip.DataFormatException; import java.util.zip.Inflater; +import org.apache.hadoop.io.compress.AlreadyClosedException; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.DoNotPool; import org.apache.hadoop.util.DataChecksum; @@ -211,6 +212,11 @@ public synchronized int decompress(byte[] b, int off, int len) numAvailBytes = inflater.inflate(b, off, len); } catch (DataFormatException dfe) { throw new IOException(dfe.getMessage()); + } catch (NullPointerException npe) { + if ("Inflater has been closed".equals(npe.getMessage())) { + throw new AlreadyClosedException(npe); + } + throw npe; } crc.update(b, off, numAvailBytes); // CRC-32 is on _uncompressed_ data if (inflater.finished()) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index ed6fd7236e84c..720516ad157ed 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -207,12 +207,14 @@ public void testDoNotPoolCompressorNotUseableAfterReturn() throws Exception { final Compressor compressor = new BuiltInGzipCompressor(new Configuration()); CodecPool.returnCompressor(compressor); - final CompressionOutputStream outputStream = gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor); + final CompressionOutputStream outputStream = + gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor); LambdaTestUtils.intercept( - NullPointerException.class, + AlreadyClosedException.class, "Deflater has been closed", - "Compressor from Codec with @DoNotPool should not be useable after returning to CodecPool", - () -> outputStream.write(1)); + "Compressor from Codec with @DoNotPool should not be " + + "useable after returning to CodecPool", + () -> outputStream.write(1)); } @Test(timeout = 10000) @@ -233,15 +235,17 @@ public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception { final byte[] gzipBytes = baos.toByteArray(); final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes); - // BuiltInGzipDecompressor is an explicit example of a Decompressor with the @DoNotPool annotation + // BuiltInGzipDecompressor is an explicit example of a Decompressor + // with the @DoNotPool annotation final Decompressor decompressor = new BuiltInGzipDecompressor(); CodecPool.returnDecompressor(decompressor); final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor); LambdaTestUtils.intercept( - NullPointerException.class, + AlreadyClosedException.class, "Inflater has been closed", - "Decompressor from Codec with @DoNotPool should not be useable after returning to CodecPool", - () -> inputStream.read()); + "Decompressor from Codec with @DoNotPool should not be " + + "useable after returning to CodecPool", + () -> inputStream.read()); } } From 5200470d1fa82f65cb50997652cdc00cf668c626 Mon Sep 17 00:00:00 2001 From: Kevin Sewell Date: Fri, 5 Aug 2022 12:55:37 +0200 Subject: [PATCH 9/9] adding ENDED state. fixing formatting in AlreadyClosedException --- .../io/compress/AlreadyClosedException.java | 15 +++------------ .../io/compress/zlib/BuiltInGzipCompressor.java | 16 +++++++--------- .../compress/zlib/BuiltInGzipDecompressor.java | 17 +++++++++++------ .../hadoop/io/compress/TestCodecPool.java | 4 ++-- 4 files changed, 23 insertions(+), 29 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java index a5284f10425bf..993d2678d2a10 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java @@ -27,16 +27,7 @@ */ public class AlreadyClosedException extends IOException { - /** - * Constructs a new exception with the specified cause and a detail - * message of (cause==null ? null : cause.toString()) (which - * typically contains the class and detail message of cause). - * @param cause the cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A null value is - * permitted, and indicates that the cause is nonexistent or - * unknown.) - */ - public AlreadyClosedException(Throwable cause) { - super(cause); - } + public AlreadyClosedException(String message) { + super(message); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java index 5525c1276d529..d44413cc30912 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java @@ -84,6 +84,10 @@ public int compress(byte[] b, int off, int len) throws IOException { throw new IOException("compress called on finished compressor"); } + if (state == BuiltInGzipDecompressor.GzipStateLabel.ENDED) { + throw new AlreadyClosedException("compress called on closed compressor"); + } + int compressedBytesWritten = 0; // If we are not within uncompressed data yet, output the header. @@ -103,15 +107,7 @@ public int compress(byte[] b, int off, int len) throws IOException { if (state == BuiltInGzipDecompressor.GzipStateLabel.INFLATE_STREAM) { // now compress it into b[] - int deflated; - try { - deflated = deflater.deflate(b, off, len); - } catch (NullPointerException npe) { - if ("Deflater has been closed".equals(npe.getMessage())) { - throw new AlreadyClosedException(npe); - } - throw npe; - } + int deflated = deflater.deflate(b, off, len); compressedBytesWritten += deflated; off += deflated; @@ -148,6 +144,8 @@ public long getBytesWritten() { @Override public void end() { deflater.end(); + + state = BuiltInGzipDecompressor.GzipStateLabel.ENDED; } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java index 959b0ffec42e7..d47864a71f481 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java @@ -110,7 +110,11 @@ public enum GzipStateLabel { * Immediately after the trailer (and potentially prior to the next gzip * member/substream header), without reset() having been called. */ - FINISHED; + FINISHED, + /** + * Immediately after end() has been called. + */ + ENDED; } /** @@ -187,6 +191,10 @@ public synchronized int decompress(byte[] b, int off, int len) throws IOException { int numAvailBytes = 0; + if (state == GzipStateLabel.ENDED) { + throw new AlreadyClosedException("decompress called on closed decompressor"); + } + if (state != GzipStateLabel.DEFLATE_STREAM) { executeHeaderState(); @@ -212,11 +220,6 @@ public synchronized int decompress(byte[] b, int off, int len) numAvailBytes = inflater.inflate(b, off, len); } catch (DataFormatException dfe) { throw new IOException(dfe.getMessage()); - } catch (NullPointerException npe) { - if ("Inflater has been closed".equals(npe.getMessage())) { - throw new AlreadyClosedException(npe); - } - throw npe; } crc.update(b, off, numAvailBytes); // CRC-32 is on _uncompressed_ data if (inflater.finished()) { @@ -482,6 +485,8 @@ public synchronized void reset() { @Override public synchronized void end() { inflater.end(); + + state = GzipStateLabel.ENDED; } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 720516ad157ed..4b18ee6047ba4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -211,7 +211,7 @@ public void testDoNotPoolCompressorNotUseableAfterReturn() throws Exception { gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor); LambdaTestUtils.intercept( AlreadyClosedException.class, - "Deflater has been closed", + "compress called on closed compressor", "Compressor from Codec with @DoNotPool should not be " + "useable after returning to CodecPool", () -> outputStream.write(1)); @@ -243,7 +243,7 @@ public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception { final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor); LambdaTestUtils.intercept( AlreadyClosedException.class, - "Inflater has been closed", + "decompress called on closed decompressor", "Decompressor from Codec with @DoNotPool should not be " + "useable after returning to CodecPool", () -> inputStream.read());