Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardSnapshotResult;
Expand Down Expand Up @@ -731,8 +730,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration(
() -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(
repository.shardContainer(repositoryShardId.index(), repositoryShardId.shardId()),
generation,
NamedXContentRegistry.EMPTY,
MockBigArrays.NON_RECYCLING_INSTANCE
NamedXContentRegistry.EMPTY
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,8 +977,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(
for (String indexMetaGeneration : indexMetaGenerations) {
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
try {
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry, bigArrays)
.getNumberOfShards();
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry).getNumberOfShards();
} catch (Exception ex) {
logger.warn(
() -> new ParameterizedMessage(
Expand Down Expand Up @@ -1435,7 +1434,7 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
@Override
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
try {
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException | NotXContentException ex) {
Expand All @@ -1446,7 +1445,7 @@ public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
@Override
public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
try {
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
Expand All @@ -1460,8 +1459,7 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna
return INDEX_METADATA_FORMAT.read(
indexContainer(index),
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index),
namedXContentRegistry,
bigArrays
namedXContentRegistry
);
} catch (NoSuchFileException e) {
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
Expand Down Expand Up @@ -3156,7 +3154,7 @@ private static List<String> unusedBlobs(
*/
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
try {
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays);
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
Expand Down Expand Up @@ -3188,7 +3186,7 @@ private Tuple<BlobStoreIndexShardSnapshots, String> buildBlobStoreIndexShardSnap
if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
}
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation);
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation);
}
final Tuple<BlobStoreIndexShardSnapshots, Long> legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
Expand All @@ -3207,8 +3205,7 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh
final BlobStoreIndexShardSnapshots shardSnapshots = INDEX_SHARD_SNAPSHOTS_FORMAT.read(
shardContainer,
Long.toString(latest),
namedXContentRegistry,
bigArrays
namedXContentRegistry
);
return new Tuple<>(shardSnapshots, latest);
} else if (blobs.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,39 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.zip.CRC32;

/**
* Snapshot metadata file format used in v2.0 and above
Expand Down Expand Up @@ -93,45 +92,180 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct
* @param name name to be translated into
* @return parsed blob object
*/
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays)
throws IOException {
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException {
String blobName = blobName(name);
try (
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
InputStream in = blobContainer.readBlob(blobName)
) {
Streams.copy(in, out, false);
return deserialize(blobName, namedXContentRegistry, out.bytes());
try (InputStream in = blobContainer.readBlob(blobName)) {
return deserialize(namedXContentRegistry, in);
}
}

public String blobName(String name) {
return String.format(Locale.ROOT, blobNameFormat, name);
}

public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException {
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input);
try {
final IndexInput indexInput = bytes.length() > 0
? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION);
final InputStream wrappedStream;
if (deserializeMetaBlobInputStream.nextBytesCompressed()) {
wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream);
} else {
wrappedStream = deserializeMetaBlobInputStream;
}
final T result;
try (
XContentParser parser = XContentHelper.createParser(
namedXContentRegistry,
LoggingDeprecationHandler.INSTANCE,
bytes.slice((int) filePointer, (int) contentSize),
XContentType.SMILE
)
XContentParser parser = XContentType.SMILE.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)
) {
return reader.apply(parser);
result = reader.apply(parser);
}
deserializeMetaBlobInputStream.verifyFooter();
return result;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we trick this into a dedicated exception with the original stacktrace
throw new CorruptStateException(ex);
} catch (Exception e) {
try {
// drain stream fully and check whether the footer is corrupted
Streams.consumeFully(deserializeMetaBlobInputStream);
deserializeMetaBlobInputStream.verifyFooter();
} catch (CorruptStateException cse) {
cse.addSuppressed(e);
throw cse;
} catch (Exception ex) {
e.addSuppressed(ex);
}
throw e;
}
}

