From f3381ec8776373a41f91b14a7ba2d4252835c705 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 18 Aug 2021 10:27:33 -0600 Subject: [PATCH 1/2] Do not checksum on LZ4 compress Currently we use the custom lz4-block scheme when compressing data. This scheme automatically calculates and write a checksum when compressing. We do not actually read this checksum when decompressing so it is unnecessary. This commit resolves this by not writing a no-op checksum. This will break arbitrary decompressors. However, since the lz4 block format is not an official format anyway, this should be fine. Relates to #73497/ --- .../transport/Lz4TransportDecompressor.java | 32 +------- .../ReuseBuffersLZ4BlockOutputStream.java | 75 +++---------------- 2 files changed, 13 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java b/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java index 8ce58618e5c05..316413a0e01f7 100644 --- a/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java +++ b/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java @@ -38,7 +38,6 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Locale; -import java.util.zip.Checksum; /** * This file is forked from the https://netty.io project. In particular it forks the following file @@ -100,11 +99,6 @@ private enum State { */ private LZ4FastDecompressor decompressor; - /** - * Underlying checksum calculator in use. - */ - private Checksum checksum; - /** * Type of current block. */ @@ -120,11 +114,6 @@ private enum State { */ private int decompressedLength; - /** - * Checksum value of current incoming block. - */ - private int currentChecksum; - private final PageCacheRecycler recycler; private final ArrayDeque> pages; private int pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE; @@ -134,7 +123,6 @@ public Lz4TransportDecompressor(PageCacheRecycler recycler) { this.decompressor = LZ4Factory.safeInstance().fastDecompressor(); this.recycler = recycler; this.pages = new ArrayDeque<>(4); - this.checksum = null; } @Override @@ -232,23 +220,19 @@ private int decodeBlock(BytesReference reference) throws IOException { compressedLength, decompressedLength)); } - int currentChecksum = Integer.reverseBytes(in.readInt()); + // Read int where checksum would normally be written + in.readInt(); bytesConsumed += HEADER_LENGTH; if (decompressedLength == 0) { - if (currentChecksum != 0) { - throw new IllegalStateException("stream corrupted: checksum error"); - } currentState = State.FINISHED; decompressor = null; - checksum = null; break; } this.blockType = blockType; this.compressedLength = compressedLength; this.decompressedLength = decompressedLength; - this.currentChecksum = currentChecksum; } currentState = State.DECOMPRESS_DATA; @@ -258,7 +242,6 @@ private int decodeBlock(BytesReference reference) throws IOException { break; } - final Checksum checksum = this.checksum; byte[] decompressed = getThreadLocalBuffer(DECOMPRESSED, decompressedLength); try { @@ -292,17 +275,6 @@ private int decodeBlock(BytesReference reference) throws IOException { // Skip inbound bytes after we processed them. bytesConsumed += compressedLength; - if (checksum != null) { - checksum.reset(); - checksum.update(decompressed, 0, decompressedLength); - final int checksumResult = (int) checksum.getValue(); - if (checksumResult != currentChecksum) { - throw new IllegalStateException(String.format(Locale.ROOT, - "stream corrupted: mismatching checksum: %d (expected: %d)", - checksumResult, currentChecksum)); - } - } - int bytesToCopy = decompressedLength; int uncompressedOffset = 0; while (bytesToCopy > 0) { diff --git a/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java b/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java index ed0c02f4ed035..0bbcd6e034007 100644 --- a/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java +++ b/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java @@ -23,21 +23,17 @@ package org.elasticsearch.transport; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.zip.Checksum; - import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4Compressor; -import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4FrameOutputStream; import net.jpountz.util.SafeUtils; -import net.jpountz.xxhash.StreamingXXHash32; -import net.jpountz.xxhash.XXHashFactory; import org.apache.lucene.util.BytesRef; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + /** * This file is forked from https://github.com/lz4/lz4-java. In particular it forks the following file * net.jpountz.lz4.LZ4BlockOutputStream. @@ -46,6 +42,8 @@ * the need to allocate two new byte arrays everytime a new stream is created. For the Elasticsearch use case, * a single thread should fully compress the stream in one go to avoid memory corruption. * + * Additionally, it does not checksum (or write a check) for the data compressed. We do no read the checksum + * when decompressing in Elasticsearch. * * Streaming LZ4 (not compatible with the LZ4 Frame format). * This class compresses data into fixed-size blocks of compressed data. @@ -116,7 +114,6 @@ private static int compressionLevel(int blockSize) { private final int blockSize; private final int compressionLevel; private final LZ4Compressor compressor; - private final Checksum checksum; private final ArrayBox arrayBox; private final byte[] buffer; private final byte[] compressedBuffer; @@ -134,71 +131,23 @@ private static int compressionLevel(int blockSize) { * must be >= 64 and <= 32 M * @param compressor the {@link LZ4Compressor} instance to use to compress * data - * @param checksum the {@link Checksum} instance to use to check data for - * integrity. - * @param syncFlush true if pending data should also be flushed on {@link #flush()} */ - public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor, Checksum checksum, - boolean syncFlush) { + public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor) { super(out); this.blockSize = blockSize; this.compressor = compressor; - this.checksum = checksum; this.compressionLevel = compressionLevel(blockSize); final int compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize); this.arrayBox = ARRAY_BOX.get(); arrayBox.markOwnership(blockSize, compressedBlockSize); this.buffer = arrayBox.uncompressed; this.compressedBuffer = arrayBox.compressed; - this.syncFlush = syncFlush; + this.syncFlush = false; o = 0; finished = false; System.arraycopy(MAGIC, 0, compressedBuffer, 0, MAGIC_LENGTH); } - /** - * Creates a new instance which checks stream integrity using - * {@link StreamingXXHash32} and doesn't sync flush. - * - * @param out the {@link OutputStream} to feed - * @param blockSize the maximum number of bytes to try to compress at once, - * must be >= 64 and <= 32 M - * @param compressor the {@link LZ4Compressor} instance to use to compress - * data - * - * @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int, LZ4Compressor, Checksum, boolean) - * @see StreamingXXHash32#asChecksum() - */ - public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor) { - this(out, blockSize, compressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), false); - } - - /** - * Creates a new instance which compresses with the standard LZ4 compression - * algorithm. - * - * @param out the {@link OutputStream} to feed - * @param blockSize the maximum number of bytes to try to compress at once, - * must be >= 64 and <= 32 M - * - * @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int, LZ4Compressor) - * @see LZ4Factory#fastCompressor() - */ - public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize) { - this(out, blockSize, LZ4Factory.fastestInstance().fastCompressor()); - } - - /** - * Creates a new instance which compresses into blocks of 64 KB. - * - * @param out the {@link OutputStream} to feed - * - * @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int) - */ - public ReuseBuffersLZ4BlockOutputStream(OutputStream out) { - this(out, 1 << 16); - } - private void ensureNotFinished() { if (finished) { throw new IllegalStateException("This stream is already closed"); @@ -256,9 +205,6 @@ private void flushBufferedData() throws IOException { if (o == 0) { return; } - checksum.reset(); - checksum.update(buffer, 0, o); - final int check = (int) checksum.getValue(); int compressedLength = compressor.compress(buffer, 0, o, compressedBuffer, HEADER_LENGTH); final int compressMethod; if (compressedLength >= o) { @@ -272,7 +218,8 @@ private void flushBufferedData() throws IOException { compressedBuffer[MAGIC_LENGTH] = (byte) (compressMethod | compressionLevel); writeIntLE(compressedLength, compressedBuffer, MAGIC_LENGTH + 1); writeIntLE(o, compressedBuffer, MAGIC_LENGTH + 5); - writeIntLE(check, compressedBuffer, MAGIC_LENGTH + 9); + // Write 0 for checksum. We do not read it on decompress. + writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 9); assert MAGIC_LENGTH + 13 == HEADER_LENGTH; out.write(compressedBuffer, 0, HEADER_LENGTH + compressedLength); o = 0; @@ -327,7 +274,7 @@ private static void writeIntLE(int i, byte[] buf, int off) { @Override public String toString() { return getClass().getSimpleName() + "(out=" + out + ", blockSize=" + blockSize - + ", compressor=" + compressor + ", checksum=" + checksum + ")"; + + ", compressor=" + compressor + ")"; } } From 3ad7b2e5f08d0b4b3dcb1e188fe049263877fdc8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 18 Aug 2021 10:32:28 -0600 Subject: [PATCH 2/2] Typo --- .../transport/ReuseBuffersLZ4BlockOutputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java b/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java index 0bbcd6e034007..cfd284576fcd0 100644 --- a/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java +++ b/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java @@ -42,7 +42,7 @@ * the need to allocate two new byte arrays everytime a new stream is created. For the Elasticsearch use case, * a single thread should fully compress the stream in one go to avoid memory corruption. * - * Additionally, it does not checksum (or write a check) for the data compressed. We do no read the checksum + * Additionally, it does not checksum (or write a check) for the data compressed. We do not read the checksum * when decompressing in Elasticsearch. * * Streaming LZ4 (not compatible with the LZ4 Frame format).