diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index d2c86e69fbd5d..2bb77aacffa60 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -103,9 +103,8 @@ public boolean isCcrAllowed() { * @param leaderIndex the name of the leader index * @param onFailure the failure consumer * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards - * @param the type of response the listener is waiting for */ - public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( + public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( final Client client, final String clusterAlias, final String leaderIndex, @@ -118,8 +117,8 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU request.indices(leaderIndex); checkRemoteClusterLicenseAndFetchClusterState( client, - Collections.emptyMap(), clusterAlias, + client.getRemoteClusterClient(clusterAlias), request, onFailure, leaderClusterState -> { @@ -151,22 +150,20 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU * * @param client the client * @param clusterAlias the remote cluster alias - * @param headers the headers to use for leader client * @param request the cluster state request * @param onFailure the failure consumer * @param leaderClusterStateConsumer the leader cluster state consumer */ public void checkRemoteClusterLicenseAndFetchClusterState( final Client client, - final Map headers, final String clusterAlias, final ClusterStateRequest request, final Consumer onFailure, final Consumer leaderClusterStateConsumer) { checkRemoteClusterLicenseAndFetchClusterState( client, - headers, clusterAlias, + systemClient(client.getRemoteClusterClient(clusterAlias)), request, onFailure, leaderClusterStateConsumer, @@ -182,18 +179,17 @@ public void checkRemoteClusterLicenseAndFetchClusterState( * * @param client the client * @param clusterAlias the remote cluster alias - * @param headers the headers to use for leader client + * @param leaderClient the leader client to use to execute cluster state API * @param request the cluster state request * @param onFailure the failure consumer * @param leaderClusterStateConsumer the leader cluster state consumer * @param nonCompliantLicense the supplier for when the license state of the remote cluster is non-compliant * @param unknownLicense the supplier for when the license state of the remote cluster is unknown due to failure - * @param the type of response the listener is waiting for */ - private void checkRemoteClusterLicenseAndFetchClusterState( + private void checkRemoteClusterLicenseAndFetchClusterState( final Client client, - final Map headers, final String clusterAlias, + final Client leaderClient, final ClusterStateRequest request, final Consumer onFailure, final Consumer leaderClusterStateConsumer, @@ -207,7 +203,6 @@ private void checkRemoteClusterLicenseAndFetchClusterState( @Override public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { if (licenseCheck.isSuccess()) { - final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers); final ActionListener clusterStateListener = ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata @@ -356,6 +351,21 @@ void doExecute(Action action, Request request, ActionListener + void doExecute(Action action, Request request, ActionListener listener) { + final Supplier supplier = threadContext.newRestorableContext(false); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + } + } + }; + } + private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { final ThreadContext.StoredContext storedContext = threadContext.stashContext(); threadContext.copyHeaders(headers.entrySet()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index b32ed829cf42c..6323fb7f103db 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -159,8 +159,7 @@ private void doAutoFollow() { AutoFollower operation = new AutoFollower(handler, followerClusterState) { @Override - void getLeaderClusterState(final Map headers, - final String remoteCluster, + void getLeaderClusterState(final String remoteCluster, final BiConsumer handler) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); @@ -168,7 +167,6 @@ void getLeaderClusterState(final Map headers, // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( client, - headers, remoteCluster, request, e -> handler.accept(null, e), @@ -249,7 +247,7 @@ void autoFollowIndices() { final String remoteCluster = autoFollowPattern.getRemoteCluster(); Map headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName); - getLeaderClusterState(headers, remoteCluster, (leaderClusterState, e) -> { + getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { if (leaderClusterState != null) { assert e == null; final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName); @@ -413,13 +411,10 @@ static Function recordLeaderIndexAsFollowFunction(St /** * Fetch the cluster state from the leader with the specified cluster alias - * - * @param headers the client headers * @param remoteCluster the name of the leader cluster * @param handler the callback to invoke */ abstract void getLeaderClusterState( - Map headers, String remoteCluster, BiConsumer handler ); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index c460f23fe93e7..25a12f30c08f2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -100,20 +100,12 @@ protected void masterOperation( client.getRemoteClusterClient(remoteCluster); String leaderIndex = request.getLeaderIndex(); - createFollowerIndexAndFollowRemoteIndex(request, remoteCluster, leaderIndex, listener); - } - - private void createFollowerIndexAndFollowRemoteIndex( - final PutFollowAction.Request request, - final String remoteCluster, - final String leaderIndex, - final ActionListener listener) { ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( - client, - remoteCluster, - leaderIndex, - listener::onFailure, - (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); + client, + remoteCluster, + leaderIndex, + listener::onFailure, + (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); } private void createFollowerIndex( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 6b542d15044e5..1da58cc2703db 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -83,10 +83,9 @@ public void testAutoFollower() { }; AutoFollower autoFollower = new AutoFollower(handler, currentState) { @Override - void getLeaderClusterState(Map headers, - String remoteCluster, + void getLeaderClusterState(String remoteCluster, BiConsumer handler) { - assertThat(headers, equalTo(autoFollowHeaders.get("remote"))); + assertThat(remoteCluster, equalTo("remote")); handler.accept(leaderState, null); } @@ -143,8 +142,7 @@ public void testAutoFollowerClusterStateApiFailure() { }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(Map headers, - String remoteCluster, + void getLeaderClusterState(String remoteCluster, BiConsumer handler) { handler.accept(null, failure); } @@ -204,8 +202,7 @@ public void testAutoFollowerUpdateClusterStateFailure() { }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(Map headers, - String remoteCluster, + void getLeaderClusterState(String remoteCluster, BiConsumer handler) { handler.accept(leaderState, null); } @@ -267,8 +264,7 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(Map headers, - String remoteCluster, + void getLeaderClusterState(String remoteCluster, BiConsumer handler) { handler.accept(leaderState, null); }