From f320685639121bfc5485eaf6af3c3aa5f7e40a39 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 May 2021 14:33:11 +0200 Subject: [PATCH] Deserialize BlobStore Metadata Files in a Streaming Manner (#73149) We were reading the full file contents up-front here because of the complexity of verifying the footer otherwise. This commit moves the logic for reading metadata blobs (that can become quite sizable in some cases) in a streaming manner by manually doing the footer verification as Lucene's utility methods don't allow for verification on top of a stream. --- .../snapshots/CloneSnapshotIT.java | 4 +- .../blobstore/BlobStoreRepository.java | 17 +- .../blobstore/ChecksumBlobStoreFormat.java | 198 +++++++++++++++--- .../snapshots/BlobStoreFormatTests.java | 55 ++--- .../SnapshotInfoBlobSerializationTests.java | 2 +- 5 files changed, 190 insertions(+), 86 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index c6b204c5291f0..a1d517d481f9b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardSnapshotResult; @@ -731,8 +730,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration( () -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read( repository.shardContainer(repositoryShardId.index(), repositoryShardId.shardId()), generation, - NamedXContentRegistry.EMPTY, - MockBigArrays.NON_RECYCLING_INSTANCE + NamedXContentRegistry.EMPTY ) ) ) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index e7dcc75ffde5d..4463b49611ad9 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -977,8 +977,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes( for (String indexMetaGeneration : indexMetaGenerations) { executor.execute(ActionRunnable.supply(allShardCountsListener, () -> { try { - return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry, bigArrays) - .getNumberOfShards(); + return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry).getNumberOfShards(); } catch (Exception ex) { logger.warn( () -> new ParameterizedMessage( @@ -1435,7 +1434,7 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito @Override public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { try { - return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays); + return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry); } catch (NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException | NotXContentException ex) { @@ -1446,7 +1445,7 @@ public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { @Override public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) { try { - return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays); + return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry); } catch (NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException ex) { @@ -1460,8 +1459,7 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna return INDEX_METADATA_FORMAT.read( indexContainer(index), repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), - namedXContentRegistry, - bigArrays + namedXContentRegistry ); } catch (NoSuchFileException e) { throw new SnapshotMissingException(metadata.name(), snapshotId, e); @@ -3156,7 +3154,7 @@ private static List unusedBlobs( */ public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { try { - return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays); + return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry); } catch (NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException ex) { @@ -3188,7 +3186,7 @@ private Tuple buildBlobStoreIndexShardSnap if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) { return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN); } - return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation); + return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation); } final Tuple legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2())); @@ -3207,8 +3205,7 @@ private Tuple buildBlobStoreIndexShardSnapsh final BlobStoreIndexShardSnapshots shardSnapshots = INDEX_SHARD_SNAPSHOTS_FORMAT.read( shardContainer, Long.toString(latest), - namedXContentRegistry, - bigArrays + namedXContentRegistry ); return new Tuple<>(shardSnapshots, latest); } else if (blobs.stream() diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index a0d7646adefe9..e3ff271914d93 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -11,19 +11,19 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.ByteBuffersIndexInput; -import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.InputStreamDataInput; import org.apache.lucene.store.OutputStreamIndexOutput; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.common.Numbers; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; -import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -31,20 +31,19 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.snapshots.SnapshotInfo; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.zip.CRC32; /** * Snapshot metadata file format used in v2.0 and above @@ -93,15 +92,10 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct * @param name name to be translated into * @return parsed blob object */ - public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays) - throws IOException { + public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException { String blobName = blobName(name); - try ( - ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); - InputStream in = blobContainer.readBlob(blobName) - ) { - Streams.copy(in, out, false); - return deserialize(blobName, namedXContentRegistry, out.bytes()); + try (InputStream in = blobContainer.readBlob(blobName)) { + return deserialize(namedXContentRegistry, in); } } @@ -109,29 +103,169 @@ public String blobName(String name) { return String.format(Locale.ROOT, blobNameFormat, name); } - public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException { - final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; + public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException { + final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input); try { - final IndexInput indexInput = bytes.length() > 0 - ? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc) - : new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES); - CodecUtil.checksumEntireFile(indexInput); - CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); - long filePointer = indexInput.getFilePointer(); - long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; + CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION); + final InputStream wrappedStream; + if (deserializeMetaBlobInputStream.nextBytesCompressed()) { + wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream); + } else { + wrappedStream = deserializeMetaBlobInputStream; + } + final T result; try ( - XContentParser parser = XContentHelper.createParser( - namedXContentRegistry, - LoggingDeprecationHandler.INSTANCE, - bytes.slice((int) filePointer, (int) contentSize), - XContentType.SMILE - ) + XContentParser parser = XContentType.SMILE.xContent() + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream) ) { - return reader.apply(parser); + result = reader.apply(parser); } + deserializeMetaBlobInputStream.verifyFooter(); + return result; } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { // we trick this into a dedicated exception with the original stacktrace throw new CorruptStateException(ex); + } catch (Exception e) { + try { + // drain stream fully and check whether the footer is corrupted + Streams.consumeFully(deserializeMetaBlobInputStream); + deserializeMetaBlobInputStream.verifyFooter(); + } catch (CorruptStateException cse) { + cse.addSuppressed(e); + throw cse; + } catch (Exception ex) { + e.addSuppressed(ex); + } + throw e; + } + } + + /** + * Wrapper input stream for deserializing blobs that come with a Lucene header and footer in a streaming manner. It manually manages + * a read buffer to enable not reading into the last 16 bytes (the footer length) of the buffer via the standard read methods so that + * a parser backed by this stream will only see the blob's body. + */ + private static final class DeserializeMetaBlobInputStream extends FilterInputStream { + + // checksum updated with all but the last 8 bytes read from the wrapped stream + private final CRC32 crc32 = new CRC32(); + + // Only the first buffer.length - 16 bytes are exposed by the read() methods; once the read position reaches 16 bytes from the end + // of the buffer the remaining 16 bytes are moved to the start of the buffer and the rest of the buffer is filled from the stream. + private final byte[] buffer = new byte[1024 * 8]; + + // the number of bytes in the buffer, in [0, buffer.length], equal to buffer.length unless the last fill hit EOF + private int bufferCount; + + // the current read position within the buffer, in [0, bufferCount - 16] + private int bufferPos; + + DeserializeMetaBlobInputStream(InputStream in) { + super(in); + } + + @Override + public int read() throws IOException { + if (getAvailable() <= 0) { + return -1; + } + return buffer[bufferPos++]; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int remaining = len; + int read = 0; + while (remaining > 0) { + final int r = doRead(b, off + read, remaining); + if (r <= 0) { + break; + } + read += r; + remaining -= r; + } + if (len > 0 && remaining == len) { + // nothing to read, EOF + return -1; + } + return read; + } + + @Override + public void close() throws IOException { + // not closing the wrapped stream + } + + private int doRead(byte[] b, int off, int len) throws IOException { + final int available = getAvailable(); + if (available < 0) { + return -1; + } + final int read = Math.min(available, len); + System.arraycopy(buffer, bufferPos, b, off, read); + bufferPos += read; + return read; + } + + /** + * Verify footer of the bytes read by this stream the same way {@link CodecUtil#checkFooter(ChecksumIndexInput)} would. + * + * @throws CorruptStateException if footer is found to be corrupted + */ + void verifyFooter() throws CorruptStateException { + if (bufferCount - bufferPos != CodecUtil.footerLength()) { + throw new CorruptStateException( + "should have consumed all but 16 bytes from the buffer but saw buffer pos [" + + bufferPos + + "] and count [" + + bufferCount + + "]" + ); + } + crc32.update(buffer, 0, bufferPos + 8); + final int magicFound = Numbers.bytesToInt(buffer, bufferPos); + if (magicFound != CodecUtil.FOOTER_MAGIC) { + throw new CorruptStateException("unexpected footer magic [" + magicFound + "]"); + } + final int algorithmFound = Numbers.bytesToInt(buffer, bufferPos + 4); + if (algorithmFound != 0) { + throw new CorruptStateException("unexpected algorithm [" + algorithmFound + "]"); + } + final long checksum = crc32.getValue(); + final long checksumInFooter = Numbers.bytesToLong(buffer, bufferPos + 8); + if (checksum != checksumInFooter) { + throw new CorruptStateException("checksums do not match read [" + checksum + "] but expected [" + checksumInFooter + "]"); + } + } + + /** + * @return true if the next bytes in this stream are compressed + */ + boolean nextBytesCompressed() { + // we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling + // this method + assert bufferPos > 0 : "buffer position must be greater than 0 but was [" + bufferPos + "]"; + return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos)); + } + + /** + * @return the number of bytes available in the buffer, possibly refilling the buffer if needed + */ + private int getAvailable() throws IOException { + final int footerLen = CodecUtil.footerLength(); + if (bufferCount == 0) { + // first read, fill the buffer + bufferCount = Streams.readFully(in, buffer, 0, buffer.length); + } else if (bufferPos == bufferCount - footerLen) { + // crc and discard all but the last 16 bytes in the buffer that might be the footer bytes + assert bufferCount >= footerLen; + crc32.update(buffer, 0, bufferPos); + System.arraycopy(buffer, bufferPos, buffer, 0, footerLen); + bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen); + bufferPos = 0; + } + // bytes in the buffer minus 16 bytes that could be the footer + return bufferCount - bufferPos - footerLen; } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index 48dceabe0d415..e0d6203e20628 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.elasticsearch.test.ESTestCase; @@ -30,7 +31,6 @@ import java.io.InputStream; import java.util.Map; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; public class BlobStoreFormatTests extends ESTestCase { @@ -57,20 +57,9 @@ public static BlobObj fromXContent(XContentParser parser) throws IOException { } if (token == XContentParser.Token.START_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token != XContentParser.Token.FIELD_NAME) { - throw new ElasticsearchParseException("unexpected token [{}]", token); - } - String currentFieldName = parser.currentName(); - token = parser.nextToken(); - if (token.isValue()) { - if ("text".equals(currentFieldName)) { - text = parser.text(); - } else { - throw new ElasticsearchParseException("unexpected field [{}]", currentFieldName); - } - } else { - throw new ElasticsearchParseException("unexpected token [{}]", token); - } + XContentParserUtils.ensureFieldName(parser, token, "text"); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); + text = parser.text(); } } if (text == null) { @@ -92,24 +81,15 @@ public void testBlobStoreOperations() throws IOException { ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); // Write blobs in different formats - checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE); - checksumSMILE.write( - new BlobObj("checksum smile compressed"), - blobContainer, - "check-smile-comp", - true, - MockBigArrays.NON_RECYCLING_INSTANCE - ); + final String randomText = randomAlphaOfLengthBetween(0, 1024 * 8 * 3); + final String normalText = "checksum smile: " + randomText; + checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE); + final String compressedText = "checksum smile compressed: " + randomText; + checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE); // Assert that all checksum blobs can be read - assertEquals( - checksumSMILE.read(blobContainer, "check-smile", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE).getText(), - "checksum smile" - ); - assertEquals( - checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE).getText(), - "checksum smile compressed" - ); + assertEquals(normalText, checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText()); + assertEquals(compressedText, checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry()).getText()); } public void testCompressionIsApplied() throws IOException { @@ -135,18 +115,13 @@ public void testBlobCorruption() throws IOException { BlobObj blobObj = new BlobObj(testString); ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean(), MockBigArrays.NON_RECYCLING_INSTANCE); - assertEquals( - checksumFormat.read(blobContainer, "test-path", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE).getText(), - testString - ); + assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(), testString); randomCorruption(blobContainer, "test-path"); try { - checksumFormat.read(blobContainer, "test-path", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE); + checksumFormat.read(blobContainer, "test-path", xContentRegistry()); fail("Should have failed due to corruption"); - } catch (ElasticsearchCorruptionException ex) { - assertThat(ex.getMessage(), containsString("test-path")); - } catch (EOFException ex) { - // This can happen if corrupt the byte length + } catch (ElasticsearchCorruptionException | EOFException ex) { + // expected exceptions from random byte corruption } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java index 7a1f3a075ba0c..a0e489228a053 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java @@ -40,7 +40,7 @@ protected SnapshotInfo copyInstance(SnapshotInfo instance, Version version) thro BigArrays.NON_RECYCLING_INSTANCE, bytes -> ActionListener.completeWith( future, - () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize("test", NamedXContentRegistry.EMPTY, bytes) + () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(NamedXContentRegistry.EMPTY, bytes.streamInput()) ) ); return future.actionGet();