Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
6930e8b
Respect generational files in recoveryDiff
DaveCTurner Apr 15, 2020
c159fc8
Imports
DaveCTurner Apr 15, 2020
14f0a3f
Merge branch 'master' into 2020-04-15-dont-copy-liv-file
DaveCTurner May 13, 2020
b68c5d7
Merge branch 'master' into 2020-04-15-dont-copy-liv-file
DaveCTurner May 13, 2020
85e85ad
WIP add support for writer-assigned UUIDs introduced in Lucene 8.6
DaveCTurner May 13, 2020
7f453b3
Merge remote-tracking branch 'origin/master' into 2020-04-15-dont-cop…
fcofdez Sep 14, 2021
0edde82
Remove dated comment
fcofdez Sep 14, 2021
968a8d5
Fix SnapshotsRecoveryPlannerServiceTests
fcofdez Sep 14, 2021
a46ed7a
Merge remote-tracking branch 'origin/master' into 2020-04-15-dont-cop…
fcofdez Sep 23, 2021
da417ce
Compare contents directly when possible
fcofdez Sep 23, 2021
9d6d0ac
Merge remote-tracking branch 'origin/master' into 2020-04-15-dont-cop…
fcofdez Sep 28, 2021
7f2b8bc
Take into account version for FileInfo serialization
fcofdez Sep 28, 2021
57af839
Merge remote-tracking branch 'origin/master' into 2020-04-15-dont-cop…
fcofdez Sep 28, 2021
5cb099e
Merge branch 'master' into 2020-04-15-dont-copy-liv-file
elasticmachine Sep 29, 2021
90b8940
Merge remote-tracking branch 'origin/master' into 2020-04-15-dont-cop…
fcofdez Oct 4, 2021
3148a53
Review comments
fcofdez Oct 4, 2021
c6f7a45
Compute hashEqualContents eagerly
fcofdez Oct 4, 2021
8529206
Revert "Compute hashEqualContents eagerly"
fcofdez Oct 4, 2021
2f86d08
Merge branch 'master' into 2020-04-15-dont-copy-liv-file
elasticmachine Oct 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Objects;
import java.util.stream.IntStream;

import static org.elasticsearch.index.store.StoreFileMetadata.UNAVAILABLE_WRITER_UUID;

/**
* Shard snapshot metadata
*/
Expand All @@ -36,6 +38,7 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
* Information about snapshotted file
*/
public static class FileInfo implements Writeable {
public static final String SERIALIZE_WRITER_UUID = "serialize_writer_uuid";

private final String name;
private final ByteSizeValue partSize;
Expand Down Expand Up @@ -238,14 +241,15 @@ public boolean isSame(FileInfo fileInfo) {
static final String PART_SIZE = "part_size";
static final String WRITTEN_BY = "written_by";
static final String META_HASH = "meta_hash";
static final String WRITER_UUID = "writer_uuid";

/**
* Serializes file info into JSON
*
* @param file file info
* @param builder XContent builder
*/
public static void toXContent(FileInfo file, XContentBuilder builder) throws IOException {
public static void toXContent(FileInfo file, XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NAME, file.name);
builder.field(PHYSICAL_NAME, file.metadata.name());
Expand All @@ -259,10 +263,19 @@ public static void toXContent(FileInfo file, XContentBuilder builder) throws IOE
builder.field(WRITTEN_BY, file.metadata.writtenBy());
}

if (file.metadata.hash() != null && file.metadata().hash().length > 0) {
BytesRef br = file.metadata.hash();
builder.field(META_HASH, br.bytes, br.offset, br.length);
final BytesRef hash = file.metadata.hash();
if (hash != null && hash.length > 0) {
builder.field(META_HASH, hash.bytes, hash.offset, hash.length);
}

final BytesRef writerUuid = file.metadata.writerUuid();
// We serialize by default when SERIALIZE_WRITER_UUID is not present since in deletes/clones
// we read the serialized files from the blob store and we enforce the version invariants when
// the snapshot was done
if (writerUuid.length > 0 && params.paramAsBoolean(SERIALIZE_WRITER_UUID, true)) {
builder.field(WRITER_UUID, writerUuid.bytes, writerUuid.offset, writerUuid.length);
}

builder.endObject();
}

Expand All @@ -281,6 +294,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
ByteSizeValue partSize = null;
String writtenBy = null;
BytesRef metaHash = new BytesRef();
BytesRef writerUuid = UNAVAILABLE_WRITER_UUID;
XContentParserUtils.ensureExpectedToken(token, XContentParser.Token.START_OBJECT, parser);
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
Expand All @@ -303,6 +317,9 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
metaHash.bytes = parser.binaryValue();
metaHash.offset = 0;
metaHash.length = metaHash.bytes.length;
} else if (WRITER_UUID.equals(currentFieldName)) {
writerUuid = new BytesRef(parser.binaryValue());
assert writerUuid.length > 0;
} else {
XContentParserUtils.throwUnknownField(currentFieldName, parser.getTokenLocation());
}
Expand All @@ -326,7 +343,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
} else if (checksum == null) {
throw new ElasticsearchParseException("missing checksum for name [" + name + "]");
}
return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash), partSize);
return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize);
}

