Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardSnapshotResult;
Expand Down Expand Up @@ -677,7 +676,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreReposit
String generation) {
return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f,
() -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(repository.shardContainer(repositoryShardId.index(),
repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY, MockBigArrays.NON_RECYCLING_INSTANCE))));
repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY))));
}

private static BlobStoreIndexShardSnapshot readShardSnapshot(BlobStoreRepository repository, RepositoryShardId repositoryShardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,8 +862,8 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(Collection<SnapshotId> s
for (String indexMetaGeneration : indexMetaGenerations) {
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
try {
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry,
bigArrays).getNumberOfShards();
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry
).getNumberOfShards();
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage(
"[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
Expand Down Expand Up @@ -1220,7 +1220,7 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
@Override
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
try {
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException | NotXContentException ex) {
Expand All @@ -1231,7 +1231,7 @@ public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
@Override
public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
try {
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
Expand All @@ -1243,7 +1243,7 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
try {
return INDEX_METADATA_FORMAT.read(indexContainer(index),
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry, bigArrays);
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry);
} catch (NoSuchFileException e) {
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
}
Expand Down Expand Up @@ -2722,7 +2722,7 @@ private static List<String> unusedBlobs(Set<String> blobs, Set<String> surviving
*/
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
try {
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays);
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
Expand All @@ -2748,7 +2748,7 @@ private Tuple<BlobStoreIndexShardSnapshots, String> buildBlobStoreIndexShardSnap
if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
}
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation);
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation);
}
final Tuple<BlobStoreIndexShardSnapshots, Long> legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
Expand All @@ -2765,7 +2765,7 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh
long latest = latestGeneration(blobs);
if (latest >= 0) {
final BlobStoreIndexShardSnapshots shardSnapshots =
INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry, bigArrays);
INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry);
return new Tuple<>(shardSnapshots, latest);
} else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX)
|| b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,39 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.zip.CRC32;

/**
* Snapshot metadata file format used in v2.0 and above
Expand Down Expand Up @@ -93,37 +92,174 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct
* @param name name to be translated into
* @return parsed blob object
*/
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry,
BigArrays bigArrays) throws IOException {
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException {
String blobName = blobName(name);
try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
InputStream in = blobContainer.readBlob(blobName)) {
Streams.copy(in, out, false);
return deserialize(blobName, namedXContentRegistry, out.bytes());
try (InputStream in = blobContainer.readBlob(blobName)) {
return deserialize(namedXContentRegistry, in);
}
}

public String blobName(String name) {
return String.format(Locale.ROOT, blobNameFormat, name);
}

public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException {
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input);
try {
final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput(
new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytes.slice((int) filePointer, (int) contentSize), XContentType.SMILE)) {
return reader.apply(parser);
CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION);
final InputStream wrappedStream;
if (deserializeMetaBlobInputStream.nextBytesCompressed()) {
wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream);
} else {
wrappedStream = deserializeMetaBlobInputStream;
}
final T result;
try (XContentParser parser = XContentType.SMILE.xContent().createParser(
namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)) {
result = reader.apply(parser);
}
deserializeMetaBlobInputStream.verifyFooter();
return result;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we trick this into a dedicated exception with the original stacktrace
throw new CorruptStateException(ex);
} catch (Exception e) {
try {
// drain stream fully and check whether the footer is corrupted
Streams.consumeFully(deserializeMetaBlobInputStream);
deserializeMetaBlobInputStream.verifyFooter();
} catch (CorruptStateException cse) {
cse.addSuppressed(e);
throw cse;
} catch (Exception ex) {
e.addSuppressed(ex);
}
throw e;
}
}

