Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/128650.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 128650
summary: Update shardGenerations for all indices on snapshot finalization
area: Snapshot/Restore
type: enhancement
issues:
- 108907
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/
public final class FinalizeSnapshotContext extends DelegatingActionListener<RepositoryData, RepositoryData> {

private final ShardGenerations updatedShardGenerations;
private final UpdatedShardGenerations updatedShardGenerations;

/**
* Obsolete shard generations map computed from the cluster state update that this finalization executed in
Expand All @@ -46,7 +46,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
private final Runnable onDone;

/**
* @param updatedShardGenerations updated shard generations
* @param updatedShardGenerations updated shard generations for both live and deleted indices
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
Expand All @@ -57,7 +57,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
* once all cleanup operations after snapshot completion have executed
*/
public FinalizeSnapshotContext(
ShardGenerations updatedShardGenerations,
UpdatedShardGenerations updatedShardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Expand All @@ -78,7 +78,7 @@ public long repositoryStateId() {
return repositoryStateId;
}

public ShardGenerations updatedShardGenerations() {
public UpdatedShardGenerations updatedShardGenerations() {
return updatedShardGenerations;
}

Expand Down Expand Up @@ -120,4 +120,20 @@ public void onDone() {
public void onResponse(RepositoryData repositoryData) {
delegate.onResponse(repositoryData);
}

/**
* A record used to track the new shard generations that have been written for each shard in a snapshot.
* An index may be deleted after the shard generation is written but before the snapshot is finalized.
* In this case, its shard generation is tracked in {@link #deletedIndices} because it's still a valid
* shard generation blob that exists in the repository and may be used by subsequent snapshots, even though
* the index will not be included in the snapshot being finalized. Otherwise, it is tracked in
* {@link #liveIndices}.
*/
public record UpdatedShardGenerations(ShardGenerations liveIndices, ShardGenerations deletedIndices) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think this will work (but I will add some other comments elsewhere)

public static final UpdatedShardGenerations EMPTY = new UpdatedShardGenerations(ShardGenerations.EMPTY, ShardGenerations.EMPTY);

public boolean hasShardGen(RepositoryShardId repositoryShardId) {
return liveIndices.hasShardGen(repositoryShardId) || deletedIndices.hasShardGen(repositoryShardId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
Expand Down Expand Up @@ -405,16 +406,16 @@ public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapsh
*
* @param snapshotId Id of the new snapshot
* @param details Details of the new snapshot
* @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new
* generations indexed by the shard id they correspond to must be supplied.
* @param updatedShardGenerations Updated shard generations in the new snapshot, including both indices that are included
* in the given snapshot and those got deleted while finalizing.
* @param indexMetaBlobs Map of index metadata blob uuids
* @param newIdentifiers Map of new index metadata blob uuids keyed by the identifiers of the
* {@link IndexMetadata} in them
*/
public RepositoryData addSnapshot(
final SnapshotId snapshotId,
final SnapshotDetails details,
final ShardGenerations shardGenerations,
final UpdatedShardGenerations updatedShardGenerations,
@Nullable final Map<IndexId, String> indexMetaBlobs,
@Nullable final Map<String, String> newIdentifiers
) {
Expand All @@ -424,12 +425,13 @@ public RepositoryData addSnapshot(
// the new master, so we make the operation idempotent
return this;
}
final var liveIndexIds = updatedShardGenerations.liveIndices().indices();
Map<String, SnapshotId> snapshots = new HashMap<>(snapshotIds);
snapshots.put(snapshotId.getUUID(), snapshotId);
Map<String, SnapshotDetails> newSnapshotDetails = new HashMap<>(snapshotsDetails);
newSnapshotDetails.put(snapshotId.getUUID(), details);
Map<IndexId, List<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
for (final IndexId indexId : shardGenerations.indices()) {
for (final IndexId indexId : liveIndexIds) {
final List<SnapshotId> snapshotIds = allIndexSnapshots.get(indexId);
if (snapshotIds == null) {
allIndexSnapshots.put(indexId, List.of(snapshotId));
Expand All @@ -445,11 +447,8 @@ public RepositoryData addSnapshot(
: "Index meta generations should have been empty but was [" + indexMetaDataGenerations + "]";
newIndexMetaGenerations = IndexMetaDataGenerations.EMPTY;
} else {
assert indexMetaBlobs.isEmpty() || shardGenerations.indices().equals(indexMetaBlobs.keySet())
: "Shard generations contained indices "
+ shardGenerations.indices()
+ " but indexMetaData was given for "
+ indexMetaBlobs.keySet();
assert indexMetaBlobs.isEmpty() || liveIndexIds.equals(indexMetaBlobs.keySet())
: "Shard generations contained indices " + liveIndexIds + " but indexMetaData was given for " + indexMetaBlobs.keySet();
newIndexMetaGenerations = indexMetaDataGenerations.withAddedSnapshot(snapshotId, indexMetaBlobs, newIdentifiers);
}

Expand All @@ -459,7 +458,7 @@ public RepositoryData addSnapshot(
snapshots,
newSnapshotDetails,
allIndexSnapshots,
ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build(),
ShardGenerations.builder().putAll(this.shardGenerations).update(updatedShardGenerations).build(),
newIndexMetaGenerations,
clusterUUID
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;

/**
* Represents the current {@link ShardGeneration} for each shard in a repository.
*/
Expand Down Expand Up @@ -231,6 +233,14 @@ public Builder putAll(ShardGenerations shardGenerations) {
return this;
}

public Builder update(UpdatedShardGenerations updatedShardGenerations) {
putAll(updatedShardGenerations.liveIndices());
// For deleted indices, we only update the generations if they are present in the existing generations, i.e.
// they are referenced by other snapshots.
updateIfPresent(updatedShardGenerations.deletedIndices());
return this;
}

public Builder put(IndexId indexId, int shardId, SnapshotsInProgress.ShardSnapshotStatus status) {
// only track generations for successful shard status values
return put(indexId, shardId, status.state().failed() ? null : status.generation());
Expand All @@ -244,6 +254,20 @@ public Builder put(IndexId indexId, int shardId, ShardGeneration generation) {
return this;
}

private void updateIfPresent(ShardGenerations shardGenerations) {
shardGenerations.shardGenerations.forEach((indexId, gens) -> {
final Map<Integer, ShardGeneration> existingShardGens = generations.get(indexId);
if (existingShardGens != null) {
for (int i = 0; i < gens.size(); i++) {
final ShardGeneration gen = gens.get(i);
if (gen != null) {
existingShardGens.put(i, gen);
}
}
}
});
}

private boolean noDuplicateIndicesWithSameName(IndexId newId) {
for (IndexId id : generations.keySet()) {
if (id.getName().equals(newId.getName()) && id.equals(newId) == false) {
Expand All @@ -254,6 +278,9 @@ private boolean noDuplicateIndicesWithSameName(IndexId newId) {
}

public ShardGenerations build() {
if (generations.isEmpty()) {
return EMPTY;
}
return new ShardGenerations(generations.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
final Set<Integer> shardIds = entry.getValue().keySet();
assert shardIds.isEmpty() == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1749,11 +1749,10 @@ int sizeInBytes() {
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN
: "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";
final Collection<IndexId> indices = shardGenerations.indices();
final Collection<IndexId> indices = finalizeSnapshotContext.updatedShardGenerations().liveIndices().indices();
final SnapshotId snapshotId = snapshotInfo.snapshotId();
// Once we are done writing the updated index-N blob we remove the now unreferenced index-${uuid} blobs in each shard
// directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION
Expand Down Expand Up @@ -1867,7 +1866,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
existingRepositoryData.addSnapshot(
snapshotId,
snapshotDetails,
shardGenerations,
finalizeSnapshotContext.updatedShardGenerations(),
metadataWriteResult.indexMetas(),
metadataWriteResult.indexMetaIdentifiers()
),
Expand Down
Loading
Loading