Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/91851.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 91851
summary: Simplify and optimize deduplication of `RepositoryData` for a non-caching
repository instance
area: Snapshot/Restore
type: bug
issues:
- 89952
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.SingleResultDeduplicator;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
Expand Down Expand Up @@ -413,7 +413,11 @@ protected BlobStoreRepository(
this.namedXContentRegistry = namedXContentRegistry;
this.basePath = basePath;
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(
threadPool.getThreadContext(),
listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData))
);
shardSnapshotTaskRunner = new ShardSnapshotTaskRunner(
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
threadPool.executor(ThreadPool.Names.SNAPSHOT),
Expand Down Expand Up @@ -1787,19 +1791,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
metadata.name(),
latestKnownRepoGen
);
// Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
// Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
// generation may change
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
if (bestEffortConsistency || cacheRepositoryData == false) {
executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
} else {
repoDataDeduplicator.executeOnce(
metadata,
listener,
(metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))
);
}
repoDataLoadDeduplicator.execute(listener);
}
}

Expand Down Expand Up @@ -1843,78 +1835,70 @@ private void initializeRepoGenerationTracking(ActionListener<RepositoryData> lis
}
existingListener.onFailure(e);
};
threadPool.generic()
.execute(
ActionRunnable.wrap(
ActionListener.wrap(
repoData -> submitUnbatchedTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except
// for
// extreme corner cases like failing over to an older version master node and back to the
// current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(),
"Found unexpected initialized repo metadata [" + metadata + "]"
);
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
currentState.metadata()
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(
metadata.name(),
repoData.getGenId(),
repoData.getGenId()
)
)
repoDataLoadDeduplicator.execute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

ActionListener.wrap(
repoData -> submitUnbatchedTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except
// for
// extreme corner cases like failing over to an older version master node and back to the
// current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(),
"Found unexpected initialized repo metadata [" + metadata + "]"
);
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
currentState.metadata()
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(metadata.name(), repoData.getGenId(), repoData.getGenId())
)
.build();
}
)
.build();
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
logger.trace(
"[{}] initialized repository generation in cluster state to [{}]",
metadata.name(),
repoData.getGenId()
);
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
existingListener.onResponse(repoData);
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
});
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
logger.trace(
"[{}] initialized repository generation in cluster state to [{}]",
metadata.name(),
repoData.getGenId()
);
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
}
),
onFailure
),
this::doGetRepositoryData
)
);
existingListener.onResponse(repoData);
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
});
}
}
),
onFailure
)
);
} else {
logger.trace(
"[{}] waiting for existing initialization of repository metadata generation in cluster state",
Expand All @@ -1926,11 +1910,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
}

/**
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
* unique for a given value of {@link #metadata} at any point in time.
* Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage.
*/
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator;
private final SingleResultDeduplicator<RepositoryData> repoDataLoadDeduplicator;

private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ private static ClusterService mockClusterService(ClusterState initialState) {
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
when(threadPool.executor(ThreadPool.Names.SNAPSHOT_META)).thenReturn(new SameThreadExecutorService());
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))
Expand Down