From 1fae87413a7f4afef42f0fb8da666224095cec86 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 23 Aug 2022 13:31:13 +0200 Subject: [PATCH 01/17] lets try this --- .../cluster/SnapshotsInProgress.java | 152 +++++++++++++++--- 1 file changed, 134 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index c55be18f74358..eaff2af0fa8f7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -56,13 +56,13 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion"; // keyed by repository name - private final Map> entries; + private final Map entries; public SnapshotsInProgress(StreamInput in) throws IOException { this(collectByRepo(in)); } - private static Map> collectByRepo(StreamInput in) throws IOException { + private static Map collectByRepo(StreamInput in) throws IOException { final int count = in.readVInt(); if (count == 0) { return Map.of(); @@ -72,13 +72,14 @@ private static Map> collectByRepo(StreamInput in) throws IOE final Entry entry = Entry.readFrom(in); entriesByRepo.computeIfAbsent(entry.repository(), repo -> new ArrayList<>()).add(entry); } + final Map res = Maps.newMapWithExpectedSize(entriesByRepo.size()); for (Map.Entry> entryForRepo : entriesByRepo.entrySet()) { - entryForRepo.setValue(List.copyOf(entryForRepo.getValue())); + res.put(entryForRepo.getKey(), new ByRepo(entryForRepo.getValue())); } - return entriesByRepo; + return res; } - private SnapshotsInProgress(Map> entries) { + private SnapshotsInProgress(Map entries) { this.entries = Map.copyOf(entries); assert assertConsistentEntries(this.entries); } @@ -87,26 +88,26 @@ public SnapshotsInProgress withUpdatedEntriesForRepo(String repository, List> copy = new HashMap<>(this.entries); + final Map copy = new HashMap<>(this.entries); if (updatedEntries.isEmpty()) { copy.remove(repository); if (copy.isEmpty()) { return EMPTY; } } else { - copy.put(repository, List.copyOf(updatedEntries)); + copy.put(repository, new ByRepo(updatedEntries)); } return new SnapshotsInProgress(copy); } public SnapshotsInProgress withAddedEntry(Entry entry) { - final List forRepo = new ArrayList<>(entries.getOrDefault(entry.repository(), List.of())); + final List forRepo = new ArrayList<>(entries.getOrDefault(entry.repository(), ByRepo.EMPTY).entries); forRepo.add(entry); return withUpdatedEntriesForRepo(entry.repository(), forRepo); } public List forRepo(String repository) { - return entries.getOrDefault(repository, List.of()); + return entries.getOrDefault(repository, ByRepo.EMPTY).entries; } public boolean isEmpty() { @@ -115,18 +116,18 @@ public boolean isEmpty() { public int count() { int count = 0; - for (List list : entries.values()) { - count += list.size(); + for (ByRepo byRepo : entries.values()) { + count += byRepo.entries.size(); } return count; } - public Collection> entriesByRepo() { - return entries.values(); + public Iterable> entriesByRepo() { + return () -> entries.values().stream().map(byRepo -> byRepo.entries).iterator(); } public Stream asStream() { - return entries.values().stream().flatMap(Collection::stream); + return entries.values().stream().flatMap(t -> t.entries.stream()); } @Nullable @@ -187,10 +188,20 @@ public Version getMinimalSupportedVersion() { return Version.CURRENT.minimumCompatibilityVersion(); } + private static final Version DIFFABLE_VERSION = Version.V_8_5_0; + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + if (in.getVersion().onOrAfter(DIFFABLE_VERSION)) { + return new SnapshotInProgressDiff(in); + } return readDiffFrom(Custom.class, TYPE, in); } + @Override + public Diff diff(Custom previousState) { + return new SnapshotInProgressDiff((SnapshotsInProgress) previousState, this); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(count()); @@ -334,11 +345,11 @@ private static boolean hasFailures(Map c return false; } - private static boolean assertConsistentEntries(Map> entries) { - for (Map.Entry> repoEntries : entries.entrySet()) { + private static boolean assertConsistentEntries(Map entries) { + for (Map.Entry repoEntries : entries.entrySet()) { final Set> assignedShards = new HashSet<>(); final Set> queuedShards = new HashSet<>(); - final List entriesForRepository = repoEntries.getValue(); + final List entriesForRepository = repoEntries.getValue().entries; final String repository = repoEntries.getKey(); assert entriesForRepository.isEmpty() == false : "found empty list of snapshots for " + repository + " in " + entries; for (Entry entry : entriesForRepository) { @@ -653,7 +664,7 @@ public String toString() { } } - public static class Entry implements Writeable, ToXContent, RepositoryOperation { + public static class Entry implements Writeable, ToXContent, RepositoryOperation, Diffable { private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; @@ -1310,5 +1321,110 @@ public void writeTo(StreamOutput out) throws IOException { public boolean isFragment() { return false; } + + @Override + public Diff diff(Entry previousState) { + return new SimpleDiffable.CompleteDiff<>(this); + } + } + + private static final class SnapshotInProgressDiff implements NamedDiff { + + private final DiffableUtils.MapDiff> mapDiff; + + SnapshotInProgressDiff(SnapshotsInProgress before, SnapshotsInProgress after) { + this.mapDiff = DiffableUtils.diff(before.entries, after.entries, DiffableUtils.getStringKeySerializer()); + } + + SnapshotInProgressDiff(DiffableUtils.MapDiff> mapDiff) { + this.mapDiff = mapDiff; + } + + SnapshotInProgressDiff(StreamInput in) throws IOException { + this( + DiffableUtils.readJdkMapDiff( + in, + DiffableUtils.getStringKeySerializer(), + i -> new ByRepo(i.readList(Entry::readFrom)), + i -> new ByRepo.ByRepoDiff( + DiffableUtils.readJdkMapDiff( + i, + DiffableUtils.getVIntKeySerializer(), + Entry::readFrom, + ii -> SimpleDiffable.readDiffFrom(Entry::readFrom, ii) + ) + ) + ) + ); + } + + @Override + public SnapshotsInProgress apply(Custom part) { + return new SnapshotsInProgress(mapDiff.apply(((SnapshotsInProgress) part).entries)); + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + mapDiff.writeTo(out); + } + } + + private record ByRepo(List entries) implements Diffable { + + static final ByRepo EMPTY = new ByRepo(List.of()); + + private ByRepo(List entries) { + this.entries = List.copyOf(entries); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(entries); + } + + @Override + public Diff diff(ByRepo previousState) { + final DiffableUtils.MapDiff> diff = DiffableUtils.diff( + toMapByPosition(previousState), + toMapByPosition(this), + DiffableUtils.getVIntKeySerializer() + ); + return new ByRepoDiff(diff); + } + + public static Map toMapByPosition(ByRepo part) { + final Map before = new HashMap<>(part.entries.size()); + for (int i = 0; i < part.entries.size(); i++) { + Entry entry = part.entries.get(i); + before.put(i, entry); + } + return before; + } + + private record ByRepoDiff(DiffableUtils.MapDiff> diff) implements Diff { + + @Override + public ByRepo apply(ByRepo part) { + final var updated = diff.apply(toMapByPosition(part)); + final Entry[] arr = new Entry[updated.size()]; + updated.forEach((k, v) -> arr[k] = v); + return new ByRepo(List.of(arr)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + diff.writeTo(out); + } + } } } From bb926e66ec86467445be9bdc3d8ddbc34b54d0bb Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 23 Aug 2022 16:29:59 +0200 Subject: [PATCH 02/17] faster --- .../repositories/blobstore/BlobStoreRepository.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 8c730feffdf22..37710d71cf998 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1484,10 +1484,14 @@ private void cleanupOldShardGens( toDelete.add(containerPath + shardGeneration); } } - try { - deleteFromContainer(blobContainer(), toDelete.iterator()); - } catch (Exception e) { - logger.warn("Failed to clean up old shard generation blobs", e); + if (toDelete.isEmpty() == false) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + deleteFromContainer(blobContainer(), toDelete.iterator()); + } catch (Exception e) { + logger.warn("Failed to clean up old shard generation blobs", e); + } + }); } } From a13707b866cf1c4c0015a9b6115dca198023d159 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 24 Aug 2022 13:04:27 +0200 Subject: [PATCH 03/17] back out unrelated --- .../repositories/blobstore/BlobStoreRepository.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 37710d71cf998..8c730feffdf22 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1484,14 +1484,10 @@ private void cleanupOldShardGens( toDelete.add(containerPath + shardGeneration); } } - if (toDelete.isEmpty() == false) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { - deleteFromContainer(blobContainer(), toDelete.iterator()); - } catch (Exception e) { - logger.warn("Failed to clean up old shard generation blobs", e); - } - }); + try { + deleteFromContainer(blobContainer(), toDelete.iterator()); + } catch (Exception e) { + logger.warn("Failed to clean up old shard generation blobs", e); } } From 8bd7689f4c4ebd9746937743a5607f1ba2993359 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 24 Aug 2022 13:56:39 +0200 Subject: [PATCH 04/17] bck --- .../cluster/SnapshotsInProgress.java | 169 +++++++++++++++++- 1 file changed, 162 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index eaff2af0fa8f7..105fb9753a5ac 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -1324,7 +1324,167 @@ public boolean isFragment() { @Override public Diff diff(Entry previousState) { - return new SimpleDiffable.CompleteDiff<>(this); + return new EntryDiff(previousState, this); + } + } + + /* + State state, + */ + private static final class EntryDiff implements Diff { + + private static final DiffableUtils.NonDiffableValueSerializer INDEX_ID_VALUE_SERIALIZER = + new DiffableUtils.NonDiffableValueSerializer<>() { + @Override + public void write(IndexId value, StreamOutput out) throws IOException { + out.writeString(value.getId()); + } + + @Override + public IndexId read(StreamInput in, String key) throws IOException { + return new IndexId(key, in.readString()); + } + }; + + private static final DiffableUtils.NonDiffableValueSerializer SHARD_SNAPSHOT_STATUS_VALUE_SERIALIZER = + new DiffableUtils.NonDiffableValueSerializer<>() { + @Override + public void write(ShardSnapshotStatus value, StreamOutput out) throws IOException { + value.writeTo(out); + } + + @Override + public ShardSnapshotStatus read(StreamInput in, Object key) throws IOException { + return ShardSnapshotStatus.readFrom(in); + } + }; + + private static final DiffableUtils.KeySerializer SHARD_ID_KEY_SERIALIZER = new DiffableUtils.KeySerializer<>() { + @Override + public void writeKey(ShardId key, StreamOutput out) throws IOException { + key.writeTo(out); + } + + @Override + public ShardId readKey(StreamInput in) throws IOException { + return new ShardId(in); + } + }; + + private static final DiffableUtils.KeySerializer REPO_SHARD_ID_KEY_SERIALIZER = + new DiffableUtils.KeySerializer<>() { + @Override + public void writeKey(RepositoryShardId key, StreamOutput out) throws IOException { + key.writeTo(out); + } + + @Override + public RepositoryShardId readKey(StreamInput in) throws IOException { + return new RepositoryShardId(in); + } + }; + + private final DiffableUtils.MapDiff> indexByIndexNameDiff; + + private final DiffableUtils.MapDiff> shardsByShardIdDiff; + + @Nullable + private final DiffableUtils.MapDiff< + RepositoryShardId, + ShardSnapshotStatus, + Map> shardsByRepoShardIdDiff; + + @Nullable + private final List updatedDataStreams; + + @Nullable + private final String updatedFailure; + + private final long updatedRepositoryStateId; + + private final State updatedState; + + EntryDiff(StreamInput in) throws IOException { + this.indexByIndexNameDiff = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), INDEX_ID_VALUE_SERIALIZER); + this.updatedState = State.fromValue(in.readByte()); + this.updatedRepositoryStateId = in.readLong(); + this.updatedDataStreams = in.readOptionalStringList(); + this.updatedFailure = in.readOptionalString(); + this.shardsByShardIdDiff = DiffableUtils.readJdkMapDiff( + in, + SHARD_ID_KEY_SERIALIZER, + (DiffableUtils.ValueSerializer) SHARD_SNAPSHOT_STATUS_VALUE_SERIALIZER + ); + if (in.readBoolean()) { + shardsByRepoShardIdDiff = DiffableUtils.readJdkMapDiff( + in, + REPO_SHARD_ID_KEY_SERIALIZER, + (DiffableUtils.ValueSerializer) SHARD_SNAPSHOT_STATUS_VALUE_SERIALIZER + ); + } else { + shardsByRepoShardIdDiff = null; + } + } + + EntryDiff(Entry before, Entry after) { + this.indexByIndexNameDiff = DiffableUtils.diff( + before.indices, + after.indices, + DiffableUtils.getStringKeySerializer(), + INDEX_ID_VALUE_SERIALIZER + ); + this.updatedDataStreams = before.dataStreams.equals(after.dataStreams) ? null : after.dataStreams; + this.updatedState = after.state; + this.updatedRepositoryStateId = after.repositoryStateId; + this.updatedFailure = after.failure; + this.shardsByShardIdDiff = DiffableUtils.diff( + before.shards, + after.shards, + SHARD_ID_KEY_SERIALIZER, + (DiffableUtils.ValueSerializer) SHARD_SNAPSHOT_STATUS_VALUE_SERIALIZER + ); + if (before.isClone()) { + this.shardsByRepoShardIdDiff = DiffableUtils.diff( + before.shardStatusByRepoShardId, + after.shardStatusByRepoShardId, + REPO_SHARD_ID_KEY_SERIALIZER, + (DiffableUtils.ValueSerializer) SHARD_SNAPSHOT_STATUS_VALUE_SERIALIZER + ); + } else { + this.shardsByRepoShardIdDiff = null; + } + } + + @Override + public Entry apply(Entry part) { + return new Entry( + part.snapshot, + part.includeGlobalState, + part.partial, + updatedState, + indexByIndexNameDiff.apply(part.indices), + updatedDataStreams == null ? part.dataStreams : updatedDataStreams, + part.featureStates, + part.startTime, + updatedRepositoryStateId, + shardsByShardIdDiff.apply(part.shards), + updatedFailure, + part.userMetadata, + part.version, + part.source, + part.source == null ? null : shardsByRepoShardIdDiff.apply(part.shardStatusByRepoShardId) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + this.indexByIndexNameDiff.writeTo(out); + out.writeByte(this.updatedState.value()); + out.writeLong(this.updatedRepositoryStateId); + out.writeOptionalStringCollection(updatedDataStreams); + out.writeOptionalString(updatedFailure); + shardsByShardIdDiff.writeTo(out); + out.writeOptionalWriteable(shardsByRepoShardIdDiff); } } @@ -1347,12 +1507,7 @@ private static final class SnapshotInProgressDiff implements NamedDiff { DiffableUtils.getStringKeySerializer(), i -> new ByRepo(i.readList(Entry::readFrom)), i -> new ByRepo.ByRepoDiff( - DiffableUtils.readJdkMapDiff( - i, - DiffableUtils.getVIntKeySerializer(), - Entry::readFrom, - ii -> SimpleDiffable.readDiffFrom(Entry::readFrom, ii) - ) + DiffableUtils.readJdkMapDiff(i, DiffableUtils.getVIntKeySerializer(), Entry::readFrom, EntryDiff::new) ) ) ); From 1a5e668da795f9a32f353abee1e17896596c4b05 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 24 Aug 2022 14:11:34 +0200 Subject: [PATCH 05/17] fixed up --- .../cluster/SnapshotsInProgress.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 105fb9753a5ac..b2f5e839b9689 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -1328,9 +1328,6 @@ public Diff diff(Entry previousState) { } } - /* - State state, - */ private static final class EntryDiff implements Diff { private static final DiffableUtils.NonDiffableValueSerializer INDEX_ID_VALUE_SERIALIZER = @@ -1404,6 +1401,7 @@ public RepositoryShardId readKey(StreamInput in) throws IOException { private final State updatedState; + @SuppressWarnings("unchecked") EntryDiff(StreamInput in) throws IOException { this.indexByIndexNameDiff = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), INDEX_ID_VALUE_SERIALIZER); this.updatedState = State.fromValue(in.readByte()); @@ -1426,6 +1424,7 @@ public RepositoryShardId readKey(StreamInput in) throws IOException { } } + @SuppressWarnings("unchecked") EntryDiff(Entry before, Entry after) { this.indexByIndexNameDiff = DiffableUtils.diff( before.indices, @@ -1507,7 +1506,8 @@ private static final class SnapshotInProgressDiff implements NamedDiff { DiffableUtils.getStringKeySerializer(), i -> new ByRepo(i.readList(Entry::readFrom)), i -> new ByRepo.ByRepoDiff( - DiffableUtils.readJdkMapDiff(i, DiffableUtils.getVIntKeySerializer(), Entry::readFrom, EntryDiff::new) + DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new), + i.readStringList() ) ) ); @@ -1549,36 +1549,41 @@ public void writeTo(StreamOutput out) throws IOException { @Override public Diff diff(ByRepo previousState) { - final DiffableUtils.MapDiff> diff = DiffableUtils.diff( - toMapByPosition(previousState), - toMapByPosition(this), - DiffableUtils.getVIntKeySerializer() + final DiffableUtils.MapDiff> diff = DiffableUtils.diff( + toMapByUUID(previousState), + toMapByUUID(this), + DiffableUtils.getStringKeySerializer() ); - return new ByRepoDiff(diff); + return new ByRepoDiff(diff, entries.stream().map(e -> e.snapshot().getSnapshotId().getUUID()).toList()); } - public static Map toMapByPosition(ByRepo part) { - final Map before = new HashMap<>(part.entries.size()); - for (int i = 0; i < part.entries.size(); i++) { - Entry entry = part.entries.get(i); - before.put(i, entry); + public static Map toMapByUUID(ByRepo part) { + final Map before = new HashMap<>(part.entries.size()); + for (Entry entry : part.entries) { + before.put(entry.snapshot().getSnapshotId().getUUID(), entry); } return before; } - private record ByRepoDiff(DiffableUtils.MapDiff> diff) implements Diff { + private record ByRepoDiff(DiffableUtils.MapDiff> diff, List snapshotIds) + implements + Diff { @Override public ByRepo apply(ByRepo part) { - final var updated = diff.apply(toMapByPosition(part)); + final var updated = diff.apply(toMapByUUID(part)); final Entry[] arr = new Entry[updated.size()]; - updated.forEach((k, v) -> arr[k] = v); + for (int i = 0; i < snapshotIds.size(); i++) { + String snapshotId = snapshotIds.get(i); + arr[i] = updated.get(snapshotId); + } return new ByRepo(List.of(arr)); } @Override public void writeTo(StreamOutput out) throws IOException { diff.writeTo(out); + out.writeStringCollection(snapshotIds); } } } From 84845c786d7965a1b88e406aa4a741a603af4e4d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 24 Aug 2022 16:45:48 +0200 Subject: [PATCH 06/17] cheaper map --- .../java/org/elasticsearch/cluster/SnapshotsInProgress.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index b2f5e839b9689..0ad933d802c3a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -781,8 +781,8 @@ private Entry( assert existing == null || existing.equals(index) : "Conflicting indices [" + existing + "] and [" + index + "]"; byRepoShardIdBuilder.put(new RepositoryShardId(indexId, shardId.id()), entry.getValue()); } - this.shardStatusByRepoShardId = Map.copyOf(byRepoShardIdBuilder); - this.snapshotIndices = Map.copyOf(res); + this.shardStatusByRepoShardId = Collections.unmodifiableMap(byRepoShardIdBuilder); + this.snapshotIndices = Collections.unmodifiableMap(res); } else { assert shards.isEmpty(); this.shardStatusByRepoShardId = shardStatusByRepoShardId; @@ -820,7 +820,7 @@ private static Entry readFrom(StreamInput in) throws IOException { RepositoryShardId::new, ShardSnapshotStatus::readFrom ); - final List featureStates = Collections.unmodifiableList(in.readList(SnapshotFeatureInfo::new)); + final List featureStates = in.readImmutableList(SnapshotFeatureInfo::new); return new SnapshotsInProgress.Entry( snapshot, includeGlobalState, From 1c83e073e3cf4b60f997e15e711520eb83fd0e3d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 24 Aug 2022 17:03:50 +0200 Subject: [PATCH 07/17] less redundant diffing --- .../cluster/SnapshotsInProgress.java | 67 +++++++++++++++++-- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 0ad933d802c3a..61abb27efff83 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -732,7 +732,7 @@ public Entry( failure, userMetadata, version, - null, + (SnapshotId) null, Map.of() ); } @@ -781,8 +781,8 @@ private Entry( assert existing == null || existing.equals(index) : "Conflicting indices [" + existing + "] and [" + index + "]"; byRepoShardIdBuilder.put(new RepositoryShardId(indexId, shardId.id()), entry.getValue()); } - this.shardStatusByRepoShardId = Collections.unmodifiableMap(byRepoShardIdBuilder); - this.snapshotIndices = Collections.unmodifiableMap(res); + this.shardStatusByRepoShardId = Map.copyOf(byRepoShardIdBuilder); + this.snapshotIndices = Map.copyOf(res); } else { assert shards.isEmpty(); this.shardStatusByRepoShardId = shardStatusByRepoShardId; @@ -791,6 +791,42 @@ private Entry( assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.shardStatusByRepoShardId); } + private Entry( + Snapshot snapshot, + boolean includeGlobalState, + boolean partial, + State state, + Map indices, + List dataStreams, + List featureStates, + long startTime, + long repositoryStateId, + Map shards, + String failure, + Map userMetadata, + Version version, + Map shardStatusByRepoShardId, + Map snapshotIndices + ) { + this.state = state; + this.snapshot = snapshot; + this.includeGlobalState = includeGlobalState; + this.partial = partial; + this.indices = Map.copyOf(indices); + this.dataStreams = List.copyOf(dataStreams); + this.featureStates = List.copyOf(featureStates); + this.startTime = startTime; + this.shards = shards; + this.repositoryStateId = repositoryStateId; + this.failure = failure; + this.userMetadata = userMetadata == null ? null : Map.copyOf(userMetadata); + this.version = version; + this.source = null; + this.shardStatusByRepoShardId = Map.copyOf(shardStatusByRepoShardId); + this.snapshotIndices = snapshotIndices; + assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.shardStatusByRepoShardId); + } + private static Entry readFrom(StreamInput in) throws IOException { final Snapshot snapshot = new Snapshot(in); final boolean includeGlobalState = in.readBoolean(); @@ -1456,17 +1492,38 @@ public RepositoryShardId readKey(StreamInput in) throws IOException { @Override public Entry apply(Entry part) { + final var updatedIndices = indexByIndexNameDiff.apply(part.indices); + final var updatedStateByShard = shardsByShardIdDiff.apply(part.shards); + if (part.source == null && updatedIndices == part.indices && updatedStateByShard == part.shards) { + return new Entry( + part.snapshot, + part.includeGlobalState, + part.partial, + updatedState, + updatedIndices, + updatedDataStreams == null ? part.dataStreams : updatedDataStreams, + part.featureStates, + part.startTime, + updatedRepositoryStateId, + updatedStateByShard, + updatedFailure, + part.userMetadata, + part.version, + part.shardStatusByRepoShardId, + part.snapshotIndices + ); + } return new Entry( part.snapshot, part.includeGlobalState, part.partial, updatedState, - indexByIndexNameDiff.apply(part.indices), + updatedIndices, updatedDataStreams == null ? part.dataStreams : updatedDataStreams, part.featureStates, part.startTime, updatedRepositoryStateId, - shardsByShardIdDiff.apply(part.shards), + updatedStateByShard, updatedFailure, part.userMetadata, part.version, From e4360d2140f32ff0aefe5a6ff235911cc6e58bde Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 25 Aug 2022 14:51:25 +0200 Subject: [PATCH 08/17] tet fix --- ...SnapshotsInProgressSerializationTests.java | 108 +----------------- 1 file changed, 4 insertions(+), 104 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 40350f4e44c40..cab835e482290 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -187,44 +187,8 @@ protected Custom mutateInstance(Custom instance) { } private Entry mutateEntry(Entry entry) { - switch (randomInt(8)) { + switch (randomInt(3)) { case 0 -> { - boolean includeGlobalState = entry.includeGlobalState() == false; - return new Entry( - entry.snapshot(), - includeGlobalState, - entry.partial(), - entry.state(), - entry.indices(), - entry.dataStreams(), - entry.featureStates(), - entry.repositoryStateId(), - entry.startTime(), - entry.shards(), - entry.failure(), - entry.userMetadata(), - entry.version() - ); - } - case 1 -> { - boolean partial = entry.partial() == false; - return new Entry( - entry.snapshot(), - entry.includeGlobalState(), - partial, - entry.state(), - entry.indices(), - entry.dataStreams(), - entry.featureStates(), - entry.startTime(), - entry.repositoryStateId(), - entry.shards(), - entry.failure(), - entry.userMetadata(), - entry.version() - ); - } - case 2 -> { List dataStreams = Stream.concat(entry.dataStreams().stream(), Stream.of(randomAlphaOfLength(10))).toList(); return new Entry( entry.snapshot(), @@ -242,25 +206,7 @@ private Entry mutateEntry(Entry entry) { entry.version() ); } - case 3 -> { - long startTime = randomValueOtherThan(entry.startTime(), ESTestCase::randomLong); - return new Entry( - entry.snapshot(), - entry.includeGlobalState(), - entry.partial(), - entry.state(), - entry.indices(), - entry.dataStreams(), - entry.featureStates(), - startTime, - entry.repositoryStateId(), - entry.shards(), - entry.failure(), - entry.userMetadata(), - entry.version() - ); - } - case 4 -> { + case 1 -> { long repositoryStateId = randomValueOtherThan(entry.startTime(), ESTestCase::randomLong); return new Entry( entry.snapshot(), @@ -278,7 +224,7 @@ private Entry mutateEntry(Entry entry) { entry.version() ); } - case 5 -> { + case 2 -> { String failure = randomValueOtherThan(entry.failure(), () -> randomAlphaOfLengthBetween(2, 10)); return new Entry( entry.snapshot(), @@ -296,7 +242,7 @@ private Entry mutateEntry(Entry entry) { entry.version() ); } - case 6 -> { + case 3 -> { Map indices = new HashMap<>(entry.indices()); IndexId indexId = new IndexId(randomAlphaOfLength(10), randomAlphaOfLength(10)); indices.put(indexId.getName(), indexId); @@ -322,52 +268,6 @@ private Entry mutateEntry(Entry entry) { entry.version() ); } - case 7 -> { - Map userMetadata = entry.userMetadata() != null ? new HashMap<>(entry.userMetadata()) : new HashMap<>(); - String key = randomAlphaOfLengthBetween(2, 10); - if (userMetadata.containsKey(key)) { - userMetadata.remove(key); - } else { - userMetadata.put(key, randomAlphaOfLengthBetween(2, 10)); - } - return new Entry( - entry.snapshot(), - entry.includeGlobalState(), - entry.partial(), - entry.state(), - entry.indices(), - entry.dataStreams(), - entry.featureStates(), - entry.startTime(), - entry.repositoryStateId(), - entry.shards(), - entry.failure(), - userMetadata, - entry.version() - ); - } - case 8 -> { - List featureStates = randomList( - 1, - 5, - () -> randomValueOtherThanMany(entry.featureStates()::contains, SnapshotFeatureInfoTests::randomSnapshotFeatureInfo) - ); - return new Entry( - entry.snapshot(), - entry.includeGlobalState(), - entry.partial(), - entry.state(), - entry.indices(), - entry.dataStreams(), - featureStates, - entry.startTime(), - entry.repositoryStateId(), - entry.shards(), - entry.failure(), - entry.userMetadata(), - entry.version() - ); - } default -> throw new IllegalArgumentException("invalid randomization case"); } } From 0eaef8a324edf950aaafd44b9547ea95916c7a9a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 25 Aug 2022 15:49:21 +0200 Subject: [PATCH 09/17] smaller serialization --- .../cluster/SnapshotsInProgress.java | 45 ++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 61abb27efff83..675ba33dfa5e5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -1564,7 +1564,7 @@ private static final class SnapshotInProgressDiff implements NamedDiff { i -> new ByRepo(i.readList(Entry::readFrom)), i -> new ByRepo.ByRepoDiff( DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new), - i.readStringList() + DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER) ) ) ); @@ -1594,6 +1594,18 @@ public void writeTo(StreamOutput out) throws IOException { private record ByRepo(List entries) implements Diffable { static final ByRepo EMPTY = new ByRepo(List.of()); + private static final DiffableUtils.NonDiffableValueSerializer INT_DIFF_VALUE_SERIALIZER = + new DiffableUtils.NonDiffableValueSerializer<>() { + @Override + public void write(Integer value, StreamOutput out) throws IOException { + out.writeVInt(value); + } + + @Override + public Integer read(StreamInput in, String key) throws IOException { + return in.readVInt(); + } + }; private ByRepo(List entries) { this.entries = List.copyOf(entries); @@ -1611,7 +1623,21 @@ public Diff diff(ByRepo previousState) { toMapByUUID(this), DiffableUtils.getStringKeySerializer() ); - return new ByRepoDiff(diff, entries.stream().map(e -> e.snapshot().getSnapshotId().getUUID()).toList()); + final DiffableUtils.MapDiff> positionDiff = DiffableUtils.diff( + toPositionMap(previousState), + toPositionMap(this), + DiffableUtils.getStringKeySerializer(), + INT_DIFF_VALUE_SERIALIZER + ); + return new ByRepoDiff(diff, positionDiff); + } + + public static Map toPositionMap(ByRepo part) { + final Map before = new HashMap<>(part.entries.size()); + for (int i = 0; i < part.entries.size(); i++) { + before.put(part.entries.get(i).snapshot().getSnapshotId().getUUID(), i); + } + return before; } public static Map toMapByUUID(ByRepo part) { @@ -1622,25 +1648,24 @@ public static Map toMapByUUID(ByRepo part) { return before; } - private record ByRepoDiff(DiffableUtils.MapDiff> diff, List snapshotIds) - implements - Diff { + private record ByRepoDiff( + DiffableUtils.MapDiff> diff, + DiffableUtils.MapDiff> positionDiff + ) implements Diff { @Override public ByRepo apply(ByRepo part) { final var updated = diff.apply(toMapByUUID(part)); + final var updatedPositions = positionDiff.apply(toPositionMap(part)); final Entry[] arr = new Entry[updated.size()]; - for (int i = 0; i < snapshotIds.size(); i++) { - String snapshotId = snapshotIds.get(i); - arr[i] = updated.get(snapshotId); - } + updatedPositions.forEach((uuid, position) -> arr[position] = updated.get(uuid)); return new ByRepo(List.of(arr)); } @Override public void writeTo(StreamOutput out) throws IOException { diff.writeTo(out); - out.writeStringCollection(snapshotIds); + positionDiff.writeTo(out); } } } From c73975b06b75c2f7fa89b062bc9495f0aa7adffb Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 26 Aug 2022 13:49:01 +0200 Subject: [PATCH 10/17] bck --- .../main/java/org/elasticsearch/cluster/SnapshotsInProgress.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 675ba33dfa5e5..aac09f61330f6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -1495,6 +1495,7 @@ public Entry apply(Entry part) { final var updatedIndices = indexByIndexNameDiff.apply(part.indices); final var updatedStateByShard = shardsByShardIdDiff.apply(part.shards); if (part.source == null && updatedIndices == part.indices && updatedStateByShard == part.shards) { + // fast path for normal snapshots that avoid rebuilding the by-repo-id map if nothing changed about shard status return new Entry( part.snapshot, part.includeGlobalState, From a5bf01c0ac9ed660e5cc14d6a42ce49def220daf Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 1 Sep 2022 14:29:04 +0200 Subject: [PATCH 11/17] fix BwC --- .../cluster/SnapshotsInProgress.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index aac09f61330f6..7398503c01897 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -1547,28 +1547,26 @@ public void writeTo(StreamOutput out) throws IOException { private static final class SnapshotInProgressDiff implements NamedDiff { + private final SnapshotsInProgress after; + private final DiffableUtils.MapDiff> mapDiff; SnapshotInProgressDiff(SnapshotsInProgress before, SnapshotsInProgress after) { this.mapDiff = DiffableUtils.diff(before.entries, after.entries, DiffableUtils.getStringKeySerializer()); - } - - SnapshotInProgressDiff(DiffableUtils.MapDiff> mapDiff) { - this.mapDiff = mapDiff; + this.after = after; } SnapshotInProgressDiff(StreamInput in) throws IOException { - this( - DiffableUtils.readJdkMapDiff( - in, - DiffableUtils.getStringKeySerializer(), - i -> new ByRepo(i.readList(Entry::readFrom)), - i -> new ByRepo.ByRepoDiff( - DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new), - DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER) - ) + this.mapDiff = DiffableUtils.readJdkMapDiff( + in, + DiffableUtils.getStringKeySerializer(), + i -> new ByRepo(i.readList(Entry::readFrom)), + i -> new ByRepo.ByRepoDiff( + DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new), + DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER) ) ); + this.after = null; } @Override @@ -1588,7 +1586,12 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { - mapDiff.writeTo(out); + assert after != null : "should only write instances that were diffed from this node's state"; + if (out.getVersion().onOrAfter(DIFFABLE_VERSION)) { + mapDiff.writeTo(out); + } else { + new SimpleDiffable.CompleteDiff<>(after).writeTo(out); + } } } From 107c6e3ddecdc8d87675827dcd5c536ed41bdf68 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 1 Sep 2022 16:00:14 +0200 Subject: [PATCH 12/17] some docs --- .../cluster/SnapshotsInProgress.java | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 7398503c01897..bc4e78c4f74b1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -1595,6 +1595,12 @@ public void writeTo(StreamOutput out) throws IOException { } } + /** + * Wrapper for the list of snapshots per repository to allow for diffing changes in individual entries as well as position changes + * of entries in the list. + * + * @param entries all snapshots executing for a single repository + */ private record ByRepo(List entries) implements Diffable { static final ByRepo EMPTY = new ByRepo(List.of()); @@ -1622,44 +1628,46 @@ public void writeTo(StreamOutput out) throws IOException { @Override public Diff diff(ByRepo previousState) { - final DiffableUtils.MapDiff> diff = DiffableUtils.diff( - toMapByUUID(previousState), - toMapByUUID(this), - DiffableUtils.getStringKeySerializer() - ); - final DiffableUtils.MapDiff> positionDiff = DiffableUtils.diff( - toPositionMap(previousState), - toPositionMap(this), - DiffableUtils.getStringKeySerializer(), - INT_DIFF_VALUE_SERIALIZER + return new ByRepoDiff( + DiffableUtils.diff(toMapByUUID(previousState), toMapByUUID(this), DiffableUtils.getStringKeySerializer()), + DiffableUtils.diff( + toPositionMap(previousState), + toPositionMap(this), + DiffableUtils.getStringKeySerializer(), + INT_DIFF_VALUE_SERIALIZER + ) ); - return new ByRepoDiff(diff, positionDiff); } public static Map toPositionMap(ByRepo part) { - final Map before = new HashMap<>(part.entries.size()); + final Map res = Maps.newMapWithExpectedSize(part.entries.size()); for (int i = 0; i < part.entries.size(); i++) { - before.put(part.entries.get(i).snapshot().getSnapshotId().getUUID(), i); + res.put(part.entries.get(i).snapshot().getSnapshotId().getUUID(), i); } - return before; + return res; } public static Map toMapByUUID(ByRepo part) { - final Map before = new HashMap<>(part.entries.size()); + final Map res = Maps.newMapWithExpectedSize(part.entries.size()); for (Entry entry : part.entries) { - before.put(entry.snapshot().getSnapshotId().getUUID(), entry); + res.put(entry.snapshot().getSnapshotId().getUUID(), entry); } - return before; + return res; } + /** + * @param diffBySnapshotUUID diff of a map of snapshot UUID to snapshot entry + * @param positionDiff diff of a map with snapshot UUID keys and positions in {@link ByRepo#entries} as values. Used to efficiently + * diff an entry moving to another index in the list + */ private record ByRepoDiff( - DiffableUtils.MapDiff> diff, + DiffableUtils.MapDiff> diffBySnapshotUUID, DiffableUtils.MapDiff> positionDiff ) implements Diff { @Override public ByRepo apply(ByRepo part) { - final var updated = diff.apply(toMapByUUID(part)); + final var updated = diffBySnapshotUUID.apply(toMapByUUID(part)); final var updatedPositions = positionDiff.apply(toPositionMap(part)); final Entry[] arr = new Entry[updated.size()]; updatedPositions.forEach((uuid, position) -> arr[position] = updated.get(uuid)); @@ -1668,7 +1676,7 @@ public ByRepo apply(ByRepo part) { @Override public void writeTo(StreamOutput out) throws IOException { - diff.writeTo(out); + diffBySnapshotUUID.writeTo(out); positionDiff.writeTo(out); } } From d4826233475b4bc5eddea2efe58f355e84e405c6 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 5 Sep 2022 08:57:02 +0200 Subject: [PATCH 13/17] nicer --- .../elasticsearch/cluster/SnapshotsInProgress.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index bc4e78c4f74b1..1999a88b9f7c8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -1449,15 +1449,13 @@ public RepositoryShardId readKey(StreamInput in) throws IOException { SHARD_ID_KEY_SERIALIZER, (DiffableUtils.ValueSerializer) SHARD_SNAPSHOT_STATUS_VALUE_SERIALIZER ); - if (in.readBoolean()) { - shardsByRepoShardIdDiff = DiffableUtils.readJdkMapDiff( - in, + shardsByRepoShardIdDiff = in.readOptionalWriteable( + i -> DiffableUtils.readJdkMapDiff( + i, REPO_SHARD_ID_KEY_SERIALIZER, (DiffableUtils.ValueSerializer) SHARD_SNAPSHOT_STATUS_VALUE_SERIALIZER - ); - } else { - shardsByRepoShardIdDiff = null; - } + ) + ); } @SuppressWarnings("unchecked") From 852cf9d818db9be81c7e13fe01095f8bd4973371 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 5 Sep 2022 13:57:00 +0200 Subject: [PATCH 14/17] CR: split out legal mutations --- ...SnapshotsInProgressSerializationTests.java | 114 +++++++++++++++++- 1 file changed, 112 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index cab835e482290..0e788c006dd98 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -53,7 +53,7 @@ protected Custom createTestInstance() { int numberOfSnapshots = randomInt(10); SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY; for (int i = 0; i < numberOfSnapshots; i++) { - snapshotsInProgress.withAddedEntry(randomSnapshot()); + snapshotsInProgress = snapshotsInProgress.withAddedEntry(randomSnapshot()); } return snapshotsInProgress; } @@ -147,7 +147,7 @@ protected Custom makeTestChanges(Custom testInstance) { for (int i = 0; i < entries.size(); i++) { if (randomBoolean()) { final Entry entry = entries.get(i); - entries.set(i, mutateEntry(entry)); + entries.set(i, mutateEntryWithLegalChange(entry)); } } updatedInstance = updatedInstance.withUpdatedEntriesForRepo(perRepoEntries.get(0).repository(), entries); @@ -187,6 +187,116 @@ protected Custom mutateInstance(Custom instance) { } private Entry mutateEntry(Entry entry) { + switch (randomInt(5)) { + case 0 -> { + boolean includeGlobalState = entry.includeGlobalState() == false; + return new Entry( + entry.snapshot(), + includeGlobalState, + entry.partial(), + entry.state(), + entry.indices(), + entry.dataStreams(), + entry.featureStates(), + entry.repositoryStateId(), + entry.startTime(), + entry.shards(), + entry.failure(), + entry.userMetadata(), + entry.version() + ); + } + case 1 -> { + boolean partial = entry.partial() == false; + return new Entry( + entry.snapshot(), + entry.includeGlobalState(), + partial, + entry.state(), + entry.indices(), + entry.dataStreams(), + entry.featureStates(), + entry.startTime(), + entry.repositoryStateId(), + entry.shards(), + entry.failure(), + entry.userMetadata(), + entry.version() + ); + } + case 2 -> { + long startTime = randomValueOtherThan(entry.startTime(), ESTestCase::randomLong); + return new Entry( + entry.snapshot(), + entry.includeGlobalState(), + entry.partial(), + entry.state(), + entry.indices(), + entry.dataStreams(), + entry.featureStates(), + startTime, + entry.repositoryStateId(), + entry.shards(), + entry.failure(), + entry.userMetadata(), + entry.version() + ); + } + case 3 -> { + Map userMetadata = entry.userMetadata() != null ? new HashMap<>(entry.userMetadata()) : new HashMap<>(); + String key = randomAlphaOfLengthBetween(2, 10); + if (userMetadata.containsKey(key)) { + userMetadata.remove(key); + } else { + userMetadata.put(key, randomAlphaOfLengthBetween(2, 10)); + } + return new Entry( + entry.snapshot(), + entry.includeGlobalState(), + entry.partial(), + entry.state(), + entry.indices(), + entry.dataStreams(), + entry.featureStates(), + entry.startTime(), + entry.repositoryStateId(), + entry.shards(), + entry.failure(), + userMetadata, + entry.version() + ); + } + case 4 -> { + List featureStates = randomList( + 1, + 5, + () -> randomValueOtherThanMany(entry.featureStates()::contains, SnapshotFeatureInfoTests::randomSnapshotFeatureInfo) + ); + return new Entry( + entry.snapshot(), + entry.includeGlobalState(), + entry.partial(), + entry.state(), + entry.indices(), + entry.dataStreams(), + featureStates, + entry.startTime(), + entry.repositoryStateId(), + entry.shards(), + entry.failure(), + entry.userMetadata(), + entry.version() + ); + } + case 5 -> { + return mutateEntryWithLegalChange(entry); + } + default -> throw new IllegalArgumentException("invalid randomization case"); + } + } + + // mutates an entry with a change that could occur as part of a cluster state update and is thus diffable + private Entry mutateEntryWithLegalChange(Entry entry) { switch (randomInt(3)) { case 0 -> { List dataStreams = Stream.concat(entry.dataStreams().stream(), Stream.of(randomAlphaOfLength(10))).toList(); From 9f9995cf18688d3db3fd24ead15d7aa48e94df85 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 5 Sep 2022 13:59:00 +0200 Subject: [PATCH 15/17] Update docs/changelog/89619.yaml --- docs/changelog/89619.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/89619.yaml diff --git a/docs/changelog/89619.yaml b/docs/changelog/89619.yaml new file mode 100644 index 0000000000000..2525bcac345ba --- /dev/null +++ b/docs/changelog/89619.yaml @@ -0,0 +1,6 @@ +pr: 89619 +summary: Make `SnapshotsInProgress` Diffable +area: Snapshot/Restore +type: enhancement +issues: + - 88732 From 107eeacabd98bc779999de769a99db6fe8e23c0d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 13 Sep 2022 10:31:24 +0200 Subject: [PATCH 16/17] CR: comments --- .../org/elasticsearch/cluster/SnapshotsInProgress.java | 10 +++++++--- .../SnapshotsInProgressSerializationTests.java | 7 +++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 44eec0376644b..71b7b1c98446d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -101,7 +101,7 @@ public SnapshotsInProgress withUpdatedEntriesForRepo(String repository, List forRepo = new ArrayList<>(entries.getOrDefault(entry.repository(), ByRepo.EMPTY).entries); + final List forRepo = new ArrayList<>(forRepo(entry.repository())); forRepo.add(entry); return withUpdatedEntriesForRepo(entry.repository(), forRepo); } @@ -1618,7 +1618,9 @@ public Diff diff(ByRepo previousState) { public static Map toPositionMap(ByRepo part) { final Map res = Maps.newMapWithExpectedSize(part.entries.size()); for (int i = 0; i < part.entries.size(); i++) { - res.put(part.entries.get(i).snapshot().getSnapshotId().getUUID(), i); + final String snapshotUUID = part.entries.get(i).snapshot().getSnapshotId().getUUID(); + assert res.containsKey(snapshotUUID) == false; + res.put(snapshotUUID, i); } return res; } @@ -1626,7 +1628,9 @@ public static Map toPositionMap(ByRepo part) { public static Map toMapByUUID(ByRepo part) { final Map res = Maps.newMapWithExpectedSize(part.entries.size()); for (Entry entry : part.entries) { - res.put(entry.snapshot().getSnapshotId().getUUID(), entry); + final String snapshotUUID = entry.snapshot().getSnapshotId().getUUID(); + assert res.containsKey(snapshotUUID) == false; + res.put(snapshotUUID, entry); } return res; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 6a690937342a7..35dc417608596 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -143,13 +143,16 @@ protected Custom makeTestChanges(Custom testInstance) { if (randomBoolean()) { // modify some elements for (List perRepoEntries : updatedInstance.entriesByRepo()) { - final List entries = new ArrayList<>(perRepoEntries); + List entries = new ArrayList<>(perRepoEntries); for (int i = 0; i < entries.size(); i++) { if (randomBoolean()) { final Entry entry = entries.get(i); entries.set(i, mutateEntryWithLegalChange(entry)); } } + if (randomBoolean()) { + entries = shuffledList(entries); + } updatedInstance = updatedInstance.withUpdatedEntriesForRepo(perRepoEntries.get(0).repository(), entries); } } @@ -317,7 +320,7 @@ private Entry mutateEntryWithLegalChange(Entry entry) { ); } case 1 -> { - long repositoryStateId = randomValueOtherThan(entry.startTime(), ESTestCase::randomLong); + long repositoryStateId = randomValueOtherThan(entry.repositoryStateId(), ESTestCase::randomLong); return Entry.snapshot( entry.snapshot(), entry.includeGlobalState(), From 97c71b1e0370aaa35113b583c6589002b8e55649 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 13 Sep 2022 12:54:45 +0200 Subject: [PATCH 17/17] verify diffable --- .../cluster/SnapshotsInProgress.java | 38 +++++++++++++++++++ ...SnapshotsInProgressSerializationTests.java | 2 +- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 71b7b1c98446d..eaff0a7d3eb39 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -1426,6 +1426,13 @@ public RepositoryShardId readKey(StreamInput in) throws IOException { @SuppressWarnings("unchecked") EntryDiff(Entry before, Entry after) { + try { + verifyDiffable(before, after); + } catch (Exception e) { + final IllegalArgumentException ex = new IllegalArgumentException("Cannot diff [" + before + "] and [" + after + "]"); + assert false : ex; + throw ex; + } this.indexByIndexNameDiff = DiffableUtils.diff( before.indices, after.indices, @@ -1454,6 +1461,37 @@ public RepositoryShardId readKey(StreamInput in) throws IOException { } } + private static void verifyDiffable(Entry before, Entry after) { + if (before.snapshot().equals(after.snapshot()) == false) { + throw new IllegalArgumentException("snapshot changed from [" + before.snapshot() + "] to [" + after.snapshot() + "]"); + } + if (before.startTime() != after.startTime()) { + throw new IllegalArgumentException("start time changed from [" + before.startTime() + "] to [" + after.startTime() + "]"); + } + if (Objects.equals(before.source(), after.source()) == false) { + throw new IllegalArgumentException("source changed from [" + before.source() + "] to [" + after.source() + "]"); + } + if (before.includeGlobalState() != after.includeGlobalState()) { + throw new IllegalArgumentException( + "include global state changed from [" + before.includeGlobalState() + "] to [" + after.includeGlobalState() + "]" + ); + } + if (before.partial() != after.partial()) { + throw new IllegalArgumentException("partial changed from [" + before.partial() + "] to [" + after.partial() + "]"); + } + if (before.featureStates().equals(after.featureStates()) == false) { + throw new IllegalArgumentException( + "feature states changed from " + before.featureStates() + " to " + after.featureStates() + ); + } + if (Objects.equals(before.userMetadata(), after.userMetadata()) == false) { + throw new IllegalArgumentException("user metadata changed from " + before.userMetadata() + " to " + after.userMetadata()); + } + if (before.version().equals(after.version()) == false) { + throw new IllegalArgumentException("version changed from " + before.version() + " to " + after.version()); + } + } + @Override public Entry apply(Entry part) { final var updatedIndices = indexByIndexNameDiff.apply(part.indices); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 35dc417608596..095b248e9f2fe 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -59,7 +59,7 @@ protected Custom createTestInstance() { } private Entry randomSnapshot() { - Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10))); + Snapshot snapshot = new Snapshot("repo-" + randomInt(5), new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10))); boolean includeGlobalState = randomBoolean(); boolean partial = randomBoolean(); int numberOfIndices = randomIntBetween(0, 10);