Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/85551.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85551
summary: "Distinguish missing and invalid repositories"
area: Snapshot/Restore
type: bug
issues: [85550]
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories;

import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isA;

public class InvalidRepositoryIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(UnstableRepository.Plugin.class);
}

public static class UnstableRepository extends MockRepository {
public static final String TYPE = "unstable";
public static final Setting<List<String>> UNSTABLE_NODES = Setting.stringListSetting(
"repository.unstable_nodes",
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public UnstableRepository(
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
List<String> unstableNodes = UNSTABLE_NODES.get(metadata.settings());
if (unstableNodes.contains(clusterService.getNodeName())) {
throw new RepositoryException(metadata.name(), "Failed to create repository: current node is not stable");
}
}

public static class Plugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
@Override
public Map<String, Factory> getRepositories(
Environment env,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
return Collections.singletonMap(
TYPE,
(metadata) -> new UnstableRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
);
}

@Override
public List<Setting<?>> getSettings() {
return List.of(UNSTABLE_NODES);
}
}
}

public void testCreateInvalidRepository() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
final String repositoryName = "test-duplicate-create-repo";

// put repository for the first time: only let master node create repository successfully
createRepository(
repositoryName,
UnstableRepository.TYPE,
Settings.builder()
.put("location", randomRepoPath())
.putList(
UnstableRepository.UNSTABLE_NODES.getKey(),
Arrays.stream(internalCluster().getNodeNames())
.filter(name -> name.equals(internalCluster().getMasterName()) == false)
.toList()
)
);
// verification should fail with some node has InvalidRepository
final var expectedException = expectThrows(
RepositoryVerificationException.class,
() -> client().admin().cluster().prepareVerifyRepository(repositoryName).get()
);
for (Throwable suppressed : expectedException.getSuppressed()) {
Throwable outerCause = suppressed.getCause();
assertThat(outerCause, isA(RepositoryException.class));
assertThat(
outerCause.getMessage(),
equalTo("[" + repositoryName + "] repository type [" + UnstableRepository.TYPE + "] failed to create on current node")
);
Throwable innerCause = suppressed.getCause().getCause().getCause();
assertThat(innerCause, isA(RepositoryException.class));
assertThat(
innerCause.getMessage(),
equalTo("[" + repositoryName + "] Failed to create repository: current node is not stable")
);
}

// restart master
internalCluster().restartNode(internalCluster().getMasterName());
ensureGreen();

// put repository again: let all node can create repository successfully
createRepository(repositoryName, UnstableRepository.TYPE, Settings.builder().put("location", randomRepoPath()));
// verification should succeed with all node create repository successfully
VerifyRepositoryResponse verifyRepositoryResponse = client().admin().cluster().prepareVerifyRepository(repositoryName).get();
assertEquals(verifyRepositoryResponse.getNodes().size(), internalCluster().numDataAndMasterNodes());

}

private void createRepository(String name, String type, Settings.Builder settings) {
// create
assertAcked(client().admin().cluster().preparePutRepository(name).setType(type).setVerify(false).setSettings(settings).get());
// get
final GetRepositoriesResponse updatedGetRepositoriesResponse = client().admin().cluster().prepareGetRepositories(name).get();
// assert
assertThat(updatedGetRepositoriesResponse.repositories(), hasSize(1));
final RepositoryMetadata updatedRepositoryMetadata = updatedGetRepositoriesResponse.repositories().get(0);
assertThat(updatedRepositoryMetadata.type(), equalTo(type));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.IOException;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Represents a repository that exists in the cluster state but could not be instantiated on a node, typically due to invalid configuration.
*/
public class InvalidRepository extends AbstractLifecycleComponent implements Repository {

private final RepositoryMetadata repositoryMetadata;
private final RepositoryException creationException;

public InvalidRepository(RepositoryMetadata repositoryMetadata, RepositoryException creationException) {
this.repositoryMetadata = repositoryMetadata;
this.creationException = creationException;
}

private RepositoryException createCreationException() {
return new RepositoryException(
repositoryMetadata.name(),
"repository type [" + repositoryMetadata.type() + "] failed to create on current node",
creationException
);
}

@Override
public RepositoryMetadata getMetadata() {
return repositoryMetadata;
}

@Override
public void getSnapshotInfo(GetSnapshotInfoContext context) {
throw createCreationException();
}

@Override
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
throw createCreationException();
}

@Override
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
throw createCreationException();
}

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
listener.onFailure(createCreationException());
}

@Override
public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
finalizeSnapshotContext.onFailure(createCreationException());
}

@Override
public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
ActionListener<RepositoryData> listener
) {
listener.onFailure(createCreationException());
}

@Override
public long getSnapshotThrottleTimeInNanos() {
throw createCreationException();
}

@Override
public long getRestoreThrottleTimeInNanos() {
throw createCreationException();
}

@Override
public String startVerification() {
throw createCreationException();
}

@Override
public void endVerification(String verificationToken) {
throw createCreationException();
}

@Override
public void verify(String verificationToken, DiscoveryNode localNode) {
throw createCreationException();
}

@Override
public boolean isReadOnly() {
// this repository is assumed writable to bypass read-only check and fail with exception produced by this class
return false;
}

@Override
public void snapshotShard(SnapshotShardContext snapshotShardContext) {
snapshotShardContext.onFailure(createCreationException());
}

@Override
public void restoreShard(
Store store,
SnapshotId snapshotId,
IndexId indexId,
ShardId snapshotShardId,
RecoveryState recoveryState,
ActionListener<Void> listener
) {
listener.onFailure(createCreationException());
}

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
throw createCreationException();
}

@Override
public void updateState(ClusterState state) {

}

@Override
public void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
String source,
Consumer<Exception> onFailure
) {
onFailure.accept(createCreationException());
}

@Override
public void cloneShardSnapshot(
SnapshotId source,
SnapshotId target,
RepositoryShardId shardId,
ShardGeneration shardGeneration,
ActionListener<ShardSnapshotResult> listener
) {
listener.onFailure(createCreationException());
}

@Override
public void awaitIdle() {

}

@Override
protected void doStart() {

}

@Override
protected void doStop() {

}

@Override
protected void doClose() throws IOException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,19 +515,20 @@ public void applyClusterState(ClusterChangedEvent event) {
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
logger.warn(() -> new ParameterizedMessage("failed to change repository [{}]", repositoryMetadata.name()), ex);
repository = new InvalidRepository(repositoryMetadata, ex);
}
}
} else {
try {
repository = createRepository(repositoryMetadata, typesRegistry, RepositoriesService::createUnknownTypeRepository);
} catch (RepositoryException ex) {
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", repositoryMetadata.name()), ex);
repository = new InvalidRepository(repositoryMetadata, ex);
}
}
if (repository != null) {
logger.debug("registering repository [{}]", repositoryMetadata.name());
builder.put(repositoryMetadata.name(), repository);
}
assert repository != null : "repository should not be null here";
logger.debug("registering repository [{}]", repositoryMetadata.name());
builder.put(repositoryMetadata.name(), repository);
}
for (Repository repo : builder.values()) {
repo.updateState(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

/**
* This class represents a repository that could not be initialized due to unknown type.
* This could happen whe a user creates a snapshot repository using a type from a plugin and then removes the plugin.
* This could happen when a user creates a snapshot repository using a type from a plugin and then removes the plugin.
*/
public class UnknownTypeRepository extends AbstractLifecycleComponent implements Repository {

Expand Down
Loading