diff --git a/docs/changelog/91851.yaml b/docs/changelog/91851.yaml new file mode 100644 index 0000000000000..36a09f4782024 --- /dev/null +++ b/docs/changelog/91851.yaml @@ -0,0 +1,7 @@ +pr: 91851 +summary: Simplify and optimize deduplication of `RepositoryData` for a non-caching + repository instance +area: Snapshot/Restore +type: bug +issues: + - 89952 diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index d7df332673be3..0dc2422f9c53f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -25,7 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.ResultDeduplicator; +import org.elasticsearch.action.SingleResultDeduplicator; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.ListenableActionFuture; @@ -413,7 +413,11 @@ protected BlobStoreRepository( this.namedXContentRegistry = namedXContentRegistry; this.basePath = basePath; this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings()); - this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext()); + this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>( + threadPool.getThreadContext(), + listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META) + .execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)) + ); shardSnapshotTaskRunner = new ShardSnapshotTaskRunner( threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), threadPool.executor(ThreadPool.Names.SNAPSHOT), @@ -1787,19 +1791,7 @@ public void getRepositoryData(ActionListener listener) { metadata.name(), latestKnownRepoGen ); - // Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state - // Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given - // generation may change - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META); - if (bestEffortConsistency || cacheRepositoryData == false) { - executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); - } else { - repoDataDeduplicator.executeOnce( - metadata, - listener, - (metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData)) - ); - } + repoDataLoadDeduplicator.execute(listener); } } @@ -1843,78 +1835,70 @@ private void initializeRepoGenerationTracking(ActionListener lis } existingListener.onFailure(e); }; - threadPool.generic() - .execute( - ActionRunnable.wrap( - ActionListener.wrap( - repoData -> submitUnbatchedTask( - "set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]", - new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - RepositoryMetadata metadata = getRepoMetadata(currentState); - // No update to the repository generation should have occurred concurrently in general except - // for - // extreme corner cases like failing over to an older version master node and back to the - // current - // node concurrently - if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) { - throw new RepositoryException( - metadata.name(), - "Found unexpected initialized repo metadata [" + metadata + "]" - ); - } - return ClusterState.builder(currentState) - .metadata( - Metadata.builder(currentState.getMetadata()) - .putCustom( - RepositoriesMetadata.TYPE, - currentState.metadata() - .custom(RepositoriesMetadata.TYPE) - .withUpdatedGeneration( - metadata.name(), - repoData.getGenId(), - repoData.getGenId() - ) - ) + repoDataLoadDeduplicator.execute( + ActionListener.wrap( + repoData -> submitUnbatchedTask( + "set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + RepositoryMetadata metadata = getRepoMetadata(currentState); + // No update to the repository generation should have occurred concurrently in general except + // for + // extreme corner cases like failing over to an older version master node and back to the + // current + // node concurrently + if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) { + throw new RepositoryException( + metadata.name(), + "Found unexpected initialized repo metadata [" + metadata + "]" + ); + } + return ClusterState.builder(currentState) + .metadata( + Metadata.builder(currentState.getMetadata()) + .putCustom( + RepositoriesMetadata.TYPE, + currentState.metadata() + .custom(RepositoriesMetadata.TYPE) + .withUpdatedGeneration(metadata.name(), repoData.getGenId(), repoData.getGenId()) ) - .build(); - } + ) + .build(); + } - @Override - public void onFailure(Exception e) { - onFailure.accept(e); - } + @Override + public void onFailure(Exception e) { + onFailure.accept(e); + } - @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - logger.trace( - "[{}] initialized repository generation in cluster state to [{}]", - metadata.name(), - repoData.getGenId() - ); - // Resolve listeners on generic pool since some callbacks for repository data do additional IO - threadPool.generic().execute(() -> { - final ActionListener existingListener; - synchronized (BlobStoreRepository.this) { - existingListener = repoDataInitialized; - repoDataInitialized = null; - } - existingListener.onResponse(repoData); - logger.trace( - "[{}] called listeners after initializing repository to generation [{}]", - metadata.name(), - repoData.getGenId() - ); - }); + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + logger.trace( + "[{}] initialized repository generation in cluster state to [{}]", + metadata.name(), + repoData.getGenId() + ); + // Resolve listeners on generic pool since some callbacks for repository data do additional IO + threadPool.generic().execute(() -> { + final ActionListener existingListener; + synchronized (BlobStoreRepository.this) { + existingListener = repoDataInitialized; + repoDataInitialized = null; } - } - ), - onFailure - ), - this::doGetRepositoryData - ) - ); + existingListener.onResponse(repoData); + logger.trace( + "[{}] called listeners after initializing repository to generation [{}]", + metadata.name(), + repoData.getGenId() + ); + }); + } + } + ), + onFailure + ) + ); } else { logger.trace( "[{}] waiting for existing initialization of repository metadata generation in cluster state", @@ -1926,11 +1910,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) } /** - * {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning - * {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is - * unique for a given value of {@link #metadata} at any point in time. + * Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage. */ - private final ResultDeduplicator repoDataDeduplicator; + private final SingleResultDeduplicator repoDataLoadDeduplicator; private void doGetRepositoryData(ActionListener listener) { // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index 8304901864ba4..52e4cf7b7ea8d 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -423,6 +423,7 @@ private static ClusterService mockClusterService(ClusterState initialState) { final ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(threadContext); when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); + when(threadPool.executor(ThreadPool.Names.SNAPSHOT_META)).thenReturn(new SameThreadExecutorService()); when(threadPool.generic()).thenReturn(new SameThreadExecutorService()); when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))