From 193ecd60bf3fbfdbe32ca1c92da0321c554d01bf Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 17 May 2021 09:55:38 +0200 Subject: [PATCH 1/6] Deserialize BlobStore Metadata Files in a Streaming Manner We were reading the full file contents up-front here because of the complexity of verifying the footer otherwise. This commit moves the logic for reading metadata blobs (that can become quite sizable in some cases) in a streaming manner by manually doing the footer verification as Lucene's utility methods don't allow for verification on top of a stream. --- .../snapshots/CloneSnapshotIT.java | 3 +- .../blobstore/BlobStoreRepository.java | 16 +- .../blobstore/ChecksumBlobStoreFormat.java | 177 +++++++++++++++--- .../snapshots/BlobStoreFormatTests.java | 17 +- .../SnapshotInfoBlobSerializationTests.java | 2 +- .../MockEventuallyConsistentRepository.java | 7 +- 6 files changed, 171 insertions(+), 51 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index c23267f8d1894..e2d9617d388eb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardSnapshotResult; @@ -677,7 +676,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreReposit String generation) { return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(repository.shardContainer(repositoryShardId.index(), - repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY, MockBigArrays.NON_RECYCLING_INSTANCE)))); + repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY)))); } private static BlobStoreIndexShardSnapshot readShardSnapshot(BlobStoreRepository repository, RepositoryShardId repositoryShardId, diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 38f01969619e6..d4c078c570af6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -862,8 +862,8 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(Collection s for (String indexMetaGeneration : indexMetaGenerations) { executor.execute(ActionRunnable.supply(allShardCountsListener, () -> { try { - return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry, - bigArrays).getNumberOfShards(); + return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry + ).getNumberOfShards(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage( "[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex); @@ -1220,7 +1220,7 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito @Override public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { try { - return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays); + return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry); } catch (NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException | NotXContentException ex) { @@ -1231,7 +1231,7 @@ public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { @Override public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) { try { - return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays); + return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry); } catch (NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException ex) { @@ -1243,7 +1243,7 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) { public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { try { return INDEX_METADATA_FORMAT.read(indexContainer(index), - repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry, bigArrays); + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry); } catch (NoSuchFileException e) { throw new SnapshotMissingException(metadata.name(), snapshotId, e); } @@ -2722,7 +2722,7 @@ private static List unusedBlobs(Set blobs, Set surviving */ public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { try { - return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays); + return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry); } catch (NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException ex) { @@ -2748,7 +2748,7 @@ private Tuple buildBlobStoreIndexShardSnap if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) { return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN); } - return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation); + return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation); } final Tuple legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2())); @@ -2765,7 +2765,7 @@ private Tuple buildBlobStoreIndexShardSnapsh long latest = latestGeneration(blobs); if (latest >= 0) { final BlobStoreIndexShardSnapshots shardSnapshots = - INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry, bigArrays); + INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry); return new Tuple<>(shardSnapshots, latest); } else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX) || b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 8b41c8d0a2176..2127050ffc30d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -11,19 +11,19 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.ByteBuffersIndexInput; -import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.InputStreamDataInput; import org.apache.lucene.store.OutputStreamIndexOutput; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.Numbers; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; -import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -31,20 +31,19 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.snapshots.SnapshotInfo; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.zip.CRC32; /** * Snapshot metadata file format used in v2.0 and above @@ -93,13 +92,10 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct * @param name name to be translated into * @return parsed blob object */ - public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry, - BigArrays bigArrays) throws IOException { + public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException { String blobName = blobName(name); - try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); - InputStream in = blobContainer.readBlob(blobName)) { - Streams.copy(in, out, false); - return deserialize(blobName, namedXContentRegistry, out.bytes()); + try (InputStream in = blobContainer.readBlob(blobName)) { + return deserialize(namedXContentRegistry, in); } } @@ -107,26 +103,155 @@ public String blobName(String name) { return String.format(Locale.ROOT, blobNameFormat, name); } - public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException { - final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; + public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException { + final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input); + + CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION); + final InputStream wrappedStream; + if (deserializeMetaBlobInputStream.nextBytesCompressed()) { + wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream); + } else { + wrappedStream = deserializeMetaBlobInputStream; + } try { - final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput( - new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc) - : new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES); - CodecUtil.checksumEntireFile(indexInput); - CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); - long filePointer = indexInput.getFilePointer(); - long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; - try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, - bytes.slice((int) filePointer, (int) contentSize), XContentType.SMILE)) { - return reader.apply(parser); + final T result; + try (XContentParser parser = XContentType.SMILE.xContent().createParser( + namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)) { + result = reader.apply(parser); } + deserializeMetaBlobInputStream.verifyFooter(); + return result; } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { // we trick this into a dedicated exception with the original stacktrace throw new CorruptStateException(ex); } } + /** + * Wrapper input stream for deserializing blobs that come with a Lucene header and footer in a streaming manner. It manually manages + * a read buffer to enable not reading into the last 16 bytes (the footer length) of the buffer via the standard read methods so that + * a parser backed by this stream will only see the blob's body. + */ + private static final class DeserializeMetaBlobInputStream extends FilterInputStream { + + // checksum updated with all but the last 8 bytes read from the wrapped stream + private final CRC32 crc32 = new CRC32(); + + private final byte[] buffer = new byte[1024 * 8]; + + private int bufferCount; + + private int bufferPos; + + DeserializeMetaBlobInputStream(InputStream in) { + super(in); + } + + @Override + public int read() throws IOException { + if (buffered() <= 0) { + fill(); + } + if (buffered() <= 0) { + return -1; + } + return buffer[bufferPos++]; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int remaining = len; + int read = 0; + while (remaining > 0) { + final int r = doRead(b, off + read, remaining); + if (r <= 0) { + break; + } + read += r; + remaining -= r; + } + if (len > 0 && remaining == len) { + // nothing to read, EOF + return -1; + } + return read; + } + + @Override + public void close() throws IOException { + // not closing the wrapped stream + } + + private int doRead(byte[] b, int off, int len) throws IOException { + if (buffered() <= 0) { + fill(); + } + final int available = buffered(); + if (available < 0) { + return -1; + } + final int read = Math.min(available, len); + System.arraycopy(buffer, bufferPos, b, off, read); + bufferPos += read; + return read; + } + + /** + * Verify footer of the bytes read by this stream the same way {@link CodecUtil#checkFooter(ChecksumIndexInput)} would. + * + * @throws CorruptStateException if footer is found to be corrupted + */ + void verifyFooter() throws CorruptStateException { + if (bufferCount - bufferPos != CodecUtil.footerLength()) { + throw new CorruptStateException( + "should have consumed all but 16 bytes from the buffer but saw buffer pos [" + bufferPos + "] and count [" + + bufferCount + "]"); + } + crc32.update(buffer, 0, bufferPos + 8); + final int magicFound = Numbers.bytesToInt(buffer, bufferPos); + if (magicFound != CodecUtil.FOOTER_MAGIC) { + throw new CorruptStateException("unexpected footer magic [" + magicFound + "]"); + } + final int algorithmFound = Numbers.bytesToInt(buffer, bufferPos + 4); + if (algorithmFound != 0) { + throw new CorruptStateException("unexpected algorithm [" + algorithmFound + "]"); + } + final long checksum = crc32.getValue(); + final long checksumInFooter = Numbers.bytesToLong(buffer, bufferPos + 8); + if (checksum != checksumInFooter) { + throw new CorruptStateException("checksums do not match read [" + checksum + "] but expected [" + checksumInFooter + "]"); + } + } + + /** + * @return true if the next bytes in this stream are compressed + */ + boolean nextBytesCompressed() { + // we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling + // this method + return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos)); + } + + private int buffered() { + // bytes in the buffer minus 16 bytes that could be the footer + return bufferCount - bufferPos - CodecUtil.footerLength(); + } + + private void fill() throws IOException { + if (bufferCount == 0) { + bufferCount = Streams.readFully(in, buffer, 0, buffer.length); + } else { + // crc and discard all but the last 16 bytes in the buffer that might be the footer bytes + final int footerLen = CodecUtil.footerLength(); + assert bufferCount >= footerLen; + crc32.update(buffer, 0, bufferCount - footerLen); + System.arraycopy(buffer, bufferCount - footerLen, buffer, 0, footerLen); + bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen); + bufferPos = 0; + } + } + } + /** * Writes blob with resolving the blob name using {@link #blobName} method. *

diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index 37f646900868c..e960b7b522487 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.Map; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; public class BlobStoreFormatTests extends ESTestCase { @@ -98,10 +97,10 @@ public void testBlobStoreOperations() throws IOException { MockBigArrays.NON_RECYCLING_INSTANCE); // Assert that all checksum blobs can be read - assertEquals(checksumSMILE.read(blobContainer, "check-smile", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE).getText(), + assertEquals(checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText(), "checksum smile"); - assertEquals(checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry(), - MockBigArrays.NON_RECYCLING_INSTANCE).getText(), "checksum smile compressed"); + assertEquals(checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry() + ).getText(), "checksum smile compressed"); } public void testCompressionIsApplied() throws IOException { @@ -127,16 +126,14 @@ public void testBlobCorruption() throws IOException { BlobObj blobObj = new BlobObj(testString); ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean(), MockBigArrays.NON_RECYCLING_INSTANCE); - assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE).getText(), + assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(), testString); randomCorruption(blobContainer, "test-path"); try { - checksumFormat.read(blobContainer, "test-path", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE); + checksumFormat.read(blobContainer, "test-path", xContentRegistry()); fail("Should have failed due to corruption"); - } catch (ElasticsearchCorruptionException ex) { - assertThat(ex.getMessage(), containsString("test-path")); - } catch (EOFException ex) { - // This can happen if corrupt the byte length + } catch (ElasticsearchCorruptionException | EOFException ex) { + // expected } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java index 7d99faac00073..c785e1803f45a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java @@ -35,7 +35,7 @@ protected SnapshotInfo copyInstance(SnapshotInfo instance, Version version) thro final PlainActionFuture future = new PlainActionFuture<>(); BlobStoreRepository.SNAPSHOT_FORMAT.serialize(instance, "test", randomBoolean(), BigArrays.NON_RECYCLING_INSTANCE, bytes -> ActionListener.completeWith(future, - () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize("test", NamedXContentRegistry.EMPTY, bytes))); + () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(NamedXContentRegistry.EMPTY, bytes.streamInput()))); return future.actionGet(); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 8e4447bde94b9..36d20526b9904 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.util.Maps; @@ -329,11 +328,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b if (basePath().buildAsString().equals(path().buildAsString())) { try { final SnapshotInfo updatedInfo = BlobStoreRepository.SNAPSHOT_FORMAT.deserialize( - blobName, namedXContentRegistry, new BytesArray(data)); + namedXContentRegistry, new ByteArrayInputStream(data)); // If the existing snapshotInfo differs only in the timestamps it stores, then the overwrite is not // a problem and could be the result of a correctly handled master failover. - final SnapshotInfo existingInfo = SNAPSHOT_FORMAT.deserialize( - blobName, namedXContentRegistry, Streams.readFully(readBlob(blobName))); + final SnapshotInfo existingInfo = + SNAPSHOT_FORMAT.deserialize(namedXContentRegistry, readBlob(blobName)); assertThat(existingInfo.snapshotId(), equalTo(updatedInfo.snapshotId())); assertThat(existingInfo.reason(), equalTo(updatedInfo.reason())); assertThat(existingInfo.state(), equalTo(updatedInfo.state())); From 7e95f55fd8ca2fff57a6ba44d819b271d8d131e7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 17 May 2021 18:55:27 +0200 Subject: [PATCH 2/6] CR: comments --- .../repositories/blobstore/ChecksumBlobStoreFormat.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 2127050ffc30d..697c9f62bd9e9 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -137,10 +137,14 @@ private static final class DeserializeMetaBlobInputStream extends FilterInputStr // checksum updated with all but the last 8 bytes read from the wrapped stream private final CRC32 crc32 = new CRC32(); + // Only the first buffer.length - 16 bytes are exposed by the read() methods; once the read position reaches 16 bytes from the end + // of the buffer the remaining 16 bytes are moved to the start of the buffer and the rest of the buffer is filled from the stream. private final byte[] buffer = new byte[1024 * 8]; + // the number of bytes in the buffer, in [0, buffer.length], equal to buffer.length unless the last fill hit EOF private int bufferCount; + // the current read position within the buffer, in [0, bufferCount - 16] private int bufferPos; DeserializeMetaBlobInputStream(InputStream in) { @@ -244,8 +248,9 @@ private void fill() throws IOException { // crc and discard all but the last 16 bytes in the buffer that might be the footer bytes final int footerLen = CodecUtil.footerLength(); assert bufferCount >= footerLen; - crc32.update(buffer, 0, bufferCount - footerLen); - System.arraycopy(buffer, bufferCount - footerLen, buffer, 0, footerLen); + assert bufferPos == bufferCount - footerLen; + crc32.update(buffer, 0, bufferPos); + System.arraycopy(buffer, bufferPos, buffer, 0, footerLen); bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen); bufferPos = 0; } From 582fc20dc2fa1a25bfde3fd4d3964003eb34e0cc Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 17 May 2021 19:26:18 +0200 Subject: [PATCH 3/6] larger text in test --- .../snapshots/BlobStoreFormatTests.java | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index e960b7b522487..4f524ff9665ff 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.index.translog.BufferedChecksumStreamOutput; import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.elasticsearch.test.ESTestCase; @@ -57,20 +58,9 @@ public static BlobObj fromXContent(XContentParser parser) throws IOException { } if (token == XContentParser.Token.START_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token != XContentParser.Token.FIELD_NAME) { - throw new ElasticsearchParseException("unexpected token [{}]", token); - } - String currentFieldName = parser.currentName(); - token = parser.nextToken(); - if (token.isValue()) { - if ("text" .equals(currentFieldName)) { - text = parser.text(); - } else { - throw new ElasticsearchParseException("unexpected field [{}]", currentFieldName); - } - } else { - throw new ElasticsearchParseException("unexpected token [{}]", token); - } + XContentParserUtils.ensureFieldName(parser, token, "text"); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); + text = parser.text(); } } if (text == null) { @@ -92,15 +82,16 @@ public void testBlobStoreOperations() throws IOException { ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); // Write blobs in different formats - checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE); - checksumSMILE.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true, + final String randomText = randomAlphaOfLengthBetween(0, 1024 * 8 * 3); + final String normalText = "checksum smile: " + randomText; + checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE); + final String compresedText = "checksum smile compressed: " + randomText; + checksumSMILE.write(new BlobObj(compresedText), blobContainer, "check-smile-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE); // Assert that all checksum blobs can be read - assertEquals(checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText(), - "checksum smile"); - assertEquals(checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry() - ).getText(), "checksum smile compressed"); + assertEquals(normalText, checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText()); + assertEquals(compresedText, checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry()).getText()); } public void testCompressionIsApplied() throws IOException { From 4dc53f9e2273f1d58cf2daaa0381dbe7c06585db Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 May 2021 11:23:46 +0200 Subject: [PATCH 4/6] CR comments --- .../repositories/blobstore/ChecksumBlobStoreFormat.java | 1 + .../org/elasticsearch/snapshots/BlobStoreFormatTests.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 697c9f62bd9e9..7d43b3d715e1a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -233,6 +233,7 @@ void verifyFooter() throws CorruptStateException { boolean nextBytesCompressed() { // we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling // this method + assert bufferPos > 0 : "buffer position must be greater than 0 but was [" + bufferPos + "]"; return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos)); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index 4f524ff9665ff..4f6113b6f6469 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -85,13 +85,13 @@ public void testBlobStoreOperations() throws IOException { final String randomText = randomAlphaOfLengthBetween(0, 1024 * 8 * 3); final String normalText = "checksum smile: " + randomText; checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE); - final String compresedText = "checksum smile compressed: " + randomText; - checksumSMILE.write(new BlobObj(compresedText), blobContainer, "check-smile-comp", true, + final String compressedText = "checksum smile compressed: " + randomText; + checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE); // Assert that all checksum blobs can be read assertEquals(normalText, checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText()); - assertEquals(compresedText, checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry()).getText()); + assertEquals(compressedText, checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry()).getText()); } public void testCompressionIsApplied() throws IOException { From 5b2a9a826569e35e992c08814337cb43852dfec7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 May 2021 12:42:28 +0200 Subject: [PATCH 5/6] CR: fixes --- .../blobstore/ChecksumBlobStoreFormat.java | 45 ++++++++----------- .../snapshots/BlobStoreFormatTests.java | 9 +++- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 7d43b3d715e1a..6e5bf21467635 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -105,18 +105,17 @@ public String blobName(String name) { public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException { final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input); - - CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION); - final InputStream wrappedStream; - if (deserializeMetaBlobInputStream.nextBytesCompressed()) { - wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream); - } else { - wrappedStream = deserializeMetaBlobInputStream; - } try { + CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION); + final InputStream wrappedStream; + if (deserializeMetaBlobInputStream.nextBytesCompressed()) { + wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream); + } else { + wrappedStream = deserializeMetaBlobInputStream; + } final T result; try (XContentParser parser = XContentType.SMILE.xContent().createParser( - namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)) { + namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)) { result = reader.apply(parser); } deserializeMetaBlobInputStream.verifyFooter(); @@ -153,10 +152,7 @@ private static final class DeserializeMetaBlobInputStream extends FilterInputStr @Override public int read() throws IOException { - if (buffered() <= 0) { - fill(); - } - if (buffered() <= 0) { + if (getAvailable() <= 0) { return -1; } return buffer[bufferPos++]; @@ -187,10 +183,7 @@ public void close() throws IOException { } private int doRead(byte[] b, int off, int len) throws IOException { - if (buffered() <= 0) { - fill(); - } - final int available = buffered(); + final int available = getAvailable(); if (available < 0) { return -1; } @@ -237,24 +230,24 @@ boolean nextBytesCompressed() { return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos)); } - private int buffered() { - // bytes in the buffer minus 16 bytes that could be the footer - return bufferCount - bufferPos - CodecUtil.footerLength(); - } - - private void fill() throws IOException { + /** + * @return the number of bytes available in the buffer, possibly refilling the buffer if needed + */ + private int getAvailable() throws IOException { + final int footerLen = CodecUtil.footerLength(); if (bufferCount == 0) { + // first read, fill the buffer bufferCount = Streams.readFully(in, buffer, 0, buffer.length); - } else { + } else if (bufferPos == bufferCount - footerLen) { // crc and discard all but the last 16 bytes in the buffer that might be the footer bytes - final int footerLen = CodecUtil.footerLength(); assert bufferCount >= footerLen; - assert bufferPos == bufferCount - footerLen; crc32.update(buffer, 0, bufferPos); System.arraycopy(buffer, bufferPos, buffer, 0, footerLen); bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen); bufferPos = 0; } + // bytes in the buffer minus 16 bytes that could be the footer + return bufferCount - bufferPos - footerLen; } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index 4f6113b6f6469..73ca6f505b6e9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -8,8 +8,10 @@ package org.elasticsearch.snapshots; +import com.fasterxml.jackson.core.JsonParseException; import org.elasticsearch.ElasticsearchCorruptionException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; @@ -32,6 +34,8 @@ import java.io.IOException; import java.io.InputStream; import java.util.Map; +import java.util.zip.ZipException; + import static org.hamcrest.Matchers.greaterThan; public class BlobStoreFormatTests extends ESTestCase { @@ -123,8 +127,9 @@ public void testBlobCorruption() throws IOException { try { checksumFormat.read(blobContainer, "test-path", xContentRegistry()); fail("Should have failed due to corruption"); - } catch (ElasticsearchCorruptionException | EOFException ex) { - // expected + } catch (ElasticsearchCorruptionException | EOFException | ZipException | JsonParseException | ParsingException + | ElasticsearchParseException ex) { + // expected exceptions from random byte corruption } } From 9c7fb6e9a28aece24e70e3dfcaf4acefd2235111 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 May 2021 13:41:08 +0200 Subject: [PATCH 6/6] always verify footer --- .../blobstore/ChecksumBlobStoreFormat.java | 12 ++++++++++++ .../snapshots/BlobStoreFormatTests.java | 6 +----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 6e5bf21467635..1ceaf6a74a2d1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -123,6 +123,18 @@ public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream in } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { // we trick this into a dedicated exception with the original stacktrace throw new CorruptStateException(ex); + } catch (Exception e) { + try { + // drain stream fully and check whether the footer is corrupted + Streams.consumeFully(deserializeMetaBlobInputStream); + deserializeMetaBlobInputStream.verifyFooter(); + } catch (CorruptStateException cse) { + cse.addSuppressed(e); + throw cse; + } catch (Exception ex) { + e.addSuppressed(ex); + } + throw e; } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index 73ca6f505b6e9..9e7f7d1e102ec 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -8,10 +8,8 @@ package org.elasticsearch.snapshots; -import com.fasterxml.jackson.core.JsonParseException; import org.elasticsearch.ElasticsearchCorruptionException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; @@ -34,7 +32,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.Map; -import java.util.zip.ZipException; import static org.hamcrest.Matchers.greaterThan; @@ -127,8 +124,7 @@ public void testBlobCorruption() throws IOException { try { checksumFormat.read(blobContainer, "test-path", xContentRegistry()); fail("Should have failed due to corruption"); - } catch (ElasticsearchCorruptionException | EOFException | ZipException | JsonParseException | ParsingException - | ElasticsearchParseException ex) { + } catch (ElasticsearchCorruptionException | EOFException ex) { // expected exceptions from random byte corruption } }