diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index ec196d637e1a9..e1d49754997c8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -6,10 +6,20 @@ package org.elasticsearch.xpack.ccr; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoriesMetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -20,6 +30,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -32,6 +43,8 @@ import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; @@ -39,10 +52,12 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; @@ -77,12 +92,16 @@ import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import static java.util.Collections.emptyList; @@ -92,7 +111,7 @@ /** * Container class for CCR functionality. */ -public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin { +public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin { public static final String CCR_THREAD_POOL_NAME = "ccr"; public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; @@ -104,6 +123,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final boolean enabled; private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; + private SetOnce repositoryManager = new SetOnce<>(); /** * Construct an instance of the CCR container with the specified settings. @@ -142,6 +162,8 @@ public Collection createComponents( return emptyList(); } + repositoryManager.set(new CCRRepositoryManager(settings, clusterService)); + return Arrays.asList( ccrLicenseChecker, new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker) @@ -259,6 +281,119 @@ public List> getExecutorBuilders(Settings settings) { return Collections.singletonList(new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME, 32, 100, "xpack.ccr.ccr_thread_pool")); } + @Override + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, settings); + return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); + } + protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } + private static class CCRRepositoryManager extends RemoteClusterAware implements LocalNodeMasterListener { + + private static final Logger LOGGER = LogManager.getLogger(CCRRepositoryManager.class); + private static final String SOURCE = "refreshing " + CcrRepository.TYPE + " repositories"; + + private final ClusterService clusterService; + private final Set clusters = ConcurrentCollections.newConcurrentSet(); + private volatile boolean isMasterNode = false; + + private CCRRepositoryManager(Settings settings, ClusterService clusterService) { + super(settings); + this.clusterService = clusterService; + clusters.addAll(buildRemoteClustersDynamicConfig(settings).keySet()); + clusterService.addLocalNodeMasterListener(this); + listenForUpdates(clusterService.getClusterSettings()); + } + + @Override + protected Set getRemoteClusterNames() { + return clusters; + } + + @Override + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { + if (addresses.isEmpty()) { + if (clusters.remove(clusterAlias) && isMasterNode) { + refreshCCRRepositories(); + } + } else { + if (clusters.add(clusterAlias) && isMasterNode) { + refreshCCRRepositories(); + } + } + } + + @Override + public void onMaster() { + this.isMasterNode = true; + refreshCCRRepositories(); + } + + @Override + public void offMaster() { + this.isMasterNode = false; + } + + @Override + public String executorName() { + return ThreadPool.Names.SAME; + } + + private void refreshCCRRepositories() { + clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + MetaData metaData = currentState.metaData(); + RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE); + + if (repositories == null) { + List repositoriesMetaData = new ArrayList<>(clusters.size()); + for (String cluster : clusters) { + LOGGER.info("put [{}] repository [{}]", CcrRepository.TYPE, cluster); + repositoriesMetaData.add(new RepositoryMetaData(cluster, CcrRepository.TYPE, Settings.EMPTY)); + } + repositories = new RepositoriesMetaData(repositoriesMetaData); + } else { + List repositoriesMetaData = new ArrayList<>(repositories.repositories().size()); + + Set needToAdd = new HashSet<>(clusters); + + for (RepositoryMetaData repositoryMetaData : repositories.repositories()) { + String name = repositoryMetaData.name(); + if (CcrRepository.TYPE.equals(repositoryMetaData.type())) { + if (needToAdd.remove(name)) { + repositoriesMetaData.add(new RepositoryMetaData(name, CcrRepository.TYPE, Settings.EMPTY)); + } else { + LOGGER.info("delete [{}] repository [{}]", CcrRepository.TYPE, name); + } + } else { + if (needToAdd.remove(name)) { + throw new IllegalStateException("Repository name conflict. Cannot put [" + + CcrRepository.TYPE + "] repository [" + name + "]. A [" + + repositoryMetaData.type() + "] repository with the same name is already registered."); + } + repositoriesMetaData.add(repositoryMetaData); + } + } + for (String cluster : needToAdd) { + LOGGER.info("put [{}] repository [{}]", CcrRepository.TYPE, cluster); + repositoriesMetaData.add(new RepositoryMetaData(cluster, CcrRepository.TYPE, Settings.EMPTY)); + } + repositories = new RepositoriesMetaData(repositoriesMetaData); + } + + MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); + mdBuilder.putCustom(RepositoriesMetaData.TYPE, repositories); + return ClusterState.builder(currentState).metaData(mdBuilder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + LOGGER.warn(new ParameterizedMessage("failed to refresh [{}] repositories", CcrRepository.TYPE), e); + } + }); + } + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java new file mode 100644 index 0000000000000..00646a8d06e1b --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.repository; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; + +import java.io.IOException; +import java.util.List; + +/** + * This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to + * restore shards/indexes that exist on the remote cluster. + */ +public class CcrRepository extends AbstractLifecycleComponent implements Repository { + + public static final String TYPE = "_ccr_"; + + private final RepositoryMetaData metadata; + + public CcrRepository(RepositoryMetaData metadata, Settings settings) { + super(settings); + this.metadata = metadata; + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public RepositoryMetaData getMetadata() { + return metadata; + } + + @Override + public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public RepositoryData getRepositoryData() { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public long getSnapshotThrottleTimeInNanos() { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public long getRestoreThrottleTimeInNanos() { + return 0; + } + + @Override + public String startVerification() { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void endVerification(String verificationToken) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void verify(String verificationToken, DiscoveryNode localNode) { + } + + @Override + public boolean isReadOnly() { + return true; + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + + @Override + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java new file mode 100644 index 0000000000000..38df6088c62d9 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryManagerIT.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +public class CcrRepositoryManagerIT extends CcrIntegTestCase { + + public void testThatRepositoryIsPutAndRemovedWhenRemoteClusterIsUpdated() throws Exception { + assertBusy(() -> { + GetRepositoriesResponse response = followerClient() + .admin() + .cluster() + .prepareGetRepositories() + .get(); + assertEquals(1, response.repositories().size()); + assertEquals(CcrRepository.TYPE, response.repositories().get(0).type()); + assertEquals("leader_cluster", response.repositories().get(0).name()); + }); + + ClusterUpdateSettingsRequest putFollowerRequest = new ClusterUpdateSettingsRequest(); + String address = getFollowerCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + putFollowerRequest.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", address)); + assertAcked(followerClient().admin().cluster().updateSettings(putFollowerRequest).actionGet()); + + assertBusy(() -> { + GetRepositoriesResponse response = followerClient() + .admin() + .cluster() + .prepareGetRepositories() + .get(); + assertEquals(2, response.repositories().size()); + }); + + ClusterUpdateSettingsRequest deleteLeaderRequest = new ClusterUpdateSettingsRequest(); + deleteLeaderRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", "")); + assertAcked(followerClient().admin().cluster().updateSettings(deleteLeaderRequest).actionGet()); + + assertBusy(() -> { + GetRepositoriesResponse response = followerClient() + .admin() + .cluster() + .prepareGetRepositories() + .get(); + assertEquals(1, response.repositories().size()); + assertEquals(CcrRepository.TYPE, response.repositories().get(0).type()); + assertEquals("follower_cluster_copy", response.repositories().get(0).name()); + }); + + ClusterUpdateSettingsRequest deleteFollowerRequest = new ClusterUpdateSettingsRequest(); + deleteFollowerRequest.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", "")); + assertAcked(followerClient().admin().cluster().updateSettings(deleteFollowerRequest).actionGet()); + + assertBusy(() -> { + GetRepositoriesResponse response = followerClient() + .admin() + .cluster() + .prepareGetRepositories() + .get(); + assertEquals(0, response.repositories().size()); + }); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 92427fb92c439..369ee5e2be016 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -53,7 +53,9 @@ import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptContext; @@ -393,6 +395,13 @@ public List> getPersistentTasksExecutor(ClusterServic .collect(toList()); } + @Override + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry)); + filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry))); + return repositories; + } + @Override public void close() throws IOException { IOUtils.close(plugins);