Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,12 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup
}
final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repository.getRepositoryData(repositoryDataListener);
try {
repository.getRepositoryData(repositoryDataListener);
} catch(Exception e) {
listener.onFailure(e);
return;
}
repositoryDataListener.whenComplete(repositoryData -> {
final long repositoryStateId = repositoryData.getGenId();
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener);
getRepositoryData(repositoryName, repositoryDataListener);
repositoryDataListener.whenComplete(repositoryData -> {
final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null);
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
Expand Down Expand Up @@ -1199,8 +1199,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener<Void> listener,
final boolean immediatePriority) {
// First, look for the snapshot in the repository
final Repository repository = repositoriesService.repository(repositoryName);
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
getRepositoryData(repositoryName, ActionListener.wrap(repositoryData -> {
Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
.stream()
.filter(s -> s.getName().equals(snapshotName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.elasticsearch.snapshots;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -34,7 +36,9 @@
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;

import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -108,6 +112,54 @@ public void testConcurrentlyChangeRepositoryContents() throws Exception {
.addSnapshots(snapshot).get().getSnapshots(repoName));
}

public void testRetrievingRepositoryDataThrows() throws Exception {
disableRepoConsistencyCheck("This test does not create any data in the repository.");
String randomNodeName = randomFrom(internalCluster().getNodeNames());
Client client = client(randomNodeName);

Path repo = randomRepoPath();
final String repoName = "test-mock-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("mock").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put(BlobStoreRepository.ALLOW_CONCURRENT_MODIFICATION.getKey(), true)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class)
.repository(repoName)).setThrowOnGetRepositoryData(true);

ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> {
client.admin().cluster().prepareCreateSnapshot(repoName, "test-snap").setWaitForCompletion(true).get();
});
assertThat(e.getMessage(), Matchers.is("Expected test exception"));

GetSnapshotsResponse snapshotsResponse = client.admin().cluster().prepareGetSnapshots(repoName).get();
assertThat(snapshotsResponse.getSuccessfulResponses().size(), Matchers.is(0));
assertThat(snapshotsResponse.getFailedResponses().size(), Matchers.is(1));
assertThat(snapshotsResponse.getFailedResponses().get(repoName).getMessage(),
Matchers.is("Expected test exception"));

e = expectThrows(ElasticsearchException.class, () -> {
client.admin().cluster().prepareDeleteSnapshot(repoName, "test-snap").get();
});
assertThat(e.getMessage(), Matchers.is("Expected test exception"));

e = expectThrows(ElasticsearchException.class, () -> {
client.admin().cluster().prepareRestoreSnapshot(repoName, "test-snap").get();
});
assertThat(e.getMessage(), Matchers.is("Expected test exception"));

e = expectThrows(ElasticsearchException.class, () -> {
client.admin().cluster().prepareCleanupRepository(repoName).get();
});
assertThat(e.getMessage(), Matchers.is("Expected test exception"));

((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class)
.repository(repoName)).setThrowOnGetRepositoryData(false);
}

public void testConcurrentlyChangeRepositoryContentsInBwCMode() throws Exception {
Client client = client();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.fs.FsRepository;

import java.io.IOException;
Expand All @@ -56,6 +58,7 @@
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -115,6 +118,8 @@ public long getFailureCount() {

private volatile boolean blocked = false;

private AtomicBoolean throwOnGetRepositoryData = new AtomicBoolean(false);

public MockRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService);
Expand All @@ -131,6 +136,19 @@ public MockRepository(RepositoryMetaData metadata, Environment environment,
logger.info("starting mock repository with random prefix {}", randomPrefix);
}

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (throwOnGetRepositoryData.get()) {
throw new ElasticsearchException("Expected test exception");
} else {
super.getRepositoryData(listener);
}
}

public void setThrowOnGetRepositoryData(boolean throwOnGet) {
this.throwOnGetRepositoryData.set(throwOnGet);
}

@Override
public RepositoryMetaData getMetadata() {
return overrideSettings(super.getMetadata(), env);
Expand Down