diff --git a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java index a3af52a9a4aca..5c15040609863 100644 --- a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java @@ -42,4 +42,17 @@ public interface RepositoryPlugin { default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { return Collections.emptyMap(); } + + /** + * Returns internal repository types added by this plugin. Internal repositories cannot be registered + * through the external API. + * + * @param env The environment for the local node, which may be used for the local settings and path.repo + * + * The key of the returned {@link Map} is the type name of the repository and + * the value is a factory to construct the {@link Repository} interface. + */ + default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + return Collections.emptyMap(); + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index 637ec2d8dfbc1..90e3c94dfb3c5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -58,8 +58,24 @@ public RepositoriesModule(Environment env, List repoPlugins, T } } + Map internalFactories = new HashMap<>(); + for (RepositoryPlugin repoPlugin : repoPlugins) { + Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry); + for (Map.Entry entry : newRepoTypes.entrySet()) { + if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { + throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); + } + if (factories.put(entry.getKey(), entry.getValue()) != null) { + throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered as a " + + "non-internal repository"); + } + } + } + Map repositoryTypes = Collections.unmodifiableMap(factories); - repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes, threadPool); + Map internalRepositoryTypes = Collections.unmodifiableMap(internalFactories); + repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes, + internalRepositoryTypes, threadPool); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index fe80592d009ef..7ab6ae0a1f4c9 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; @@ -57,6 +58,7 @@ public class RepositoriesService implements ClusterStateApplier { private static final Logger logger = LogManager.getLogger(RepositoriesService.class); private final Map typesRegistry; + private final Map internalTypesRegistry; private final ClusterService clusterService; @@ -64,12 +66,14 @@ public class RepositoriesService implements ClusterStateApplier { private final VerifyNodeRepositoryAction verifyAction; + private final Map internalRepositories = ConcurrentCollections.newConcurrentMap(); private volatile Map repositories = Collections.emptyMap(); public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService, - Map typesRegistry, + Map typesRegistry, Map internalTypesRegistry, ThreadPool threadPool) { this.typesRegistry = typesRegistry; + this.internalTypesRegistry = internalTypesRegistry; this.clusterService = clusterService; this.threadPool = threadPool; // Doesn't make sense to maintain repositories on non-master and non-data nodes @@ -101,7 +105,7 @@ public void registerRepository(final RegisterRepositoryRequest request, final Ac // Trying to create the new repository on master to make sure it works try { - closeRepository(createRepository(newRepositoryMetaData)); + closeRepository(createRepository(newRepositoryMetaData, typesRegistry)); } catch (Exception e) { registrationListener.onFailure(e); return; @@ -315,7 +319,7 @@ public void applyClusterState(ClusterChangedEvent event) { closeRepository(repository); repository = null; try { - repository = createRepository(repositoryMetaData); + repository = createRepository(repositoryMetaData, typesRegistry); } catch (RepositoryException ex) { // TODO: this catch is bogus, it means the old repo is already closed, // but we have nothing to replace it @@ -324,7 +328,7 @@ public void applyClusterState(ClusterChangedEvent event) { } } else { try { - repository = createRepository(repositoryMetaData); + repository = createRepository(repositoryMetaData, typesRegistry); } catch (RepositoryException ex) { logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", repositoryMetaData.name()), ex); } @@ -355,9 +359,37 @@ public Repository repository(String repositoryName) { if (repository != null) { return repository; } + repository = internalRepositories.get(repositoryName); + if (repository != null) { + return repository; + } throw new RepositoryMissingException(repositoryName); } + public void registerInternalRepository(String name, String type) { + RepositoryMetaData metaData = new RepositoryMetaData(name, type, Settings.EMPTY); + Repository repository = internalRepositories.computeIfAbsent(name, (n) -> { + logger.debug("put internal repository [{}][{}]", name, type); + return createRepository(metaData, internalTypesRegistry); + }); + if (type.equals(repository.getMetadata().type()) == false) { + logger.warn(new ParameterizedMessage("internal repository [{}][{}] already registered. this prevented the registration of " + + "internal repository [{}][{}].", name, repository.getMetadata().type(), name, type)); + } else if (repositories.containsKey(name)) { + logger.warn(new ParameterizedMessage("non-internal repository [{}] already registered. this repository will block the " + + "usage of internal repository [{}][{}].", name, metaData.type(), name)); + } + } + + public void unregisterInternalRepository(String name) { + Repository repository = internalRepositories.remove(name); + if (repository != null) { + RepositoryMetaData metadata = repository.getMetadata(); + logger.debug(() -> new ParameterizedMessage("delete internal repository [{}][{}].", metadata.type(), name)); + closeRepository(repository); + } + } + /** Closes the given repository. */ private void closeRepository(Repository repository) { logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name()); @@ -365,21 +397,21 @@ private void closeRepository(Repository repository) { } /** - * Creates repository holder + * Creates repository holder. This method starts the repository */ - private Repository createRepository(RepositoryMetaData repositoryMetaData) { + private Repository createRepository(RepositoryMetaData repositoryMetaData, Map factories) { logger.debug("creating repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()); - Repository.Factory factory = typesRegistry.get(repositoryMetaData.type()); + Repository.Factory factory = factories.get(repositoryMetaData.type()); if (factory == null) { throw new RepositoryException(repositoryMetaData.name(), "repository type [" + repositoryMetaData.type() + "] does not exist"); } try { - Repository repository = factory.create(repositoryMetaData, typesRegistry::get); + Repository repository = factory.create(repositoryMetaData, factories::get); repository.start(); return repository; } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e); + logger.warn(new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e); throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 8da433fdc6c8e..2c36af8638f63 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -167,7 +167,7 @@ public String getKey(final String key) { REMOTE_CLUSTERS_SEEDS); protected final Settings settings; - protected final ClusterNameExpressionResolver clusterNameResolver; + private final ClusterNameExpressionResolver clusterNameResolver; /** * Creates a new {@link RemoteClusterAware} instance @@ -242,14 +242,15 @@ static DiscoveryNode buildSeedNode(String clusterName, String address, boolean p * indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under * {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable. * + * @param remoteClusterNames the remote cluster names * @param requestIndices the indices in the search request to filter * @param indexExists a predicate that can test if a certain index or alias exists in the local cluster * * @return a map of grouped remote and local indices */ - public Map> groupClusterIndices(String[] requestIndices, Predicate indexExists) { + protected Map> groupClusterIndices(Set remoteClusterNames, String[] requestIndices, + Predicate indexExists) { Map> perClusterIndices = new HashMap<>(); - Set remoteClusterNames = getRemoteClusterNames(); for (String index : requestIndices) { int i = index.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR); if (i >= 0) { @@ -281,9 +282,6 @@ public Map> groupClusterIndices(String[] requestIndices, Pr return perClusterIndices; } - protected abstract Set getRemoteClusterNames(); - - /** * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is * empty the cluster alias is unregistered and should be removed. diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 0028a2537a266..52da474f2dd4a 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -278,7 +278,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no public Map groupIndices(IndicesOptions indicesOptions, String[] indices, Predicate indexExists) { Map originalIndicesMap = new HashMap<>(); if (isCrossClusterSearchEnabled()) { - final Map> groupedIndices = groupClusterIndices(indices, indexExists); + final Map> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices, indexExists); if (groupedIndices.isEmpty()) { //search on _all in the local cluster if neither local indices nor remote indices were specified originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions)); @@ -380,8 +380,7 @@ RemoteClusterConnection getRemoteClusterConnection(String cluster) { return connection; } - @Override - protected Set getRemoteClusterNames() { + Set getRemoteClusterNames() { return this.remoteClusters.keySet(); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 4625aa04be372..89a6bc8dcf89d 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -461,7 +461,7 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod Collections.emptySet()); final ClusterService clusterService = mock(ClusterService.class); final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService, - transportService, null, threadPool); + transportService, Collections.emptyMap(), Collections.emptyMap(), threadPool); final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(threadPool, transportService, null, clusterService); final ShardStateAction shardStateAction = mock(ShardStateAction.class); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java new file mode 100644 index 0000000000000..96a9670d16202 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RepositoriesModuleTests extends ESTestCase { + + private Environment environment; + private NamedXContentRegistry contentRegistry; + private List repoPlugins = new ArrayList<>(); + private RepositoryPlugin plugin1; + private RepositoryPlugin plugin2; + private Repository.Factory factory; + + @Override + public void setUp() throws Exception { + super.setUp(); + environment = mock(Environment.class); + contentRegistry = mock(NamedXContentRegistry.class); + plugin1 = mock(RepositoryPlugin.class); + plugin2 = mock(RepositoryPlugin.class); + factory = mock(Repository.Factory.class); + repoPlugins.add(plugin1); + repoPlugins.add(plugin2); + when(environment.settings()).thenReturn(Settings.EMPTY); + } + + public void testCanRegisterTwoRepositoriesWithDifferentTypes() { + when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory)); + + // Would throw + new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + mock(ThreadPool.class), contentRegistry); + } + + public void testCannotRegisterTwoRepositoriesWithSameTypes() { + when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + mock(ThreadPool.class), contentRegistry)); + + assertEquals("Repository type [type1] is already registered", ex.getMessage()); + } + + public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { + when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + mock(ThreadPool.class), contentRegistry)); + + assertEquals("Internal repository type [type1] is already registered", ex.getMessage()); + } + + public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { + when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + mock(ThreadPool.class), contentRegistry)); + + assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage()); + } +} diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java new file mode 100644 index 0000000000000..c02ab0d185610 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -0,0 +1,233 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class RepositoriesServiceTests extends ESTestCase { + + private RepositoriesService repositoriesService; + + @Override + public void setUp() throws Exception { + super.setUp(); + ThreadPool threadPool = mock(ThreadPool.class); + final TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()); + repositoriesService = new RepositoriesService(Settings.EMPTY, mock(ClusterService.class), + transportService, Collections.emptyMap(), Collections.singletonMap(TestRepository.TYPE, TestRepository::new), threadPool); + } + + public void testRegisterInternalRepository() { + String repoName = "name"; + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(repoName); + assertEquals(repoName, repository.getMetadata().name()); + assertEquals(TestRepository.TYPE, repository.getMetadata().type()); + assertEquals(Settings.EMPTY, repository.getMetadata().settings()); + assertTrue(((TestRepository) repository).isStarted); + } + + public void testUnregisterInternalRepository() { + String repoName = "name"; + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(repoName); + assertFalse(((TestRepository) repository).isClosed); + repositoriesService.unregisterInternalRepository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + assertTrue(((TestRepository) repository).isClosed); + } + + public void testRegisterWillNotUpdateIfInternalRepositoryWithNameExists() { + String repoName = "name"; + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(repoName); + assertFalse(((TestRepository) repository).isClosed); + repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + assertFalse(((TestRepository) repository).isClosed); + Repository repository2 = repositoriesService.repository(repoName); + assertSame(repository, repository2); + } + + private static class TestRepository implements Repository { + + private static final String TYPE = "internal"; + private boolean isClosed; + private boolean isStarted; + + private final RepositoryMetaData metaData; + + private TestRepository(RepositoryMetaData metaData) { + this.metaData = metaData; + } + + @Override + public RepositoryMetaData getMetadata() { + return metaData; + } + + @Override + public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { + return null; + } + + @Override + public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { + return null; + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + return null; + } + + @Override + public RepositoryData getRepositoryData() { + return null; + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + + } + + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState) { + return null; + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + + } + + @Override + public long getSnapshotThrottleTimeInNanos() { + return 0; + } + + @Override + public long getRestoreThrottleTimeInNanos() { + return 0; + } + + @Override + public String startVerification() { + return null; + } + + @Override + public void endVerification(String verificationToken) { + + } + + @Override + public void verify(String verificationToken, DiscoveryNode localNode) { + + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + + } + + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { + + } + + @Override + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + return null; + } + + @Override + public Lifecycle.State lifecycleState() { + return null; + } + + @Override + public void addLifecycleListener(LifecycleListener listener) { + + } + + @Override + public void removeLifecycleListener(LifecycleListener listener) { + + } + + @Override + public void start() { + isStarted = true; + } + + @Override + public void stop() { + + } + + @Override + public void close() { + isClosed = true; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 5bc05507b9d9f..9a185163436af 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -219,8 +219,9 @@ public void testGroupClusterIndices() throws IOException { assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); assertFalse(service.isRemoteClusterRegistered("foo")); - Map> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", - "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"}, + Map> perClusterIndices = service.groupClusterIndices(service.getRemoteClusterNames(), + new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", + "cluster*:baz", "*:boo", "no*match:boo"}, i -> false); List localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); assertNotNull(localIndices); @@ -230,7 +231,7 @@ public void testGroupClusterIndices() throws IOException { assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2")); IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> - service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", + service.groupClusterIndices(service.getRemoteClusterNames(), new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals)); assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" + @@ -277,7 +278,7 @@ public void testGroupIndices() throws IOException { } { IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> - service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", + service.groupClusterIndices(service.getRemoteClusterNames(), new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals)); assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" + " cluster_1", iae.getMessage()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index ec196d637e1a9..7cceecbd399a5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -6,9 +6,11 @@ package org.elasticsearch.xpack.ccr; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -32,6 +34,8 @@ import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; @@ -41,46 +45,50 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; -import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; -import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; -import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; -import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; -import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; -import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; -import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; -import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; +import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; +import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction; +import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; +import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction; -import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction; -import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; -import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; +import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; +import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; +import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; +import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction; -import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction; -import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; -import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; +import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; -import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; @@ -92,7 +100,7 @@ /** * Container class for CCR functionality. */ -public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin { +public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin { public static final String CCR_THREAD_POOL_NAME = "ccr"; public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; @@ -104,6 +112,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final boolean enabled; private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; + private final SetOnce repositoryManager = new SetOnce<>(); /** * Construct an instance of the CCR container with the specified settings. @@ -142,6 +151,8 @@ public Collection createComponents( return emptyList(); } + this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, (NodeClient) client)); + return Arrays.asList( ccrLicenseChecker, new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker) @@ -166,6 +177,10 @@ public List> getPersistentTasksExecutor(ClusterServic // internal actions new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class), new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), + new ActionHandler<>(PutInternalCcrRepositoryAction.INSTANCE, + PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class), + new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE, + DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class), // stats action new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class), new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), @@ -259,6 +274,12 @@ public List> getExecutorBuilders(Settings settings) { return Collections.singletonList(new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME, 32, 100, "xpack.ccr.ccr_thread_pool")); } + @Override + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, settings); + return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); + } + protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java new file mode 100644 index 0000000000000..f86789a880e24 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; +import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; + +import java.util.List; + +class CcrRepositoryManager extends RemoteClusterAware { + + private final NodeClient client; + + CcrRepositoryManager(Settings settings, ClusterService clusterService, NodeClient client) { + super(settings); + this.client = client; + listenForUpdates(clusterService.getClusterSettings()); + } + + @Override + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { + String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; + if (addresses.isEmpty()) { + DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName); + PlainActionFuture f = PlainActionFuture.newFuture(); + client.executeLocally(DeleteInternalCcrRepositoryAction.INSTANCE, request, f); + assert f.isDone() : "Should be completed as it is executed synchronously"; + } else { + ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE); + PlainActionFuture f = PlainActionFuture.newFuture(); + client.executeLocally(PutInternalCcrRepositoryAction.INSTANCE, request, f); + assert f.isDone() : "Should be completed as it is executed synchronously"; + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java new file mode 100644 index 0000000000000..e85ce65858e0f --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +public class DeleteInternalCcrRepositoryAction extends Action { + + public static final DeleteInternalCcrRepositoryAction INSTANCE = new DeleteInternalCcrRepositoryAction(); + public static final String NAME = "cluster:admin/ccr/internal_repository/delete"; + + private DeleteInternalCcrRepositoryAction() { + super(NAME); + } + + @Override + public DeleteInternalCcrRepositoryResponse newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + public Writeable.Reader getResponseReader() { + return DeleteInternalCcrRepositoryResponse::new; + } + + public static class TransportDeleteInternalRepositoryAction + extends TransportAction { + + private final RepositoriesService repositoriesService; + + @Inject + public TransportDeleteInternalRepositoryAction(RepositoriesService repositoriesService, ActionFilters actionFilters, + TransportService transportService) { + super(NAME, actionFilters, transportService.getTaskManager()); + this.repositoriesService = repositoriesService; + } + + @Override + protected void doExecute(Task task, DeleteInternalCcrRepositoryRequest request, + ActionListener listener) { + repositoriesService.unregisterInternalRepository(request.getName()); + listener.onResponse(new DeleteInternalCcrRepositoryResponse()); + } + } + + public static class DeleteInternalCcrRepositoryResponse extends ActionResponse { + + DeleteInternalCcrRepositoryResponse() { + super(); + } + + DeleteInternalCcrRepositoryResponse(StreamInput streamInput) throws IOException { + super(streamInput); + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java new file mode 100644 index 0000000000000..12264c1d57c85 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryRequest.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class DeleteInternalCcrRepositoryRequest extends ActionRequest { + + private final String name; + + public DeleteInternalCcrRepositoryRequest(String name) { + this.name = Objects.requireNonNull(name); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("DeleteInternalRepositoryRequest cannot be serialized for sending across the wire."); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException("DeleteInternalRepositoryRequest cannot be serialized for sending across the wire."); + } + + public String getName() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteInternalCcrRepositoryRequest that = (DeleteInternalCcrRepositoryRequest) o; + return Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public String toString() { + return "DeleteInternalRepositoryRequest{" + + "name='" + name + '\'' + + '}'; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java new file mode 100644 index 0000000000000..2d12cc4d77ad1 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +public class PutInternalCcrRepositoryAction extends Action { + + public static final PutInternalCcrRepositoryAction INSTANCE = new PutInternalCcrRepositoryAction(); + public static final String NAME = "cluster:admin/ccr/internal_repository/put"; + + private PutInternalCcrRepositoryAction() { + super(NAME); + } + + @Override + public PutInternalCcrRepositoryResponse newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + public Writeable.Reader getResponseReader() { + return PutInternalCcrRepositoryResponse::new; + } + + public static class TransportPutInternalRepositoryAction + extends TransportAction { + + private final RepositoriesService repositoriesService; + + @Inject + public TransportPutInternalRepositoryAction(RepositoriesService repositoriesService, ActionFilters actionFilters, + TransportService transportService) { + super(NAME, actionFilters, transportService.getTaskManager()); + this.repositoriesService = repositoriesService; + } + + @Override + protected void doExecute(Task task, PutInternalCcrRepositoryRequest request, + ActionListener listener) { + repositoriesService.registerInternalRepository(request.getName(), request.getType()); + listener.onResponse(new PutInternalCcrRepositoryResponse()); + } + } + + public static class PutInternalCcrRepositoryResponse extends ActionResponse { + + PutInternalCcrRepositoryResponse() { + super(); + } + + PutInternalCcrRepositoryResponse(StreamInput streamInput) throws IOException { + super(streamInput); + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java new file mode 100644 index 0000000000000..71efcdf319da9 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryRequest.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class PutInternalCcrRepositoryRequest extends ActionRequest { + + private final String name; + private final String type; + + public PutInternalCcrRepositoryRequest(String name, String type) { + this.name = Objects.requireNonNull(name); + this.type = Objects.requireNonNull(type); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("PutInternalRepositoryRequest cannot be serialized for sending across the wire."); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException("PutInternalRepositoryRequest cannot be serialized for sending across the wire."); + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PutInternalCcrRepositoryRequest that = (PutInternalCcrRepositoryRequest) o; + return Objects.equals(name, that.name) && + Objects.equals(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hash(name, type); + } + + @Override + public String toString() { + return "PutInternalCcrRepositoryRequest{" + + "name='" + name + '\'' + + ", type='" + type + '\'' + + '}'; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java new file mode 100644 index 0000000000000..81f9bd3f2f932 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.repository; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; + +import java.io.IOException; +import java.util.List; + +/** + * This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to + * restore shards/indexes that exist on the remote cluster. + */ +public class CcrRepository extends AbstractLifecycleComponent implements Repository { + + public static final String TYPE = "_ccr_"; + public static final String NAME_PREFIX = "_ccr_"; + + private final RepositoryMetaData metadata; + + public CcrRepository(RepositoryMetaData metadata, Settings settings) { + super(settings); + this.metadata = metadata; + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public RepositoryMetaData getMetadata() { + return metadata; + } + + @Override + public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public RepositoryData getRepositoryData() { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public long getSnapshotThrottleTimeInNanos() { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public long getRestoreThrottleTimeInNanos() { + return 0; + } + + @Override + public String startVerification() { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void endVerification(String verificationToken) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void verify(String verificationToken, DiscoveryNode localNode) { + } + + @Override + public boolean isReadOnly() { + return true; + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java new file mode 100644 index 0000000000000..133e1ee06064f --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work +// TODO: is completed. +public class CcrRepositoryManagerIT extends CcrIntegTestCase { + + public void testThatRepositoryIsPutAndRemovedWhenRemoteClusterIsUpdated() throws Exception { + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + final RepositoriesService repositoriesService = + getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class).iterator().next(); + try { + Repository repository = repositoriesService.repository(leaderClusterRepoName); + assertEquals(CcrRepository.TYPE, repository.getMetadata().type()); + assertEquals(leaderClusterRepoName, repository.getMetadata().name()); + } catch (RepositoryMissingException e) { + fail("need repository"); + } + + ClusterUpdateSettingsRequest putFollowerRequest = new ClusterUpdateSettingsRequest(); + String address = getFollowerCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + putFollowerRequest.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", address)); + assertAcked(followerClient().admin().cluster().updateSettings(putFollowerRequest).actionGet()); + + String followerCopyRepoName = CcrRepository.NAME_PREFIX + "follower_cluster_copy"; + try { + Repository repository = repositoriesService.repository(followerCopyRepoName); + assertEquals(CcrRepository.TYPE, repository.getMetadata().type()); + assertEquals(followerCopyRepoName, repository.getMetadata().name()); + } catch (RepositoryMissingException e) { + fail("need repository"); + } + + ClusterUpdateSettingsRequest deleteLeaderRequest = new ClusterUpdateSettingsRequest(); + deleteLeaderRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", "")); + assertAcked(followerClient().admin().cluster().updateSettings(deleteLeaderRequest).actionGet()); + + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(leaderClusterRepoName)); + + ClusterUpdateSettingsRequest deleteFollowerRequest = new ClusterUpdateSettingsRequest(); + deleteFollowerRequest.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", "")); + assertAcked(followerClient().admin().cluster().updateSettings(deleteFollowerRequest).actionGet()); + + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(followerCopyRepoName)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 92427fb92c439..d0686bd03d9e1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -53,7 +53,9 @@ import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptContext; @@ -393,6 +395,21 @@ public List> getPersistentTasksExecutor(ClusterServic .collect(toList()); } + @Override + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry)); + filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry))); + return repositories; + } + + @Override + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + HashMap internalRepositories = new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry)); + filterPlugins(RepositoryPlugin.class).forEach(r -> + internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry))); + return internalRepositories; + } + @Override public void close() throws IOException { IOUtils.close(plugins); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index cbd0d7ca184cc..aa1461b189a39 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -70,7 +70,7 @@ class IndicesAndAliasesResolver { * then the index names will be categorized into those that refer to {@link ResolvedIndices#getLocal() local indices}, and those that * refer to {@link ResolvedIndices#getRemote() remote indices}. This categorization follows the standard * {@link RemoteClusterAware#buildRemoteIndexName(String, String) remote index-name format} and also respects the currently defined - * {@link RemoteClusterAware#getRemoteClusterNames() remote clusters}. + * remote clusters}. *


* Thus an index name N will considered to be remote if-and-only-if all of the following are true *
    @@ -438,11 +438,6 @@ private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings listenForUpdates(clusterSettings); } - @Override - protected Set getRemoteClusterNames() { - return clusters; - } - @Override protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { if (addresses.isEmpty()) { @@ -453,7 +448,7 @@ protected void updateRemoteCluster(String clusterAlias, List addresses, } ResolvedIndices splitLocalAndRemoteIndexNames(String... indices) { - final Map> map = super.groupClusterIndices(indices, exists -> false); + final Map> map = super.groupClusterIndices(clusters, indices, exists -> false); final List local = map.remove(LOCAL_CLUSTER_GROUP_KEY); final List remote = map.entrySet().stream() .flatMap(e -> e.getValue().stream().map(v -> e.getKey() + REMOTE_CLUSTER_INDEX_SEPARATOR + v))