Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Locale;
import java.util.zip.Checksum;

/**
* This file is forked from the https://netty.io project. In particular it forks the following file
Expand Down Expand Up @@ -100,11 +99,6 @@ private enum State {
*/
private LZ4FastDecompressor decompressor;

/**
* Underlying checksum calculator in use.
*/
private Checksum checksum;

/**
* Type of current block.
*/
Expand All @@ -120,11 +114,6 @@ private enum State {
*/
private int decompressedLength;

/**
* Checksum value of current incoming block.
*/
private int currentChecksum;

private final PageCacheRecycler recycler;
private final ArrayDeque<Recycler.V<byte[]>> pages;
private int pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE;
Expand All @@ -134,7 +123,6 @@ public Lz4TransportDecompressor(PageCacheRecycler recycler) {
this.decompressor = LZ4Factory.safeInstance().fastDecompressor();
this.recycler = recycler;
this.pages = new ArrayDeque<>(4);
this.checksum = null;
}

@Override
Expand Down Expand Up @@ -232,23 +220,19 @@ private int decodeBlock(BytesReference reference) throws IOException {
compressedLength, decompressedLength));
}

int currentChecksum = Integer.reverseBytes(in.readInt());
// Read int where checksum would normally be written
in.readInt();
bytesConsumed += HEADER_LENGTH;

if (decompressedLength == 0) {
if (currentChecksum != 0) {
throw new IllegalStateException("stream corrupted: checksum error");
}
currentState = State.FINISHED;
decompressor = null;
checksum = null;
break;
}

this.blockType = blockType;
this.compressedLength = compressedLength;
this.decompressedLength = decompressedLength;
this.currentChecksum = currentChecksum;
}

currentState = State.DECOMPRESS_DATA;
Expand All @@ -258,7 +242,6 @@ private int decodeBlock(BytesReference reference) throws IOException {
break;
}

final Checksum checksum = this.checksum;
byte[] decompressed = getThreadLocalBuffer(DECOMPRESSED, decompressedLength);

try {
Expand Down Expand Up @@ -292,17 +275,6 @@ private int decodeBlock(BytesReference reference) throws IOException {
// Skip inbound bytes after we processed them.
bytesConsumed += compressedLength;

if (checksum != null) {
checksum.reset();
checksum.update(decompressed, 0, decompressedLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new IllegalStateException(String.format(Locale.ROOT,
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
}

int bytesToCopy = decompressedLength;
int uncompressedOffset = 0;
while (bytesToCopy > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,17 @@

package org.elasticsearch.transport;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Checksum;

import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FrameOutputStream;
import net.jpountz.util.SafeUtils;
import net.jpountz.xxhash.StreamingXXHash32;
import net.jpountz.xxhash.XXHashFactory;

import org.apache.lucene.util.BytesRef;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;

/**
* This file is forked from https://github.com/lz4/lz4-java. In particular it forks the following file
* net.jpountz.lz4.LZ4BlockOutputStream.
Expand All @@ -46,6 +42,8 @@
* the need to allocate two new byte arrays everytime a new stream is created. For the Elasticsearch use case,
* a single thread should fully compress the stream in one go to avoid memory corruption.
*
* Additionally, it does not checksum (or write a check) for the data compressed. We do not read the checksum
* when decompressing in Elasticsearch.
*
* Streaming LZ4 (not compatible with the LZ4 Frame format).
* This class compresses data into fixed-size blocks of compressed data.
Expand Down Expand Up @@ -116,7 +114,6 @@ private static int compressionLevel(int blockSize) {
private final int blockSize;
private final int compressionLevel;
private final LZ4Compressor compressor;
private final Checksum checksum;
private final ArrayBox arrayBox;
private final byte[] buffer;
private final byte[] compressedBuffer;
Expand All @@ -134,71 +131,23 @@ private static int compressionLevel(int blockSize) {
* must be &gt;= 64 and &lt;= 32 M
* @param compressor the {@link LZ4Compressor} instance to use to compress
* data
* @param checksum the {@link Checksum} instance to use to check data for
* integrity.
* @param syncFlush true if pending data should also be flushed on {@link #flush()}
*/
public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor, Checksum checksum,
boolean syncFlush) {
public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor) {
super(out);
this.blockSize = blockSize;
this.compressor = compressor;
this.checksum = checksum;
this.compressionLevel = compressionLevel(blockSize);
final int compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize);
this.arrayBox = ARRAY_BOX.get();
arrayBox.markOwnership(blockSize, compressedBlockSize);
this.buffer = arrayBox.uncompressed;
this.compressedBuffer = arrayBox.compressed;
this.syncFlush = syncFlush;
this.syncFlush = false;
o = 0;
finished = false;
System.arraycopy(MAGIC, 0, compressedBuffer, 0, MAGIC_LENGTH);
}

/**
* Creates a new instance which checks stream integrity using
* {@link StreamingXXHash32} and doesn't sync flush.
*
* @param out the {@link OutputStream} to feed
* @param blockSize the maximum number of bytes to try to compress at once,
* must be &gt;= 64 and &lt;= 32 M
* @param compressor the {@link LZ4Compressor} instance to use to compress
* data
*
* @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int, LZ4Compressor, Checksum, boolean)
* @see StreamingXXHash32#asChecksum()
*/
public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor) {
this(out, blockSize, compressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), false);
}

/**
* Creates a new instance which compresses with the standard LZ4 compression
* algorithm.
*
* @param out the {@link OutputStream} to feed
* @param blockSize the maximum number of bytes to try to compress at once,
* must be &gt;= 64 and &lt;= 32 M
*
* @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int, LZ4Compressor)
* @see LZ4Factory#fastCompressor()
*/
public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize) {
this(out, blockSize, LZ4Factory.fastestInstance().fastCompressor());
}

/**
* Creates a new instance which compresses into blocks of 64 KB.
*
* @param out the {@link OutputStream} to feed
*
* @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int)
*/
public ReuseBuffersLZ4BlockOutputStream(OutputStream out) {
this(out, 1 << 16);
}

private void ensureNotFinished() {
if (finished) {
throw new IllegalStateException("This stream is already closed");
Expand Down Expand Up @@ -256,9 +205,6 @@ private void flushBufferedData() throws IOException {
if (o == 0) {
return;
}
checksum.reset();
checksum.update(buffer, 0, o);
final int check = (int) checksum.getValue();
int compressedLength = compressor.compress(buffer, 0, o, compressedBuffer, HEADER_LENGTH);
final int compressMethod;
if (compressedLength >= o) {
Expand All @@ -272,7 +218,8 @@ private void flushBufferedData() throws IOException {
compressedBuffer[MAGIC_LENGTH] = (byte) (compressMethod | compressionLevel);
writeIntLE(compressedLength, compressedBuffer, MAGIC_LENGTH + 1);
writeIntLE(o, compressedBuffer, MAGIC_LENGTH + 5);
writeIntLE(check, compressedBuffer, MAGIC_LENGTH + 9);
// Write 0 for checksum. We do not read it on decompress.
writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 9);
assert MAGIC_LENGTH + 13 == HEADER_LENGTH;
out.write(compressedBuffer, 0, HEADER_LENGTH + compressedLength);
o = 0;
Expand Down Expand Up @@ -327,7 +274,7 @@ private static void writeIntLE(int i, byte[] buf, int off) {
@Override
public String toString() {
return getClass().getSimpleName() + "(out=" + out + ", blockSize=" + blockSize
+ ", compressor=" + compressor + ", checksum=" + checksum + ")";
+ ", compressor=" + compressor + ")";
}

}
Expand Down