Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.index.store.StoreFileMetadata.UNAVAILABLE_WRITER_UUID;

/**
* Shard snapshot metadata
*/
Expand Down Expand Up @@ -231,6 +233,7 @@ public boolean isSame(FileInfo fileInfo) {
static final String PART_SIZE = "part_size";
static final String WRITTEN_BY = "written_by";
static final String META_HASH = "meta_hash";
static final String WRITER_UUID = "writer_uuid";

/**
* Serializes file info into JSON
Expand All @@ -252,10 +255,16 @@ public static void toXContent(FileInfo file, XContentBuilder builder) throws IOE
builder.field(WRITTEN_BY, file.metadata.writtenBy());
}

if (file.metadata.hash() != null && file.metadata().hash().length > 0) {
BytesRef br = file.metadata.hash();
builder.field(META_HASH, br.bytes, br.offset, br.length);
final BytesRef hash = file.metadata.hash();
if (hash != null && hash.length > 0) {
builder.field(META_HASH, hash.bytes, hash.offset, hash.length);
}

final BytesRef writerUuid = file.metadata.writerUuid();
if (writerUuid.length > 0) {
builder.field(WRITER_UUID, writerUuid.bytes, writerUuid.offset, writerUuid.length);
}

builder.endObject();
}

Expand All @@ -275,6 +284,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
Version writtenBy = null;
String writtenByStr = null;
BytesRef metaHash = new BytesRef();
BytesRef writerUuid = UNAVAILABLE_WRITER_UUID;
if (token == XContentParser.Token.START_OBJECT) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
Expand All @@ -298,6 +308,9 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
metaHash.bytes = parser.binaryValue();
metaHash.offset = 0;
metaHash.length = metaHash.bytes.length;
} else if (WRITER_UUID.equals(currentFieldName)) {
writerUuid = new BytesRef(parser.binaryValue());
assert writerUuid.length > 0;
} else {
throw new ElasticsearchParseException("unknown parameter [{}]", currentFieldName);
}
Expand All @@ -322,7 +335,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
} else if (checksum == null) {
throw new ElasticsearchParseException("missing checksum for name [" + name + "]");
}
return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash), partSize);
return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize);
}

@Override
Expand Down
169 changes: 102 additions & 67 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -67,7 +68,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
Expand Down Expand Up @@ -101,6 +101,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -137,7 +138,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref

/**
* Specific {@link IOContext} used to verify Lucene files footer checksums.
* See {@link MetadataSnapshot#checksumFromLuceneFile(Directory, String, Map, Logger, Version, boolean)}
* See {@link MetadataSnapshot#checksumFromLuceneFile(Directory, String, Map, Logger, Version, boolean, BytesRef)}
*/
public static final IOContext READONCE_CHECKSUM = new IOContext(IOContext.READONCE.context);

Expand Down Expand Up @@ -827,16 +828,22 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}

final BytesRef segmentInfoId = StoreFileMetadata.toWriterUuid(info.info.getId());
final BytesRef segmentCommitInfoId = StoreFileMetadata.toWriterUuid(info.getId());

for (String file : info.files()) {
checksumFromLuceneFile(directory, file, builder, logger, version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)));
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)),
IndexFileNames.parseGeneration(file) == 0 ? segmentInfoId : segmentCommitInfoId);
}
}
if (maxVersion == null) {
maxVersion = org.elasticsearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true,
StoreFileMetadata.toWriterUuid(segmentCommitInfos.getId()));
} catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we either know the index is corrupted or it's just not there
throw ex;
Expand All @@ -862,7 +869,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}

private static void checksumFromLuceneFile(Directory directory, String file, Map<String, StoreFileMetadata> builder,
Logger logger, Version version, boolean readFileAsHash) throws IOException {
Logger logger, Version version, boolean readFileAsHash, BytesRef writerUuid) throws IOException {
final String checksum;
final BytesRefBuilder fileHash = new BytesRefBuilder();
try (IndexInput in = directory.openInput(file, READONCE_CHECKSUM)) {
Expand All @@ -887,7 +894,7 @@ private static void checksumFromLuceneFile(Directory directory, String file, Map
logger.debug(() -> new ParameterizedMessage("Can retrieve checksum from file [{}]", file), ex);
throw ex;
}
builder.put(file, new StoreFileMetadata(file, length, checksum, version, fileHash.get()));
builder.put(file, new StoreFileMetadata(file, length, checksum, version, fileHash.get(), writerUuid));
}
}

Expand Down Expand Up @@ -916,8 +923,6 @@ public Map<String, StoreFileMetadata> asMap() {
return metadata;
}

private static final String DEL_FILE_EXTENSION = "del"; // legacy delete file
private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file
private static final String SEGMENT_INFO_EXTENSION = "si";