@Override
Expand Down Expand Up @@ -503,7 +520,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(INCREMENTAL_SIZE, incrementalSize);
builder.startArray(FILES);
for (FileInfo fileInfo : indexFiles) {
FileInfo.toXContent(fileInfo, builder);
FileInfo.toXContent(fileInfo, builder, params);
}
builder.endArray();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
// First we list all blobs with their file infos:
builder.startArray(Fields.FILES);
for (Map.Entry<String, FileInfo> entry : files.entrySet()) {
FileInfo.toXContent(entry.getValue(), builder);
FileInfo.toXContent(entry.getValue(), builder, params);
}
builder.endArray();
// Then we list all snapshots with list of all blobs that are used by the snapshot
Expand Down
160 changes: 97 additions & 63 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
Expand Down Expand Up @@ -90,6 +90,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -834,16 +835,22 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}

final BytesRef segmentInfoId = StoreFileMetadata.toWriterUuid(info.info.getId());
final BytesRef segmentCommitInfoId = StoreFileMetadata.toWriterUuid(info.getId());

for (String file : info.files()) {
checksumFromLuceneFile(directory, file, builder, logger, version.toString(),
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)));
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)),
IndexFileNames.parseGeneration(file) == 0 ? segmentInfoId : segmentCommitInfoId);
}
}
if (maxVersion == null) {
maxVersion = org.elasticsearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion.toString(), true);
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion.toString(), true,
StoreFileMetadata.toWriterUuid(segmentCommitInfos.getId()));
} catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we either know the index is corrupted or it's just not there
throw ex;
Expand All @@ -869,7 +876,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}

private static void checksumFromLuceneFile(Directory directory, String file, Map<String, StoreFileMetadata> builder,
Logger logger, String version, boolean readFileAsHash) throws IOException {
Logger logger, String version, boolean readFileAsHash, BytesRef writerUuid) throws IOException {
final String checksum;
final BytesRefBuilder fileHash = new BytesRefBuilder();
try (IndexInput in = directory.openInput(file, READONCE_CHECKSUM)) {
Expand All @@ -894,7 +901,7 @@ private static void checksumFromLuceneFile(Directory directory, String file, Map
logger.debug(() -> new ParameterizedMessage("Can retrieve checksum from file [{}]", file), ex);
throw ex;
}
builder.put(file, new StoreFileMetadata(file, length, checksum, version, fileHash.get()));
builder.put(file, new StoreFileMetadata(file, length, checksum, version, fileHash.get(), writerUuid));
}
}

Expand Down Expand Up @@ -923,8 +930,6 @@ public Map<String, StoreFileMetadata> asMap() {
return metadata;
}

private static final String DEL_FILE_EXTENSION = "del"; // legacy delete file
private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file
private static final String SEGMENT_INFO_EXTENSION = "si";

