diff --git a/test/framework/src/main/java/org/elasticsearch/test/MockLogAppender.java b/test/framework/src/main/java/org/elasticsearch/test/MockLogAppender.java index a53ba046d32c3..895bd7ec77a2b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/MockLogAppender.java +++ b/test/framework/src/main/java/org/elasticsearch/test/MockLogAppender.java @@ -85,7 +85,7 @@ public AbstractEventExpectation(String name, String logger, Level level, String @Override public void match(LogEvent event) { - if (event.getLevel().equals(level) && event.getLoggerName().equals(logger)) { + if (event.getLevel().equals(level) && event.getLoggerName().equals(logger) && innerMatch(event)) { if (Regex.isSimpleMatchPattern(message)) { if (Regex.simpleMatch(message, event.getMessage().getFormattedMessage())) { saw = true; @@ -97,6 +97,11 @@ public void match(LogEvent event) { } } } + + public boolean innerMatch(final LogEvent event) { + return true; + } + } public static class UnseenEventExpectation extends AbstractEventExpectation { @@ -123,6 +128,32 @@ public void assertMatched() { } } + public static class ExceptionSeenEventExpectation extends SeenEventExpectation { + + private final Class clazz; + private final String exceptionMessage; + + public ExceptionSeenEventExpectation( + final String name, + final String logger, + final Level level, + final String message, + final Class clazz, + final String exceptionMessage) { + super(name, logger, level, message); + this.clazz = clazz; + this.exceptionMessage = exceptionMessage; + } + + @Override + public boolean innerMatch(final LogEvent event) { + return event.getThrown() != null + && event.getThrown().getClass() == clazz + && event.getThrown().getMessage().equals(exceptionMessage); + } + + } + public static class PatternSeenEventExcpectation implements LoggingExpectation { protected final String name; diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle similarity index 61% rename from x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/build.gradle rename to x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle index 1566333e60848..c599903ced12e 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/build.gradle +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle @@ -20,7 +20,20 @@ leaderClusterTestRunner { systemProperty 'tests.is_leader_cluster', 'true' } +task writeJavaPolicy { + doLast { + final File javaPolicy = file("${buildDir}/tmp/java.policy") + javaPolicy.write( + [ + "grant {", + " permission java.io.FilePermission \"${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log\", \"read\";", + "};" + ].join("\n")) + } +} + task followClusterTest(type: RestIntegTestTask) {} +followClusterTest.dependsOn writeJavaPolicy followClusterTestCluster { dependsOn leaderClusterTestRunner @@ -31,8 +44,10 @@ followClusterTestCluster { } followClusterTestRunner { + systemProperty 'java.security.policy', "file://${buildDir}/tmp/java.policy" systemProperty 'tests.is_leader_cluster', 'false' systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + systemProperty 'log', "${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log" finalizedBy 'leaderClusterTestCluster#stop' } diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java similarity index 50% rename from x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java rename to x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java index 06d9f91c7abb7..c52a4a9b59d78 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java @@ -9,11 +9,16 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.test.rest.ESRestTestCase; +import java.nio.file.Files; +import java.util.Iterator; +import java.util.List; import java.util.Locale; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; public class CcrMultiClusterLicenseIT extends ESRestTestCase { @@ -29,7 +34,7 @@ public void testFollowIndex() { if (runningAgainstLeaderCluster == false) { final Request request = new Request("POST", "/follower/_ccr/follow"); request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}"); - assertLicenseIncompatible(request); + assertNonCompliantLicense(request); } } @@ -37,11 +42,44 @@ public void testCreateAndFollowIndex() { if (runningAgainstLeaderCluster == false) { final Request request = new Request("POST", "/follower/_ccr/create_and_follow"); request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}"); - assertLicenseIncompatible(request); + assertNonCompliantLicense(request); } } - private static void assertLicenseIncompatible(final Request request) { + public void testAutoFollow() throws Exception { + if (runningAgainstLeaderCluster == false) { + final Request request = new Request("PUT", "/_ccr/_auto_follow/leader_cluster"); + request.setJsonEntity("{\"leader_index_patterns\":[\"*\"]}"); + client().performRequest(request); + + // parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster + assertBusy(() -> { + final List lines = Files.readAllLines(PathUtils.get(System.getProperty("log"))); + + final Iterator it = lines.iterator(); + + boolean warn = false; + while (it.hasNext()) { + final String line = it.next(); + if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " + + "failure occurred during auto-follower coordination")) { + warn = true; + break; + } + } + assertTrue(warn); + assertTrue(it.hasNext()); + final String lineAfterWarn = it.next(); + assertThat( + lineAfterWarn, + equalTo("org.elasticsearch.ElasticsearchStatusException: " + + "can not fetch remote cluster state as the remote cluster [leader_cluster] is not licensed for [ccr]; " + + "the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]")); + }); + } + } + + private static void assertNonCompliantLicense(final Request request) { final ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request)); final String expected = String.format( Locale.ROOT, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index cd0561b1c0c60..353a66db26339 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -126,7 +126,7 @@ public Collection createComponents( return Arrays.asList( ccrLicenseChecker, - new AutoFollowCoordinator(settings, client, threadPool, clusterService) + new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index cefa490f4f7e2..f9a5d8fe83035 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -23,6 +23,7 @@ import java.util.Objects; import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import java.util.function.Function; /** * Encapsulates licensing checking for CCR. @@ -58,14 +59,13 @@ public boolean isCcrAllowed() { /** * Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for - * license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@link ActionListener#onFailure(Exception)} method - * of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the - * remote cluster. + * license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. + * Otherwise, the specified consumer is invoked with the leader index metadata fetched from the remote cluster. * * @param client the client * @param clusterAlias the remote cluster alias * @param leaderIndex the name of the leader index - * @param listener the listener + * @param onFailure the failure consumer * @param leaderIndexMetadataConsumer the leader index metadata consumer * @param the type of response the listener is waiting for */ @@ -73,8 +73,75 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( final Client client, final String clusterAlias, final String leaderIndex, - final ActionListener listener, + final Consumer onFailure, final Consumer leaderIndexMetadataConsumer) { + + final ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.metaData(true); + request.indices(leaderIndex); + checkRemoteClusterLicenseAndFetchClusterState( + client, + clusterAlias, + request, + onFailure, + leaderClusterState -> leaderIndexMetadataConsumer.accept(leaderClusterState.getMetaData().index(leaderIndex)), + licenseCheck -> indexMetadataNonCompliantRemoteLicense(leaderIndex, licenseCheck), + e -> indexMetadataUnknownRemoteLicense(leaderIndex, clusterAlias, e)); + } + + /** + * Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state, + * the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR, + * the {@code onFailure} consumer is invoked. Otherwise, the specified consumer is invoked with the leader cluster state fetched from + * the remote cluster. + * + * @param client the client + * @param clusterAlias the remote cluster alias + * @param request the cluster state request + * @param onFailure the failure consumer + * @param leaderClusterStateConsumer the leader cluster state consumer + * @param the type of response the listener is waiting for + */ + public void checkRemoteClusterLicenseAndFetchClusterState( + final Client client, + final String clusterAlias, + final ClusterStateRequest request, + final Consumer onFailure, + final Consumer leaderClusterStateConsumer) { + checkRemoteClusterLicenseAndFetchClusterState( + client, + clusterAlias, + request, + onFailure, + leaderClusterStateConsumer, + CcrLicenseChecker::clusterStateNonCompliantRemoteLicense, + e -> clusterStateUnknownRemoteLicense(clusterAlias, e)); + } + + /** + * Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state, + * the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR, + * the {@code onFailure} consumer is invoked. Otherwise, the specified consumer is invoked with the leader cluster state fetched from + * the remote cluster. + * + * @param client the client + * @param clusterAlias the remote cluster alias + * @param request the cluster state request + * @param onFailure the failure consumer + * @param leaderClusterStateConsumer the leader cluster state consumer + * @param nonCompliantLicense the supplier for when the license state of the remote cluster is non-compliant + * @param unknownLicense the supplier for when the license state of the remote cluster is unknown due to failure + * @param the type of response the listener is waiting for + */ + private void checkRemoteClusterLicenseAndFetchClusterState( + final Client client, + final String clusterAlias, + final ClusterStateRequest request, + final Consumer onFailure, + final Consumer leaderClusterStateConsumer, + final Function nonCompliantLicense, + final Function unknownLicense) { // we have to check the license on the remote cluster new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses( Collections.singletonList(clusterAlias), @@ -83,35 +150,25 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( @Override public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { if (licenseCheck.isSuccess()) { - final Client remoteClient = client.getRemoteClusterClient(clusterAlias); - final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex); - final ActionListener clusterStateListener = ActionListener.wrap( - r -> { - final ClusterState remoteClusterState = r.getState(); - final IndexMetaData leaderIndexMetadata = - remoteClusterState.getMetaData().index(leaderIndex); - leaderIndexMetadataConsumer.accept(leaderIndexMetadata); - }, - listener::onFailure); + final Client leaderClient = client.getRemoteClusterClient(clusterAlias); + final ActionListener clusterStateListener = + ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata - remoteClient.admin().cluster().state(clusterStateRequest, clusterStateListener); + leaderClient.admin().cluster().state(request, clusterStateListener); } else { - listener.onFailure(incompatibleRemoteLicense(leaderIndex, licenseCheck)); + onFailure.accept(nonCompliantLicense.apply(licenseCheck)); } } @Override public void onFailure(final Exception e) { - listener.onFailure(unknownRemoteLicense(leaderIndex, clusterAlias, e)); + onFailure.accept(unknownLicense.apply(e)); } }); } - private static ElasticsearchStatusException incompatibleRemoteLicense( + private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense( final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias(); final String message = String.format( @@ -127,7 +184,21 @@ private static ElasticsearchStatusException incompatibleRemoteLicense( return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST); } - private static ElasticsearchStatusException unknownRemoteLicense( + private static ElasticsearchStatusException clusterStateNonCompliantRemoteLicense( + final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { + final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias(); + final String message = String.format( + Locale.ROOT, + "can not fetch remote cluster state as the remote cluster [%s] is not licensed for [ccr]; %s", + clusterAlias, + RemoteClusterLicenseChecker.buildErrorMessage( + "ccr", + licenseCheck.remoteClusterLicenseInfo(), + RemoteClusterLicenseChecker::isLicensePlatinumOrTrial)); + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST); + } + + private static ElasticsearchStatusException indexMetadataUnknownRemoteLicense( final String leaderIndex, final String clusterAlias, final Exception cause) { final String message = String.format( Locale.ROOT, @@ -138,4 +209,11 @@ private static ElasticsearchStatusException unknownRemoteLicense( return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause); } + private static ElasticsearchStatusException clusterStateUnknownRemoteLicense(final String clusterAlias, final Exception cause) { + final String message = String.format( + Locale.ROOT, + "can not fetch remote cluster state as the license state of the remote cluster [%s] could not be determined", clusterAlias); + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 234fe32cdd0ee..639cd4d5782ab 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -21,7 +21,9 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; +import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; @@ -30,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -47,22 +50,32 @@ public class AutoFollowCoordinator implements ClusterStateApplier { private final TimeValue pollInterval; private final ThreadPool threadPool; private final ClusterService clusterService; + private final CcrLicenseChecker ccrLicenseChecker; private volatile boolean localNodeMaster = false; - public AutoFollowCoordinator(Settings settings, - Client client, - ThreadPool threadPool, - ClusterService clusterService) { + public AutoFollowCoordinator( + Settings settings, + Client client, + ThreadPool threadPool, + ClusterService clusterService, + CcrLicenseChecker ccrLicenseChecker) { this.client = client; this.threadPool = threadPool; this.clusterService = clusterService; + this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); clusterService.addStateApplier(this); } private void doAutoFollow() { + if (ccrLicenseChecker.isCcrAllowed() == false) { + // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API + LOGGER.warn("skipping auto-follower coordination", LicenseUtils.newComplianceException("ccr")); + threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + return; + } if (localNodeMaster == false) { return; } @@ -80,23 +93,32 @@ private void doAutoFollow() { Consumer handler = e -> { if (e != null) { - LOGGER.warn("Failure occurred during auto following indices", e); + LOGGER.warn("failure occurred during auto-follower coordination", e); } threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); }; - AutoFollower operation = new AutoFollower(client, handler, followerClusterState) { + AutoFollower operation = new AutoFollower(handler, followerClusterState) { @Override - void getLeaderClusterState(Client leaderClient, BiConsumer handler) { - ClusterStateRequest request = new ClusterStateRequest(); + void getLeaderClusterState(final String leaderClusterAlias, final BiConsumer handler) { + final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.metaData(true); - leaderClient.admin().cluster().state(request, - ActionListener.wrap( - r -> handler.accept(r.getState(), null), - e -> handler.accept(null, e) - ) - ); + + if ("_local_".equals(leaderClusterAlias)) { + client.admin().cluster().state( + request, ActionListener.wrap(r -> handler.accept(r.getState(), null), e -> handler.accept(null, e))); + } else { + final Client leaderClient = client.getRemoteClusterClient(leaderClusterAlias); + // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( + leaderClient, + leaderClusterAlias, + request, + e -> handler.accept(null, e), + leaderClusterState -> handler.accept(leaderClusterState, null)); + } + } @Override @@ -143,7 +165,6 @@ public void applyClusterState(ClusterChangedEvent event) { abstract static class AutoFollower { - private final Client client; private final Consumer handler; private final ClusterState followerClusterState; private final AutoFollowMetadata autoFollowMetadata; @@ -151,8 +172,7 @@ abstract static class AutoFollower { private final CountDown autoFollowPatternsCountDown; private final AtomicReference autoFollowPatternsErrorHolder = new AtomicReference<>(); - AutoFollower(Client client, Consumer handler, ClusterState followerClusterState) { - this.client = client; + AutoFollower(final Consumer handler, final ClusterState followerClusterState) { this.handler = handler; this.followerClusterState = followerClusterState; this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); @@ -163,10 +183,9 @@ void autoFollowIndices() { for (Map.Entry entry : autoFollowMetadata.getPatterns().entrySet()) { String clusterAlias = entry.getKey(); AutoFollowPattern autoFollowPattern = entry.getValue(); - Client leaderClient = clusterAlias.equals("_local_") ? client : client.getRemoteClusterClient(clusterAlias); List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); - getLeaderClusterState(leaderClient, (leaderClusterState, e) -> { + getLeaderClusterState(clusterAlias, (leaderClusterState, e) -> { if (leaderClusterState != null) { assert e == null; handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState); @@ -289,18 +308,17 @@ static Function recordLeaderIndexAsFollowFunction(St }; } - // abstract methods to make unit testing possible: - - abstract void getLeaderClusterState(Client leaderClient, - BiConsumer handler); + /** + * Fetch the cluster state from the leader with the specified cluster alias + * + * @param leaderClusterAlias the cluster alias of the leader + * @param handler the callback to invoke + */ + abstract void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler); - abstract void createAndFollow(FollowIndexAction.Request followRequest, - Runnable successHandler, - Consumer failureHandler); + abstract void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler); - abstract void updateAutoFollowMetadata(Function updateFunction, - Consumer handler); + abstract void updateAutoFollowMetadata(Function updateFunction, Consumer handler); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index 2e36bca293225..cf77bf8112f91 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -255,7 +255,7 @@ private void createFollowerIndexAndFollowRemoteIndex( client, clusterAlias, leaderIndex, - listener, + listener::onFailure, leaderIndexMetaData -> createFollowerIndex(leaderIndexMetaData, request, listener)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 17b7bbe674b38..2a14c4e9a50bb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -370,7 +370,7 @@ private void followRemoteIndex( client, clusterAlias, leaderIndex, - listener, + listener::onFailure, leaderIndexMetadata -> { try { start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, listener); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 3d3e342c0cd3e..a4ff9511cfbd8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -21,8 +21,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; @@ -30,20 +32,29 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; public class TransportPutAutoFollowPatternAction extends TransportMasterNodeAction { private final Client client; + private final CcrLicenseChecker ccrLicenseChecker; @Inject - public TransportPutAutoFollowPatternAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, Client client, - IndexNameExpressionResolver indexNameExpressionResolver) { + public TransportPutAutoFollowPatternAction( + final Settings settings, + final TransportService transportService, + final ClusterService clusterService, + final ThreadPool threadPool, + final ActionFilters actionFilters, + final Client client, + final IndexNameExpressionResolver indexNameExpressionResolver, + final CcrLicenseChecker ccrLicenseChecker) { super(settings, PutAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutAutoFollowPatternAction.Request::new); this.client = client; + this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); } @Override @@ -60,6 +71,10 @@ protected AcknowledgedResponse newResponse() { protected void masterOperation(PutAutoFollowPatternAction.Request request, ClusterState state, ActionListener listener) throws Exception { + if (ccrLicenseChecker.isCcrAllowed() == false) { + listener.onFailure(LicenseUtils.newComplianceException("ccr")); + return; + } final Client leaderClient; if (request.getLeaderClusterAlias().equals("_local_")) { leaderClient = client; @@ -71,22 +86,26 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request, clusterStateRequest.clear(); clusterStateRequest.metaData(true); - leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { - final ClusterState leaderClusterState = clusterStateResponse.getState(); - clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return new AcknowledgedResponse(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return innerPut(request, currentState, leaderClusterState); - } - }); - }, listener::onFailure)); + leaderClient.admin().cluster().state( + clusterStateRequest, + ActionListener.wrap( + clusterStateResponse -> { + final ClusterState leaderClusterState = clusterStateResponse.getState(); + clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return innerPut(request, currentState, leaderClusterState); + } + }); + }, + listener::onFailure)); } static ClusterState innerPut(PutAutoFollowPatternAction.Request request, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index 675758903bf27..05383b280e6ce 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -6,15 +6,22 @@ package org.elasticsearch.xpack.ccr; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; import org.elasticsearch.xpack.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction; import org.elasticsearch.xpack.ccr.action.FollowIndexAction; +import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import java.util.Collection; @@ -28,10 +35,10 @@ public class CcrLicenseIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return Collections.singletonList(IncompatibleLicenseLocalStateCcr.class); + return Collections.singletonList(NonCompliantLicenseLocalStateCcr.class); } - public void testThatFollowingIndexIsUnavailableWithIncompatibleLicense() throws InterruptedException { + public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException { final FollowIndexAction.Request followRequest = getFollowRequest(); final CountDownLatch latch = new CountDownLatch(1); client().execute( @@ -45,14 +52,14 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(final Exception e) { - assertIncompatibleLicense(e); + assertNonCompliantLicense(e); latch.countDown(); } }); latch.await(); } - public void testThatCreateAndFollowingIndexIsUnavailableWithIncompatibleLicense() throws InterruptedException { + public void testThatCreateAndFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException { final FollowIndexAction.Request followRequest = getFollowRequest(); final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); final CountDownLatch latch = new CountDownLatch(1); @@ -67,14 +74,14 @@ public void onResponse(final CreateAndFollowIndexAction.Response response) { @Override public void onFailure(final Exception e) { - assertIncompatibleLicense(e); + assertNonCompliantLicense(e); latch.countDown(); } }); latch.await(); } - public void testThatCcrStatsAreUnavailableWithIncompatibleLicense() throws InterruptedException { + public void testThatCcrStatsAreUnavailableWithNonCompliantLicense() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.TasksRequest(), new ActionListener() { @Override @@ -84,7 +91,7 @@ public void onResponse(final CcrStatsAction.TasksResponse tasksResponse) { @Override public void onFailure(final Exception e) { - assertIncompatibleLicense(e); + assertNonCompliantLicense(e); latch.countDown(); } }); @@ -92,7 +99,52 @@ public void onFailure(final Exception e) { latch.await(); } - private void assertIncompatibleLicense(final Exception e) { + public void testThatPutAutoFollowPatternsIsUnavailableWithNonCompliantLicense() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("leader"); + request.setLeaderIndexPatterns(Collections.singletonList("*")); + client().execute( + PutAutoFollowPatternAction.INSTANCE, + request, + new ActionListener() { + @Override + public void onResponse(final AcknowledgedResponse response) { + latch.countDown(); + fail(); + } + + @Override + public void onFailure(final Exception e) { + assertNonCompliantLicense(e); + latch.countDown(); + } + }); + latch.await(); + } + + public void testAutoFollowCoordinatorLogsSkippingAutoFollowCoordinationWithNonCompliantLicense() throws Exception { + final Logger logger = LogManager.getLogger(AutoFollowCoordinator.class); + final MockLogAppender appender = new MockLogAppender(); + appender.start(); + appender.addExpectation( + new MockLogAppender.ExceptionSeenEventExpectation( + getTestName(), + logger.getName(), + Level.WARN, + "skipping auto-follower coordination", + ElasticsearchSecurityException.class, + "current license is non-compliant for [ccr]")); + Loggers.addAppender(logger, appender); + try { + assertBusy(appender::assertAllExpectationsMatched); + } finally { + Loggers.removeAppender(logger, appender); + appender.stop(); + } + } + + private void assertNonCompliantLicense(final Exception e) { assertThat(e, instanceOf(ElasticsearchSecurityException.class)); assertThat(e.getMessage(), equalTo("current license is non-compliant for [ccr]")); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IncompatibleLicenseLocalStateCcr.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/NonCompliantLicenseLocalStateCcr.java similarity index 80% rename from x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IncompatibleLicenseLocalStateCcr.java rename to x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/NonCompliantLicenseLocalStateCcr.java index c4b765d3c65ea..f960668a7dff1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IncompatibleLicenseLocalStateCcr.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/NonCompliantLicenseLocalStateCcr.java @@ -12,16 +12,16 @@ import java.nio.file.Path; -public class IncompatibleLicenseLocalStateCcr extends LocalStateCompositeXPackPlugin { +public class NonCompliantLicenseLocalStateCcr extends LocalStateCompositeXPackPlugin { - public IncompatibleLicenseLocalStateCcr(final Settings settings, final Path configPath) throws Exception { + public NonCompliantLicenseLocalStateCcr(final Settings settings, final Path configPath) throws Exception { super(settings, configPath); plugins.add(new Ccr(settings, new CcrLicenseChecker(() -> false)) { @Override protected XPackLicenseState getLicenseState() { - return IncompatibleLicenseLocalStateCcr.this.getLicenseState(); + return NonCompliantLicenseLocalStateCcr.this.getLicenseState(); } }); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index dd1376a4d7a73..2ef841292322a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -66,9 +66,9 @@ public void testAutoFollower() { invoked[0] = true; assertThat(e, nullValue()); }; - AutoFollower autoFollower = new AutoFollower(client, handler, currentState) { + AutoFollower autoFollower = new AutoFollower(handler, currentState) { @Override - void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler) { handler.accept(leaderState, null); } @@ -113,9 +113,9 @@ public void testAutoFollowerClusterStateApiFailure() { invoked[0] = true; assertThat(e, sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(client, handler, followerState) { + AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler) { handler.accept(null, failure); } @@ -161,9 +161,9 @@ public void testAutoFollowerUpdateClusterStateFailure() { invoked[0] = true; assertThat(e, sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(client, handler, followerState) { + AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler) { handler.accept(leaderState, null); } @@ -211,9 +211,9 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { invoked[0] = true; assertThat(e, sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(client, handler, followerState) { + AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler) { handler.accept(leaderState, null); }