/**
* Wrapper input stream for deserializing blobs that come with a Lucene header and footer in a streaming manner. It manually manages
* a read buffer to enable not reading into the last 16 bytes (the footer length) of the buffer via the standard read methods so that
* a parser backed by this stream will only see the blob's body.
*/
private static final class DeserializeMetaBlobInputStream extends FilterInputStream {

// checksum updated with all but the last 8 bytes read from the wrapped stream
private final CRC32 crc32 = new CRC32();

// Only the first buffer.length - 16 bytes are exposed by the read() methods; once the read position reaches 16 bytes from the end
// of the buffer the remaining 16 bytes are moved to the start of the buffer and the rest of the buffer is filled from the stream.
private final byte[] buffer = new byte[1024 * 8];

// the number of bytes in the buffer, in [0, buffer.length], equal to buffer.length unless the last fill hit EOF
private int bufferCount;

// the current read position within the buffer, in [0, bufferCount - 16]
private int bufferPos;

DeserializeMetaBlobInputStream(InputStream in) {
super(in);
}

@Override
public int read() throws IOException {
if (getAvailable() <= 0) {
return -1;
}
return buffer[bufferPos++];
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int remaining = len;
int read = 0;
while (remaining > 0) {
final int r = doRead(b, off + read, remaining);
if (r <= 0) {
break;
}
read += r;
remaining -= r;
}
if (len > 0 && remaining == len) {
// nothing to read, EOF
return -1;
}
return read;
}

@Override
public void close() throws IOException {
// not closing the wrapped stream
}

private int doRead(byte[] b, int off, int len) throws IOException {
final int available = getAvailable();
if (available < 0) {
return -1;
}
final int read = Math.min(available, len);
System.arraycopy(buffer, bufferPos, b, off, read);
bufferPos += read;
return read;
}

/**
* Verify footer of the bytes read by this stream the same way {@link CodecUtil#checkFooter(ChecksumIndexInput)} would.
*
* @throws CorruptStateException if footer is found to be corrupted
*/
void verifyFooter() throws CorruptStateException {
if (bufferCount - bufferPos != CodecUtil.footerLength()) {
throw new CorruptStateException(
"should have consumed all but 16 bytes from the buffer but saw buffer pos ["
+ bufferPos
+ "] and count ["
+ bufferCount
+ "]"
);
}
crc32.update(buffer, 0, bufferPos + 8);
final int magicFound = Numbers.bytesToInt(buffer, bufferPos);
if (magicFound != CodecUtil.FOOTER_MAGIC) {
throw new CorruptStateException("unexpected footer magic [" + magicFound + "]");
}
final int algorithmFound = Numbers.bytesToInt(buffer, bufferPos + 4);
if (algorithmFound != 0) {
throw new CorruptStateException("unexpected algorithm [" + algorithmFound + "]");
}
final long checksum = crc32.getValue();
final long checksumInFooter = Numbers.bytesToLong(buffer, bufferPos + 8);
if (checksum != checksumInFooter) {
throw new CorruptStateException("checksums do not match read [" + checksum + "] but expected [" + checksumInFooter + "]");
}
}

/**
* @return true if the next bytes in this stream are compressed
*/
boolean nextBytesCompressed() {
// we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling
// this method
assert bufferPos > 0 : "buffer position must be greater than 0 but was [" + bufferPos + "]";
return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos));
}

/**
* @return the number of bytes available in the buffer, possibly refilling the buffer if needed
*/
private int getAvailable() throws IOException {
final int footerLen = CodecUtil.footerLength();
if (bufferCount == 0) {
// first read, fill the buffer
bufferCount = Streams.readFully(in, buffer, 0, buffer.length);
} else if (bufferPos == bufferCount - footerLen) {
// crc and discard all but the last 16 bytes in the buffer that might be the footer bytes
assert bufferCount >= footerLen;
crc32.update(buffer, 0, bufferPos);
System.arraycopy(buffer, bufferPos, buffer, 0, footerLen);
bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen);
bufferPos = 0;
}
// bytes in the buffer minus 16 bytes that could be the footer
return bufferCount - bufferPos - footerLen;
}
}

Expand Down
Loading