diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 9bb70ac76a06a..edb5c516e3662 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -362,6 +362,10 @@
wildfly-openssl-java
provided
+
+ io.airlift
+ aircompressor
+
diff --git a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java
new file mode 100644
index 0000000000000..c47b0e24fde2c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzoCodec.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hadoop.compression.lzo;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class creates lzo compressors/decompressors that are bridged
+ * to `org.apache.hadoop.io.compress.LzoCodec` from `com.hadoop.compression.lzo.LzoCodec`
+ */
+public class LzoCodec extends org.apache.hadoop.io.compress.LzoCodec {
+ private static final Logger LOG = LoggerFactory.getLogger(LzoCodec.class.getName());
+ private static final String gplLzoCodec = LzoCodec.class.getName();
+ private static final String hadoopLzoCodec = org.apache.hadoop.io.compress.LzoCodec.class.getName();
+ private static AtomicBoolean warned = new AtomicBoolean(false);
+
+ static {
+ LOG.info("Bridging " + gplLzoCodec + " to " + hadoopLzoCodec + ".");
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor) throws IOException {
+ if (warned.compareAndSet(false, true)) {
+ LOG.warn("{} is deprecated. You should use {} instead to generate LZO compressed data.",
+ gplLzoCodec, hadoopLzoCodec);
+ }
+ return super.createOutputStream(out, compressor);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java
new file mode 100644
index 0000000000000..12581d5637733
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/com/hadoop/compression/lzo/LzopCodec.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hadoop.compression.lzo;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class creates lzop compressors/decompressors that are bridged
+ * to `org.apache.hadoop.io.compress.LzopCodec` from `com.hadoop.compression.lzo.LzopCodec`
+ */
+public class LzopCodec extends org.apache.hadoop.io.compress.LzopCodec {
+ private static final Logger LOG = LoggerFactory.getLogger(LzopCodec.class.getName());
+ private static final String gplLzopCodec = LzopCodec.class.getName();
+ private static final String hadoopLzopCodec = org.apache.hadoop.io.compress.LzopCodec.class.getName();
+ private static AtomicBoolean warned = new AtomicBoolean(false);
+
+ static {
+ LOG.info("Bridging " + gplLzopCodec + " to " + hadoopLzopCodec + ".");
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor) throws IOException {
+ if (warned.compareAndSet(false, true)) {
+ LOG.warn("{} is deprecated. You should use {} instead to generate LZOP compressed data.",
+ gplLzopCodec, hadoopLzopCodec);
+ }
+ return super.createOutputStream(out, compressor);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index c08af395ad2f9..6a7c500bcdcd8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -142,7 +142,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Default value for IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY */
public static final int IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT =
- 64*1024;
+ 256 * 1024;
/** Internal buffer size for Snappy compressor/decompressors */
public static final String IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY =
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java
index 96410a18ebcb5..b29d79c175fc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java
@@ -65,4 +65,14 @@ private CodecConstants() {
* Default extension for {@link org.apache.hadoop.io.compress.ZStandardCodec}.
*/
public static final String ZSTANDARD_CODEC_EXTENSION = ".zst";
+
+ /**
+ * Default extension for {@link org.apache.hadoop.io.compress.LzoCodec}.
+ */
+ public static final String LZO_CODEC_EXTENSION = ".lzo_deflate";
+
+ /**
+ * Default extension for {@link org.apache.hadoop.io.compress.LzopCodec}.
+ */
+ public static final String LZOP_CODEC_EXTENSION = ".lzo";
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java
new file mode 100644
index 0000000000000..6f4c696d169ae
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.lzo.LzoCompressor;
+import org.apache.hadoop.io.compress.lzo.LzoDecompressor;
+
+/**
+ * This class creates lzo compressors/decompressors.
+ */
+public class LzoCodec implements Configurable, CompressionCodec {
+ Configuration conf;
+
+ /**
+ * Set the configuration to be used by this object.
+ *
+ * @param conf the configuration object.
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Return the configuration used by this object.
+ *
+ * @return the configuration object used by this objec.
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream}.
+ *
+ * @param out the location for the final output stream
+ * @return a stream the user can write uncompressed data to have it compressed
+ * @throws IOException
+ */
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ return CompressionCodec.Util.
+ createOutputStreamWithCodecPool(this, conf, out);
+ }
+
+ /**
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream} with the given {@link Compressor}.
+ *
+ * @param out the location for the final output stream
+ * @param compressor compressor to use
+ * @return a stream the user can write uncompressed data to have it compressed
+ * @throws IOException
+ */
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor) throws IOException {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec();
+ Configuration lzoConf = new Configuration(this.conf);
+ lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize);
+ codec.setConf(lzoConf);
+ return codec.createOutputStream(out);
+ }
+
+ /**
+ * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of compressor needed by this codec.
+ */
+ @Override
+ public Class extends Compressor> getCompressorType() {
+ return LzoCompressor.class;
+ }
+
+ /**
+ * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new compressor for use by this codec
+ */
+ @Override
+ public Compressor createCompressor() {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ return new LzoCompressor(bufferSize);
+ }
+
+ /**
+ * Create a {@link CompressionInputStream} that will read from the given
+ * input stream.
+ *
+ * @param in the stream to read compressed bytes from
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
+ @Override
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return CompressionCodec.Util.
+ createInputStreamWithCodecPool(this, conf, in);
+ }
+
+ /**
+ * Create a {@link CompressionInputStream} that will read from the given
+ * {@link InputStream} with the given {@link Decompressor}.
+ *
+ * @param in the stream to read compressed bytes from
+ * @param decompressor decompressor to use
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
+ @Override
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor) throws IOException {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec();
+ Configuration lzoConf = new Configuration(this.conf);
+ lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize);
+ codec.setConf(lzoConf);
+ return codec.createInputStream(in);
+ }
+
+ /**
+ * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of decompressor needed by this codec.
+ */
+ @Override
+ public Class extends Decompressor> getDecompressorType() {
+ return LzoDecompressor.class;
+ }
+
+ /**
+ * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new decompressor for use by this codec
+ */
+ @Override
+ public Decompressor createDecompressor() {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ return new LzoDecompressor(bufferSize);
+ }
+
+ /**
+ * Get the default filename extension for this kind of compression.
+ *
+ * @return .lzo_deflate.
+ */
+ @Override
+ public String getDefaultExtension() {
+ return CodecConstants.LZO_CODEC_EXTENSION;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java
new file mode 100644
index 0000000000000..a61df517ed2dc
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.lzo.*;
+
+/**
+ * This class creates lzop compressors/decompressors.
+ */
+public class LzopCodec implements Configurable, CompressionCodec {
+ private Configuration conf;
+
+ /**
+ * Set the configuration to be used by this object.
+ *
+ * @param conf the configuration object.
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Return the configuration used by this object.
+ *
+ * @return the configuration object used by this objec.
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream}.
+ *
+ * @param out the location for the final output stream
+ * @return a stream the user can write uncompressed data to have it compressed
+ * @throws IOException
+ */
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ return CompressionCodec.Util.
+ createOutputStreamWithCodecPool(this, conf, out);
+ }
+
+ /**
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream} with the given {@link Compressor}.
+ *
+ * @param out the location for the final output stream
+ * @param compressor compressor to use
+ * @return a stream the user can write uncompressed data to have it compressed
+ * @throws IOException
+ */
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor) throws IOException {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ io.airlift.compress.lzo.LzopCodec codec = new io.airlift.compress.lzo.LzopCodec();
+ Configuration lzoConf = new Configuration(this.conf);
+ lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize);
+ codec.setConf(lzoConf);
+ return codec.createOutputStream(out);
+ }
+
+ /**
+ * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of compressor needed by this codec.
+ */
+ @Override
+ public Class extends Compressor> getCompressorType() {
+ return LzopCompressor.class;
+ }
+
+ /**
+ * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new compressor for use by this codec
+ */
+ @Override
+ public Compressor createCompressor() {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ return new LzopCompressor(bufferSize);
+ }
+
+ /**
+ * Create a {@link CompressionInputStream} that will read from the given
+ * input stream.
+ *
+ * @param in the stream to read compressed bytes from
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
+ @Override
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return CompressionCodec.Util.
+ createInputStreamWithCodecPool(this, conf, in);
+ }
+
+ /**
+ * Create a {@link CompressionInputStream} that will read from the given
+ * {@link InputStream} with the given {@link Decompressor}.
+ *
+ * @param in the stream to read compressed bytes from
+ * @param decompressor decompressor to use
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
+ @Override
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor) throws IOException {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ io.airlift.compress.lzo.LzopCodec codec = new io.airlift.compress.lzo.LzopCodec();
+ Configuration lzoConf = new Configuration(this.conf);
+ lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize);
+ codec.setConf(lzoConf);
+ return codec.createInputStream(in);
+ }
+
+ /**
+ * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of decompressor needed by this codec.
+ */
+ @Override
+ public Class extends Decompressor> getDecompressorType() {
+ return LzopDecompressor.class;
+ }
+
+ /**
+ * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new decompressor for use by this codec
+ */
+ @Override
+ public Decompressor createDecompressor() {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ return new LzopDecompressor(bufferSize);
+ }
+
+ /**
+ * Get the default filename extension for this kind of compression.
+ *
+ * @return .lzo.
+ */
+ @Override
+ public String getDefaultExtension() {
+ return CodecConstants.LZO_CODEC_EXTENSION;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
new file mode 100644
index 0000000000000..43db878508871
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lzo;
+
+import java.io.IOException;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LzoCompressor implements Compressor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(LzoCompressor.class.getName());
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+ private int directBufferSize;
+ private Buffer compressedDirectBuf = null;
+ private int uncompressedDirectBufLen;
+ private Buffer uncompressedDirectBuf = null;
+ private byte[] userBuf = null;
+ private int userBufOff = 0, userBufLen = 0;
+ private boolean finish, finished;
+
+ private long bytesRead = 0L;
+ private long bytesWritten = 0L;
+
+ /**
+ * Creates a new compressor.
+ *
+ * @param directBufferSize size of the direct buffer to be used.
+ */
+ public LzoCompressor(int directBufferSize) {
+ this.directBufferSize = directBufferSize;
+
+ uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ compressedDirectBuf.position(directBufferSize);
+ }
+
+ /**
+ * Creates a new compressor with the default buffer size.
+ */
+ public LzoCompressor() {
+ this(DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ /**
+ * Sets input data for compression.
+ * This should be called whenever #needsInput() returns
+ * true indicating that more input data is required.
+ *
+ * @param b Input data
+ * @param off Start offset
+ * @param len Length
+ */
+ @Override
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ finished = false;
+
+ if (len > uncompressedDirectBuf.remaining()) {
+ // save data; now !needsInput
+ this.userBuf = b;
+ this.userBufOff = off;
+ this.userBufLen = len;
+ } else {
+ ((ByteBuffer) uncompressedDirectBuf).put(b, off, len);
+ uncompressedDirectBufLen = uncompressedDirectBuf.position();
+ }
+
+ bytesRead += len;
+ }
+
+ /**
+ * If a write would exceed the capacity of the direct buffers, it is set
+ * aside to be loaded by this function while the compressed data are
+ * consumed.
+ */
+ synchronized void setInputFromSavedData() {
+ if (0 >= userBufLen) {
+ return;
+ }
+ finished = false;
+
+ uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize);
+ ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff,
+ uncompressedDirectBufLen);
+
+ // Note how much data is being fed to lz4
+ userBufOff += uncompressedDirectBufLen;
+ userBufLen -= uncompressedDirectBufLen;
+ }
+
+ /**
+ * Does nothing.
+ */
+ @Override
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ // do nothing
+ }
+
+ /**
+ * Returns true if the input data buffer is empty and
+ * #setInput() should be called to provide more input.
+ *
+ * @return true if the input data buffer is empty and
+ * #setInput() should be called in order to provide more input.
+ */
+ @Override
+ public synchronized boolean needsInput() {
+ return !(compressedDirectBuf.remaining() > 0
+ || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0);
+ }
+
+ /**
+ * When called, indicates that compression should end
+ * with the current contents of the input buffer.
+ */
+ @Override
+ public synchronized void finish() {
+ finish = true;
+ }
+
+ /**
+ * Returns true if the end of the compressed
+ * data output stream has been reached.
+ *
+ * @return true if the end of the compressed
+ * data output stream has been reached.
+ */
+ @Override
+ public synchronized boolean finished() {
+ // Check if all uncompressed data has been consumed
+ return (finish && finished && compressedDirectBuf.remaining() == 0);
+ }
+
+ /**
+ * Fills specified buffer with compressed data. Returns actual number
+ * of bytes of compressed data. A return value of 0 indicates that
+ * needsInput() should be called in order to determine if more input
+ * data is required.
+ *
+ * @param b Buffer for the compressed data
+ * @param off Start offset of the data
+ * @param len Size of the buffer
+ * @return The actual number of bytes of compressed data.
+ */
+ @Override
+ public synchronized int compress(byte[] b, int off, int len)
+ throws IOException {
+ throw new UnsupportedOperationException("LZO block compressor is not supported");
+ }
+
+ /**
+ * Resets compressor so that a new set of input data can be processed.
+ */
+ @Override
+ public synchronized void reset() {
+ finish = false;
+ finished = false;
+ uncompressedDirectBuf.clear();
+ uncompressedDirectBufLen = 0;
+ compressedDirectBuf.clear();
+ compressedDirectBuf.limit(0);
+ userBufOff = userBufLen = 0;
+ bytesRead = bytesWritten = 0L;
+ }
+
+ /**
+ * Prepare the compressor to be used in a new stream with settings defined in
+ * the given Configuration
+ *
+ * @param conf Configuration from which new setting are fetched
+ */
+ @Override
+ public synchronized void reinit(Configuration conf) {
+ reset();
+ }
+
+ /**
+ * Return number of bytes given to this compressor since last reset.
+ */
+ @Override
+ public synchronized long getBytesRead() {
+ return bytesRead;
+ }
+
+ /**
+ * Return number of bytes consumed by callers of compress since last reset.
+ */
+ @Override
+ public synchronized long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ /**
+ * Closes the compressor and discards any unprocessed input.
+ */
+ @Override
+ public synchronized void end() {
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
new file mode 100644
index 0000000000000..60310418feccb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lzo;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LzoDecompressor implements Decompressor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(LzoDecompressor.class.getName());
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+ private int directBufferSize;
+ private Buffer compressedDirectBuf = null;
+ private int compressedDirectBufLen;
+ private Buffer uncompressedDirectBuf = null;
+ private byte[] userBuf = null;
+ private int userBufOff = 0, userBufLen = 0;
+ private boolean finished;
+
+ /**
+ * Creates a new compressor.
+ *
+ * @param directBufferSize size of the direct buffer to be used.
+ */
+ public LzoDecompressor(int directBufferSize) {
+ this.directBufferSize = directBufferSize;
+
+ compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ }
+
+ /**
+ * Creates a new decompressor with the default buffer size.
+ */
+ public LzoDecompressor() {
+ this(DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ /**
+ * Sets input data for decompression.
+ * This should be called if and only if {@link #needsInput()} returns
+ * true indicating that more input data is required.
+ * (Both native and non-native versions of various Decompressors require
+ * that the data passed in via b[] remain unmodified until
+ * the caller is explicitly notified--via {@link #needsInput()}--that the
+ * buffer may be safely modified. With this requirement, an extra
+ * buffer-copy can be avoided.)
+ *
+ * @param b Input data
+ * @param off Start offset
+ * @param len Length
+ */
+ @Override
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ this.userBuf = b;
+ this.userBufOff = off;
+ this.userBufLen = len;
+
+ setInputFromSavedData();
+
+ // Reinitialize lz4's output direct-buffer
+ uncompressedDirectBuf.limit(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ }
+
+ /**
+ * If a write would exceed the capacity of the direct buffers, it is set
+ * aside to be loaded by this function while the compressed data are
+ * consumed.
+ */
+ synchronized void setInputFromSavedData() {
+ compressedDirectBufLen = Math.min(userBufLen, directBufferSize);
+
+ // Reinitialize lz4's input direct buffer
+ compressedDirectBuf.rewind();
+ ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff,
+ compressedDirectBufLen);
+
+ // Note how much data is being fed to lz4
+ userBufOff += compressedDirectBufLen;
+ userBufLen -= compressedDirectBufLen;
+ }
+
+ /**
+ * Does nothing.
+ */
+ @Override
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ // do nothing
+ }
+
+ /**
+ * Returns true if the input data buffer is empty and
+ * {@link #setInput(byte[], int, int)} should be called to
+ * provide more input.
+ *
+ * @return true if the input data buffer is empty and
+ * {@link #setInput(byte[], int, int)} should be called in
+ * order to provide more input.
+ */
+ @Override
+ public synchronized boolean needsInput() {
+ // Consume remaining compressed data?
+ if (uncompressedDirectBuf.remaining() > 0) {
+ return false;
+ }
+
+ // Check if lz4 has consumed all input
+ if (compressedDirectBufLen <= 0) {
+ // Check if we have consumed all user-input
+ if (userBufLen <= 0) {
+ return true;
+ } else {
+ setInputFromSavedData();
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns false.
+ *
+ * @return false.
+ */
+ @Override
+ public synchronized boolean needsDictionary() {
+ return false;
+ }
+
+ /**
+ * Returns true if the end of the decompressed
+ * data output stream has been reached.
+ *
+ * @return true if the end of the decompressed
+ * data output stream has been reached.
+ */
+ @Override
+ public synchronized boolean finished() {
+ return (finished && uncompressedDirectBuf.remaining() == 0);
+ }
+
+ /**
+ * Fills specified buffer with uncompressed data. Returns actual number
+ * of bytes of uncompressed data. A return value of 0 indicates that
+ * {@link #needsInput()} should be called in order to determine if more
+ * input data is required.
+ *
+ * @param b Buffer for the compressed data
+ * @param off Start offset of the data
+ * @param len Size of the buffer
+ * @return The actual number of bytes of compressed data.
+ * @throws IOException
+ */
+ @Override
+ public synchronized int decompress(byte[] b, int off, int len)
+ throws IOException
+ {
+ throw new UnsupportedOperationException("LZO block decompressor is not supported");
+ }
+
+ /**
+ * Returns 0.
+ *
+ * @return 0.
+ */
+ @Override
+ public synchronized int getRemaining() {
+ // Never use this function in BlockDecompressorStream.
+ return 0;
+ }
+
+ @Override
+ public synchronized void reset() {
+ finished = false;
+ compressedDirectBufLen = 0;
+ uncompressedDirectBuf.limit(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ userBufOff = userBufLen = 0;
+ }
+
+ /**
+ * Resets decompressor and input and output buffers so that a new set of
+ * input data can be processed.
+ */
+ @Override
+ public synchronized void end() {
+ // do nothing
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java
new file mode 100644
index 0000000000000..d13556143c180
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lzo;
+
+import java.io.IOException;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LzopCompressor implements Compressor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(LzopCompressor.class.getName());
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+ private int directBufferSize;
+ private Buffer compressedDirectBuf = null;
+ private int uncompressedDirectBufLen;
+ private Buffer uncompressedDirectBuf = null;
+ private byte[] userBuf = null;
+ private int userBufOff = 0, userBufLen = 0;
+ private boolean finish, finished;
+
+ private long bytesRead = 0L;
+ private long bytesWritten = 0L;
+
+ /**
+ * Creates a new compressor.
+ *
+ * @param directBufferSize size of the direct buffer to be used.
+ */
+ public LzopCompressor(int directBufferSize) {
+ this.directBufferSize = directBufferSize;
+
+ uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ compressedDirectBuf.position(directBufferSize);
+ }
+
+ /**
+ * Creates a new compressor with the default buffer size.
+ */
+ public LzopCompressor() {
+ this(DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ /**
+ * Sets input data for compression.
+ * This should be called whenever #needsInput() returns
+ * true indicating that more input data is required.
+ *
+ * @param b Input data
+ * @param off Start offset
+ * @param len Length
+ */
+ @Override
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ finished = false;
+
+ if (len > uncompressedDirectBuf.remaining()) {
+ // save data; now !needsInput
+ this.userBuf = b;
+ this.userBufOff = off;
+ this.userBufLen = len;
+ } else {
+ ((ByteBuffer) uncompressedDirectBuf).put(b, off, len);
+ uncompressedDirectBufLen = uncompressedDirectBuf.position();
+ }
+
+ bytesRead += len;
+ }
+
+ /**
+ * If a write would exceed the capacity of the direct buffers, it is set
+ * aside to be loaded by this function while the compressed data are
+ * consumed.
+ */
+ synchronized void setInputFromSavedData() {
+ if (0 >= userBufLen) {
+ return;
+ }
+ finished = false;
+
+ uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize);
+ ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff,
+ uncompressedDirectBufLen);
+
+ // Note how much data is being fed to lz4
+ userBufOff += uncompressedDirectBufLen;
+ userBufLen -= uncompressedDirectBufLen;
+ }
+
+ /**
+ * Does nothing.
+ */
+ @Override
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ // do nothing
+ }
+
+ /**
+ * Returns true if the input data buffer is empty and
+ * #setInput() should be called to provide more input.
+ *
+ * @return true if the input data buffer is empty and
+ * #setInput() should be called in order to provide more input.
+ */
+ @Override
+ public synchronized boolean needsInput() {
+ return !(compressedDirectBuf.remaining() > 0
+ || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0);
+ }
+
+ /**
+ * When called, indicates that compression should end
+ * with the current contents of the input buffer.
+ */
+ @Override
+ public synchronized void finish() {
+ finish = true;
+ }
+
+ /**
+ * Returns true if the end of the compressed
+ * data output stream has been reached.
+ *
+ * @return true if the end of the compressed
+ * data output stream has been reached.
+ */
+ @Override
+ public synchronized boolean finished() {
+ // Check if all uncompressed data has been consumed
+ return (finish && finished && compressedDirectBuf.remaining() == 0);
+ }
+
+ /**
+ * Fills specified buffer with compressed data. Returns actual number
+ * of bytes of compressed data. A return value of 0 indicates that
+ * needsInput() should be called in order to determine if more input
+ * data is required.
+ *
+ * @param b Buffer for the compressed data
+ * @param off Start offset of the data
+ * @param len Size of the buffer
+ * @return The actual number of bytes of compressed data.
+ */
+ @Override
+ public synchronized int compress(byte[] b, int off, int len)
+ throws IOException {
+ throw new UnsupportedOperationException("LZO block compressor is not supported");
+ }
+
+ /**
+ * Resets compressor so that a new set of input data can be processed.
+ */
+ @Override
+ public synchronized void reset() {
+ finish = false;
+ finished = false;
+ uncompressedDirectBuf.clear();
+ uncompressedDirectBufLen = 0;
+ compressedDirectBuf.clear();
+ compressedDirectBuf.limit(0);
+ userBufOff = userBufLen = 0;
+ bytesRead = bytesWritten = 0L;
+ }
+
+ /**
+ * Prepare the compressor to be used in a new stream with settings defined in
+ * the given Configuration
+ *
+ * @param conf Configuration from which new setting are fetched
+ */
+ @Override
+ public synchronized void reinit(Configuration conf) {
+ reset();
+ }
+
+ /**
+ * Return number of bytes given to this compressor since last reset.
+ */
+ @Override
+ public synchronized long getBytesRead() {
+ return bytesRead;
+ }
+
+ /**
+ * Return number of bytes consumed by callers of compress since last reset.
+ */
+ @Override
+ public synchronized long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ /**
+ * Closes the compressor and discards any unprocessed input.
+ */
+ @Override
+ public synchronized void end() {
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java
new file mode 100644
index 0000000000000..93a428a451e35
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lzo;
+
+import java.io.IOException;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LzopDecompressor implements Decompressor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(Lz4Compressor.class.getName());
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+ private int directBufferSize;
+ private Buffer compressedDirectBuf = null;
+ private int compressedDirectBufLen;
+ private Buffer uncompressedDirectBuf = null;
+ private byte[] userBuf = null;
+ private int userBufOff = 0, userBufLen = 0;
+ private boolean finished;
+
+ /**
+ * Creates a new compressor.
+ *
+ * @param directBufferSize size of the direct buffer to be used.
+ */
+ public LzopDecompressor(int directBufferSize) {
+ this.directBufferSize = directBufferSize;
+
+ compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ }
+
+ /**
+ * Creates a new decompressor with the default buffer size.
+ */
+ public LzopDecompressor() {
+ this(DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ /**
+ * Sets input data for decompression.
+ * This should be called if and only if {@link #needsInput()} returns
+ * true indicating that more input data is required.
+ * (Both native and non-native versions of various Decompressors require
+ * that the data passed in via b[] remain unmodified until
+ * the caller is explicitly notified--via {@link #needsInput()}--that the
+ * buffer may be safely modified. With this requirement, an extra
+ * buffer-copy can be avoided.)
+ *
+ * @param b Input data
+ * @param off Start offset
+ * @param len Length
+ */
+ @Override
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ this.userBuf = b;
+ this.userBufOff = off;
+ this.userBufLen = len;
+
+ setInputFromSavedData();
+
+ // Reinitialize lz4's output direct-buffer
+ uncompressedDirectBuf.limit(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ }
+
+ /**
+ * If a write would exceed the capacity of the direct buffers, it is set
+ * aside to be loaded by this function while the compressed data are
+ * consumed.
+ */
+ synchronized void setInputFromSavedData() {
+ compressedDirectBufLen = Math.min(userBufLen, directBufferSize);
+
+ // Reinitialize lz4's input direct buffer
+ compressedDirectBuf.rewind();
+ ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff,
+ compressedDirectBufLen);
+
+ // Note how much data is being fed to lz4
+ userBufOff += compressedDirectBufLen;
+ userBufLen -= compressedDirectBufLen;
+ }
+
+ /**
+ * Does nothing.
+ */
+ @Override
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ // do nothing
+ }
+
+ /**
+ * Returns true if the input data buffer is empty and
+ * {@link #setInput(byte[], int, int)} should be called to
+ * provide more input.
+ *
+ * @return true if the input data buffer is empty and
+ * {@link #setInput(byte[], int, int)} should be called in
+ * order to provide more input.
+ */
+ @Override
+ public synchronized boolean needsInput() {
+ // Consume remaining compressed data?
+ if (uncompressedDirectBuf.remaining() > 0) {
+ return false;
+ }
+
+ // Check if lz4 has consumed all input
+ if (compressedDirectBufLen <= 0) {
+ // Check if we have consumed all user-input
+ if (userBufLen <= 0) {
+ return true;
+ } else {
+ setInputFromSavedData();
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns false.
+ *
+ * @return false.
+ */
+ @Override
+ public synchronized boolean needsDictionary() {
+ return false;
+ }
+
+ /**
+ * Returns true if the end of the decompressed
+ * data output stream has been reached.
+ *
+ * @return true if the end of the decompressed
+ * data output stream has been reached.
+ */
+ @Override
+ public synchronized boolean finished() {
+ return (finished && uncompressedDirectBuf.remaining() == 0);
+ }
+
+ /**
+ * Fills specified buffer with uncompressed data. Returns actual number
+ * of bytes of uncompressed data. A return value of 0 indicates that
+ * {@link #needsInput()} should be called in order to determine if more
+ * input data is required.
+ *
+ * @param b Buffer for the compressed data
+ * @param off Start offset of the data
+ * @param len Size of the buffer
+ * @return The actual number of bytes of compressed data.
+ * @throws IOException
+ */
+ @Override
+ public synchronized int decompress(byte[] b, int off, int len)
+ throws IOException {
+ throw new UnsupportedOperationException("LZO block decompressor is not supported");
+ }
+
+ /**
+ * Returns 0.
+ *
+ * @return 0.
+ */
+ @Override
+ public synchronized int getRemaining() {
+ // Never use this function in BlockDecompressorStream.
+ return 0;
+ }
+
+ @Override
+ public synchronized void reset() {
+ finished = false;
+ compressedDirectBufLen = 0;
+ uncompressedDirectBuf.limit(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ userBufOff = userBufLen = 0;
+ }
+
+ /**
+ * Resets decompressor and input and output buffers so that a new set of
+ * input data can be processed.
+ */
+ @Override
+ public synchronized void end() {
+ // do nothing
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java
new file mode 100644
index 0000000000000..19effdd39c11a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.io.compress.lzo;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
index 94ff7a88493c7..d35956fb2bfd4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
@@ -176,6 +176,26 @@ public void testGzipCodecWithParam() throws IOException {
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
}
+ @Test
+ public void testLzoCodec() throws IOException {
+ codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzoCodec");
+ codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzoCodec");
+
+ // Testing `com.hadoop.compression.lzo.LzoCodec` that is bridged to `org.apache.hadoop.io.compress.LzoCodec`
+ codecTest(conf, seed, 0, "com.hadoop.compression.lzo.LzoCodec");
+ codecTest(conf, seed, count, "com.hadoop.compression.lzo.LzoCodec");
+ }
+
+ @Test
+ public void testLzopCodec() throws IOException {
+ codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzopCodec");
+ codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzopCodec");
+
+ // Testing `com.hadoop.compression.lzo.LzopCodec` that is bridged to `org.apache.hadoop.io.compress.LzopCodec`
+ codecTest(conf, seed, 0, "com.hadoop.compression.lzo.LzopCodec");
+ codecTest(conf, seed, count, "com.hadoop.compression.lzo.LzopCodec");
+ }
+
private static void codecTest(Configuration conf, int seed, int count,
String codecClass)
throws IOException {
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 4e819cd896a2c..901b872587c76 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -141,6 +141,7 @@
3.2.4
3.10.6.Final
4.1.50.Final
+ 0.16
0.5.1
@@ -1727,6 +1728,11 @@
jna
${jna.version}
+
+ io.airlift
+ aircompressor
+ ${aircompressor.version}
+