From 6930e8b0e3fba03ecf78578b2df35c2b22d19e08 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 15 Apr 2020 12:49:34 +0100 Subject: [PATCH 1/3] Respect generational files in recoveryDiff Today `MetadataSnapshot#recoveryDiff` considers the `.liv` file as per-commit rather than per-segment and often transfers them during peer recoveries and snapshot restores. It also considers differences in `.fnm`, `.dvd` and `.dvm` files as indicating a difference in the whole segment, even though these files may be adjusted without changing the segment itself. This commit adjusts this logic to attach these generational files to the segments themselves, allowing Elasticsearch only to transfer them if they are genuinely needed. Closes #55142 Resolves an outstanding `//NORELEASE` action related to #50999. --- .../org/elasticsearch/index/store/Store.java | 153 +++++++++++------- .../elasticsearch/index/store/StoreTests.java | 134 ++++++++++----- .../SearchableSnapshotsIntegTests.java | 3 +- 3 files changed, 183 insertions(+), 107 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 5aa99e818208f..de7c1d17f36cb 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BufferedChecksum; @@ -53,6 +54,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -67,7 +69,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.RefCounted; -import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; @@ -101,6 +102,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -910,8 +912,6 @@ public Map asMap() { return metadata; } - private static final String DEL_FILE_EXTENSION = "del"; // legacy delete file - private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file private static final String SEGMENT_INFO_EXTENSION = "si"; /** @@ -922,80 +922,110 @@ public Map asMap() { *
  • different: they exist in both snapshots but their they are not identical
  • *
  • missing: files that exist in the source but not in the target
  • * - * This method groups file into per-segment files and per-commit files. A file is treated as - * identical if and on if all files in it's group are identical. On a per-segment level files for a segment are treated - * as identical iff: - *
      - *
    • all files in this segment have the same checksum
    • - *
    • all files in this segment have the same length
    • - *
    • the segments {@code .si} files hashes are byte-identical Note: This is a using a perfect hash function, - * The metadata transfers the {@code .si} file content as it's hash
    • - *
    *

    - * The {@code .si} file contains a lot of diagnostics including a timestamp etc. in the future there might be - * unique segment identifiers in there hardening this method further. + * Individual files are compared by name, length and checksum. The segment info ({@code *.si}) files and the segments file + * ({@code segments_N}) are also checked to be a byte-for-byte match. *

    - * The per-commit files handles very similar. A commit is composed of the {@code segments_N} files as well as generational files - * like deletes ({@code _x_y.del}) or field-info ({@code _x_y.fnm}) files. On a per-commit level files for a commit are treated - * as identical iff: - *

      - *
    • all files belonging to this commit have the same checksum
    • - *
    • all files belonging to this commit have the same length
    • - *
    • the segments file {@code segments_N} files hashes are byte-identical Note: This is a using a perfect hash function, - * The metadata transfers the {@code segments_N} file content as it's hash
    • - *
    + * Files are collected together into a group for each segment plus one group of "per-commit" ({@code segments_N}) files. Each + * per-segment group is subdivided into a nongenerational group (most of them) and a generational group (e.g. {@code *.liv}, + * {@code *.fnm}, {@code *.dvm}, {@code *.dvd} that have been updated by subsequent commits). *

    - * NOTE: this diff will not contain the {@code segments.gen} file. This file is omitted on recovery. + * For each segment, if any nongenerational files are different then the whole segment is considered to be different and will be + * recovered in full. If all the nongenerational files are the same but any generational files are different then all the + * generational files are considered to be different and will be recovered in full, but the nongenerational files are left alone. + * Finally, if any file is different then all the per-commit files are recovered too. + */ + /* Future work: the {@code *.si} file includes {@link SegmentInfo#getId()} which is a globally unique identifier for the + * nongenerational files in the segment so we could compare that instead of using the files lengths and checksums. We may also get a + * similar ID for the generational files in https://issues.apache.org/jira/browse/LUCENE-9324. + * TODO follow up once this Lucene discussion closes. */ - public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { + public RecoveryDiff recoveryDiff(final MetadataSnapshot targetSnapshot) { + final List perCommitSourceFiles = new ArrayList<>(); + final Map, List>> perSegmentSourceFiles = new HashMap<>(); + // per segment, a tuple of <> + + for (StoreFileMetadata sourceFile : this) { + if (sourceFile.name().startsWith("_")) { + final String segmentId = IndexFileNames.parseSegmentName(sourceFile.name()); + final long generation = IndexFileNames.parseGeneration(sourceFile.name()); + final Tuple, List> perSegmentTuple = perSegmentSourceFiles + .computeIfAbsent(segmentId, k -> Tuple.tuple(new ArrayList<>(), new ArrayList<>())); + (generation == 0 ? perSegmentTuple.v1() : perSegmentTuple.v2()).add(sourceFile); + } else { + assert sourceFile.name().startsWith(IndexFileNames.SEGMENTS + "_") : "unexpected " + sourceFile; + perCommitSourceFiles.add(sourceFile); + } + } + final List identical = new ArrayList<>(); final List different = new ArrayList<>(); final List missing = new ArrayList<>(); - final Map> perSegment = new HashMap<>(); - final List perCommitStoreFiles = new ArrayList<>(); - - for (StoreFileMetadata meta : this) { - if (IndexFileNames.OLD_SEGMENTS_GEN.equals(meta.name())) { // legacy - continue; // we don't need that file at all + final List tmpIdentical = new ArrayList<>(); // confirm whole group is identical before adding to 'identical' + final Predicate> groupComparer = sourceGroup -> { + assert tmpIdentical.isEmpty() : "not cleaned up: " + tmpIdentical; + boolean groupIdentical = true; + for (StoreFileMetadata sourceFile : sourceGroup) { + final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name()); + if (targetFile == null) { + groupIdentical = false; + missing.add(sourceFile); + } else if (groupIdentical && targetFile.isSame(sourceFile)) { + tmpIdentical.add(sourceFile); + } else { + groupIdentical = false; + different.add(sourceFile); + } } - final String segmentId = IndexFileNames.parseSegmentName(meta.name()); - final String extension = IndexFileNames.getExtension(meta.name()); - if (IndexFileNames.SEGMENTS.equals(segmentId) || - DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) { - // only treat del files as per-commit files fnm files are generational but only for upgradable DV - perCommitStoreFiles.add(meta); + if (groupIdentical) { + identical.addAll(tmpIdentical); } else { - perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta); + different.addAll(tmpIdentical); } - } - final ArrayList identicalFiles = new ArrayList<>(); - for (List segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) { - identicalFiles.clear(); - boolean consistent = true; - for (StoreFileMetadata meta : segmentFiles) { - StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name()); - if (storeFileMetadata == null) { - consistent = false; - missing.add(meta); - } else if (storeFileMetadata.isSame(meta) == false) { - consistent = false; - different.add(meta); + tmpIdentical.clear(); + return groupIdentical; + }; + final Consumer> allDifferent = sourceGroup -> { + for (StoreFileMetadata sourceFile : sourceGroup) { + final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name()); + if (targetFile == null) { + missing.add(sourceFile); } else { - identicalFiles.add(meta); + different.add(sourceFile); } } - if (consistent) { - identical.addAll(identicalFiles); + }; + + boolean segmentsIdentical = true; + + for (Tuple, List> segmentFiles : perSegmentSourceFiles.values()) { + final List nonGenerationalFiles = segmentFiles.v1(); + final List generationalFiles = segmentFiles.v2(); + + if (groupComparer.test(nonGenerationalFiles)) { + // non-generational files are identical, now check the generational files + segmentsIdentical = groupComparer.test(generationalFiles) && segmentsIdentical; } else { - // make sure all files are added - this can happen if only the deletes are different - different.addAll(identicalFiles); + // non-generational files were different, so consider the whole segment as different + segmentsIdentical = false; + allDifferent.accept(generationalFiles); } } - RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical), - Collections.unmodifiableList(different), Collections.unmodifiableList(missing)); - assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0) - : "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" + - this.metadata.size() + "] contains segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]"; + + if (segmentsIdentical) { + // segments were the same, check the per-commit files + groupComparer.test(perCommitSourceFiles); + } else { + // at least one segment was different, so treat all the per-commit files as different too + allDifferent.accept(perCommitSourceFiles); + } + + final RecoveryDiff recoveryDiff = new RecoveryDiff( + Collections.unmodifiableList(identical), + Collections.unmodifiableList(different), + Collections.unmodifiableList(missing)); + assert recoveryDiff.size() == metadata.size() : "some files are missing: recoveryDiff is [" + recoveryDiff + + "] comparing: [" + metadata + "] to [" + targetSnapshot.metadata + "]"; return recoveryDiff; } @@ -1099,7 +1129,6 @@ public String toString() { } } - /** * Returns true if the file is auto-generated by the store and shouldn't be deleted during cleanup. * This includes write lock and checksum files diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 33d6b7d59a850..f3cb040d29679 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -20,6 +20,7 @@ import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.SortedDocValuesField; @@ -485,11 +486,16 @@ public void testRecoveryDiff() throws IOException, InterruptedException { List docs = new ArrayList<>(); for (int i = 0; i < numDocs; i++) { Document doc = new Document(); - doc.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); - doc.add(new TextField("body", - TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); - doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random())))); + final Field.Store stringFieldStored = random().nextBoolean() ? Field.Store.YES : Field.Store.NO; + doc.add(new StringField("id", "" + i, stringFieldStored)); + final String textFieldContent = TestUtil.randomRealisticUnicodeString(random()); + final Field.Store textFieldStored = random().nextBoolean() ? Field.Store.YES : Field.Store.NO; + doc.add(new TextField("body", textFieldContent, textFieldStored)); + final String docValueFieldContent = TestUtil.randomRealisticUnicodeString(random()); + doc.add(new BinaryDocValuesField("dv", new BytesRef(docValueFieldContent))); docs.add(doc); + logger.info("--> doc [{}] id=[{}] (store={}) body=[{}] (store={}) dv=[{}]", + i, i, stringFieldStored, textFieldContent, textFieldStored, docValueFieldContent); } long seed = random().nextLong(); Store.MetadataSnapshot first; @@ -504,9 +510,8 @@ public void testRecoveryDiff() throws IOException, InterruptedException { final boolean lotsOfSegments = rarely(random); for (Document d : docs) { writer.addDocument(d); - if (lotsOfSegments && random.nextBoolean()) { - writer.commit(); - } else if (rarely(random)) { + if (lotsOfSegments && random.nextBoolean() || rarely(random)) { + logger.info("--> commit after doc {}", d.getField("id").stringValue()); writer.commit(); } } @@ -533,9 +538,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException { final boolean lotsOfSegments = rarely(random); for (Document d : docs) { writer.addDocument(d); - if (lotsOfSegments && random.nextBoolean()) { - writer.commit(); - } else if (rarely(random)) { + if (lotsOfSegments && random.nextBoolean() || rarely(random)) { writer.commit(); } } @@ -560,35 +563,40 @@ public void testRecoveryDiff() throws IOException, InterruptedException { assertThat(selfDiff.different, empty()); assertThat(selfDiff.missing, empty()); - - // lets add some deletes - Random random = new Random(seed); - IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); - iwc.setMergePolicy(NoMergePolicy.INSTANCE); - iwc.setUseCompoundFile(random.nextBoolean()); - iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); - IndexWriter writer = new IndexWriter(store.directory(), iwc); - writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(numDocs)))); - writer.commit(); - writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(null); + // delete a doc + final String deleteId = Integer.toString(random().nextInt(numDocs)); + Store.MetadataSnapshot metadata; + { + Random random = new Random(seed); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + iwc.setUseCompoundFile(random.nextBoolean()); + iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); + IndexWriter writer = new IndexWriter(store.directory(), iwc); + logger.info("--> delete doc {}", deleteId); + writer.deleteDocuments(new Term("id", deleteId)); + writer.commit(); + writer.close(); + metadata = store.getMetadata(null); + } StoreFileMetadata delFile = null; for (StoreFileMetadata md : metadata) { if (md.name().endsWith(".liv")) { delFile = md; + logger.info("--> delFile=[{}]", delFile); break; } } Store.RecoveryDiff afterDeleteDiff = metadata.recoveryDiff(second); if (delFile != null) { - assertThat(afterDeleteDiff.identical.size(), equalTo(metadata.size() - 2)); // segments_N + del file - assertThat(afterDeleteDiff.different.size(), equalTo(0)); - assertThat(afterDeleteDiff.missing.size(), equalTo(2)); + assertThat(afterDeleteDiff.toString(), afterDeleteDiff.identical.size(), equalTo(metadata.size() - 2)); // segments_N + del file + assertThat(afterDeleteDiff.toString(), afterDeleteDiff.different.size(), equalTo(0)); + assertThat(afterDeleteDiff.toString(), afterDeleteDiff.missing.size(), equalTo(2)); } else { // an entire segment must be missing (single doc segment got dropped) - assertThat(afterDeleteDiff.identical.size(), greaterThan(0)); - assertThat(afterDeleteDiff.different.size(), equalTo(0)); - assertThat(afterDeleteDiff.missing.size(), equalTo(1)); // the commit file is different + assertThat(afterDeleteDiff.toString(), afterDeleteDiff.identical.size(), greaterThan(0)); + assertThat(afterDeleteDiff.toString(), afterDeleteDiff.different.size(), equalTo(0)); + assertThat(afterDeleteDiff.toString(), afterDeleteDiff.missing.size(), equalTo(1)); // the commit file is different } // check the self diff @@ -598,30 +606,70 @@ public void testRecoveryDiff() throws IOException, InterruptedException { assertThat(selfDiff.missing, empty()); // add a new commit - iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); - iwc.setMergePolicy(NoMergePolicy.INSTANCE); - iwc.setUseCompoundFile(true); // force CFS - easier to test here since we know it will add 3 files - iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); - writer = new IndexWriter(store.directory(), iwc); - writer.addDocument(docs.get(0)); - writer.close(); + { + Random random = new Random(seed); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + iwc.setUseCompoundFile(true); // force CFS - easier to test here since we know it will add 3 files + iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); + IndexWriter writer = new IndexWriter(store.directory(), iwc); + logger.info("--> add new empty doc"); + writer.addDocument(new Document()); + writer.close(); + } Store.MetadataSnapshot newCommitMetadata = store.getMetadata(null); Store.RecoveryDiff newCommitDiff = newCommitMetadata.recoveryDiff(metadata); if (delFile != null) { - assertThat(newCommitDiff.identical.size(), - equalTo(newCommitMetadata.size() - 5)); // segments_N, del file, cfs, cfe, si for the new segment - assertThat(newCommitDiff.different.size(), equalTo(1)); // the del file must be different - assertThat(newCommitDiff.different.get(0).name(), endsWith(".liv")); - assertThat(newCommitDiff.missing.size(), equalTo(4)); // segments_N,cfs, cfe, si for the new segment + assertThat(newCommitDiff.toString(), newCommitDiff.identical.size(), + equalTo(newCommitMetadata.size() - 4)); // segments_N, cfs, cfe, si for the new segment + assertThat(newCommitDiff.toString(), newCommitDiff.different.size(), equalTo(0)); // the del file must be different + assertThat(newCommitDiff.toString(), newCommitDiff.missing.size(), equalTo(4)); // segments_N,cfs, cfe, si for the new segment + assertTrue(newCommitDiff.toString(), newCommitDiff.identical.stream().anyMatch(m -> m.name().endsWith(".liv"))); } else { - assertThat(newCommitDiff.identical.size(), + assertThat(newCommitDiff.toString(), newCommitDiff.identical.size(), equalTo(newCommitMetadata.size() - 4)); // segments_N, cfs, cfe, si for the new segment - assertThat(newCommitDiff.different.size(), equalTo(0)); - assertThat(newCommitDiff.missing.size(), + assertThat(newCommitDiff.toString(), newCommitDiff.different.size(), equalTo(0)); + assertThat(newCommitDiff.toString(), newCommitDiff.missing.size(), equalTo(4)); // an entire segment must be missing (single doc segment got dropped) plus the commit is different } + // update doc values + Store.MetadataSnapshot dvUpdateSnapshot; + final String updateId = randomValueOtherThan(deleteId, () -> Integer.toString(random().nextInt(numDocs))); + { + Random random = new Random(seed); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + iwc.setUseCompoundFile(random.nextBoolean()); + iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); + IndexWriter writer = new IndexWriter(store.directory(), iwc); + final String newDocValue = TestUtil.randomRealisticUnicodeString(random()); + logger.info("--> update doc [{}] with dv=[{}]", updateId, newDocValue); + writer.updateBinaryDocValue(new Term("id", updateId), "dv", new BytesRef(newDocValue)); + writer.commit(); + writer.close(); + dvUpdateSnapshot = store.getMetadata(null); + } + logger.info("--> source: {}", dvUpdateSnapshot.asMap()); + logger.info("--> target: {}", newCommitMetadata.asMap()); + Store.RecoveryDiff dvUpdateDiff = dvUpdateSnapshot.recoveryDiff(newCommitMetadata); + final int delFileCount; + if (delFile == null || dvUpdateDiff.different.isEmpty()) { + // liv file either doesn't exist or belongs to a different segment from the one that we just updated + delFileCount = 0; + assertThat(dvUpdateDiff.toString(), dvUpdateDiff.different, empty()); + } else { + // liv file is generational and belongs to the updated segment + delFileCount = 1; + assertThat(dvUpdateDiff.toString(), dvUpdateDiff.different.size(), equalTo(1)); + assertThat(dvUpdateDiff.toString(), dvUpdateDiff.different.get(0).name(), endsWith(".liv")); + } + + assertThat(dvUpdateDiff.toString(), dvUpdateDiff.identical.size(), equalTo(dvUpdateSnapshot.size() - 4 - delFileCount)); + assertThat(dvUpdateDiff.toString(), dvUpdateDiff.different.size(), equalTo(delFileCount)); + assertThat(dvUpdateDiff.toString(), dvUpdateDiff.missing.size(), equalTo(4)); // segments_N, fnm, dvd, dvm for the updated segment + deleteContent(store.directory()); IOUtils.close(store); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 8bc0fc1b67120..de9acf2cf340f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -76,8 +76,7 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { for (int i = between(10, 10_000); i >= 0; i--) { indexRequestBuilders.add(client().prepareIndex(indexName).setSource("foo", randomBoolean() ? "bar" : "baz")); } - // TODO NORELEASE no dummy docs since that includes deletes, yet we always copy the .liv file in peer recovery - indexRandom(true, false, indexRequestBuilders); + indexRandom(true, true, indexRequestBuilders); refresh(indexName); assertThat( client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(), From c159fc8a4dd5b8b0a26a1f86ff1e69bcb5d3cfdf Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 15 Apr 2020 15:41:42 +0100 Subject: [PATCH 2/3] Imports --- server/src/main/java/org/elasticsearch/index/store/Store.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index de7c1d17f36cb..25971b40eb53a 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -34,7 +34,6 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BufferedChecksum; From 85e85ade509752fea644dba527487e85c09a058b Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 13 May 2020 13:09:15 +0100 Subject: [PATCH 3/3] WIP add support for writer-assigned UUIDs introduced in Lucene 8.6 --- .../BlobStoreIndexShardSnapshot.java | 21 +++++- .../org/elasticsearch/index/store/Store.java | 21 ++++-- .../index/store/StoreFileMetadata.java | 73 ++++++++++++++++++- .../snapshots/blobstore/FileInfoTests.java | 23 +++--- 4 files changed, 114 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java index ccb44cd9550ee..a4d93e328a89c 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java @@ -36,6 +36,8 @@ import java.util.Collections; import java.util.List; +import static org.elasticsearch.index.store.StoreFileMetadata.UNAVAILABLE_WRITER_UUID; + /** * Shard snapshot metadata */ @@ -231,6 +233,7 @@ public boolean isSame(FileInfo fileInfo) { static final String PART_SIZE = "part_size"; static final String WRITTEN_BY = "written_by"; static final String META_HASH = "meta_hash"; + static final String WRITER_UUID = "writer_uuid"; /** * Serializes file info into JSON @@ -252,10 +255,16 @@ public static void toXContent(FileInfo file, XContentBuilder builder) throws IOE builder.field(WRITTEN_BY, file.metadata.writtenBy()); } - if (file.metadata.hash() != null && file.metadata().hash().length > 0) { - BytesRef br = file.metadata.hash(); - builder.field(META_HASH, br.bytes, br.offset, br.length); + final BytesRef hash = file.metadata.hash(); + if (hash != null && hash.length > 0) { + builder.field(META_HASH, hash.bytes, hash.offset, hash.length); + } + + final BytesRef writerUuid = file.metadata.writerUuid(); + if (writerUuid.length > 0) { + builder.field(WRITER_UUID, writerUuid.bytes, writerUuid.offset, writerUuid.length); } + builder.endObject(); } @@ -275,6 +284,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException { Version writtenBy = null; String writtenByStr = null; BytesRef metaHash = new BytesRef(); + BytesRef writerUuid = UNAVAILABLE_WRITER_UUID; if (token == XContentParser.Token.START_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -298,6 +308,9 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException { metaHash.bytes = parser.binaryValue(); metaHash.offset = 0; metaHash.length = metaHash.bytes.length; + } else if (WRITER_UUID.equals(currentFieldName)) { + writerUuid = new BytesRef(parser.binaryValue()); + assert writerUuid.length > 0; } else { throw new ElasticsearchParseException("unknown parameter [{}]", currentFieldName); } @@ -322,7 +335,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException { } else if (checksum == null) { throw new ElasticsearchParseException("missing checksum for name [" + name + "]"); } - return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash), partSize); + return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 1a19a6feaefe5..28e6b03cbbfd7 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -138,7 +138,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref /** * Specific {@link IOContext} used to verify Lucene files footer checksums. - * See {@link MetadataSnapshot#checksumFromLuceneFile(Directory, String, Map, Logger, Version, boolean)} + * See {@link MetadataSnapshot#checksumFromLuceneFile(Directory, String, Map, Logger, Version, boolean, BytesRef)} */ public static final IOContext READONCE_CHECKSUM = new IOContext(IOContext.READONCE.context); @@ -828,16 +828,22 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg if (version.onOrAfter(maxVersion)) { maxVersion = version; } + + final BytesRef segmentInfoId = StoreFileMetadata.toWriterUuid(info.info.getId()); + final BytesRef segmentCommitInfoId = StoreFileMetadata.toWriterUuid(info.getId()); + for (String file : info.files()) { checksumFromLuceneFile(directory, file, builder, logger, version, - SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))); + SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)), + IndexFileNames.parseGeneration(file) == 0 ? segmentInfoId : segmentCommitInfoId); } } if (maxVersion == null) { maxVersion = org.elasticsearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion; } final String segmentsFile = segmentCommitInfos.getSegmentsFileName(); - checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); + checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true, + StoreFileMetadata.toWriterUuid(segmentCommitInfos.getId())); } catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) { // we either know the index is corrupted or it's just not there throw ex; @@ -863,7 +869,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg } private static void checksumFromLuceneFile(Directory directory, String file, Map builder, - Logger logger, Version version, boolean readFileAsHash) throws IOException { + Logger logger, Version version, boolean readFileAsHash, BytesRef writerUuid) throws IOException { final String checksum; final BytesRefBuilder fileHash = new BytesRefBuilder(); try (IndexInput in = directory.openInput(file, READONCE_CHECKSUM)) { @@ -888,7 +894,7 @@ private static void checksumFromLuceneFile(Directory directory, String file, Map logger.debug(() -> new ParameterizedMessage("Can retrieve checksum from file [{}]", file), ex); throw ex; } - builder.put(file, new StoreFileMetadata(file, length, checksum, version, fileHash.get())); + builder.put(file, new StoreFileMetadata(file, length, checksum, version, fileHash.get(), writerUuid)); } } @@ -928,8 +934,9 @@ public Map asMap() { *

  • missing: files that exist in the source but not in the target
  • * *

    - * Individual files are compared by name, length and checksum. The segment info ({@code *.si}) files and the segments file - * ({@code segments_N}) are also checked to be a byte-for-byte match. + * Individual files are compared by name, length, checksum and (if present) a UUID that was assigned when the file was originally + * written. The segment info ({@code *.si}) files and the segments file ({@code segments_N}) are also checked to be a byte-for-byte + * match. *

    * Files are collected together into a group for each segment plus one group of "per-commit" ({@code segments_N}) files. Each * per-segment group is subdivided into a nongenerational group (most of them) and a generational group (e.g. {@code *.liv}, diff --git a/server/src/main/java/org/elasticsearch/index/store/StoreFileMetadata.java b/server/src/main/java/org/elasticsearch/index/store/StoreFileMetadata.java index b9de3090de503..140c05ab44db3 100644 --- a/server/src/main/java/org/elasticsearch/index/store/StoreFileMetadata.java +++ b/server/src/main/java/org/elasticsearch/index/store/StoreFileMetadata.java @@ -20,8 +20,13 @@ package org.elasticsearch.index.store; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -33,6 +38,9 @@ public class StoreFileMetadata implements Writeable { + public static final BytesRef UNAVAILABLE_WRITER_UUID = new BytesRef(); + private static final org.elasticsearch.Version WRITER_UUID_MIN_VERSION = org.elasticsearch.Version.V_8_0_0; + private final String name; // the actual file size on "disk", if compressed, the compressed size @@ -44,16 +52,21 @@ public class StoreFileMetadata implements Writeable { private final BytesRef hash; + private final BytesRef writerUuid; + public StoreFileMetadata(String name, long length, String checksum, Version writtenBy) { - this(name, length, checksum, writtenBy, null); + this(name, length, checksum, writtenBy, null, UNAVAILABLE_WRITER_UUID); } - public StoreFileMetadata(String name, long length, String checksum, Version writtenBy, BytesRef hash) { + public StoreFileMetadata(String name, long length, String checksum, Version writtenBy, BytesRef hash, BytesRef writerUuid) { this.name = Objects.requireNonNull(name, "name must not be null"); this.length = length; this.checksum = Objects.requireNonNull(checksum, "checksum must not be null"); this.writtenBy = Objects.requireNonNull(writtenBy, "writtenBy must not be null"); this.hash = hash == null ? new BytesRef() : hash; + + assert writerUuid != null && (writerUuid.length > 0 || writerUuid == UNAVAILABLE_WRITER_UUID); + this.writerUuid = Objects.requireNonNull(writerUuid, "writerUuid must not be null"); } /** @@ -69,6 +82,12 @@ public StoreFileMetadata(StreamInput in) throws IOException { throw new AssertionError(e); } hash = in.readBytesRef(); + if (in.getVersion().onOrAfter(WRITER_UUID_MIN_VERSION)) { + writerUuid = StoreFileMetadata.toWriterUuid(in.readBytesRef()); + } else { + writerUuid = UNAVAILABLE_WRITER_UUID; + } + } @Override @@ -78,6 +97,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(checksum); out.writeString(writtenBy.toString()); out.writeBytesRef(hash); + if (out.getVersion().onOrAfter(WRITER_UUID_MIN_VERSION)) { + out.writeBytesRef(writerUuid); + } } /** @@ -133,6 +155,16 @@ public boolean isSame(StoreFileMetadata other) { // we can't tell if either or is null so we return false in this case! this is why we don't use equals for this! return false; } + if (writerUuid.length > 0 && other.writerUuid.length > 0) { + // if the writer ID is missing on one of the files then we ignore this field and just rely on the checksum and hash, but if + // it's present on both files then it must be identical + if (writerUuid.equals(other.writerUuid) == false) { + return false; + } else { + assert name.equals(other.name) && length == other.length && checksum.equals(other.checksum) : this + " vs " + other; + assert hash.equals(other.hash) : this + " vs " + other + " with hashes " + hash + " vs " + other.hash; + } + } return length == other.length && checksum.equals(other.checksum) && hash.equals(other.hash); } @@ -155,4 +187,41 @@ public Version writtenBy() { public BytesRef hash() { return hash; } + + /** + * Returns the globally-unique ID that was assigned by the {@link IndexWriter} that originally wrote this file: + * + * - For `segments_N` files this is {@link SegmentInfos#getId()} which uniquely identifies the commit. + * - For non-generational segment files this is {@link SegmentInfo#getId()} which uniquely identifies the segment. + * - For generational segment files (i.e. updated docvalues, liv files etc) this is {@link SegmentCommitInfo#getId()} + * which uniquely identifies the generation of the segment. + * + * This ID may be {@link StoreFileMetadata#UNAVAILABLE_WRITER_UUID} (i.e. zero-length) if unavilable, e.g.: + * + * - The file was written by a version of Lucene prior to {@link org.apache.lucene.util.Version#LUCENE_8_6_0}. + * - The metadata came from a version of Elasticsearch prior to {@link StoreFileMetadata#WRITER_UUID_MIN_VERSION}). + * - The file is not one of the files listed above. + * + */ + public BytesRef writerUuid() { + return writerUuid; + } + + static BytesRef toWriterUuid(BytesRef bytesRef) { + if (bytesRef.length == 0) { + return UNAVAILABLE_WRITER_UUID; + } else { + return bytesRef; + } + } + + static BytesRef toWriterUuid(@Nullable byte[] id) { + if (id == null) { + return UNAVAILABLE_WRITER_UUID; + } else { + assert id.length > 0; + return new BytesRef(id); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/index/snapshots/blobstore/FileInfoTests.java b/server/src/test/java/org/elasticsearch/index/snapshots/blobstore/FileInfoTests.java index 0a4ce23e34895..aabbbafc16139 100644 --- a/server/src/test/java/org/elasticsearch/index/snapshots/blobstore/FileInfoTests.java +++ b/server/src/test/java/org/elasticsearch/index/snapshots/blobstore/FileInfoTests.java @@ -45,13 +45,10 @@ public class FileInfoTests extends ESTestCase { public void testToFromXContent() throws IOException { final int iters = scaledRandomIntBetween(1, 10); for (int iter = 0; iter < iters; iter++) { - final BytesRef hash = new BytesRef(scaledRandomIntBetween(0, 1024 * 1024)); - hash.length = hash.bytes.length; - for (int i = 0; i < hash.length; i++) { - hash.bytes[i] = randomByte(); - } + final BytesRef hash = randomBytesRef(1024 * 1024); + final BytesRef writerUuid = randomBytesRef(20); StoreFileMetadata meta = new StoreFileMetadata("foobar", Math.abs(randomLong()), randomAlphaOfLengthBetween(1, 10), - Version.LATEST, hash); + Version.LATEST, hash, writerUuid); ByteSizeValue size = new ByteSizeValue(Math.abs(randomLong())); BlobStoreIndexShardSnapshot.FileInfo info = new BlobStoreIndexShardSnapshot.FileInfo("_foobar", meta, size); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON).prettyPrint(); @@ -75,14 +72,18 @@ public void testToFromXContent() throws IOException { } } + private static BytesRef randomBytesRef(int maxSize) { + final BytesRef hash = new BytesRef(scaledRandomIntBetween(0, maxSize)); + hash.length = hash.bytes.length; + for (int i = 0; i < hash.length; i++) { + hash.bytes[i] = randomByte(); + } + return hash; + } + public void testInvalidFieldsInFromXContent() throws IOException { final int iters = scaledRandomIntBetween(1, 10); for (int iter = 0; iter < iters; iter++) { - final BytesRef hash = new BytesRef(scaledRandomIntBetween(0, 1024 * 1024)); - hash.length = hash.bytes.length; - for (int i = 0; i < hash.length; i++) { - hash.bytes[i] = randomByte(); - } String name = "foobar"; String physicalName = "_foobar"; String failure = null;