/**
Expand All @@ -935,77 +940,107 @@ public Map<String, StoreFileMetadata> asMap() {
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
* This method groups file into per-segment files and per-commit files. A file is treated as
* identical if and on if all files in it's group are identical. On a per-segment level files for a segment are treated
* as identical iff:
* <ul>
* <li>all files in this segment have the same checksum</li>
* <li>all files in this segment have the same length</li>
* <li>the segments {@code .si} files hashes are byte-identical Note: This is a using a perfect hash function,
* The metadata transfers the {@code .si} file content as it's hash</li>
* </ul>
* <p>
* The {@code .si} file contains a lot of diagnostics including a timestamp etc. in the future there might be
* unique segment identifiers in there hardening this method further.
* Individual files are compared by name, length, checksum and (if present) a UUID that was assigned when the file was originally
* written. The segment info ({@code *.si}) files and the segments file ({@code segments_N}) are also checked to be a byte-for-byte
* match.
* <p>
* The per-commit files handles very similar. A commit is composed of the {@code segments_N} files as well as generational files
* like deletes ({@code _x_y.del}) or field-info ({@code _x_y.fnm}) files. On a per-commit level files for a commit are treated
* as identical iff:
* <ul>
* <li>all files belonging to this commit have the same checksum</li>
* <li>all files belonging to this commit have the same length</li>
* <li>the segments file {@code segments_N} files hashes are byte-identical Note: This is a using a perfect hash function,
* The metadata transfers the {@code segments_N} file content as it's hash</li>
* </ul>
* Files are collected together into a group for each segment plus one group of "per-commit" ({@code segments_N}) files. Each
* per-segment group is subdivided into a nongenerational group (most of them) and a generational group (e.g. {@code *.liv},
* {@code *.fnm}, {@code *.dvm}, {@code *.dvd} that have been updated by subsequent commits).
* <p>
* NOTE: this diff will not contain the {@code segments.gen} file. This file is omitted on recovery.
* For each segment, if any nongenerational files are different then the whole segment is considered to be different and will be
* recovered in full. If all the nongenerational files are the same but any generational files are different then all the
* generational files are considered to be different and will be recovered in full, but the nongenerational files are left alone.
* Finally, if any file is different then all the per-commit files are recovered too.
*/
public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
public RecoveryDiff recoveryDiff(final MetadataSnapshot targetSnapshot) {
final List<StoreFileMetadata> perCommitSourceFiles = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move the computation of perCommitSourceFiles and perSegmentSourceFiles just before the loop where it is used.

final Map<String, Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>>> perSegmentSourceFiles = new HashMap<>();
// per segment, a tuple of <<non-generational files, generational files>>

for (StoreFileMetadata sourceFile : this) {
if (sourceFile.name().startsWith("_")) {
final String segmentId = IndexFileNames.parseSegmentName(sourceFile.name());
final boolean isGenerationalFile = IndexFileNames.parseGeneration(sourceFile.name()) > 0L;
final Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>> perSegmentTuple = perSegmentSourceFiles
.computeIfAbsent(segmentId, k -> Tuple.tuple(new ArrayList<>(), new ArrayList<>()));
(isGenerationalFile ? perSegmentTuple.v2() : perSegmentTuple.v1()).add(sourceFile);
} else {
assert sourceFile.name().startsWith(IndexFileNames.SEGMENTS + "_") : "unexpected " + sourceFile;
perCommitSourceFiles.add(sourceFile);
}
}

final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();

for (StoreFileMetadata meta : this) {
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId) ||
DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
final List<StoreFileMetadata> tmpIdentical = new ArrayList<>(); // confirm whole group is identical before adding to 'identical'
final Predicate<List<StoreFileMetadata>> groupComparer = sourceGroup -> {
assert tmpIdentical.isEmpty() : "not cleaned up: " + tmpIdentical;
boolean groupIdentical = true;
for (StoreFileMetadata sourceFile : sourceGroup) {
final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name());
if (targetFile == null) {
groupIdentical = false;
missing.add(sourceFile);
} else if (groupIdentical && targetFile.isSame(sourceFile)) {
tmpIdentical.add(sourceFile);
} else {
groupIdentical = false;
different.add(sourceFile);
}
}
if (groupIdentical) {
identical.addAll(tmpIdentical);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
different.addAll(tmpIdentical);
}
}
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
consistent = false;
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
different.add(meta);
tmpIdentical.clear();
return groupIdentical;
};
final Consumer<List<StoreFileMetadata>> allDifferent = sourceGroup -> {
for (StoreFileMetadata sourceFile : sourceGroup) {
final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name());
if (targetFile == null) {
missing.add(sourceFile);
} else {
identicalFiles.add(meta);
different.add(sourceFile);
}
}
if (consistent) {
identical.addAll(identicalFiles);
};

boolean segmentsIdentical = true;

for (Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>> segmentFiles : perSegmentSourceFiles.values()) {
final List<StoreFileMetadata> nonGenerationalFiles = segmentFiles.v1();
final List<StoreFileMetadata> generationalFiles = segmentFiles.v2();

if (groupComparer.test(nonGenerationalFiles)) {
// non-generational files are identical, now check the generational files
segmentsIdentical = groupComparer.test(generationalFiles) && segmentsIdentical;
} else {
// make sure all files are added - this can happen if only the deletes are different
different.addAll(identicalFiles);
// non-generational files were different, so consider the whole segment as different
segmentsIdentical = false;
allDifferent.accept(generationalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical),
Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
assert recoveryDiff.size() == this.metadata.size()
: "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" +
this.metadata.size() + "]";

if (segmentsIdentical) {
// segments were the same, check the per-commit files
groupComparer.test(perCommitSourceFiles);
} else {
// at least one segment was different, so treat all the per-commit files as different too
allDifferent.accept(perCommitSourceFiles);
}

final RecoveryDiff recoveryDiff = new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing));
assert recoveryDiff.size() == metadata.size() : "some files are missing: recoveryDiff is [" + recoveryDiff
+ "] comparing: [" + metadata + "] to [" + targetSnapshot.metadata + "]";
return recoveryDiff;
}

Expand Down Expand Up @@ -1109,7 +1144,6 @@ public String toString() {
}
}


/**
* Returns true if the file is auto-generated by the store and shouldn't be deleted during cleanup.
* This includes write lock files
Expand Down
Loading