/**
Expand All @@ -928,80 +933,111 @@ public Map<String, StoreFileMetadata> asMap() {
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
* This method groups file into per-segment files and per-commit files. A file is treated as
* identical if and on if all files in it's group are identical. On a per-segment level files for a segment are treated
* as identical iff:
* <ul>
* <li>all files in this segment have the same checksum</li>
* <li>all files in this segment have the same length</li>
* <li>the segments {@code .si} files hashes are byte-identical Note: This is a using a perfect hash function,
* The metadata transfers the {@code .si} file content as it's hash</li>
* </ul>
* <p>
* The {@code .si} file contains a lot of diagnostics including a timestamp etc. in the future there might be
* unique segment identifiers in there hardening this method further.
* Individual files are compared by name, length, checksum and (if present) a UUID that was assigned when the file was originally
* written. The segment info ({@code *.si}) files and the segments file ({@code segments_N}) are also checked to be a byte-for-byte
* match.
* <p>
* The per-commit files handles very similar. A commit is composed of the {@code segments_N} files as well as generational files
* like deletes ({@code _x_y.del}) or field-info ({@code _x_y.fnm}) files. On a per-commit level files for a commit are treated
* as identical iff:
* <ul>
* <li>all files belonging to this commit have the same checksum</li>
* <li>all files belonging to this commit have the same length</li>
* <li>the segments file {@code segments_N} files hashes are byte-identical Note: This is a using a perfect hash function,
* The metadata transfers the {@code segments_N} file content as it's hash</li>
* </ul>
* Files are collected together into a group for each segment plus one group of "per-commit" ({@code segments_N}) files. Each
* per-segment group is subdivided into a nongenerational group (most of them) and a generational group (e.g. {@code *.liv},
* {@code *.fnm}, {@code *.dvm}, {@code *.dvd} that have been updated by subsequent commits).
* <p>
* NOTE: this diff will not contain the {@code segments.gen} file. This file is omitted on recovery.
* For each segment, if any nongenerational files are different then the whole segment is considered to be different and will be
* recovered in full. If all the nongenerational files are the same but any generational files are different then all the
* generational files are considered to be different and will be recovered in full, but the nongenerational files are left alone.
* Finally, if any file is different then all the per-commit files are recovered too.
*/
public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
/* Future work: the {@code *.si} file includes {@link SegmentInfo#getId()} which is a globally unique identifier for the
* nongenerational files in the segment so we could compare that instead of using the files lengths and checksums. We may also get a
* similar ID for the generational files in https://issues.apache.org/jira/browse/LUCENE-9324.
* TODO follow up once this Lucene discussion closes.
*/
public RecoveryDiff recoveryDiff(final MetadataSnapshot targetSnapshot) {
final List<StoreFileMetadata> perCommitSourceFiles = new ArrayList<>();
final Map<String, Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>>> perSegmentSourceFiles = new HashMap<>();
// per segment, a tuple of <<non-generational files, generational files>>

for (StoreFileMetadata sourceFile : this) {
if (sourceFile.name().startsWith("_")) {
final String segmentId = IndexFileNames.parseSegmentName(sourceFile.name());
final long generation = IndexFileNames.parseGeneration(sourceFile.name());
final Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>> perSegmentTuple = perSegmentSourceFiles
.computeIfAbsent(segmentId, k -> Tuple.tuple(new ArrayList<>(), new ArrayList<>()));
(generation == 0 ? perSegmentTuple.v1() : perSegmentTuple.v2()).add(sourceFile);
} else {
assert sourceFile.name().startsWith(IndexFileNames.SEGMENTS + "_") : "unexpected " + sourceFile;
perCommitSourceFiles.add(sourceFile);
}
}

final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();

for (StoreFileMetadata meta : this) {
if (IndexFileNames.OLD_SEGMENTS_GEN.equals(meta.name())) { // legacy
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is no longer relevant but looking for confirmation from someone who knows the history here.

continue; // we don't need that file at all
final List<StoreFileMetadata> tmpIdentical = new ArrayList<>(); // confirm whole group is identical before adding to 'identical'
final Predicate<List<StoreFileMetadata>> groupComparer = sourceGroup -> {
assert tmpIdentical.isEmpty() : "not cleaned up: " + tmpIdentical;
boolean groupIdentical = true;
for (StoreFileMetadata sourceFile : sourceGroup) {
final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name());
if (targetFile == null) {
groupIdentical = false;
missing.add(sourceFile);
} else if (groupIdentical && targetFile.isSame(sourceFile)) {
tmpIdentical.add(sourceFile);
} else {
groupIdentical = false;
different.add(sourceFile);
}
}
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId) ||
DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
if (groupIdentical) {
identical.addAll(tmpIdentical);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
different.addAll(tmpIdentical);
}
}
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
consistent = false;
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
different.add(meta);
tmpIdentical.clear();
return groupIdentical;
};
final Consumer<List<StoreFileMetadata>> allDifferent = sourceGroup -> {
for (StoreFileMetadata sourceFile : sourceGroup) {
final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name());
if (targetFile == null) {
missing.add(sourceFile);
} else {
identicalFiles.add(meta);
different.add(sourceFile);
}
}
if (consistent) {
identical.addAll(identicalFiles);
};

boolean segmentsIdentical = true;

for (Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>> segmentFiles : perSegmentSourceFiles.values()) {
final List<StoreFileMetadata> nonGenerationalFiles = segmentFiles.v1();
final List<StoreFileMetadata> generationalFiles = segmentFiles.v2();

if (groupComparer.test(nonGenerationalFiles)) {
// non-generational files are identical, now check the generational files
segmentsIdentical = groupComparer.test(generationalFiles) && segmentsIdentical;
} else {
// make sure all files are added - this can happen if only the deletes are different
different.addAll(identicalFiles);
// non-generational files were different, so consider the whole segment as different
segmentsIdentical = false;
allDifferent.accept(generationalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical),
Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0)
: "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" +
this.metadata.size() + "] contains segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]";

if (segmentsIdentical) {
// segments were the same, check the per-commit files
groupComparer.test(perCommitSourceFiles);
} else {
// at least one segment was different, so treat all the per-commit files as different too
allDifferent.accept(perCommitSourceFiles);
}

final RecoveryDiff recoveryDiff = new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing));
assert recoveryDiff.size() == metadata.size() : "some files are missing: recoveryDiff is [" + recoveryDiff
+ "] comparing: [" + metadata + "] to [" + targetSnapshot.metadata + "]";
return recoveryDiff;
}

Expand Down Expand Up @@ -1105,7 +1141,6 @@ public String toString() {
}
}


/**
* Returns true if the file is auto-generated by the store and shouldn't be deleted during cleanup.
* This includes write lock and checksum files
Expand Down
Loading