/**
* Wrapper input stream for deserializing blobs that come with a Lucene header and footer in a streaming manner. It manually manages
* a read buffer to enable not reading into the last 16 bytes (the footer length) of the buffer via the standard read methods so that
* a parser backed by this stream will only see the blob's body.
*/
private static final class DeserializeMetaBlobInputStream extends FilterInputStream {

// checksum updated with all but the last 8 bytes read from the wrapped stream
private final CRC32 crc32 = new CRC32();

// Only the first buffer.length - 16 bytes are exposed by the read() methods; once the read position reaches 16 bytes from the end
// of the buffer the remaining 16 bytes are moved to the start of the buffer and the rest of the buffer is filled from the stream.
private final byte[] buffer = new byte[1024 * 8];

// the number of bytes in the buffer, in [0, buffer.length], equal to buffer.length unless the last fill hit EOF
private int bufferCount;

// the current read position within the buffer, in [0, bufferCount - 16]
private int bufferPos;

DeserializeMetaBlobInputStream(InputStream in) {
super(in);
}

@Override
public int read() throws IOException {
if (getAvailable() <= 0) {
return -1;
}
return buffer[bufferPos++];
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int remaining = len;
int read = 0;
while (remaining > 0) {
final int r = doRead(b, off + read, remaining);
if (r <= 0) {
break;
}
read += r;
remaining -= r;
}
if (len > 0 && remaining == len) {
// nothing to read, EOF
return -1;
}
return read;
}

@Override
public void close() throws IOException {
// not closing the wrapped stream
}

private int doRead(byte[] b, int off, int len) throws IOException {
final int available = getAvailable();
if (available < 0) {
return -1;
}
final int read = Math.min(available, len);
System.arraycopy(buffer, bufferPos, b, off, read);
bufferPos += read;
return read;
}

/**
* Verify footer of the bytes read by this stream the same way {@link CodecUtil#checkFooter(ChecksumIndexInput)} would.
*
* @throws CorruptStateException if footer is found to be corrupted
*/
void verifyFooter() throws CorruptStateException {
if (bufferCount - bufferPos != CodecUtil.footerLength()) {
throw new CorruptStateException(
"should have consumed all but 16 bytes from the buffer but saw buffer pos [" + bufferPos + "] and count ["
+ bufferCount + "]");
}
crc32.update(buffer, 0, bufferPos + 8);
final int magicFound = Numbers.bytesToInt(buffer, bufferPos);
if (magicFound != CodecUtil.FOOTER_MAGIC) {
throw new CorruptStateException("unexpected footer magic [" + magicFound + "]");
}
final int algorithmFound = Numbers.bytesToInt(buffer, bufferPos + 4);
if (algorithmFound != 0) {
throw new CorruptStateException("unexpected algorithm [" + algorithmFound + "]");
}
final long checksum = crc32.getValue();
final long checksumInFooter = Numbers.bytesToLong(buffer, bufferPos + 8);
if (checksum != checksumInFooter) {
throw new CorruptStateException("checksums do not match read [" + checksum + "] but expected [" + checksumInFooter + "]");
}
}

/**
* @return true if the next bytes in this stream are compressed
*/
boolean nextBytesCompressed() {
// we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling
// this method
assert bufferPos > 0 : "buffer position must be greater than 0 but was [" + bufferPos + "]";
return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe assert that bufferPos > 0?

}

/**
* @return the number of bytes available in the buffer, possibly refilling the buffer if needed
*/
private int getAvailable() throws IOException {
final int footerLen = CodecUtil.footerLength();
if (bufferCount == 0) {
// first read, fill the buffer
bufferCount = Streams.readFully(in, buffer, 0, buffer.length);
} else if (bufferPos == bufferCount - footerLen) {
// crc and discard all but the last 16 bytes in the buffer that might be the footer bytes
assert bufferCount >= footerLen;
crc32.update(buffer, 0, bufferPos);
System.arraycopy(buffer, bufferPos, buffer, 0, footerLen);
bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen);
bufferPos = 0;
}
// bytes in the buffer minus 16 bytes that could be the footer
return bufferCount - bufferPos - footerLen;
}
}

Expand Down
Loading