From d784ccc21360fb5b1175b4356bb06839c73be872 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 4 Dec 2018 14:36:50 -0700 Subject: [PATCH 1/3] Register CcrRepository based on settings update (#36086) This commit adds an empty CcrRepository snapshot/restore repository. When a new cluster is registered in the remote cluster settings, a new CcrRepository is registered for that cluster. This is implemented using a new concept of "internal repositories". RepositoryPlugin now allows implementations to return factories for "internal repositories". The "internal repositories" are different from normal repositories in that they cannot be registered through the external repository api. Additionally, "internal repositories" are local to a node and are not stored in the cluster state. The repository will be unregistered if the remote cluster is removed. --- .../plugins/RepositoryPlugin.java | 13 + .../repositories/RepositoriesModule.java | 18 +- .../repositories/RepositoriesService.java | 50 +++- .../transport/RemoteClusterAware.java | 10 +- .../transport/RemoteClusterService.java | 5 +- ...ClusterStateServiceRandomUpdatesTests.java | 2 +- .../repositories/RepositoriesModuleTests.java | 101 ++++++++ .../RepositoriesServiceTests.java | 233 ++++++++++++++++++ .../transport/RemoteClusterServiceTests.java | 9 +- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 57 +++-- .../xpack/ccr/CcrRepositoryManager.java | 48 ++++ .../DeleteInternalCcrRepositoryAction.java | 72 ++++++ .../DeleteInternalCcrRepositoryRequest.java | 63 +++++ .../PutInternalCcrRepositoryAction.java | 72 ++++++ .../PutInternalCcrRepositoryRequest.java | 71 ++++++ .../xpack/ccr/repository/CcrRepository.java | 149 +++++++++++ .../xpack/ccr/CcrRepositoryManagerIT.java | 62 +++++ .../core/LocalStateCompositeXPackPlugin.java | 17 ++ .../authz/IndicesAndAliasesResolver.java | 9 +- 19 files changed, 1012 insertions(+), 49 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java create mode 100644 server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java diff --git a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java index a3af52a9a4aca..5c15040609863 100644 --- a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java @@ -42,4 +42,17 @@ public interface RepositoryPlugin { default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { return Collections.emptyMap(); } + + /** + * Returns internal repository types added by this plugin. Internal repositories cannot be registered + * through the external API. + * + * @param env The environment for the local node, which may be used for the local settings and path.repo + * + * The key of the returned {@link Map} is the type name of the repository and + * the value is a factory to construct the {@link Repository} interface. + */ + default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + return Collections.emptyMap(); + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index 637ec2d8dfbc1..90e3c94dfb3c5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -58,8 +58,24 @@ public RepositoriesModule(Environment env, List repoPlugins, T } } + Map internalFactories = new HashMap<>(); + for (RepositoryPlugin repoPlugin : repoPlugins) { + Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry); + for (Map.Entry entry : newRepoTypes.entrySet()) { + if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { + throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); + } + if (factories.put(entry.getKey(), entry.getValue()) != null) { + throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered as a " + + "non-internal repository"); + } + } + } + Map repositoryTypes = Collections.unmodifiableMap(factories); - repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes, threadPool); + Map internalRepositoryTypes = Collections.unmodifiableMap(internalFactories); + repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes, + internalRepositoryTypes, threadPool); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index d0b8a2caf8c77..428b1eba046c2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; @@ -57,6 +58,7 @@ public class RepositoriesService implements ClusterStateApplier { private static final Logger logger = LogManager.getLogger(RepositoriesService.class); private final Map typesRegistry; + private final Map internalTypesRegistry; private final ClusterService clusterService; @@ -64,12 +66,14 @@ public class RepositoriesService implements ClusterStateApplier { private final VerifyNodeRepositoryAction verifyAction; + private final Map internalRepositories = ConcurrentCollections.newConcurrentMap(); private volatile Map repositories = Collections.emptyMap(); public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService, - Map typesRegistry, + Map typesRegistry, Map internalTypesRegistry, ThreadPool threadPool) { this.typesRegistry = typesRegistry; + this.internalTypesRegistry = internalTypesRegistry; this.clusterService = clusterService; this.threadPool = threadPool; // Doesn't make sense to maintain repositories on non-master and non-data nodes @@ -101,7 +105,7 @@ public void registerRepository(final RegisterRepositoryRequest request, final Ac // Trying to create the new repository on master to make sure it works try { - closeRepository(createRepository(newRepositoryMetaData)); + closeRepository(createRepository(newRepositoryMetaData, typesRegistry)); } catch (Exception e) { registrationListener.onFailure(e); return; @@ -316,7 +320,7 @@ public void applyClusterState(ClusterChangedEvent event) { closeRepository(repository); repository = null; try { - repository = createRepository(repositoryMetaData); + repository = createRepository(repositoryMetaData, typesRegistry); } catch (RepositoryException ex) { // TODO: this catch is bogus, it means the old repo is already closed, // but we have nothing to replace it @@ -325,7 +329,7 @@ public void applyClusterState(ClusterChangedEvent event) { } } else { try { - repository = createRepository(repositoryMetaData); + repository = createRepository(repositoryMetaData, typesRegistry); } catch (RepositoryException ex) { logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", repositoryMetaData.name()), ex); } @@ -356,9 +360,37 @@ public Repository repository(String repositoryName) { if (repository != null) { return repository; } + repository = internalRepositories.get(repositoryName); + if (repository != null) { + return repository; + } throw new RepositoryMissingException(repositoryName); } + public void registerInternalRepository(String name, String type) { + RepositoryMetaData metaData = new RepositoryMetaData(name, type, Settings.EMPTY); + Repository repository = internalRepositories.computeIfAbsent(name, (n) -> { + logger.debug("put internal repository [{}][{}]", name, type); + return createRepository(metaData, internalTypesRegistry); + }); + if (type.equals(repository.getMetadata().type()) == false) { + logger.warn(new ParameterizedMessage("internal repository [{}][{}] already registered. this prevented the registration of " + + "internal repository [{}][{}].", name, repository.getMetadata().type(), name, type)); + } else if (repositories.containsKey(name)) { + logger.warn(new ParameterizedMessage("non-internal repository [{}] already registered. this repository will block the " + + "usage of internal repository [{}][{}].", name, metaData.type(), name)); + } + } + + public void unregisterInternalRepository(String name) { + Repository repository = internalRepositories.remove(name); + if (repository != null) { + RepositoryMetaData metadata = repository.getMetadata(); + logger.debug(() -> new ParameterizedMessage("delete internal repository [{}][{}].", metadata.type(), name)); + closeRepository(repository); + } + } + /** Closes the given repository. */ private void closeRepository(Repository repository) { logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name()); @@ -366,21 +398,21 @@ private void closeRepository(Repository repository) { } /** - * Creates repository holder + * Creates repository holder. This method starts the repository */ - private Repository createRepository(RepositoryMetaData repositoryMetaData) { + private Repository createRepository(RepositoryMetaData repositoryMetaData, Map factories) { logger.debug("creating repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()); - Repository.Factory factory = typesRegistry.get(repositoryMetaData.type()); + Repository.Factory factory = factories.get(repositoryMetaData.type()); if (factory == null) { throw new RepositoryException(repositoryMetaData.name(), "repository type [" + repositoryMetaData.type() + "] does not exist"); } try { - Repository repository = factory.create(repositoryMetaData, typesRegistry::get); + Repository repository = factory.create(repositoryMetaData, factories::get); repository.start(); return repository; } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e); + logger.warn(new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e); throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index f66ed4572661f..f9aaf7333fa9c 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -162,7 +162,7 @@ public String getKey(final String key) { REMOTE_CLUSTERS_SEEDS); protected final Settings settings; - protected final ClusterNameExpressionResolver clusterNameResolver; + private final ClusterNameExpressionResolver clusterNameResolver; /** * Creates a new {@link RemoteClusterAware} instance @@ -237,14 +237,15 @@ static DiscoveryNode buildSeedNode(String clusterName, String address, boolean p * indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under * {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable. * + * @param remoteClusterNames the remote cluster names * @param requestIndices the indices in the search request to filter * @param indexExists a predicate that can test if a certain index or alias exists in the local cluster * * @return a map of grouped remote and local indices */ - public Map> groupClusterIndices(String[] requestIndices, Predicate indexExists) { + protected Map> groupClusterIndices(Set remoteClusterNames, String[] requestIndices, + Predicate indexExists) { Map> perClusterIndices = new HashMap<>(); - Set remoteClusterNames = getRemoteClusterNames(); for (String index : requestIndices) { int i = index.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR); if (i >= 0) { @@ -276,9 +277,6 @@ public Map> groupClusterIndices(String[] requestIndices, Pr return perClusterIndices; } - protected abstract Set getRemoteClusterNames(); - - /** * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is * empty the cluster alias is unregistered and should be removed. diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index a68c225409dcb..84826b4d1e0db 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -272,7 +272,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no public Map groupIndices(IndicesOptions indicesOptions, String[] indices, Predicate indexExists) { Map originalIndicesMap = new HashMap<>(); if (isCrossClusterSearchEnabled()) { - final Map> groupedIndices = groupClusterIndices(indices, indexExists); + final Map> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices, indexExists); if (groupedIndices.isEmpty()) { //search on _all in the local cluster if neither local indices nor remote indices were specified originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions)); @@ -374,8 +374,7 @@ RemoteClusterConnection getRemoteClusterConnection(String cluster) { return connection; } - @Override - protected Set getRemoteClusterNames() { + Set getRemoteClusterNames() { return this.remoteClusters.keySet(); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 4625aa04be372..89a6bc8dcf89d 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -461,7 +461,7 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod Collections.emptySet()); final ClusterService clusterService = mock(ClusterService.class); final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService, - transportService, null, threadPool); + transportService, Collections.emptyMap(), Collections.emptyMap(), threadPool); final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(threadPool, transportService, null, clusterService); final ShardStateAction shardStateAction = mock(ShardStateAction.class); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java new file mode 100644 index 0000000000000..96a9670d16202 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RepositoriesModuleTests extends ESTestCase { + + private Environment environment; + private NamedXContentRegistry contentRegistry; + private List repoPlugins = new ArrayList<>(); + private RepositoryPlugin plugin1; + private RepositoryPlugin plugin2; + private Repository.Factory factory; + + @Override + public void setUp() throws Exception { + super.setUp(); + environment = mock(Environment.class); + contentRegistry = mock(NamedXContentRegistry.class); + plugin1 = mock(RepositoryPlugin.class); + plugin2 = mock(RepositoryPlugin.class); + factory = mock(Repository.Factory.class); + repoPlugins.add(plugin1); + repoPlugins.add(plugin2); + when(environment.settings()).thenReturn(Settings.EMPTY); + } + + public void testCanRegisterTwoRepositoriesWithDifferentTypes() { + when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory)); + + // Would throw + new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + mock(ThreadPool.class), contentRegistry); + } + + public void testCannotRegisterTwoRepositoriesWithSameTypes() { + when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + mock(ThreadPool.class), contentRegistry)); + + assertEquals("Repository type [type1] is already registered", ex.getMessage()); + } + + public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { + when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + mock(ThreadPool.class), contentRegistry)); + + assertEquals("Internal repository type [type1] is already registered", ex.getMessage()); + } + + public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { + when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + mock(ThreadPool.class), contentRegistry)); + + assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage()); + } +} diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java new file mode 100644 index 0000000000000..c02ab0d185610 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -0,0 +1,233 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class RepositoriesServiceTests extends ESTestCase { + + private RepositoriesService repositoriesService; + + @Override + public void setUp() throws Exception { + super.setUp(); + ThreadPool threadPool = mock(ThreadPool.class); + final TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()); + repositoriesService = new RepositoriesService(Settings.EMPTY, mock(ClusterService.class), + transportService, Collections.emptyMap(), Collections.singletonMap(TestRepository.TYPE, TestRepository::new), threadPool); + } + + public void testRegisterInternalRepository() { + String repoName = "name"; + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(repoName); + assertEquals(repoName, repository.getMetadata().name()); + assertEquals(TestRepository.TYPE, repository.getMetadata().type()); + assertEquals(Settings.EMPTY, repository.getMetadata().settings()); + assertTrue(((TestRepository) repository).isStarted); + } + + public void testUnregisterInternalRepository() { + String repoName = "name"; + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(repoName); + assertFalse(((TestRepository) repository).isClosed); + repositoriesService.unregisterInternalRepository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + assertTrue(((TestRepository) repository).isClosed); + } + + public void testRegisterWillNotUpdateIfInternalRepositoryWithNameExists() { + String repoName = "name"; + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(repoName); + assertFalse(((TestRepository) repository).isClosed); + repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + assertFalse(((TestRepository) repository).isClosed); + Repository repository2 = repositoriesService.repository(repoName); + assertSame(repository, repository2); + } + + private static class TestRepository implements Repository { + + private static final String TYPE = "internal"; + private boolean isClosed; + private boolean isStarted; + + private final RepositoryMetaData metaData; + + private TestRepository(RepositoryMetaData metaData) { + this.metaData = metaData; + } + + @Override + public RepositoryMetaData getMetadata() { + return metaData; + } + + @Override + public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { + return null; + } + + @Override + public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { + return null; + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + return null; + } + + @Override + public RepositoryData getRepositoryData() { + return null; + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + + } + + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState) { + return null; + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + + } + + @Override + public long getSnapshotThrottleTimeInNanos() { + return 0; + } + + @Override + public long getRestoreThrottleTimeInNanos() { + return 0; + } + + @Override + public String startVerification() { + return null; + } + + @Override + public void endVerification(String verificationToken) { + + } + + @Override + public void verify(String verificationToken, DiscoveryNode localNode) { + + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + + } + + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { + + } + + @Override + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + return null; + } + + @Override + public Lifecycle.State lifecycleState() { + return null; + } + + @Override + public void addLifecycleListener(LifecycleListener listener) { + + } + + @Override + public void removeLifecycleListener(LifecycleListener listener) { + + } + + @Override + public void start() { + isStarted = true; + } + + @Override + public void stop() { + + } + + @Override + public void close() { + isClosed = true; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index f03b202deec37..d41faee81bdfd 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -218,8 +218,9 @@ public void testGroupClusterIndices() throws IOException { assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); assertFalse(service.isRemoteClusterRegistered("foo")); - Map> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", - "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"}, + Map> perClusterIndices = service.groupClusterIndices(service.getRemoteClusterNames(), + new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", + "cluster*:baz", "*:boo", "no*match:boo"}, i -> false); List localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); assertNotNull(localIndices); @@ -229,7 +230,7 @@ public void testGroupClusterIndices() throws IOException { assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2")); IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> - service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", + service.groupClusterIndices(service.getRemoteClusterNames(), new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals)); assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" + @@ -276,7 +277,7 @@ public void testGroupIndices() throws IOException { } { IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> - service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", + service.groupClusterIndices(service.getRemoteClusterNames(), new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals)); assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" + " cluster_1", iae.getMessage()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index a21052fdce85c..1d6bf82fe54e5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -6,9 +6,11 @@ package org.elasticsearch.xpack.ccr; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -32,6 +34,8 @@ import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; @@ -41,47 +45,51 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; -import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; -import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; -import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; -import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; -import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; -import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; -import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; -import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; +import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; +import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction; +import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; +import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction; -import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction; -import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; -import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; +import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; +import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; +import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; +import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction; -import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction; -import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; -import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; +import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; import org.elasticsearch.xpack.core.XPackClientActionPlugin; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; -import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; @@ -93,7 +101,7 @@ /** * Container class for CCR functionality. */ -public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin { +public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin { public static final String CCR_THREAD_POOL_NAME = "ccr"; public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; @@ -105,6 +113,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final boolean enabled; private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; + private final SetOnce repositoryManager = new SetOnce<>(); private final boolean tribeNode; private final boolean tribeNodeClient; @@ -148,6 +157,8 @@ public Collection createComponents( return emptyList(); } + this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, (NodeClient) client)); + return Arrays.asList( ccrLicenseChecker, new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker) @@ -172,6 +183,10 @@ public List> getPersistentTasksExecutor(ClusterServic // internal actions new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class), new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), + new ActionHandler<>(PutInternalCcrRepositoryAction.INSTANCE, + PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class), + new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE, + DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class), // stats action new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class), new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), @@ -265,6 +280,12 @@ public List> getExecutorBuilders(Settings settings) { return Collections.singletonList(new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME, 32, 100, "xpack.ccr.ccr_thread_pool")); } + @Override + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, settings); + return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); + } + protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java new file mode 100644 index 0000000000000..f86789a880e24 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; +import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; + +import java.util.List; + +class CcrRepositoryManager extends RemoteClusterAware { + + private final NodeClient client; + + CcrRepositoryManager(Settings settings, ClusterService clusterService, NodeClient client) { + super(settings); + this.client = client; + listenForUpdates(clusterService.getClusterSettings()); + } + + @Override + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { + String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; + if (addresses.isEmpty()) { + DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName); + PlainActionFuture f = PlainActionFuture.newFuture(); + client.executeLocally(DeleteInternalCcrRepositoryAction.INSTANCE, request, f); + assert f.isDone() : "Should be completed as it is executed synchronously"; + } else { + ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE); + PlainActionFuture f = PlainActionFuture.newFuture(); + client.executeLocally(PutInternalCcrRepositoryAction.INSTANCE, request, f); + assert f.isDone() : "Should be completed as it is executed synchronously"; + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java new file mode 100644 index 0000000000000..e85ce65858e0f --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +public class DeleteInternalCcrRepositoryAction extends Action { + + public static final DeleteInternalCcrRepositoryAction INSTANCE = new DeleteInternalCcrRepositoryAction(); + public static final String NAME = "cluster:admin/ccr/internal_repository/delete"; + + private DeleteInternalCcrRepositoryAction() { + super(NAME); + } + + @Override + public DeleteInternalCcrRepositoryResponse newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + public Writeable.Reader getResponseReader() { + return DeleteInternalCcrRepositoryResponse::new; + } + + public static class TransportDeleteInternalRepositoryAction + extends TransportAction { + + private final RepositoriesService repositoriesService; + + @Inject + public TransportDeleteInternalRepositoryAction(RepositoriesService repositoriesService, ActionFilters actionFilters, + TransportService transportService) { + super(NAME, actionFilters, transportService.getTaskManager()); + this.repositoriesService = repositoriesService; + } + + @Override + protected void doExecute(Task task, DeleteInternalCcrRepositoryRequest request, + ActionListener listener) { + repositoriesService.unregisterInternalRepository(request.getName()); + listener.onResponse(new DeleteInternalCcrRepositoryResponse()); + } + } + + public static class DeleteInternalCcrRepositoryResponse extends ActionResponse { + + DeleteInternalCcrRepositoryResponse() { + super(); + } + + DeleteInternalCcrRepositoryResponse(StreamInput streamInput) throws IOException { + super(streamInput); + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java new file mode 100644 index 0000000000000..12264c1d57c85 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class DeleteInternalCcrRepositoryRequest extends ActionRequest { + + private final String name; + + public DeleteInternalCcrRepositoryRequest(String name) { + this.name = Objects.requireNonNull(name); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("DeleteInternalRepositoryRequest cannot be serialized for sending across the wire."); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException("DeleteInternalRepositoryRequest cannot be serialized for sending across the wire."); + } + + public String getName() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteInternalCcrRepositoryRequest that = (DeleteInternalCcrRepositoryRequest) o; + return Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public String toString() { + return "DeleteInternalRepositoryRequest{" + + "name='" + name + '\'' + + '}'; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java new file mode 100644 index 0000000000000..2d12cc4d77ad1 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +public class PutInternalCcrRepositoryAction extends Action { + + public static final PutInternalCcrRepositoryAction INSTANCE = new PutInternalCcrRepositoryAction(); + public static final String NAME = "cluster:admin/ccr/internal_repository/put"; + + private PutInternalCcrRepositoryAction() { + super(NAME); + } + + @Override + public PutInternalCcrRepositoryResponse newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + public Writeable.Reader getResponseReader() { + return PutInternalCcrRepositoryResponse::new; + } + + public static class TransportPutInternalRepositoryAction + extends TransportAction { + + private final RepositoriesService repositoriesService; + + @Inject + public TransportPutInternalRepositoryAction(RepositoriesService repositoriesService, ActionFilters actionFilters, + TransportService transportService) { + super(NAME, actionFilters, transportService.getTaskManager()); + this.repositoriesService = repositoriesService; + } + + @Override + protected void doExecute(Task task, PutInternalCcrRepositoryRequest request, + ActionListener listener) { + repositoriesService.registerInternalRepository(request.getName(), request.getType()); + listener.onResponse(new PutInternalCcrRepositoryResponse()); + } + } + + public static class PutInternalCcrRepositoryResponse extends ActionResponse { + + PutInternalCcrRepositoryResponse() { + super(); + } + + PutInternalCcrRepositoryResponse(StreamInput streamInput) throws IOException { + super(streamInput); + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java new file mode 100644 index 0000000000000..71efcdf319da9 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class PutInternalCcrRepositoryRequest extends ActionRequest { + + private final String name; + private final String type; + + public PutInternalCcrRepositoryRequest(String name, String type) { + this.name = Objects.requireNonNull(name); + this.type = Objects.requireNonNull(type); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("PutInternalRepositoryRequest cannot be serialized for sending across the wire."); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException("PutInternalRepositoryRequest cannot be serialized for sending across the wire."); + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PutInternalCcrRepositoryRequest that = (PutInternalCcrRepositoryRequest) o; + return Objects.equals(name, that.name) && + Objects.equals(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hash(name, type); + } + + @Override + public String toString() { + return "PutInternalCcrRepositoryRequest{" + + "name='" + name + '\'' + + ", type='" + type + '\'' + + '}'; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java new file mode 100644 index 0000000000000..81f9bd3f2f932 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.repository; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; + +import java.io.IOException; +import java.util.List; + +/** + * This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to + * restore shards/indexes that exist on the remote cluster. + */ +public class CcrRepository extends AbstractLifecycleComponent implements Repository { + + public static final String TYPE = "_ccr_"; + public static final String NAME_PREFIX = "_ccr_"; + + private final RepositoryMetaData metadata; + + public CcrRepository(RepositoryMetaData metadata, Settings settings) { + super(settings); + this.metadata = metadata; + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public RepositoryMetaData getMetadata() { + return metadata; + } + + @Override + public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public RepositoryData getRepositoryData() { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public long getSnapshotThrottleTimeInNanos() { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public long getRestoreThrottleTimeInNanos() { + return 0; + } + + @Override + public String startVerification() { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void endVerification(String verificationToken) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void verify(String verificationToken, DiscoveryNode localNode) { + } + + @Override + public boolean isReadOnly() { + return true; + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java new file mode 100644 index 0000000000000..133e1ee06064f --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work +// TODO: is completed. +public class CcrRepositoryManagerIT extends CcrIntegTestCase { + + public void testThatRepositoryIsPutAndRemovedWhenRemoteClusterIsUpdated() throws Exception { + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + final RepositoriesService repositoriesService = + getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class).iterator().next(); + try { + Repository repository = repositoriesService.repository(leaderClusterRepoName); + assertEquals(CcrRepository.TYPE, repository.getMetadata().type()); + assertEquals(leaderClusterRepoName, repository.getMetadata().name()); + } catch (RepositoryMissingException e) { + fail("need repository"); + } + + ClusterUpdateSettingsRequest putFollowerRequest = new ClusterUpdateSettingsRequest(); + String address = getFollowerCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + putFollowerRequest.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", address)); + assertAcked(followerClient().admin().cluster().updateSettings(putFollowerRequest).actionGet()); + + String followerCopyRepoName = CcrRepository.NAME_PREFIX + "follower_cluster_copy"; + try { + Repository repository = repositoriesService.repository(followerCopyRepoName); + assertEquals(CcrRepository.TYPE, repository.getMetadata().type()); + assertEquals(followerCopyRepoName, repository.getMetadata().name()); + } catch (RepositoryMissingException e) { + fail("need repository"); + } + + ClusterUpdateSettingsRequest deleteLeaderRequest = new ClusterUpdateSettingsRequest(); + deleteLeaderRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", "")); + assertAcked(followerClient().admin().cluster().updateSettings(deleteLeaderRequest).actionGet()); + + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(leaderClusterRepoName)); + + ClusterUpdateSettingsRequest deleteFollowerRequest = new ClusterUpdateSettingsRequest(); + deleteFollowerRequest.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", "")); + assertAcked(followerClient().admin().cluster().updateSettings(deleteFollowerRequest).actionGet()); + + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(followerCopyRepoName)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 67a7412342319..37d3891d55e28 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -53,7 +53,9 @@ import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptContext; @@ -400,6 +402,21 @@ public List> getPersistentTasksExecutor(ClusterServic .collect(toList()); } + @Override + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry)); + filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry))); + return repositories; + } + + @Override + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + HashMap internalRepositories = new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry)); + filterPlugins(RepositoryPlugin.class).forEach(r -> + internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry))); + return internalRepositories; + } + @Override public Optional getEngineFactory(IndexSettings indexSettings) { List> enginePlugins = filterPlugins(EnginePlugin.class).stream() diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index db318b032a484..1b4dd9a18d730 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -71,7 +71,7 @@ class IndicesAndAliasesResolver { * then the index names will be categorized into those that refer to {@link ResolvedIndices#getLocal() local indices}, and those that * refer to {@link ResolvedIndices#getRemote() remote indices}. This categorization follows the standard * {@link RemoteClusterAware#buildRemoteIndexName(String, String) remote index-name format} and also respects the currently defined - * {@link RemoteClusterAware#getRemoteClusterNames() remote clusters}. + * remote clusters}. *


* Thus an index name N will considered to be remote if-and-only-if all of the following are true *
    @@ -445,11 +445,6 @@ private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings listenForUpdates(clusterSettings); } - @Override - protected Set getRemoteClusterNames() { - return clusters; - } - @Override protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { if (addresses.isEmpty()) { @@ -460,7 +455,7 @@ protected void updateRemoteCluster(String clusterAlias, List addresses, } ResolvedIndices splitLocalAndRemoteIndexNames(String... indices) { - final Map> map = super.groupClusterIndices(indices, exists -> false); + final Map> map = super.groupClusterIndices(clusters, indices, exists -> false); final List local = map.remove(LOCAL_CLUSTER_GROUP_KEY); final List remote = map.entrySet().stream() .flatMap(e -> e.getValue().stream().map(v -> e.getKey() + REMOTE_CLUSTER_INDEX_SEPARATOR + v)) From 21e4fd9d7f93ca47d4aae5e0c6497c72d5080cb9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 5 Dec 2018 11:31:36 -0700 Subject: [PATCH 2/3] Fix --- .../xpack/ccr/CcrRepositoryManager.java | 3 +-- .../DeleteInternalCcrRepositoryAction.java | 20 ++++++++++---- .../DeleteInternalCcrRepositoryRequest.java | 17 ++++++++++-- ...teInternalCcrRepositoryRequestBuilder.java | 22 ++++++++++++++++ .../PutInternalCcrRepositoryAction.java | 22 +++++++++++----- .../PutInternalCcrRepositoryRequest.java | 26 ++++++++++++++++--- ...utInternalCcrRepositoryRequestBuilder.java | 26 +++++++++++++++++++ 7 files changed, 118 insertions(+), 18 deletions(-) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequestBuilder.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequestBuilder.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index f86789a880e24..842cbef7a4684 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ccr; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; @@ -39,7 +38,7 @@ protected void updateRemoteCluster(String clusterAlias, List addresses, client.executeLocally(DeleteInternalCcrRepositoryAction.INSTANCE, request, f); assert f.isDone() : "Should be completed as it is executed synchronously"; } else { - ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE); + PutInternalCcrRepositoryRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE); PlainActionFuture f = PlainActionFuture.newFuture(); client.executeLocally(PutInternalCcrRepositoryAction.INSTANCE, request, f); assert f.isDone() : "Should be completed as it is executed synchronously"; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java index e85ce65858e0f..b511af1a6ae0e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java @@ -11,16 +11,20 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.RepositoriesService; -import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -public class DeleteInternalCcrRepositoryAction extends Action { +public class DeleteInternalCcrRepositoryAction extends Action { public static final DeleteInternalCcrRepositoryAction INSTANCE = new DeleteInternalCcrRepositoryAction(); public static final String NAME = "cluster:admin/ccr/internal_repository/delete"; @@ -39,20 +43,26 @@ public Writeable.Reader getResponseReader() return DeleteInternalCcrRepositoryResponse::new; } + @Override + public DeleteInternalCcrRepositoryRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new DeleteInternalCcrRepositoryRequestBuilder(client); + } + public static class TransportDeleteInternalRepositoryAction extends TransportAction { private final RepositoriesService repositoriesService; @Inject - public TransportDeleteInternalRepositoryAction(RepositoriesService repositoriesService, ActionFilters actionFilters, + public TransportDeleteInternalRepositoryAction(Settings settings, ThreadPool threadPool, RepositoriesService repositoriesService, + ActionFilters actionFilters, IndexNameExpressionResolver resolver, TransportService transportService) { - super(NAME, actionFilters, transportService.getTaskManager()); + super(settings, NAME, threadPool, actionFilters, resolver, transportService.getTaskManager()); this.repositoriesService = repositoriesService; } @Override - protected void doExecute(Task task, DeleteInternalCcrRepositoryRequest request, + protected void doExecute(DeleteInternalCcrRepositoryRequest request, ActionListener listener) { repositoriesService.unregisterInternalRepository(request.getName()); listener.onResponse(new DeleteInternalCcrRepositoryResponse()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java index 12264c1d57c85..8c8aa458dfd36 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java @@ -14,17 +14,30 @@ import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.action.ValidateActions.addValidationError; + public class DeleteInternalCcrRepositoryRequest extends ActionRequest { - private final String name; + private String name; + + DeleteInternalCcrRepositoryRequest() { + } public DeleteInternalCcrRepositoryRequest(String name) { this.name = Objects.requireNonNull(name); } + public void setName(String name) { + this.name = name; + } + @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if (name == null) { + validationException = addValidationError("name is missing", validationException); + } + return validationException; } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequestBuilder.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequestBuilder.java new file mode 100644 index 0000000000000..87e68458f9ded --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequestBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +class DeleteInternalCcrRepositoryRequestBuilder extends ActionRequestBuilder { + + DeleteInternalCcrRepositoryRequestBuilder(final ElasticsearchClient client) { + super(client, DeleteInternalCcrRepositoryAction.INSTANCE, new DeleteInternalCcrRepositoryRequest()); + } + + void setName(String name) { + request.setName(name); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java index 2d12cc4d77ad1..d572f90773e3f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java @@ -11,16 +11,20 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.RepositoriesService; -import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -public class PutInternalCcrRepositoryAction extends Action { +public class PutInternalCcrRepositoryAction extends Action { public static final PutInternalCcrRepositoryAction INSTANCE = new PutInternalCcrRepositoryAction(); public static final String NAME = "cluster:admin/ccr/internal_repository/put"; @@ -39,23 +43,29 @@ public Writeable.Reader getResponseReader() { return PutInternalCcrRepositoryResponse::new; } + @Override + public PutInternalCcrRepositoryRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new PutInternalCcrRepositoryRequestBuilder(client); + } + public static class TransportPutInternalRepositoryAction extends TransportAction { private final RepositoriesService repositoriesService; @Inject - public TransportPutInternalRepositoryAction(RepositoriesService repositoriesService, ActionFilters actionFilters, + public TransportPutInternalRepositoryAction(Settings settings, ThreadPool threadPool, RepositoriesService repositoriesService, + ActionFilters actionFilters, IndexNameExpressionResolver resolver, TransportService transportService) { - super(NAME, actionFilters, transportService.getTaskManager()); + super(settings, NAME, threadPool, actionFilters, resolver, transportService.getTaskManager()); this.repositoriesService = repositoriesService; } @Override - protected void doExecute(Task task, PutInternalCcrRepositoryRequest request, - ActionListener listener) { + protected void doExecute(PutInternalCcrRepositoryRequest request, ActionListener listener) { repositoriesService.registerInternalRepository(request.getName(), request.getType()); listener.onResponse(new PutInternalCcrRepositoryResponse()); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java index 71efcdf319da9..1ea2bb941d4ed 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java @@ -14,10 +14,15 @@ import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.action.ValidateActions.addValidationError; + public class PutInternalCcrRepositoryRequest extends ActionRequest { - private final String name; - private final String type; + private String name; + private String type; + + PutInternalCcrRepositoryRequest() { + } public PutInternalCcrRepositoryRequest(String name, String type) { this.name = Objects.requireNonNull(name); @@ -26,7 +31,14 @@ public PutInternalCcrRepositoryRequest(String name, String type) { @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if (name == null) { + validationException = addValidationError("name is missing", validationException); + } + if (type == null) { + validationException = addValidationError("type is missing", validationException); + } + return validationException; } @Override @@ -68,4 +80,12 @@ public String toString() { ", type='" + type + '\'' + '}'; } + + public void setType(String type) { + this.type = type; + } + + public void setName(String name) { + this.name = name; + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequestBuilder.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequestBuilder.java new file mode 100644 index 0000000000000..d7b4a0064ad72 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequestBuilder.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +class PutInternalCcrRepositoryRequestBuilder extends ActionRequestBuilder { + + PutInternalCcrRepositoryRequestBuilder(final ElasticsearchClient client) { + super(client, PutInternalCcrRepositoryAction.INSTANCE, new PutInternalCcrRepositoryRequest()); + } + + void setName(String name) { + request.setName(name); + } + + void setType(String type) { + request.setType(type); + } +} From 0eb622df61dfc3d7ea079bf339770f25c45284ca Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 6 Dec 2018 10:11:21 -0700 Subject: [PATCH 3/3] Changes --- .../DeleteInternalCcrRepositoryAction.java | 12 +++------ .../DeleteInternalCcrRepositoryRequest.java | 17 ++---------- ...teInternalCcrRepositoryRequestBuilder.java | 22 ---------------- .../PutInternalCcrRepositoryAction.java | 12 +++------ .../PutInternalCcrRepositoryRequest.java | 26 +++---------------- ...utInternalCcrRepositoryRequestBuilder.java | 26 ------------------- 6 files changed, 11 insertions(+), 104 deletions(-) delete mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequestBuilder.java delete mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequestBuilder.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java index b511af1a6ae0e..bdb1bc895411e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java @@ -6,12 +6,11 @@ package org.elasticsearch.xpack.ccr.action.repositories; -import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -23,8 +22,8 @@ import java.io.IOException; -public class DeleteInternalCcrRepositoryAction extends Action { +public class DeleteInternalCcrRepositoryAction extends GenericAction { public static final DeleteInternalCcrRepositoryAction INSTANCE = new DeleteInternalCcrRepositoryAction(); public static final String NAME = "cluster:admin/ccr/internal_repository/delete"; @@ -43,11 +42,6 @@ public Writeable.Reader getResponseReader() return DeleteInternalCcrRepositoryResponse::new; } - @Override - public DeleteInternalCcrRepositoryRequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new DeleteInternalCcrRepositoryRequestBuilder(client); - } - public static class TransportDeleteInternalRepositoryAction extends TransportAction { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java index 8c8aa458dfd36..12264c1d57c85 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java @@ -14,30 +14,17 @@ import java.io.IOException; import java.util.Objects; -import static org.elasticsearch.action.ValidateActions.addValidationError; - public class DeleteInternalCcrRepositoryRequest extends ActionRequest { - private String name; - - DeleteInternalCcrRepositoryRequest() { - } + private final String name; public DeleteInternalCcrRepositoryRequest(String name) { this.name = Objects.requireNonNull(name); } - public void setName(String name) { - this.name = name; - } - @Override public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = null; - if (name == null) { - validationException = addValidationError("name is missing", validationException); - } - return validationException; + return null; } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequestBuilder.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequestBuilder.java deleted file mode 100644 index 87e68458f9ded..0000000000000 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequestBuilder.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.ccr.action.repositories; - -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.client.ElasticsearchClient; - -class DeleteInternalCcrRepositoryRequestBuilder extends ActionRequestBuilder { - - DeleteInternalCcrRepositoryRequestBuilder(final ElasticsearchClient client) { - super(client, DeleteInternalCcrRepositoryAction.INSTANCE, new DeleteInternalCcrRepositoryRequest()); - } - - void setName(String name) { - request.setName(name); - } -} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java index d572f90773e3f..41b75b2c2ad51 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java @@ -6,12 +6,11 @@ package org.elasticsearch.xpack.ccr.action.repositories; -import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -23,8 +22,8 @@ import java.io.IOException; -public class PutInternalCcrRepositoryAction extends Action { +public class PutInternalCcrRepositoryAction extends GenericAction { public static final PutInternalCcrRepositoryAction INSTANCE = new PutInternalCcrRepositoryAction(); public static final String NAME = "cluster:admin/ccr/internal_repository/put"; @@ -43,11 +42,6 @@ public Writeable.Reader getResponseReader() { return PutInternalCcrRepositoryResponse::new; } - @Override - public PutInternalCcrRepositoryRequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new PutInternalCcrRepositoryRequestBuilder(client); - } - public static class TransportPutInternalRepositoryAction extends TransportAction { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java index 1ea2bb941d4ed..71efcdf319da9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java @@ -14,15 +14,10 @@ import java.io.IOException; import java.util.Objects; -import static org.elasticsearch.action.ValidateActions.addValidationError; - public class PutInternalCcrRepositoryRequest extends ActionRequest { - private String name; - private String type; - - PutInternalCcrRepositoryRequest() { - } + private final String name; + private final String type; public PutInternalCcrRepositoryRequest(String name, String type) { this.name = Objects.requireNonNull(name); @@ -31,14 +26,7 @@ public PutInternalCcrRepositoryRequest(String name, String type) { @Override public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = null; - if (name == null) { - validationException = addValidationError("name is missing", validationException); - } - if (type == null) { - validationException = addValidationError("type is missing", validationException); - } - return validationException; + return null; } @Override @@ -80,12 +68,4 @@ public String toString() { ", type='" + type + '\'' + '}'; } - - public void setType(String type) { - this.type = type; - } - - public void setName(String name) { - this.name = name; - } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequestBuilder.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequestBuilder.java deleted file mode 100644 index d7b4a0064ad72..0000000000000 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequestBuilder.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.ccr.action.repositories; - -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.client.ElasticsearchClient; - -class PutInternalCcrRepositoryRequestBuilder extends ActionRequestBuilder { - - PutInternalCcrRepositoryRequestBuilder(final ElasticsearchClient client) { - super(client, PutInternalCcrRepositoryAction.INSTANCE, new PutInternalCcrRepositoryRequest()); - } - - void setName(String name) { - request.setName(name); - } - - void setType(String type) { - request.setType(type); - } -}