Skip to content

Commit 34aa613

Browse files
authored
HADOOP-17292. Using lz4-java in Lz4Codec (#2350)
Contributed by Liang-Chi Hsieh.
1 parent 0d3155a commit 34aa613

File tree

20 files changed

+132
-2319
lines changed

20 files changed

+132
-2319
lines changed

hadoop-common-project/hadoop-common/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,11 @@
371371
<artifactId>snappy-java</artifactId>
372372
<scope>compile</scope>
373373
</dependency>
374+
<dependency>
375+
<groupId>org.lz4</groupId>
376+
<artifactId>lz4-java</artifactId>
377+
<scope>provided</scope>
378+
</dependency>
374379
</dependencies>
375380

376381
<build>
@@ -577,11 +582,6 @@
577582
<exclude>src/main/native/m4/*</exclude>
578583
<exclude>src/test/empty-file</exclude>
579584
<exclude>src/test/all-tests</exclude>
580-
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.h</exclude>
581-
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c</exclude>
582-
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.h</exclude>
583-
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.c</exclude>
584-
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc_encoder.h</exclude>
585585
<exclude>src/main/native/gtest/**/*</exclude>
586586
<exclude>src/test/resources/test-untar.tgz</exclude>
587587
<exclude>src/test/resources/test.har/_SUCCESS</exclude>

hadoop-common-project/hadoop-common/src/CMakeLists.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,6 @@ configure_file(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
236236
set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
237237
hadoop_add_dual_library(hadoop
238238
main/native/src/exception.c
239-
${SRC}/io/compress/lz4/Lz4Compressor.c
240-
${SRC}/io/compress/lz4/Lz4Decompressor.c
241-
${SRC}/io/compress/lz4/lz4.c
242-
${SRC}/io/compress/lz4/lz4hc.c
243239
${ISAL_SOURCE_FILES}
244240
${ZSTD_SOURCE_FILES}
245241
${OPENSSL_SOURCE_FILES}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,12 @@
2727
import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
2828
import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
2929
import org.apache.hadoop.fs.CommonConfigurationKeys;
30-
import org.apache.hadoop.util.NativeCodeLoader;
3130

3231
/**
3332
* This class creates lz4 compressors/decompressors.
3433
*/
3534
public class Lz4Codec implements Configurable, CompressionCodec {
3635

37-
static {
38-
NativeCodeLoader.isNativeCodeLoaded();
39-
}
40-
4136
Configuration conf;
4237

4338
/**
@@ -60,19 +55,6 @@ public Configuration getConf() {
6055
return conf;
6156
}
6257

63-
/**
64-
* Are the native lz4 libraries loaded &amp; initialized?
65-
*
66-
* @return true if loaded &amp; initialized, otherwise false
67-
*/
68-
public static boolean isNativeCodeLoaded() {
69-
return NativeCodeLoader.isNativeCodeLoaded();
70-
}
71-
72-
public static String getLibraryName() {
73-
return Lz4Compressor.getLibraryName();
74-
}
75-
7658
/**
7759
* Create a {@link CompressionOutputStream} that will write to the given
7860
* {@link OutputStream}.
@@ -101,9 +83,6 @@ public CompressionOutputStream createOutputStream(OutputStream out)
10183
public CompressionOutputStream createOutputStream(OutputStream out,
10284
Compressor compressor)
10385
throws IOException {
104-
if (!isNativeCodeLoaded()) {
105-
throw new RuntimeException("native lz4 library not available");
106-
}
10786
int bufferSize = conf.getInt(
10887
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
10988
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
@@ -121,10 +100,6 @@ public CompressionOutputStream createOutputStream(OutputStream out,
121100
*/
122101
@Override
123102
public Class<? extends Compressor> getCompressorType() {
124-
if (!isNativeCodeLoaded()) {
125-
throw new RuntimeException("native lz4 library not available");
126-
}
127-
128103
return Lz4Compressor.class;
129104
}
130105

@@ -135,9 +110,6 @@ public Class<? extends Compressor> getCompressorType() {
135110
*/
136111
@Override
137112
public Compressor createCompressor() {
138-
if (!isNativeCodeLoaded()) {
139-
throw new RuntimeException("native lz4 library not available");
140-
}
141113
int bufferSize = conf.getInt(
142114
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
143115
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
@@ -175,10 +147,6 @@ public CompressionInputStream createInputStream(InputStream in)
175147
public CompressionInputStream createInputStream(InputStream in,
176148
Decompressor decompressor)
177149
throws IOException {
178-
if (!isNativeCodeLoaded()) {
179-
throw new RuntimeException("native lz4 library not available");
180-
}
181-
182150
return new BlockDecompressorStream(in, decompressor, conf.getInt(
183151
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
184152
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
@@ -191,10 +159,6 @@ public CompressionInputStream createInputStream(InputStream in,
191159
*/
192160
@Override
193161
public Class<? extends Decompressor> getDecompressorType() {
194-
if (!isNativeCodeLoaded()) {
195-
throw new RuntimeException("native lz4 library not available");
196-
}
197-
198162
return Lz4Decompressor.class;
199163
}
200164

@@ -205,9 +169,6 @@ public Class<? extends Decompressor> getDecompressorType() {
205169
*/
206170
@Override
207171
public Decompressor createDecompressor() {
208-
if (!isNativeCodeLoaded()) {
209-
throw new RuntimeException("native lz4 library not available");
210-
}
211172
int bufferSize = conf.getInt(
212173
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
213174
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import java.nio.Buffer;
2323
import java.nio.ByteBuffer;
2424

25+
import net.jpountz.lz4.LZ4Factory;
26+
import net.jpountz.lz4.LZ4Compressor;
27+
2528
import org.apache.hadoop.conf.Configuration;
2629
import org.apache.hadoop.io.compress.Compressor;
27-
import org.apache.hadoop.util.NativeCodeLoader;
2830
import org.slf4j.Logger;
2931
import org.slf4j.LoggerFactory;
3032

@@ -49,22 +51,7 @@ public class Lz4Compressor implements Compressor {
4951
private long bytesRead = 0L;
5052
private long bytesWritten = 0L;
5153

52-
private final boolean useLz4HC;
53-
54-
static {
55-
if (NativeCodeLoader.isNativeCodeLoaded()) {
56-
// Initialize the native library
57-
try {
58-
initIDs();
59-
} catch (Throwable t) {
60-
// Ignore failure to load/initialize lz4
61-
LOG.warn(t.toString());
62-
}
63-
} else {
64-
LOG.error("Cannot load " + Lz4Compressor.class.getName() +
65-
" without native hadoop library!");
66-
}
67-
}
54+
private final LZ4Compressor lz4Compressor;
6855

6956
/**
7057
* Creates a new compressor.
@@ -74,9 +61,21 @@ public class Lz4Compressor implements Compressor {
7461
* which trades CPU for compression ratio.
7562
*/
7663
public Lz4Compressor(int directBufferSize, boolean useLz4HC) {
77-
this.useLz4HC = useLz4HC;
7864
this.directBufferSize = directBufferSize;
7965

66+
try {
67+
LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
68+
if (useLz4HC) {
69+
lz4Compressor = lz4Factory.highCompressor();
70+
} else {
71+
lz4Compressor = lz4Factory.fastCompressor();
72+
}
73+
} catch (AssertionError t) {
74+
throw new RuntimeException("lz4-java library is not available: " +
75+
"Lz4Compressor has not been loaded. You need to add " +
76+
"lz4-java.jar to your CLASSPATH. " + t, t);
77+
}
78+
8079
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
8180

8281
// Compression is guaranteed to succeed if 'dstCapacity' >=
@@ -243,7 +242,7 @@ public synchronized int compress(byte[] b, int off, int len)
243242
}
244243

245244
// Compress data
246-
n = useLz4HC ? compressBytesDirectHC() : compressBytesDirect();
245+
n = compressDirectBuf();
247246
compressedDirectBuf.limit(n);
248247
uncompressedDirectBuf.clear(); // lz4 consumes all buffer input
249248

@@ -309,11 +308,20 @@ public synchronized long getBytesWritten() {
309308
public synchronized void end() {
310309
}
311310

312-
private native static void initIDs();
313-
314-
private native int compressBytesDirect();
315-
316-
private native int compressBytesDirectHC();
317-
318-
public native static String getLibraryName();
311+
private int compressDirectBuf() {
312+
if (uncompressedDirectBufLen == 0) {
313+
return 0;
314+
} else {
315+
// Set the position and limit of `uncompressedDirectBuf` for reading
316+
uncompressedDirectBuf.limit(uncompressedDirectBufLen).position(0);
317+
compressedDirectBuf.clear();
318+
lz4Compressor.compress((ByteBuffer) uncompressedDirectBuf,
319+
(ByteBuffer) compressedDirectBuf);
320+
uncompressedDirectBufLen = 0;
321+
uncompressedDirectBuf.limit(directBufferSize).position(0);
322+
int size = compressedDirectBuf.position();
323+
compressedDirectBuf.position(0);
324+
return size;
325+
}
326+
}
319327
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import java.nio.Buffer;
2323
import java.nio.ByteBuffer;
2424

25+
import net.jpountz.lz4.LZ4Factory;
26+
import net.jpountz.lz4.LZ4SafeDecompressor;
27+
2528
import org.apache.hadoop.io.compress.Decompressor;
26-
import org.apache.hadoop.util.NativeCodeLoader;
2729
import org.slf4j.Logger;
2830
import org.slf4j.LoggerFactory;
2931

@@ -44,20 +46,7 @@ public class Lz4Decompressor implements Decompressor {
4446
private int userBufOff = 0, userBufLen = 0;
4547
private boolean finished;
4648

47-
static {
48-
if (NativeCodeLoader.isNativeCodeLoaded()) {
49-
// Initialize the native library
50-
try {
51-
initIDs();
52-
} catch (Throwable t) {
53-
// Ignore failure to load/initialize lz4
54-
LOG.warn(t.toString());
55-
}
56-
} else {
57-
LOG.error("Cannot load " + Lz4Compressor.class.getName() +
58-
" without native hadoop library!");
59-
}
60-
}
49+
private LZ4SafeDecompressor lz4Decompressor;
6150

6251
/**
6352
* Creates a new compressor.
@@ -67,6 +56,15 @@ public class Lz4Decompressor implements Decompressor {
6756
public Lz4Decompressor(int directBufferSize) {
6857
this.directBufferSize = directBufferSize;
6958

59+
try {
60+
LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
61+
lz4Decompressor = lz4Factory.safeDecompressor();
62+
} catch (AssertionError t) {
63+
throw new RuntimeException("lz4-java library is not available: " +
64+
"Lz4Decompressor has not been loaded. You need to add " +
65+
"lz4-java.jar to your CLASSPATH. " + t, t);
66+
}
67+
7068
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
7169
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
7270
uncompressedDirectBuf.position(directBufferSize);
@@ -200,7 +198,7 @@ public synchronized boolean finished() {
200198
* @param b Buffer for the compressed data
201199
* @param off Start offset of the data
202200
* @param len Size of the buffer
203-
* @return The actual number of bytes of compressed data.
201+
* @return The actual number of bytes of uncompressed data.
204202
* @throws IOException
205203
*/
206204
@Override
@@ -228,7 +226,7 @@ public synchronized int decompress(byte[] b, int off, int len)
228226
uncompressedDirectBuf.limit(directBufferSize);
229227

230228
// Decompress data
231-
n = decompressBytesDirect();
229+
n = decompressDirectBuf();
232230
uncompressedDirectBuf.limit(n);
233231

234232
if (userBufLen <= 0) {
@@ -272,7 +270,18 @@ public synchronized void end() {
272270
// do nothing
273271
}
274272

275-
private native static void initIDs();
276-
277-
private native int decompressBytesDirect();
273+
private int decompressDirectBuf() {
274+
if (compressedDirectBufLen == 0) {
275+
return 0;
276+
} else {
277+
compressedDirectBuf.limit(compressedDirectBufLen).position(0);
278+
lz4Decompressor.decompress((ByteBuffer) compressedDirectBuf,
279+
(ByteBuffer) uncompressedDirectBuf);
280+
compressedDirectBufLen = 0;
281+
compressedDirectBuf.clear();
282+
int size = uncompressedDirectBuf.position();
283+
uncompressedDirectBuf.position(0);
284+
return size;
285+
}
286+
}
278287
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.hadoop.crypto.OpensslCipher;
25-
import org.apache.hadoop.io.compress.Lz4Codec;
2625
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
2726
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
2827
import org.apache.hadoop.classification.InterfaceAudience;
@@ -69,8 +68,6 @@ public static void main(String[] args) {
6968
boolean isalLoaded = false;
7069
boolean zStdLoaded = false;
7170
boolean pmdkLoaded = false;
72-
// lz4 is linked within libhadoop
73-
boolean lz4Loaded = nativeHadoopLoaded;
7471
boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf);
7572
boolean openSslLoaded = false;
7673
boolean winutilsExists = false;
@@ -81,7 +78,6 @@ public static void main(String[] args) {
8178
String isalDetail = "";
8279
String pmdkDetail = "";
8380
String zstdLibraryName = "";
84-
String lz4LibraryName = "";
8581
String bzip2LibraryName = "";
8682
String winutilsPath = null;
8783

@@ -119,9 +115,6 @@ public static void main(String[] args) {
119115
openSslLoaded = true;
120116
}
121117

122-
if (lz4Loaded) {
123-
lz4LibraryName = Lz4Codec.getLibraryName();
124-
}
125118
if (bzip2Loaded) {
126119
bzip2LibraryName = Bzip2Factory.getLibraryName(conf);
127120
}
@@ -144,7 +137,6 @@ public static void main(String[] args) {
144137
System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName);
145138
System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName);
146139
System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName);
147-
System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName);
148140
System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName);
149141
System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
150142
System.out.printf("ISA-L: %b %s%n", isalLoaded, isalDetail);
@@ -155,8 +147,8 @@ public static void main(String[] args) {
155147
}
156148

157149
if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
158-
(checkAll && !(zlibLoaded && lz4Loaded
159-
&& bzip2Loaded && isalLoaded && zStdLoaded))) {
150+
(checkAll && !(zlibLoaded && bzip2Loaded
151+
&& isalLoaded && zStdLoaded))) {
160152
// return 1 to indicated check failed
161153
ExitUtil.terminate(1);
162154
}

0 commit comments

Comments
 (0)