From 022eaada2f79a81aa577b90facb5fbddc5c1bd20 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Sun, 19 Jul 2020 22:46:26 -0700 Subject: [PATCH 1/8] Add LZO --- hadoop-common-project/hadoop-common/pom.xml | 4 ++ .../com/hadoop/compression/lzo/LzoCodec.java | 50 +++++++++++++++++++ .../com/hadoop/compression/lzo/LzopCodec.java | 50 +++++++++++++++++++ .../apache/hadoop/io/compress/LzoCodec.java | 25 ++++++++++ .../apache/hadoop/io/compress/LzopCodec.java | 25 ++++++++++ hadoop-project/pom.xml | 6 +++ 6 files changed, 160 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index 9bb70ac76a06a..edb5c516e3662 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -362,6 +362,10 @@ wildfly-openssl-java provided + + io.airlift + aircompressor + diff --git a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java new file mode 100644 index 0000000000000..ed262218f6e02 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hadoop.compression.lzo; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class LzoCodec extends org.apache.hadoop.io.compress.LzoCodec { + private static final Log LOG = LogFactory.getLog(LzoCodec.class); + + static final String gplLzoCodec = LzoCodec.class.getName(); + static final String hadoopLzoCodec = org.apache.hadoop.io.compress.LzoCodec.class.getName(); + static boolean warned = false; + + static { + LOG.info("Bridging " + gplLzoCodec + " to " + hadoopLzoCodec + "."); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + if (!warned) { + LOG.warn(gplLzoCodec + " is deprecated. You should use " + hadoopLzoCodec + + " instead to generate LZO compressed data."); + warned = true; + } + return super.createOutputStream(out, compressor); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java new file mode 100644 index 0000000000000..9e599a3d7aab6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hadoop.compression.lzo; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class LzopCodec extends org.apache.hadoop.io.compress.LzopCodec { + private static final Log LOG = LogFactory.getLog(LzopCodec.class); + + static final String gplLzopCodec = LzopCodec.class.getName(); + static final String hadoopLzopCodec = org.apache.hadoop.io.compress.LzopCodec.class.getName(); + static boolean warned = false; + + static { + LOG.info("Bridging " + gplLzopCodec + " to " + hadoopLzopCodec + "."); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + if (!warned) { + LOG.warn(gplLzopCodec + " is deprecated. You should use " + hadoopLzopCodec + + " instead to generate LZO compressed data."); + warned = true; + } + return super.createOutputStream(out, compressor); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java new file mode 100644 index 0000000000000..e110bb9f1f8cc --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import org.apache.hadoop.conf.Configurable; + +public class LzoCodec extends io.airlift.compress.lzo.LzoCodec + implements Configurable, CompressionCodec { +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java new file mode 100644 index 0000000000000..01065e4a32825 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import org.apache.hadoop.conf.Configurable; + +public class LzopCodec extends io.airlift.compress.lzo.LzopCodec + implements Configurable, CompressionCodec { +} diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 4e819cd896a2c..901b872587c76 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -141,6 +141,7 @@ 3.2.4 3.10.6.Final 4.1.50.Final + 0.16 0.5.1 @@ -1727,6 +1728,11 @@ jna ${jna.version} + + io.airlift + aircompressor + ${aircompressor.version} + From 592e05302821dca3e207c716f2b015feaecc2301 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 20 Jul 2020 00:59:20 -0700 Subject: [PATCH 2/8] add some test --- .../org/apache/hadoop/io/compress/TestCodec.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 94ff7a88493c7..f0be881439116 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -176,6 +176,18 @@ public void testGzipCodecWithParam() throws IOException { codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec"); } + @Test + public void testLzoCodec() throws IOException { + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzoCodec"); + codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzoCodec"); + } + + @Test + public void testLzopCodec() throws IOException { + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzopCodec"); + codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzopCodec"); + } + private static void codecTest(Configuration conf, int seed, int count, String codecClass) throws IOException { From 4054c6306306da6980d0d42661b920558ecefa04 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 20 Jul 2020 13:42:28 -0700 Subject: [PATCH 3/8] add more tests --- .../apache/hadoop/io/compress/LzopCodec.java | 83 +++++++++++++++++++ .../apache/hadoop/io/compress/TestCodec.java | 8 ++ 2 files changed, 91 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java index 01065e4a32825..09189bf0333e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java @@ -18,8 +18,91 @@ package org.apache.hadoop.io.compress; +import io.airlift.compress.lzo.LzoCodec; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; public class LzopCodec extends io.airlift.compress.lzo.LzopCodec implements Configurable, CompressionCodec { + @Override + public Class getCompressorType() + { + return LzopCodec.HadoopLzopCompressor.class; + } + + /** + * No Hadoop code seems to actually use the compressor, so just return a dummy one so the createOutputStream method + * with a compressor can function. This interface can be implemented if needed. + */ + @DoNotPool + static class HadoopLzopCompressor + implements Compressor + { + @Override + public void setInput(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public boolean needsInput() + { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public void setDictionary(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public long getBytesRead() + { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public long getBytesWritten() + { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public void finish() + { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public boolean finished() + { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public int compress(byte[] b, int off, int len) + throws IOException + { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public void reset() + { + } + + @Override + public void end() + { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public void reinit(Configuration conf) + { + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index f0be881439116..d35956fb2bfd4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -180,12 +180,20 @@ public void testGzipCodecWithParam() throws IOException { public void testLzoCodec() throws IOException { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzoCodec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzoCodec"); + + // Testing `com.hadoop.compression.lzo.LzoCodec` that is bridged to `org.apache.hadoop.io.compress.LzoCodec` + codecTest(conf, seed, 0, "com.hadoop.compression.lzo.LzoCodec"); + codecTest(conf, seed, count, "com.hadoop.compression.lzo.LzoCodec"); } @Test public void testLzopCodec() throws IOException { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzopCodec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzopCodec"); + + // Testing `com.hadoop.compression.lzo.LzopCodec` that is bridged to `org.apache.hadoop.io.compress.LzopCodec` + codecTest(conf, seed, 0, "com.hadoop.compression.lzo.LzopCodec"); + codecTest(conf, seed, count, "com.hadoop.compression.lzo.LzopCodec"); } private static void codecTest(Configuration conf, int seed, int count, From f9ffe03b8caf37a4c14bb04e55ccf065e31fdb2a Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Tue, 21 Jul 2020 16:23:52 -0700 Subject: [PATCH 4/8] address feedback --- .../com/hadoop/compression/lzo/LzoCodec.java | 39 +++-- .../com/hadoop/compression/lzo/LzopCodec.java | 39 +++-- .../apache/hadoop/io/compress/LzoCodec.java | 2 +- .../apache/hadoop/io/compress/LzopCodec.java | 148 +++++++++--------- 4 files changed, 109 insertions(+), 119 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java index ed262218f6e02..049b4e22e6c04 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java @@ -20,31 +20,30 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LzoCodec extends org.apache.hadoop.io.compress.LzoCodec { - private static final Log LOG = LogFactory.getLog(LzoCodec.class); + private static final Logger LOG = LoggerFactory.getLogger(LzoCodec.class.getName()); + private static final String gplLzoCodec = LzoCodec.class.getName(); + private static final String hadoopLzoCodec = org.apache.hadoop.io.compress.LzoCodec.class.getName(); + private static AtomicBoolean warned = new AtomicBoolean(false); - static final String gplLzoCodec = LzoCodec.class.getName(); - static final String hadoopLzoCodec = org.apache.hadoop.io.compress.LzoCodec.class.getName(); - static boolean warned = false; - - static { - LOG.info("Bridging " + gplLzoCodec + " to " + hadoopLzoCodec + "."); - } + static { + LOG.info("Bridging " + gplLzoCodec + " to " + hadoopLzoCodec + "."); + } - @Override - public CompressionOutputStream createOutputStream(OutputStream out, - Compressor compressor) throws IOException { - if (!warned) { - LOG.warn(gplLzoCodec + " is deprecated. You should use " + hadoopLzoCodec - + " instead to generate LZO compressed data."); - warned = true; - } - return super.createOutputStream(out, compressor); + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + if (warned.compareAndSet(false, true)) { + LOG.warn("{} is deprecated. You should use {} instead to generate LZO compressed data.", + gplLzoCodec, hadoopLzoCodec); } + return super.createOutputStream(out, compressor); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java index 9e599a3d7aab6..f61f77449b9f7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java @@ -20,31 +20,30 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LzopCodec extends org.apache.hadoop.io.compress.LzopCodec { - private static final Log LOG = LogFactory.getLog(LzopCodec.class); + private static final Logger LOG = LoggerFactory.getLogger(LzopCodec.class.getName()); + private static final String gplLzopCodec = LzopCodec.class.getName(); + private static final String hadoopLzopCodec = org.apache.hadoop.io.compress.LzopCodec.class.getName(); + private static AtomicBoolean warned = new AtomicBoolean(false); - static final String gplLzopCodec = LzopCodec.class.getName(); - static final String hadoopLzopCodec = org.apache.hadoop.io.compress.LzopCodec.class.getName(); - static boolean warned = false; - - static { - LOG.info("Bridging " + gplLzopCodec + " to " + hadoopLzopCodec + "."); - } + static { + LOG.info("Bridging " + gplLzopCodec + " to " + hadoopLzopCodec + "."); + } - @Override - public CompressionOutputStream createOutputStream(OutputStream out, - Compressor compressor) throws IOException { - if (!warned) { - LOG.warn(gplLzopCodec + " is deprecated. You should use " + hadoopLzopCodec - + " instead to generate LZO compressed data."); - warned = true; - } - return super.createOutputStream(out, compressor); + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + if (warned.compareAndSet(false, true)) { + LOG.warn("{} is deprecated. You should use {} instead to generate LZOP compressed data.", + gplLzopCodec, hadoopLzopCodec); } + return super.createOutputStream(out, compressor); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java index e110bb9f1f8cc..67017378d6f20 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java @@ -21,5 +21,5 @@ import org.apache.hadoop.conf.Configurable; public class LzoCodec extends io.airlift.compress.lzo.LzoCodec - implements Configurable, CompressionCodec { + implements Configurable, CompressionCodec { } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java index 09189bf0333e6..d9f788ce6b9cf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java @@ -18,91 +18,83 @@ package org.apache.hadoop.io.compress; -import io.airlift.compress.lzo.LzoCodec; +import java.io.IOException; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import java.io.IOException; - public class LzopCodec extends io.airlift.compress.lzo.LzopCodec - implements Configurable, CompressionCodec { - @Override - public Class getCompressorType() + implements Configurable, CompressionCodec { + @Override + public Class getCompressorType() { - return LzopCodec.HadoopLzopCompressor.class; + return HadoopLzopCompressor.class; } - /** - * No Hadoop code seems to actually use the compressor, so just return a dummy one so the createOutputStream method - * with a compressor can function. This interface can be implemented if needed. - */ - @DoNotPool - static class HadoopLzopCompressor - implements Compressor + @Override + public Compressor createCompressor() { - @Override - public void setInput(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public boolean needsInput() - { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public void setDictionary(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public long getBytesRead() - { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public long getBytesWritten() - { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public void finish() - { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public boolean finished() - { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public int compress(byte[] b, int off, int len) - throws IOException - { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public void reset() - { - } - - @Override - public void end() - { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public void reinit(Configuration conf) - { - } + return new HadoopLzopCompressor(); + } + + /** + * No Hadoop code seems to actually use the compressor, so just return a dummy one so the createOutputStream method + * with a compressor can function. This interface can be implemented if needed. + */ + @DoNotPool + static class HadoopLzopCompressor + implements Compressor { + @Override + public void setInput(byte[] b, int off, int len) { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public boolean needsInput() { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public long getBytesRead() { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public long getBytesWritten() { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public void finish() { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public boolean finished() { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public int compress(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public void reset() { + } + + @Override + public void end() { + throw new UnsupportedOperationException("LZOP block compressor is not supported"); + } + + @Override + public void reinit(Configuration conf) { } + } } From 08e4637386040cac5a7f8d98b8b504cf91c75975 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Tue, 21 Jul 2020 16:38:18 -0700 Subject: [PATCH 5/8] add some java doc --- .../src/main/java/com/hadoop/compression/lzo/LzoCodec.java | 4 ++++ .../src/main/java/com/hadoop/compression/lzo/LzopCodec.java | 4 ++++ .../src/main/java/org/apache/hadoop/io/compress/LzoCodec.java | 3 +++ .../main/java/org/apache/hadoop/io/compress/LzopCodec.java | 3 +++ 4 files changed, 14 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java index 049b4e22e6c04..c47b0e24fde2c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java @@ -27,6 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * This class creates lzo compressors/decompressors that are bridged + * to `org.apache.hadoop.io.compress.LzoCodec` from `com.hadoop.compression.lzo.LzoCodec` + */ public class LzoCodec extends org.apache.hadoop.io.compress.LzoCodec { private static final Logger LOG = LoggerFactory.getLogger(LzoCodec.class.getName()); private static final String gplLzoCodec = LzoCodec.class.getName(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java index f61f77449b9f7..12581d5637733 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java @@ -27,6 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * This class creates lzop compressors/decompressors that are bridged + * to `org.apache.hadoop.io.compress.LzopCodec` from `com.hadoop.compression.lzo.LzopCodec` + */ public class LzopCodec extends org.apache.hadoop.io.compress.LzopCodec { private static final Logger LOG = LoggerFactory.getLogger(LzopCodec.class.getName()); private static final String gplLzopCodec = LzopCodec.class.getName(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java index 67017378d6f20..9064d5ab096f3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java @@ -20,6 +20,9 @@ import org.apache.hadoop.conf.Configurable; +/** + * This class creates lzo compressors/decompressors. + */ public class LzoCodec extends io.airlift.compress.lzo.LzoCodec implements Configurable, CompressionCodec { } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java index d9f788ce6b9cf..940ee93ef8385 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java @@ -23,6 +23,9 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +/** + * This class creates lzop compressors/decompressors. + */ public class LzopCodec extends io.airlift.compress.lzo.LzopCodec implements Configurable, CompressionCodec { @Override From 80d3c0100020a973c76b2b4ca8545b3ba1dd57ab Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 22 Jul 2020 15:10:51 -0700 Subject: [PATCH 6/8] implement input stream and output stream --- .../hadoop/fs/CommonConfigurationKeys.java | 2 +- .../hadoop/io/compress/CodecConstants.java | 10 + .../apache/hadoop/io/compress/LzoCodec.java | 156 ++++++++- .../apache/hadoop/io/compress/LzopCodec.java | 192 +++++++---- .../hadoop/io/compress/lzo/LzoCompressor.java | 101 ++++++ .../io/compress/lzo/LzoDecompressor.java | 89 +++++ .../io/compress/lzo/LzoInputStream.java | 138 ++++++++ .../io/compress/lzo/LzoOutputStream.java | 106 ++++++ .../io/compress/lzo/LzopCompressor.java | 103 ++++++ .../io/compress/lzo/LzopDecompressor.java | 90 +++++ .../io/compress/lzo/LzopInputStream.java | 307 ++++++++++++++++++ .../io/compress/lzo/LzopOutputStream.java | 156 +++++++++ .../hadoop/io/compress/lzo/package-info.java | 22 ++ 13 files changed, 1410 insertions(+), 62 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoInputStream.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoOutputStream.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopInputStream.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopOutputStream.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index c08af395ad2f9..6a7c500bcdcd8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -142,7 +142,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Default value for IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY */ public static final int IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT = - 64*1024; + 256 * 1024; /** Internal buffer size for Snappy compressor/decompressors */ public static final String IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY = diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java index 96410a18ebcb5..b29d79c175fc6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java @@ -65,4 +65,14 @@ private CodecConstants() { * Default extension for {@link org.apache.hadoop.io.compress.ZStandardCodec}. */ public static final String ZSTANDARD_CODEC_EXTENSION = ".zst"; + + /** + * Default extension for {@link org.apache.hadoop.io.compress.LzoCodec}. + */ + public static final String LZO_CODEC_EXTENSION = ".lzo_deflate"; + + /** + * Default extension for {@link org.apache.hadoop.io.compress.LzopCodec}. + */ + public static final String LZOP_CODEC_EXTENSION = ".lzo"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java index 9064d5ab096f3..61d59cf63fb29 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java @@ -18,11 +18,163 @@ package org.apache.hadoop.io.compress; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.lzo.LzoCompressor; +import org.apache.hadoop.io.compress.lzo.LzoDecompressor; +import org.apache.hadoop.io.compress.lzo.LzoInputStream; +import org.apache.hadoop.io.compress.lzo.LzoOutputStream; /** * This class creates lzo compressors/decompressors. */ -public class LzoCodec extends io.airlift.compress.lzo.LzoCodec - implements Configurable, CompressionCodec { +public class LzoCodec implements Configurable, CompressionCodec { + Configuration conf; + + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this objec. + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzoOutputStream(out, bufferSize); + } + + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class getCompressorType() { + return LzoCompressor.class; + } + + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzoCompressor(bufferSize); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzoInputStream(in, bufferSize); + } + + /** + * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class getDecompressorType() { + return LzoDecompressor.class; + } + + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzoDecompressor(bufferSize); + } + + /** + * Get the default filename extension for this kind of compression. + * + * @return .lzo_deflate. + */ + @Override + public String getDefaultExtension() { + return CodecConstants.LZO_CODEC_EXTENSION; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java index 940ee93ef8385..5a865958abb4d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java @@ -20,84 +20,158 @@ import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.lzo.*; /** * This class creates lzop compressors/decompressors. */ -public class LzopCodec extends io.airlift.compress.lzo.LzopCodec - implements Configurable, CompressionCodec { - @Override - public Class getCompressorType() - { - return HadoopLzopCompressor.class; - } +public class LzopCodec implements Configurable, CompressionCodec { + private Configuration conf; + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ @Override - public Compressor createCompressor() - { - return new HadoopLzopCompressor(); - } + public void setConf(Configuration conf) { + this.conf = conf; + } /** - * No Hadoop code seems to actually use the compressor, so just return a dummy one so the createOutputStream method - * with a compressor can function. This interface can be implemented if needed. + * Return the configuration used by this object. + * + * @return the configuration object used by this objec. */ - @DoNotPool - static class HadoopLzopCompressor - implements Compressor { - @Override - public void setInput(byte[] b, int off, int len) { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } - - @Override - public boolean needsInput() { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } + @Override + public Configuration getConf() { + return conf; + } - @Override - public void setDictionary(byte[] b, int off, int len) { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); + } - @Override - public long getBytesRead() { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzopOutputStream(out, bufferSize); + } - @Override - public long getBytesWritten() { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class getCompressorType() { + return LzopCompressor.class; + } - @Override - public void finish() { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzopCompressor(bufferSize); + } - @Override - public boolean finished() { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); + } - @Override - public int compress(byte[] b, int off, int len) throws IOException { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzopInputStream(in, bufferSize); + } - @Override - public void reset() { - } + /** + * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class getDecompressorType() { + return LzopDecompressor.class; + } - @Override - public void end() { - throw new UnsupportedOperationException("LZOP block compressor is not supported"); - } + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzopDecompressor(bufferSize); + } - @Override - public void reinit(Configuration conf) { - } + /** + * Get the default filename extension for this kind of compression. + * + * @return .lzo. + */ + @Override + public String getDefaultExtension() { + return CodecConstants.LZO_CODEC_EXTENSION; } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java new file mode 100644 index 0000000000000..4155a1833fdab --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.Compressor; + +public class LzoCompressor implements Compressor { + private int directBufferSize; + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public LzoCompressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + } + + @Override + public void setInput(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public boolean needsInput() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public void setDictionary(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public long getBytesRead() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public long getBytesWritten() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public void finish() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public boolean finished() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public int compress(byte[] b, int off, int len) + throws IOException + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public void reset() + { + } + + @Override + public void end() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public void reinit(Configuration conf) + { + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java new file mode 100644 index 0000000000000..a1c8d1dff35ee --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; +import org.apache.hadoop.io.compress.Decompressor; + +public class LzoDecompressor implements Decompressor { + private int directBufferSize; + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public LzoDecompressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + } + + @Override + public void setInput(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public boolean needsInput() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public void setDictionary(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public boolean needsDictionary() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public boolean finished() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public int decompress(byte[] b, int off, int len) + throws IOException + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public void reset() + { + } + + @Override + public int getRemaining() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public void end() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoInputStream.java new file mode 100644 index 0000000000000..a5550dbeabc2c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoInputStream.java @@ -0,0 +1,138 @@ +package org.apache.hadoop.io.compress.lzo; + +import io.airlift.compress.lzo.LzoDecompressor; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import org.apache.hadoop.io.compress.CompressionInputStream; + + +public class LzoInputStream extends CompressionInputStream +{ + public static final int SIZE_OF_LONG = 8; + + private final io.airlift.compress.lzo.LzoDecompressor decompressor = new LzoDecompressor(); + private final InputStream in; + private final byte[] uncompressedChunk; + + private int uncompressedBlockLength; + private int uncompressedChunkOffset; + private int uncompressedChunkLength; + + private byte[] compressed = new byte[0]; + + public LzoInputStream(InputStream in, int maxUncompressedLength) + throws IOException + { + super(in); + this.in = in; + // over allocate buffer which makes decompression easier + uncompressedChunk = new byte[maxUncompressedLength + SIZE_OF_LONG]; + } + + @Override + public int read() + throws IOException + { + while (uncompressedChunkOffset >= uncompressedChunkLength) { + int compressedChunkLength = bufferCompressedData(); + if (compressedChunkLength < 0) { + return -1; + } + uncompressedChunkLength = decompressor.decompress(compressed, 0, compressedChunkLength, uncompressedChunk, 0, uncompressedChunk.length); + } + return uncompressedChunk[uncompressedChunkOffset++] & 0xFF; + } + + @Override + public int read(byte[] output, int offset, int length) + throws IOException + { + while (uncompressedChunkOffset >= uncompressedChunkLength) { + int compressedChunkLength = bufferCompressedData(); + if (compressedChunkLength < 0) { + return -1; + } + + // favor writing directly to user buffer to avoid extra copy + if (length >= uncompressedBlockLength) { + uncompressedChunkLength = decompressor.decompress(compressed, 0, compressedChunkLength, output, offset, length); + uncompressedChunkOffset = uncompressedChunkLength; + return uncompressedChunkLength; + } + + uncompressedChunkLength = decompressor.decompress(compressed, 0, compressedChunkLength, uncompressedChunk, 0, uncompressedChunk.length); + } + int size = Math.min(length, uncompressedChunkLength - uncompressedChunkOffset); + System.arraycopy(uncompressedChunk, uncompressedChunkOffset, output, offset, size); + uncompressedChunkOffset += size; + return size; + } + + @Override + public void resetState() + throws IOException + { + uncompressedBlockLength = 0; + uncompressedChunkOffset = 0; + uncompressedChunkLength = 0; + } + + private int bufferCompressedData() + throws IOException + { + uncompressedBlockLength -= uncompressedChunkOffset; + uncompressedChunkOffset = 0; + uncompressedChunkLength = 0; + while (uncompressedBlockLength == 0) { + uncompressedBlockLength = readBigEndianInt(); + if (uncompressedBlockLength == -1) { + uncompressedBlockLength = 0; + return -1; + } + } + + int compressedChunkLength = readBigEndianInt(); + if (compressedChunkLength == -1) { + return -1; + } + + if (compressed.length < compressedChunkLength) { + // over allocate buffer which makes decompression easier + compressed = new byte[compressedChunkLength + SIZE_OF_LONG]; + } + readInput(compressedChunkLength, compressed); + return compressedChunkLength; + } + + private void readInput(int length, byte[] buffer) + throws IOException + { + int offset = 0; + while (offset < length) { + int size = in.read(buffer, offset, length - offset); + if (size == -1) { + throw new EOFException("encountered EOF while reading block data"); + } + offset += size; + } + } + + private int readBigEndianInt() + throws IOException + { + int b1 = in.read(); + if (b1 < 0) { + return -1; + } + int b2 = in.read(); + int b3 = in.read(); + int b4 = in.read(); + + // If any of the other bits are negative, the stream it truncated + if ((b2 | b3 | b4) < 0) { + throw new IOException("Stream is truncated"); + } + return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4)); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoOutputStream.java new file mode 100644 index 0000000000000..9c528f56d3908 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoOutputStream.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.io.compress.lzo; + +import io.airlift.compress.lzo.LzoCompressor; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; + +public class LzoOutputStream + extends CompressionOutputStream +{ + public static final int SIZE_OF_SHORT = 2; + public static final int SIZE_OF_INT = 4; + public static final int SIZE_OF_LONG = 8; + + private final io.airlift.compress.lzo.LzoCompressor compressor = new LzoCompressor(); + + private final byte[] inputBuffer; + private final int inputMaxSize; + private int inputOffset; + + private final byte[] outputBuffer; + + public LzoOutputStream(OutputStream out, int bufferSize) + { + super(out); + inputBuffer = new byte[bufferSize]; + // leave extra space free at end of buffers to make compression (slightly) faster + inputMaxSize = inputBuffer.length - compressionOverhead(bufferSize); + outputBuffer = new byte[compressor.maxCompressedLength(inputMaxSize) + SIZE_OF_LONG]; + } + + @Override + public void write(int b) + throws IOException + { + inputBuffer[inputOffset++] = (byte) b; + if (inputOffset >= inputMaxSize) { + writeNextChunk(inputBuffer, 0, this.inputOffset); + } + } + + @Override + public void write(byte[] buffer, int offset, int length) + throws IOException + { + while (length > 0) { + int chunkSize = Math.min(length, inputMaxSize - inputOffset); + // favor writing directly from the user buffer to avoid the extra copy + if (inputOffset == 0 && length > inputMaxSize) { + writeNextChunk(buffer, offset, chunkSize); + } + else { + System.arraycopy(buffer, offset, inputBuffer, inputOffset, chunkSize); + inputOffset += chunkSize; + + if (inputOffset >= inputMaxSize) { + writeNextChunk(inputBuffer, 0, inputOffset); + } + } + length -= chunkSize; + offset += chunkSize; + } + } + + @Override + public void finish() + throws IOException + { + if (inputOffset > 0) { + writeNextChunk(inputBuffer, 0, this.inputOffset); + } + } + + @Override + public void resetState() + throws IOException + { + finish(); + } + + private void writeNextChunk(byte[] input, int inputOffset, int inputLength) + throws IOException + { + int compressedSize = compressor.compress(input, inputOffset, inputLength, outputBuffer, 0, outputBuffer.length); + + writeBigEndianInt(inputLength); + writeBigEndianInt(compressedSize); + out.write(outputBuffer, 0, compressedSize); + + this.inputOffset = 0; + } + + private void writeBigEndianInt(int value) + throws IOException + { + out.write(value >>> 24); + out.write(value >>> 16); + out.write(value >>> 8); + out.write(value); + } + + private static int compressionOverhead(int size) + { + return (size / 16) + 64 + 3; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java new file mode 100644 index 0000000000000..975ec9391c346 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; + +import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.Compressor; + +public class LzopCompressor implements Compressor { + private int directBufferSize; + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public LzopCompressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + } + + @Override + public void setInput(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public boolean needsInput() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public void setDictionary(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public long getBytesRead() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public long getBytesWritten() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public void finish() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public boolean finished() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public int compress(byte[] b, int off, int len) + throws IOException + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public void reset() + { + } + + @Override + public void end() + { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + @Override + public void reinit(Configuration conf) + { + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java new file mode 100644 index 0000000000000..b8be5aaaac6a9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; + +import org.apache.hadoop.io.compress.Decompressor; + +public class LzopDecompressor implements Decompressor { + private int directBufferSize; + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public LzopDecompressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + } + + @Override + public void setInput(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public boolean needsInput() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public void setDictionary(byte[] b, int off, int len) + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public boolean needsDictionary() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public boolean finished() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public int decompress(byte[] b, int off, int len) + throws IOException + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public void reset() + { + } + + @Override + public int getRemaining() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + @Override + public void end() + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopInputStream.java new file mode 100644 index 0000000000000..71c10b9a88705 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopInputStream.java @@ -0,0 +1,307 @@ +package org.apache.hadoop.io.compress.lzo; + +import io.airlift.compress.lzo.LzoDecompressor; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.zip.Adler32; +import java.util.zip.CRC32; +import java.util.zip.Checksum; +import org.apache.hadoop.io.compress.CompressionInputStream; + +import static java.lang.String.format; + +public class LzopInputStream extends CompressionInputStream { + static final int SIZE_OF_LONG = 8; + private static final int LZO_VERSION_MAX = 0x20A0; + private static final int LZOP_FILE_VERSION_MIN = 0x0940; + private static final int LZOP_FORMAT_VERSION_MAX = 0x1010; + + private static final int LZOP_FLAG_ADLER32_DECOMPRESSED = 0x0000_0001; + private static final int LZOP_FLAG_ADLER32_COMPRESSED = 0x0000_0002; + private static final int LZOP_FLAG_CRC32_DECOMPRESSED = 0x0000_0100; + private static final int LZOP_FLAG_CRC32_COMPRESSED = 0x0000_0200; + private static final int LZOP_FLAG_CRC32_HEADER = 0x0000_1000; + private static final int LZOP_FLAG_IO_MASK = 0x0000_000c; + private static final int LZOP_FLAG_OPERATING_SYSTEM_MASK = 0xff00_0000; + private static final int LZOP_FLAG_CHARACTER_SET_MASK = 0x00f0_0000; + + private final io.airlift.compress.lzo.LzoDecompressor decompressor = new LzoDecompressor(); + private final InputStream in; + private final byte[] uncompressedChunk; + + private int uncompressedLength; + private int uncompressedOffset; + + private boolean finished; + + private byte[] compressed = new byte[0]; + private final boolean adler32Decompressed; + private final boolean adler32Compressed; + private final boolean crc32Decompressed; + private final boolean crc32Compressed; + static final byte[] LZOP_MAGIC = new byte[] {(byte) 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a}; + static final byte LZO_1X_VARIANT = 1; + + public LzopInputStream(InputStream in, int maxUncompressedLength) + throws IOException + { + super(in); + this.in = in; + // over allocate buffer which makes decompression easier + uncompressedChunk = new byte[maxUncompressedLength + SIZE_OF_LONG]; + + byte[] magic = new byte[LZOP_MAGIC.length]; + readInput(magic, 0, magic.length); + if (!Arrays.equals(magic, LZOP_MAGIC)) { + throw new IOException("Not an LZOP file"); + } + + byte[] header = new byte[25]; + readInput(header, 0, header.length); + ByteArrayInputStream headerStream = new ByteArrayInputStream(header); + + // lzop version: ignored + int lzopFileVersion = readBigEndianShort(headerStream); + if (lzopFileVersion < LZOP_FILE_VERSION_MIN) { + throw new IOException(format("Unsupported LZOP file version 0x%08X", lzopFileVersion)); + } + + // lzo version + int lzoVersion = readBigEndianShort(headerStream); + if (lzoVersion > LZO_VERSION_MAX) { + throw new IOException(format("Unsupported LZO version 0x%08X", lzoVersion)); + } + + // lzop version of the format + int lzopFormatVersion = readBigEndianShort(headerStream); + if (lzopFormatVersion > LZOP_FORMAT_VERSION_MAX) { + throw new IOException(format("Unsupported LZOP format version 0x%08X", lzopFormatVersion)); + } + + // variant: must be LZO 1X + int variant = headerStream.read(); + if (variant != LZO_1X_VARIANT) { + throw new IOException(format("Unsupported LZO variant %s", variant)); + } + + // level: ignored + headerStream.read(); + + // flags + int flags = readBigEndianInt(headerStream); + + // ignore flags about the compression environment + flags &= ~LZOP_FLAG_IO_MASK; + flags &= ~LZOP_FLAG_OPERATING_SYSTEM_MASK; + flags &= ~LZOP_FLAG_CHARACTER_SET_MASK; + + // checksum flags + adler32Decompressed = (flags & LZOP_FLAG_ADLER32_DECOMPRESSED) != 0; + adler32Compressed = (flags & LZOP_FLAG_ADLER32_COMPRESSED) != 0; + crc32Decompressed = (flags & LZOP_FLAG_CRC32_DECOMPRESSED) != 0; + crc32Compressed = (flags & LZOP_FLAG_CRC32_COMPRESSED) != 0; + boolean crc32Header = (flags & LZOP_FLAG_CRC32_HEADER) != 0; + + flags &= ~LZOP_FLAG_ADLER32_DECOMPRESSED; + flags &= ~LZOP_FLAG_ADLER32_COMPRESSED; + flags &= ~LZOP_FLAG_CRC32_DECOMPRESSED; + flags &= ~LZOP_FLAG_CRC32_COMPRESSED; + flags &= ~LZOP_FLAG_CRC32_HEADER; + + // no other flags are supported + if (flags != 0) { + throw new IOException(format("Unsupported LZO flags 0x%08X", flags)); + } + + // output file mode: ignored + readBigEndianInt(headerStream); + + // output file modified time: ignored + readBigEndianInt(headerStream); + + // output file time zone offset: ignored + readBigEndianInt(headerStream); + + // output file name: ignored + int fileNameLength = headerStream.read(); + byte[] fileName = new byte[fileNameLength]; + readInput(fileName, 0, fileName.length); + + // verify header checksum + int headerChecksumValue = readBigEndianInt(in); + + Checksum headerChecksum = crc32Header ? new CRC32() : new Adler32(); + headerChecksum.update(header, 0, header.length); + headerChecksum.update(fileName, 0, fileName.length); + if (headerChecksumValue != (int) headerChecksum.getValue()) { + throw new IOException("Invalid header checksum"); + } + } + + @Override + public int read() + throws IOException + { + if (finished) { + return -1; + } + + while (uncompressedOffset >= uncompressedLength) { + int compressedLength = bufferCompressedData(); + if (finished) { + return -1; + } + + decompress(compressedLength, uncompressedChunk, 0, uncompressedChunk.length); + } + return uncompressedChunk[uncompressedOffset++] & 0xFF; + } + + @Override + public int read(byte[] output, int offset, int length) + throws IOException + { + if (finished) { + return -1; + } + + while (uncompressedOffset >= uncompressedLength) { + int compressedLength = bufferCompressedData(); + if (finished) { + return -1; + } + + // favor writing directly to user buffer to avoid extra copy + if (length >= uncompressedLength) { + decompress(compressedLength, output, offset, length); + uncompressedOffset = uncompressedLength; + return uncompressedLength; + } + + decompress(compressedLength, uncompressedChunk, 0, uncompressedChunk.length); + } + int size = Math.min(length, uncompressedLength - uncompressedOffset); + System.arraycopy(uncompressedChunk, uncompressedOffset, output, offset, size); + uncompressedOffset += size; + return size; + } + + @Override + public void resetState() + throws IOException + { + uncompressedLength = 0; + uncompressedOffset = 0; + finished = false; + } + + private int bufferCompressedData() + throws IOException + { + uncompressedOffset = 0; + uncompressedLength = readBigEndianInt(in); + if (uncompressedLength == -1) { + // LZOP file MUST end with uncompressedLength == 0 + throw new EOFException("encountered EOF while reading block data"); + } + if (uncompressedLength == 0) { + finished = true; + return -1; + } + + int compressedLength = readBigEndianInt(in); + if (compressedLength == -1) { + throw new EOFException("encountered EOF while reading block data"); + } + + skipChecksums(compressedLength < uncompressedLength); + + return compressedLength; + } + + private void skipChecksums(boolean compressed) + throws IOException + { + if (adler32Decompressed) { + readBigEndianInt(in); + } + if (crc32Decompressed) { + readBigEndianInt(in); + } + if (compressed && adler32Compressed) { + readBigEndianInt(in); + } + if (compressed && crc32Compressed) { + readBigEndianInt(in); + } + } + + private void decompress(int compressedLength, byte[] output, int outputOffset, int outputLength) + throws IOException + { + if (uncompressedLength == compressedLength) { + readInput(output, outputOffset, compressedLength); + } + else { + if (compressed.length < compressedLength) { + // over allocate buffer which makes decompression easier + compressed = new byte[compressedLength + SIZE_OF_LONG]; + } + readInput(compressed, 0, compressedLength); + int actualUncompressedLength = decompressor.decompress(compressed, 0, compressedLength, output, outputOffset, outputLength); + if (actualUncompressedLength != uncompressedLength) { + throw new IOException("Decompressor did not decompress the entire block"); + } + } + } + + private void readInput(byte[] buffer, int offset, int length) + throws IOException + { + while (length > 0) { + int size = in.read(buffer, offset, length); + if (size == -1) { + throw new EOFException("encountered EOF while reading block data"); + } + offset += size; + length -= size; + } + } + + private static int readBigEndianShort(InputStream in) + throws IOException + { + int b1 = in.read(); + if (b1 < 0) { + return -1; + } + + int b2 = in.read(); + // If second byte is negative, the stream it truncated + if ((b2) < 0) { + throw new IOException("Stream is truncated"); + } + return (b1 << 8) + (b2); + } + + private static int readBigEndianInt(InputStream in) + throws IOException + { + int b1 = in.read(); + if (b1 < 0) { + return -1; + } + int b2 = in.read(); + int b3 = in.read(); + int b4 = in.read(); + + // If any of the other bits are negative, the stream it truncated + if ((b2 | b3 | b4) < 0) { + throw new IOException("Stream is truncated"); + } + return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4)); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopOutputStream.java new file mode 100644 index 0000000000000..2ca361b743737 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopOutputStream.java @@ -0,0 +1,156 @@ +package org.apache.hadoop.io.compress.lzo; + +import io.airlift.compress.lzo.LzoCompressor; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.Adler32; +import org.apache.hadoop.io.compress.CompressionOutputStream; + +public class LzopOutputStream extends CompressionOutputStream +{ + static final int SIZE_OF_LONG = 8; + private static final int LZOP_FILE_VERSION = 0x1010; + private static final int LZOP_FORMAT_VERSION = 0x0940; + private static final int LZO_FORMAT_VERSION = 0x2050; + private static final int LEVEL = 5; + static final byte[] LZOP_MAGIC = new byte[] {(byte) 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a}; + static final byte LZO_1X_VARIANT = 1; + + private final io.airlift.compress.lzo.LzoCompressor compressor = new LzoCompressor(); + + private final byte[] inputBuffer; + private final int inputMaxSize; + private int inputOffset; + + private final byte[] outputBuffer; + + public LzopOutputStream(OutputStream out, int bufferSize) + throws IOException + { + super(out); + inputBuffer = new byte[bufferSize]; + // leave extra space free at end of buffers to make compression (slightly) faster + inputMaxSize = inputBuffer.length - compressionOverhead(bufferSize); + outputBuffer = new byte[compressor.maxCompressedLength(inputMaxSize) + SIZE_OF_LONG]; + + out.write(LZOP_MAGIC); + + ByteArrayOutputStream headerOut = new ByteArrayOutputStream(25); + DataOutputStream headerDataOut = new DataOutputStream(headerOut); + headerDataOut.writeShort(LZOP_FILE_VERSION); + headerDataOut.writeShort(LZO_FORMAT_VERSION); + headerDataOut.writeShort(LZOP_FORMAT_VERSION); + headerDataOut.writeByte(LZO_1X_VARIANT); + headerDataOut.writeByte(LEVEL); + + // flags (none) + headerDataOut.writeInt(0); + // file mode (non-executable regular file) + headerDataOut.writeInt(0x81a4); + // modified time (in seconds from epoch) + headerDataOut.writeInt((int) (System.currentTimeMillis() / 1000)); + // time zone modifier for above, which is UTC so 0 + headerDataOut.writeInt(0); + // file name length (none) + headerDataOut.writeByte(0); + + byte[] header = headerOut.toByteArray(); + out.write(header); + + Adler32 headerChecksum = new Adler32(); + headerChecksum.update(header); + writeBigEndianInt((int) headerChecksum.getValue()); + } + + @Override + public void write(int b) + throws IOException + { + inputBuffer[inputOffset++] = (byte) b; + if (inputOffset >= inputMaxSize) { + writeNextChunk(inputBuffer, 0, this.inputOffset); + } + } + + @Override + public void write(byte[] buffer, int offset, int length) + throws IOException + { + while (length > 0) { + int chunkSize = Math.min(length, inputMaxSize - inputOffset); + // favor writing directly from the user buffer to avoid the extra copy + if (inputOffset == 0 && length > inputMaxSize) { + writeNextChunk(buffer, offset, chunkSize); + } + else { + System.arraycopy(buffer, offset, inputBuffer, inputOffset, chunkSize); + inputOffset += chunkSize; + + if (inputOffset >= inputMaxSize) { + writeNextChunk(inputBuffer, 0, inputOffset); + } + } + length -= chunkSize; + offset += chunkSize; + } + } + + @Override + public void finish() + throws IOException + { + if (inputOffset > 0) { + writeNextChunk(inputBuffer, 0, this.inputOffset); + } + } + + @Override + public void close() + throws IOException + { + finish(); + writeBigEndianInt(0); + super.close(); + } + + @Override + public void resetState() + throws IOException + { + finish(); + } + + private void writeNextChunk(byte[] input, int inputOffset, int inputLength) + throws IOException + { + int compressedSize = compressor.compress(input, inputOffset, inputLength, outputBuffer, 0, outputBuffer.length); + + writeBigEndianInt(inputLength); + if (compressedSize < inputLength) { + writeBigEndianInt(compressedSize); + out.write(outputBuffer, 0, compressedSize); + } + else { + writeBigEndianInt(inputLength); + out.write(input, inputOffset, inputLength); + } + + this.inputOffset = 0; + } + + private void writeBigEndianInt(int value) + throws IOException + { + out.write(value >>> 24); + out.write(value >>> 16); + out.write(value >>> 8); + out.write(value); + } + + private static int compressionOverhead(int size) + { + return (size / 16) + 64 + 3; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java new file mode 100644 index 0000000000000..19effdd39c11a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.io.compress.lzo; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; From 75863aaf1c7b09d4eb048abeed6db12f3f2c21d3 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 22 Jul 2020 15:43:04 -0700 Subject: [PATCH 7/8] implement compressor and decompressor --- .../hadoop/io/compress/lzo/LzoCompressor.java | 187 +++++++++++++++--- .../io/compress/lzo/LzoDecompressor.java | 179 ++++++++++++++--- .../io/compress/lzo/LzopCompressor.java | 186 ++++++++++++++--- .../io/compress/lzo/LzopDecompressor.java | 182 ++++++++++++++--- 4 files changed, 622 insertions(+), 112 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java index 4155a1833fdab..43db878508871 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java @@ -19,11 +19,29 @@ package org.apache.hadoop.io.compress.lzo; import java.io.IOException; + +import java.nio.Buffer; +import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.Compressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LzoCompressor implements Compressor { + private static final Logger LOG = + LoggerFactory.getLogger(LzoCompressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int uncompressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finish, finished; + + private long bytesRead = 0L; + private long bytesWritten = 0L; /** * Creates a new compressor. @@ -32,70 +50,177 @@ public class LzoCompressor implements Compressor { */ public LzoCompressor(int directBufferSize) { this.directBufferSize = directBufferSize; + + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf.position(directBufferSize); } + /** + * Creates a new compressor with the default buffer size. + */ + public LzoCompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for compression. + * This should be called whenever #needsInput() returns + * true indicating that more input data is required. + * + * @param b Input data + * @param off Start offset + * @param len Length + */ @Override - public void setInput(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + finished = false; + + if (len > uncompressedDirectBuf.remaining()) { + // save data; now !needsInput + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + } else { + ((ByteBuffer) uncompressedDirectBuf).put(b, off, len); + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + bytesRead += len; + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + if (0 >= userBufLen) { + return; + } + finished = false; + + uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize); + ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff, + uncompressedDirectBufLen); + + // Note how much data is being fed to lz4 + userBufOff += uncompressedDirectBufLen; + userBufLen -= uncompressedDirectBufLen; } + /** + * Does nothing. + */ @Override - public boolean needsInput() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing } + /** + * Returns true if the input data buffer is empty and + * #setInput() should be called to provide more input. + * + * @return true if the input data buffer is empty and + * #setInput() should be called in order to provide more input. + */ @Override - public void setDictionary(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized boolean needsInput() { + return !(compressedDirectBuf.remaining() > 0 + || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0); } + /** + * When called, indicates that compression should end + * with the current contents of the input buffer. + */ @Override - public long getBytesRead() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void finish() { + finish = true; } + /** + * Returns true if the end of the compressed + * data output stream has been reached. + * + * @return true if the end of the compressed + * data output stream has been reached. + */ @Override - public long getBytesWritten() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized boolean finished() { + // Check if all uncompressed data has been consumed + return (finish && finished && compressedDirectBuf.remaining() == 0); } + /** + * Fills specified buffer with compressed data. Returns actual number + * of bytes of compressed data. A return value of 0 indicates that + * needsInput() should be called in order to determine if more input + * data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + */ @Override - public void finish() - { + public synchronized int compress(byte[] b, int off, int len) + throws IOException { throw new UnsupportedOperationException("LZO block compressor is not supported"); } + /** + * Resets compressor so that a new set of input data can be processed. + */ @Override - public boolean finished() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void reset() { + finish = false; + finished = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufLen = 0; + compressedDirectBuf.clear(); + compressedDirectBuf.limit(0); + userBufOff = userBufLen = 0; + bytesRead = bytesWritten = 0L; } + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration + * + * @param conf Configuration from which new setting are fetched + */ @Override - public int compress(byte[] b, int off, int len) - throws IOException - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void reinit(Configuration conf) { + reset(); } + /** + * Return number of bytes given to this compressor since last reset. + */ @Override - public void reset() - { + public synchronized long getBytesRead() { + return bytesRead; } + /** + * Return number of bytes consumed by callers of compress since last reset. + */ @Override - public void end() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized long getBytesWritten() { + return bytesWritten; } + /** + * Closes the compressor and discards any unprocessed input. + */ @Override - public void reinit(Configuration conf) - { + public synchronized void end() { } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java index a1c8d1dff35ee..60310418feccb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java @@ -19,10 +19,25 @@ package org.apache.hadoop.io.compress.lzo; import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + import org.apache.hadoop.io.compress.Decompressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LzoDecompressor implements Decompressor { + private static final Logger LOG = + LoggerFactory.getLogger(LzoDecompressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int compressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finished; /** * Creates a new compressor. @@ -31,59 +46,175 @@ public class LzoDecompressor implements Decompressor { */ public LzoDecompressor(int directBufferSize) { this.directBufferSize = directBufferSize; + + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + /** + * Creates a new decompressor with the default buffer size. + */ + public LzoDecompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); } + /** + * Sets input data for decompression. + * This should be called if and only if {@link #needsInput()} returns + * true indicating that more input data is required. + * (Both native and non-native versions of various Decompressors require + * that the data passed in via b[] remain unmodified until + * the caller is explicitly notified--via {@link #needsInput()}--that the + * buffer may be safely modified. With this requirement, an extra + * buffer-copy can be avoided.) + * + * @param b Input data + * @param off Start offset + * @param len Length + */ @Override - public void setInput(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + + setInputFromSavedData(); + + // Reinitialize lz4's output direct-buffer + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + compressedDirectBufLen = Math.min(userBufLen, directBufferSize); + + // Reinitialize lz4's input direct buffer + compressedDirectBuf.rewind(); + ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff, + compressedDirectBufLen); + + // Note how much data is being fed to lz4 + userBufOff += compressedDirectBufLen; + userBufLen -= compressedDirectBufLen; } + /** + * Does nothing. + */ @Override - public boolean needsInput() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing } + /** + * Returns true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called to + * provide more input. + * + * @return true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called in + * order to provide more input. + */ @Override - public void setDictionary(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized boolean needsInput() { + // Consume remaining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if lz4 has consumed all input + if (compressedDirectBufLen <= 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + + return false; } + /** + * Returns false. + * + * @return false. + */ @Override - public boolean needsDictionary() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized boolean needsDictionary() { + return false; } + /** + * Returns true if the end of the decompressed + * data output stream has been reached. + * + * @return true if the end of the decompressed + * data output stream has been reached. + */ @Override - public boolean finished() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized boolean finished() { + return (finished && uncompressedDirectBuf.remaining() == 0); } + /** + * Fills specified buffer with uncompressed data. Returns actual number + * of bytes of uncompressed data. A return value of 0 indicates that + * {@link #needsInput()} should be called in order to determine if more + * input data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + * @throws IOException + */ @Override - public int decompress(byte[] b, int off, int len) + public synchronized int decompress(byte[] b, int off, int len) throws IOException { throw new UnsupportedOperationException("LZO block decompressor is not supported"); } + /** + * Returns 0. + * + * @return 0. + */ @Override - public void reset() - { + public synchronized int getRemaining() { + // Never use this function in BlockDecompressorStream. + return 0; } @Override - public int getRemaining() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized void reset() { + finished = false; + compressedDirectBufLen = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = userBufLen = 0; } + /** + * Resets decompressor and input and output buffers so that a new set of + * input data can be processed. + */ @Override - public void end() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized void end() { + // do nothing } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java index 975ec9391c346..d13556143c180 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java @@ -20,12 +20,29 @@ import java.io.IOException; +import java.nio.Buffer; import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.lz4.Lz4Compressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LzopCompressor implements Compressor { + private static final Logger LOG = + LoggerFactory.getLogger(LzopCompressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int uncompressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finish, finished; + + private long bytesRead = 0L; + private long bytesWritten = 0L; /** * Creates a new compressor. @@ -34,70 +51,177 @@ public class LzopCompressor implements Compressor { */ public LzopCompressor(int directBufferSize) { this.directBufferSize = directBufferSize; + + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf.position(directBufferSize); + } + + /** + * Creates a new compressor with the default buffer size. + */ + public LzopCompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); } + /** + * Sets input data for compression. + * This should be called whenever #needsInput() returns + * true indicating that more input data is required. + * + * @param b Input data + * @param off Start offset + * @param len Length + */ @Override - public void setInput(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + finished = false; + + if (len > uncompressedDirectBuf.remaining()) { + // save data; now !needsInput + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + } else { + ((ByteBuffer) uncompressedDirectBuf).put(b, off, len); + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + bytesRead += len; } + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + if (0 >= userBufLen) { + return; + } + finished = false; + + uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize); + ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff, + uncompressedDirectBufLen); + + // Note how much data is being fed to lz4 + userBufOff += uncompressedDirectBufLen; + userBufLen -= uncompressedDirectBufLen; + } + + /** + * Does nothing. + */ @Override - public boolean needsInput() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing } + /** + * Returns true if the input data buffer is empty and + * #setInput() should be called to provide more input. + * + * @return true if the input data buffer is empty and + * #setInput() should be called in order to provide more input. + */ @Override - public void setDictionary(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized boolean needsInput() { + return !(compressedDirectBuf.remaining() > 0 + || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0); } + /** + * When called, indicates that compression should end + * with the current contents of the input buffer. + */ @Override - public long getBytesRead() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void finish() { + finish = true; } + /** + * Returns true if the end of the compressed + * data output stream has been reached. + * + * @return true if the end of the compressed + * data output stream has been reached. + */ @Override - public long getBytesWritten() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized boolean finished() { + // Check if all uncompressed data has been consumed + return (finish && finished && compressedDirectBuf.remaining() == 0); } + /** + * Fills specified buffer with compressed data. Returns actual number + * of bytes of compressed data. A return value of 0 indicates that + * needsInput() should be called in order to determine if more input + * data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + */ @Override - public void finish() - { + public synchronized int compress(byte[] b, int off, int len) + throws IOException { throw new UnsupportedOperationException("LZO block compressor is not supported"); } + /** + * Resets compressor so that a new set of input data can be processed. + */ @Override - public boolean finished() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void reset() { + finish = false; + finished = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufLen = 0; + compressedDirectBuf.clear(); + compressedDirectBuf.limit(0); + userBufOff = userBufLen = 0; + bytesRead = bytesWritten = 0L; } + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration + * + * @param conf Configuration from which new setting are fetched + */ @Override - public int compress(byte[] b, int off, int len) - throws IOException - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized void reinit(Configuration conf) { + reset(); } + /** + * Return number of bytes given to this compressor since last reset. + */ @Override - public void reset() - { + public synchronized long getBytesRead() { + return bytesRead; } + /** + * Return number of bytes consumed by callers of compress since last reset. + */ @Override - public void end() - { - throw new UnsupportedOperationException("LZO block compressor is not supported"); + public synchronized long getBytesWritten() { + return bytesWritten; } + /** + * Closes the compressor and discards any unprocessed input. + */ @Override - public void reinit(Configuration conf) - { + public synchronized void end() { } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java index b8be5aaaac6a9..93a428a451e35 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java @@ -20,10 +20,25 @@ import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.lz4.Lz4Compressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LzopDecompressor implements Decompressor { + private static final Logger LOG = + LoggerFactory.getLogger(Lz4Compressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int compressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finished; /** * Creates a new compressor. @@ -32,59 +47,174 @@ public class LzopDecompressor implements Decompressor { */ public LzopDecompressor(int directBufferSize) { this.directBufferSize = directBufferSize; + + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + /** + * Creates a new decompressor with the default buffer size. + */ + public LzopDecompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); } + /** + * Sets input data for decompression. + * This should be called if and only if {@link #needsInput()} returns + * true indicating that more input data is required. + * (Both native and non-native versions of various Decompressors require + * that the data passed in via b[] remain unmodified until + * the caller is explicitly notified--via {@link #needsInput()}--that the + * buffer may be safely modified. With this requirement, an extra + * buffer-copy can be avoided.) + * + * @param b Input data + * @param off Start offset + * @param len Length + */ @Override - public void setInput(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + + setInputFromSavedData(); + + // Reinitialize lz4's output direct-buffer + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); } + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + compressedDirectBufLen = Math.min(userBufLen, directBufferSize); + + // Reinitialize lz4's input direct buffer + compressedDirectBuf.rewind(); + ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff, + compressedDirectBufLen); + + // Note how much data is being fed to lz4 + userBufOff += compressedDirectBufLen; + userBufLen -= compressedDirectBufLen; + } + + /** + * Does nothing. + */ @Override - public boolean needsInput() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing } + /** + * Returns true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called to + * provide more input. + * + * @return true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called in + * order to provide more input. + */ @Override - public void setDictionary(byte[] b, int off, int len) - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized boolean needsInput() { + // Consume remaining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if lz4 has consumed all input + if (compressedDirectBufLen <= 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + + return false; } + /** + * Returns false. + * + * @return false. + */ @Override - public boolean needsDictionary() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized boolean needsDictionary() { + return false; } + /** + * Returns true if the end of the decompressed + * data output stream has been reached. + * + * @return true if the end of the decompressed + * data output stream has been reached. + */ @Override - public boolean finished() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized boolean finished() { + return (finished && uncompressedDirectBuf.remaining() == 0); } + /** + * Fills specified buffer with uncompressed data. Returns actual number + * of bytes of uncompressed data. A return value of 0 indicates that + * {@link #needsInput()} should be called in order to determine if more + * input data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + * @throws IOException + */ @Override - public int decompress(byte[] b, int off, int len) - throws IOException - { + public synchronized int decompress(byte[] b, int off, int len) + throws IOException { throw new UnsupportedOperationException("LZO block decompressor is not supported"); } + /** + * Returns 0. + * + * @return 0. + */ @Override - public void reset() - { + public synchronized int getRemaining() { + // Never use this function in BlockDecompressorStream. + return 0; } @Override - public int getRemaining() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized void reset() { + finished = false; + compressedDirectBufLen = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = userBufLen = 0; } + /** + * Resets decompressor and input and output buffers so that a new set of + * input data can be processed. + */ @Override - public void end() - { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); + public synchronized void end() { + // do nothing } } From 957f071746c123df29461106cabfd00d70c0aceb Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Tue, 4 Aug 2020 13:43:57 -0700 Subject: [PATCH 8/8] remove LzoInputStream.java and LzopOutputStream --- .../apache/hadoop/io/compress/LzoCodec.java | 14 +- .../apache/hadoop/io/compress/LzopCodec.java | 16 +- .../io/compress/lzo/LzoInputStream.java | 138 -------- .../io/compress/lzo/LzoOutputStream.java | 106 ------ .../io/compress/lzo/LzopInputStream.java | 307 ------------------ .../io/compress/lzo/LzopOutputStream.java | 156 --------- 6 files changed, 22 insertions(+), 715 deletions(-) delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoInputStream.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoOutputStream.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopInputStream.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopOutputStream.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java index 61d59cf63fb29..6f4c696d169ae 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java @@ -27,8 +27,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.compress.lzo.LzoCompressor; import org.apache.hadoop.io.compress.lzo.LzoDecompressor; -import org.apache.hadoop.io.compress.lzo.LzoInputStream; -import org.apache.hadoop.io.compress.lzo.LzoOutputStream; /** * This class creates lzo compressors/decompressors. @@ -86,7 +84,11 @@ public CompressionOutputStream createOutputStream(OutputStream out, int bufferSize = conf.getInt( CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); - return new LzoOutputStream(out, bufferSize); + io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createOutputStream(out); } /** @@ -142,7 +144,11 @@ public CompressionInputStream createInputStream(InputStream in, int bufferSize = conf.getInt( CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); - return new LzoInputStream(in, bufferSize); + io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createInputStream(in); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java index 5a865958abb4d..a61df517ed2dc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java @@ -79,11 +79,15 @@ public CompressionOutputStream createOutputStream(OutputStream out) */ @Override public CompressionOutputStream createOutputStream(OutputStream out, - Compressor compressor) throws IOException { + Compressor compressor) throws IOException { int bufferSize = conf.getInt( CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); - return new LzopOutputStream(out, bufferSize); + io.airlift.compress.lzo.LzopCodec codec = new io.airlift.compress.lzo.LzopCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createOutputStream(out); } /** @@ -135,11 +139,15 @@ public CompressionInputStream createInputStream(InputStream in) */ @Override public CompressionInputStream createInputStream(InputStream in, - Decompressor decompressor) throws IOException { + Decompressor decompressor) throws IOException { int bufferSize = conf.getInt( CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); - return new LzopInputStream(in, bufferSize); + io.airlift.compress.lzo.LzopCodec codec = new io.airlift.compress.lzo.LzopCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createInputStream(in); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoInputStream.java deleted file mode 100644 index a5550dbeabc2c..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoInputStream.java +++ /dev/null @@ -1,138 +0,0 @@ -package org.apache.hadoop.io.compress.lzo; - -import io.airlift.compress.lzo.LzoDecompressor; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import org.apache.hadoop.io.compress.CompressionInputStream; - - -public class LzoInputStream extends CompressionInputStream -{ - public static final int SIZE_OF_LONG = 8; - - private final io.airlift.compress.lzo.LzoDecompressor decompressor = new LzoDecompressor(); - private final InputStream in; - private final byte[] uncompressedChunk; - - private int uncompressedBlockLength; - private int uncompressedChunkOffset; - private int uncompressedChunkLength; - - private byte[] compressed = new byte[0]; - - public LzoInputStream(InputStream in, int maxUncompressedLength) - throws IOException - { - super(in); - this.in = in; - // over allocate buffer which makes decompression easier - uncompressedChunk = new byte[maxUncompressedLength + SIZE_OF_LONG]; - } - - @Override - public int read() - throws IOException - { - while (uncompressedChunkOffset >= uncompressedChunkLength) { - int compressedChunkLength = bufferCompressedData(); - if (compressedChunkLength < 0) { - return -1; - } - uncompressedChunkLength = decompressor.decompress(compressed, 0, compressedChunkLength, uncompressedChunk, 0, uncompressedChunk.length); - } - return uncompressedChunk[uncompressedChunkOffset++] & 0xFF; - } - - @Override - public int read(byte[] output, int offset, int length) - throws IOException - { - while (uncompressedChunkOffset >= uncompressedChunkLength) { - int compressedChunkLength = bufferCompressedData(); - if (compressedChunkLength < 0) { - return -1; - } - - // favor writing directly to user buffer to avoid extra copy - if (length >= uncompressedBlockLength) { - uncompressedChunkLength = decompressor.decompress(compressed, 0, compressedChunkLength, output, offset, length); - uncompressedChunkOffset = uncompressedChunkLength; - return uncompressedChunkLength; - } - - uncompressedChunkLength = decompressor.decompress(compressed, 0, compressedChunkLength, uncompressedChunk, 0, uncompressedChunk.length); - } - int size = Math.min(length, uncompressedChunkLength - uncompressedChunkOffset); - System.arraycopy(uncompressedChunk, uncompressedChunkOffset, output, offset, size); - uncompressedChunkOffset += size; - return size; - } - - @Override - public void resetState() - throws IOException - { - uncompressedBlockLength = 0; - uncompressedChunkOffset = 0; - uncompressedChunkLength = 0; - } - - private int bufferCompressedData() - throws IOException - { - uncompressedBlockLength -= uncompressedChunkOffset; - uncompressedChunkOffset = 0; - uncompressedChunkLength = 0; - while (uncompressedBlockLength == 0) { - uncompressedBlockLength = readBigEndianInt(); - if (uncompressedBlockLength == -1) { - uncompressedBlockLength = 0; - return -1; - } - } - - int compressedChunkLength = readBigEndianInt(); - if (compressedChunkLength == -1) { - return -1; - } - - if (compressed.length < compressedChunkLength) { - // over allocate buffer which makes decompression easier - compressed = new byte[compressedChunkLength + SIZE_OF_LONG]; - } - readInput(compressedChunkLength, compressed); - return compressedChunkLength; - } - - private void readInput(int length, byte[] buffer) - throws IOException - { - int offset = 0; - while (offset < length) { - int size = in.read(buffer, offset, length - offset); - if (size == -1) { - throw new EOFException("encountered EOF while reading block data"); - } - offset += size; - } - } - - private int readBigEndianInt() - throws IOException - { - int b1 = in.read(); - if (b1 < 0) { - return -1; - } - int b2 = in.read(); - int b3 = in.read(); - int b4 = in.read(); - - // If any of the other bits are negative, the stream it truncated - if ((b2 | b3 | b4) < 0) { - throw new IOException("Stream is truncated"); - } - return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4)); - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoOutputStream.java deleted file mode 100644 index 9c528f56d3908..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoOutputStream.java +++ /dev/null @@ -1,106 +0,0 @@ -package org.apache.hadoop.io.compress.lzo; - -import io.airlift.compress.lzo.LzoCompressor; -import java.io.IOException; -import java.io.OutputStream; -import org.apache.hadoop.io.compress.CompressionOutputStream; - -public class LzoOutputStream - extends CompressionOutputStream -{ - public static final int SIZE_OF_SHORT = 2; - public static final int SIZE_OF_INT = 4; - public static final int SIZE_OF_LONG = 8; - - private final io.airlift.compress.lzo.LzoCompressor compressor = new LzoCompressor(); - - private final byte[] inputBuffer; - private final int inputMaxSize; - private int inputOffset; - - private final byte[] outputBuffer; - - public LzoOutputStream(OutputStream out, int bufferSize) - { - super(out); - inputBuffer = new byte[bufferSize]; - // leave extra space free at end of buffers to make compression (slightly) faster - inputMaxSize = inputBuffer.length - compressionOverhead(bufferSize); - outputBuffer = new byte[compressor.maxCompressedLength(inputMaxSize) + SIZE_OF_LONG]; - } - - @Override - public void write(int b) - throws IOException - { - inputBuffer[inputOffset++] = (byte) b; - if (inputOffset >= inputMaxSize) { - writeNextChunk(inputBuffer, 0, this.inputOffset); - } - } - - @Override - public void write(byte[] buffer, int offset, int length) - throws IOException - { - while (length > 0) { - int chunkSize = Math.min(length, inputMaxSize - inputOffset); - // favor writing directly from the user buffer to avoid the extra copy - if (inputOffset == 0 && length > inputMaxSize) { - writeNextChunk(buffer, offset, chunkSize); - } - else { - System.arraycopy(buffer, offset, inputBuffer, inputOffset, chunkSize); - inputOffset += chunkSize; - - if (inputOffset >= inputMaxSize) { - writeNextChunk(inputBuffer, 0, inputOffset); - } - } - length -= chunkSize; - offset += chunkSize; - } - } - - @Override - public void finish() - throws IOException - { - if (inputOffset > 0) { - writeNextChunk(inputBuffer, 0, this.inputOffset); - } - } - - @Override - public void resetState() - throws IOException - { - finish(); - } - - private void writeNextChunk(byte[] input, int inputOffset, int inputLength) - throws IOException - { - int compressedSize = compressor.compress(input, inputOffset, inputLength, outputBuffer, 0, outputBuffer.length); - - writeBigEndianInt(inputLength); - writeBigEndianInt(compressedSize); - out.write(outputBuffer, 0, compressedSize); - - this.inputOffset = 0; - } - - private void writeBigEndianInt(int value) - throws IOException - { - out.write(value >>> 24); - out.write(value >>> 16); - out.write(value >>> 8); - out.write(value); - } - - private static int compressionOverhead(int size) - { - return (size / 16) + 64 + 3; - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopInputStream.java deleted file mode 100644 index 71c10b9a88705..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopInputStream.java +++ /dev/null @@ -1,307 +0,0 @@ -package org.apache.hadoop.io.compress.lzo; - -import io.airlift.compress.lzo.LzoDecompressor; -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.zip.Adler32; -import java.util.zip.CRC32; -import java.util.zip.Checksum; -import org.apache.hadoop.io.compress.CompressionInputStream; - -import static java.lang.String.format; - -public class LzopInputStream extends CompressionInputStream { - static final int SIZE_OF_LONG = 8; - private static final int LZO_VERSION_MAX = 0x20A0; - private static final int LZOP_FILE_VERSION_MIN = 0x0940; - private static final int LZOP_FORMAT_VERSION_MAX = 0x1010; - - private static final int LZOP_FLAG_ADLER32_DECOMPRESSED = 0x0000_0001; - private static final int LZOP_FLAG_ADLER32_COMPRESSED = 0x0000_0002; - private static final int LZOP_FLAG_CRC32_DECOMPRESSED = 0x0000_0100; - private static final int LZOP_FLAG_CRC32_COMPRESSED = 0x0000_0200; - private static final int LZOP_FLAG_CRC32_HEADER = 0x0000_1000; - private static final int LZOP_FLAG_IO_MASK = 0x0000_000c; - private static final int LZOP_FLAG_OPERATING_SYSTEM_MASK = 0xff00_0000; - private static final int LZOP_FLAG_CHARACTER_SET_MASK = 0x00f0_0000; - - private final io.airlift.compress.lzo.LzoDecompressor decompressor = new LzoDecompressor(); - private final InputStream in; - private final byte[] uncompressedChunk; - - private int uncompressedLength; - private int uncompressedOffset; - - private boolean finished; - - private byte[] compressed = new byte[0]; - private final boolean adler32Decompressed; - private final boolean adler32Compressed; - private final boolean crc32Decompressed; - private final boolean crc32Compressed; - static final byte[] LZOP_MAGIC = new byte[] {(byte) 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a}; - static final byte LZO_1X_VARIANT = 1; - - public LzopInputStream(InputStream in, int maxUncompressedLength) - throws IOException - { - super(in); - this.in = in; - // over allocate buffer which makes decompression easier - uncompressedChunk = new byte[maxUncompressedLength + SIZE_OF_LONG]; - - byte[] magic = new byte[LZOP_MAGIC.length]; - readInput(magic, 0, magic.length); - if (!Arrays.equals(magic, LZOP_MAGIC)) { - throw new IOException("Not an LZOP file"); - } - - byte[] header = new byte[25]; - readInput(header, 0, header.length); - ByteArrayInputStream headerStream = new ByteArrayInputStream(header); - - // lzop version: ignored - int lzopFileVersion = readBigEndianShort(headerStream); - if (lzopFileVersion < LZOP_FILE_VERSION_MIN) { - throw new IOException(format("Unsupported LZOP file version 0x%08X", lzopFileVersion)); - } - - // lzo version - int lzoVersion = readBigEndianShort(headerStream); - if (lzoVersion > LZO_VERSION_MAX) { - throw new IOException(format("Unsupported LZO version 0x%08X", lzoVersion)); - } - - // lzop version of the format - int lzopFormatVersion = readBigEndianShort(headerStream); - if (lzopFormatVersion > LZOP_FORMAT_VERSION_MAX) { - throw new IOException(format("Unsupported LZOP format version 0x%08X", lzopFormatVersion)); - } - - // variant: must be LZO 1X - int variant = headerStream.read(); - if (variant != LZO_1X_VARIANT) { - throw new IOException(format("Unsupported LZO variant %s", variant)); - } - - // level: ignored - headerStream.read(); - - // flags - int flags = readBigEndianInt(headerStream); - - // ignore flags about the compression environment - flags &= ~LZOP_FLAG_IO_MASK; - flags &= ~LZOP_FLAG_OPERATING_SYSTEM_MASK; - flags &= ~LZOP_FLAG_CHARACTER_SET_MASK; - - // checksum flags - adler32Decompressed = (flags & LZOP_FLAG_ADLER32_DECOMPRESSED) != 0; - adler32Compressed = (flags & LZOP_FLAG_ADLER32_COMPRESSED) != 0; - crc32Decompressed = (flags & LZOP_FLAG_CRC32_DECOMPRESSED) != 0; - crc32Compressed = (flags & LZOP_FLAG_CRC32_COMPRESSED) != 0; - boolean crc32Header = (flags & LZOP_FLAG_CRC32_HEADER) != 0; - - flags &= ~LZOP_FLAG_ADLER32_DECOMPRESSED; - flags &= ~LZOP_FLAG_ADLER32_COMPRESSED; - flags &= ~LZOP_FLAG_CRC32_DECOMPRESSED; - flags &= ~LZOP_FLAG_CRC32_COMPRESSED; - flags &= ~LZOP_FLAG_CRC32_HEADER; - - // no other flags are supported - if (flags != 0) { - throw new IOException(format("Unsupported LZO flags 0x%08X", flags)); - } - - // output file mode: ignored - readBigEndianInt(headerStream); - - // output file modified time: ignored - readBigEndianInt(headerStream); - - // output file time zone offset: ignored - readBigEndianInt(headerStream); - - // output file name: ignored - int fileNameLength = headerStream.read(); - byte[] fileName = new byte[fileNameLength]; - readInput(fileName, 0, fileName.length); - - // verify header checksum - int headerChecksumValue = readBigEndianInt(in); - - Checksum headerChecksum = crc32Header ? new CRC32() : new Adler32(); - headerChecksum.update(header, 0, header.length); - headerChecksum.update(fileName, 0, fileName.length); - if (headerChecksumValue != (int) headerChecksum.getValue()) { - throw new IOException("Invalid header checksum"); - } - } - - @Override - public int read() - throws IOException - { - if (finished) { - return -1; - } - - while (uncompressedOffset >= uncompressedLength) { - int compressedLength = bufferCompressedData(); - if (finished) { - return -1; - } - - decompress(compressedLength, uncompressedChunk, 0, uncompressedChunk.length); - } - return uncompressedChunk[uncompressedOffset++] & 0xFF; - } - - @Override - public int read(byte[] output, int offset, int length) - throws IOException - { - if (finished) { - return -1; - } - - while (uncompressedOffset >= uncompressedLength) { - int compressedLength = bufferCompressedData(); - if (finished) { - return -1; - } - - // favor writing directly to user buffer to avoid extra copy - if (length >= uncompressedLength) { - decompress(compressedLength, output, offset, length); - uncompressedOffset = uncompressedLength; - return uncompressedLength; - } - - decompress(compressedLength, uncompressedChunk, 0, uncompressedChunk.length); - } - int size = Math.min(length, uncompressedLength - uncompressedOffset); - System.arraycopy(uncompressedChunk, uncompressedOffset, output, offset, size); - uncompressedOffset += size; - return size; - } - - @Override - public void resetState() - throws IOException - { - uncompressedLength = 0; - uncompressedOffset = 0; - finished = false; - } - - private int bufferCompressedData() - throws IOException - { - uncompressedOffset = 0; - uncompressedLength = readBigEndianInt(in); - if (uncompressedLength == -1) { - // LZOP file MUST end with uncompressedLength == 0 - throw new EOFException("encountered EOF while reading block data"); - } - if (uncompressedLength == 0) { - finished = true; - return -1; - } - - int compressedLength = readBigEndianInt(in); - if (compressedLength == -1) { - throw new EOFException("encountered EOF while reading block data"); - } - - skipChecksums(compressedLength < uncompressedLength); - - return compressedLength; - } - - private void skipChecksums(boolean compressed) - throws IOException - { - if (adler32Decompressed) { - readBigEndianInt(in); - } - if (crc32Decompressed) { - readBigEndianInt(in); - } - if (compressed && adler32Compressed) { - readBigEndianInt(in); - } - if (compressed && crc32Compressed) { - readBigEndianInt(in); - } - } - - private void decompress(int compressedLength, byte[] output, int outputOffset, int outputLength) - throws IOException - { - if (uncompressedLength == compressedLength) { - readInput(output, outputOffset, compressedLength); - } - else { - if (compressed.length < compressedLength) { - // over allocate buffer which makes decompression easier - compressed = new byte[compressedLength + SIZE_OF_LONG]; - } - readInput(compressed, 0, compressedLength); - int actualUncompressedLength = decompressor.decompress(compressed, 0, compressedLength, output, outputOffset, outputLength); - if (actualUncompressedLength != uncompressedLength) { - throw new IOException("Decompressor did not decompress the entire block"); - } - } - } - - private void readInput(byte[] buffer, int offset, int length) - throws IOException - { - while (length > 0) { - int size = in.read(buffer, offset, length); - if (size == -1) { - throw new EOFException("encountered EOF while reading block data"); - } - offset += size; - length -= size; - } - } - - private static int readBigEndianShort(InputStream in) - throws IOException - { - int b1 = in.read(); - if (b1 < 0) { - return -1; - } - - int b2 = in.read(); - // If second byte is negative, the stream it truncated - if ((b2) < 0) { - throw new IOException("Stream is truncated"); - } - return (b1 << 8) + (b2); - } - - private static int readBigEndianInt(InputStream in) - throws IOException - { - int b1 = in.read(); - if (b1 < 0) { - return -1; - } - int b2 = in.read(); - int b3 = in.read(); - int b4 = in.read(); - - // If any of the other bits are negative, the stream it truncated - if ((b2 | b3 | b4) < 0) { - throw new IOException("Stream is truncated"); - } - return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4)); - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopOutputStream.java deleted file mode 100644 index 2ca361b743737..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopOutputStream.java +++ /dev/null @@ -1,156 +0,0 @@ -package org.apache.hadoop.io.compress.lzo; - -import io.airlift.compress.lzo.LzoCompressor; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.zip.Adler32; -import org.apache.hadoop.io.compress.CompressionOutputStream; - -public class LzopOutputStream extends CompressionOutputStream -{ - static final int SIZE_OF_LONG = 8; - private static final int LZOP_FILE_VERSION = 0x1010; - private static final int LZOP_FORMAT_VERSION = 0x0940; - private static final int LZO_FORMAT_VERSION = 0x2050; - private static final int LEVEL = 5; - static final byte[] LZOP_MAGIC = new byte[] {(byte) 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a}; - static final byte LZO_1X_VARIANT = 1; - - private final io.airlift.compress.lzo.LzoCompressor compressor = new LzoCompressor(); - - private final byte[] inputBuffer; - private final int inputMaxSize; - private int inputOffset; - - private final byte[] outputBuffer; - - public LzopOutputStream(OutputStream out, int bufferSize) - throws IOException - { - super(out); - inputBuffer = new byte[bufferSize]; - // leave extra space free at end of buffers to make compression (slightly) faster - inputMaxSize = inputBuffer.length - compressionOverhead(bufferSize); - outputBuffer = new byte[compressor.maxCompressedLength(inputMaxSize) + SIZE_OF_LONG]; - - out.write(LZOP_MAGIC); - - ByteArrayOutputStream headerOut = new ByteArrayOutputStream(25); - DataOutputStream headerDataOut = new DataOutputStream(headerOut); - headerDataOut.writeShort(LZOP_FILE_VERSION); - headerDataOut.writeShort(LZO_FORMAT_VERSION); - headerDataOut.writeShort(LZOP_FORMAT_VERSION); - headerDataOut.writeByte(LZO_1X_VARIANT); - headerDataOut.writeByte(LEVEL); - - // flags (none) - headerDataOut.writeInt(0); - // file mode (non-executable regular file) - headerDataOut.writeInt(0x81a4); - // modified time (in seconds from epoch) - headerDataOut.writeInt((int) (System.currentTimeMillis() / 1000)); - // time zone modifier for above, which is UTC so 0 - headerDataOut.writeInt(0); - // file name length (none) - headerDataOut.writeByte(0); - - byte[] header = headerOut.toByteArray(); - out.write(header); - - Adler32 headerChecksum = new Adler32(); - headerChecksum.update(header); - writeBigEndianInt((int) headerChecksum.getValue()); - } - - @Override - public void write(int b) - throws IOException - { - inputBuffer[inputOffset++] = (byte) b; - if (inputOffset >= inputMaxSize) { - writeNextChunk(inputBuffer, 0, this.inputOffset); - } - } - - @Override - public void write(byte[] buffer, int offset, int length) - throws IOException - { - while (length > 0) { - int chunkSize = Math.min(length, inputMaxSize - inputOffset); - // favor writing directly from the user buffer to avoid the extra copy - if (inputOffset == 0 && length > inputMaxSize) { - writeNextChunk(buffer, offset, chunkSize); - } - else { - System.arraycopy(buffer, offset, inputBuffer, inputOffset, chunkSize); - inputOffset += chunkSize; - - if (inputOffset >= inputMaxSize) { - writeNextChunk(inputBuffer, 0, inputOffset); - } - } - length -= chunkSize; - offset += chunkSize; - } - } - - @Override - public void finish() - throws IOException - { - if (inputOffset > 0) { - writeNextChunk(inputBuffer, 0, this.inputOffset); - } - } - - @Override - public void close() - throws IOException - { - finish(); - writeBigEndianInt(0); - super.close(); - } - - @Override - public void resetState() - throws IOException - { - finish(); - } - - private void writeNextChunk(byte[] input, int inputOffset, int inputLength) - throws IOException - { - int compressedSize = compressor.compress(input, inputOffset, inputLength, outputBuffer, 0, outputBuffer.length); - - writeBigEndianInt(inputLength); - if (compressedSize < inputLength) { - writeBigEndianInt(compressedSize); - out.write(outputBuffer, 0, compressedSize); - } - else { - writeBigEndianInt(inputLength); - out.write(input, inputOffset, inputLength); - } - - this.inputOffset = 0; - } - - private void writeBigEndianInt(int value) - throws IOException - { - out.write(value >>> 24); - out.write(value >>> 16); - out.write(value >>> 8); - out.write(value); - } - - private static int compressionOverhead(int size) - { - return (size / 16) + 64 + 3; - } -}