Skip to content

Commit a19ee79

Browse files
committed
[CCR] Auto follow Coordinator fetch cluster state in system context (#35120)
Auto follow Coordinator should fetch the leader cluster state using system context.
1 parent c6db73b commit a19ee79

File tree

4 files changed

+34
-40
lines changed

4 files changed

+34
-40
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,8 @@ public boolean isCcrAllowed() {
104104
* @param leaderIndex the name of the leader index
105105
* @param onFailure the failure consumer
106106
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards
107-
* @param <T> the type of response the listener is waiting for
108107
*/
109-
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
108+
public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
110109
final Client client,
111110
final String clusterAlias,
112111
final String leaderIndex,
@@ -119,8 +118,8 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
119118
request.indices(leaderIndex);
120119
checkRemoteClusterLicenseAndFetchClusterState(
121120
client,
122-
Collections.emptyMap(),
123121
clusterAlias,
122+
client.getRemoteClusterClient(clusterAlias),
124123
request,
125124
onFailure,
126125
leaderClusterState -> {
@@ -152,22 +151,20 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
152151
*
153152
* @param client the client
154153
* @param clusterAlias the remote cluster alias
155-
* @param headers the headers to use for leader client
156154
* @param request the cluster state request
157155
* @param onFailure the failure consumer
158156
* @param leaderClusterStateConsumer the leader cluster state consumer
159157
*/
160158
public void checkRemoteClusterLicenseAndFetchClusterState(
161159
final Client client,
162-
final Map<String, String> headers,
163160
final String clusterAlias,
164161
final ClusterStateRequest request,
165162
final Consumer<Exception> onFailure,
166163
final Consumer<ClusterState> leaderClusterStateConsumer) {
167164
checkRemoteClusterLicenseAndFetchClusterState(
168165
client,
169-
headers,
170166
clusterAlias,
167+
systemClient(client.getRemoteClusterClient(clusterAlias)),
171168
request,
172169
onFailure,
173170
leaderClusterStateConsumer,
@@ -183,18 +180,17 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
183180
*
184181
* @param client the client
185182
* @param clusterAlias the remote cluster alias
186-
* @param headers the headers to use for leader client
183+
* @param leaderClient the leader client to use to execute cluster state API
187184
* @param request the cluster state request
188185
* @param onFailure the failure consumer
189186
* @param leaderClusterStateConsumer the leader cluster state consumer
190187
* @param nonCompliantLicense the supplier for when the license state of the remote cluster is non-compliant
191188
* @param unknownLicense the supplier for when the license state of the remote cluster is unknown due to failure
192-
* @param <T> the type of response the listener is waiting for
193189
*/
194-
private <T> void checkRemoteClusterLicenseAndFetchClusterState(
190+
private void checkRemoteClusterLicenseAndFetchClusterState(
195191
final Client client,
196-
final Map<String, String> headers,
197192
final String clusterAlias,
193+
final Client leaderClient,
198194
final ClusterStateRequest request,
199195
final Consumer<Exception> onFailure,
200196
final Consumer<ClusterState> leaderClusterStateConsumer,
@@ -208,7 +204,6 @@ private <T> void checkRemoteClusterLicenseAndFetchClusterState(
208204
@Override
209205
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
210206
if (licenseCheck.isSuccess()) {
211-
final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers);
212207
final ActionListener<ClusterStateResponse> clusterStateListener =
213208
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
214209
// following an index in remote cluster, so use remote client to fetch leader index metadata
@@ -363,6 +358,22 @@ void doExecute(Action<Request, Response, RequestBuilder> action, Request request
363358
}
364359
}
365360

361+
private static Client systemClient(Client client) {
362+
final ThreadContext threadContext = client.threadPool().getThreadContext();
363+
return new FilterClient(client) {
364+
@Override
365+
protected <Request extends ActionRequest, Response extends ActionResponse,
366+
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
367+
void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
368+
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
369+
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
370+
threadContext.markAsSystemContext();
371+
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
372+
}
373+
}
374+
};
375+
}
376+
366377
private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
367378
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
368379
threadContext.copyHeaders(headers.entrySet());

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,14 @@ private void doAutoFollow() {
159159
AutoFollower operation = new AutoFollower(handler, followerClusterState) {
160160

161161
@Override
162-
void getLeaderClusterState(final Map<String, String> headers,
163-
final String remoteCluster,
162+
void getLeaderClusterState(final String remoteCluster,
164163
final BiConsumer<ClusterState, Exception> handler) {
165164
final ClusterStateRequest request = new ClusterStateRequest();
166165
request.clear();
167166
request.metaData(true);
168167
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
169168
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
170169
client,
171-
headers,
172170
remoteCluster,
173171
request,
174172
e -> handler.accept(null, e),
@@ -249,7 +247,7 @@ void autoFollowIndices() {
249247
final String remoteCluster = autoFollowPattern.getRemoteCluster();
250248

251249
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName);
252-
getLeaderClusterState(headers, remoteCluster, (leaderClusterState, e) -> {
250+
getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
253251
if (leaderClusterState != null) {
254252
assert e == null;
255253
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName);
@@ -413,13 +411,10 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St
413411

414412
/**
415413
* Fetch the cluster state from the leader with the specified cluster alias
416-
*
417-
* @param headers the client headers
418414
* @param remoteCluster the name of the leader cluster
419415
* @param handler the callback to invoke
420416
*/
421417
abstract void getLeaderClusterState(
422-
Map<String, String> headers,
423418
String remoteCluster,
424419
BiConsumer<ClusterState, Exception> handler
425420
);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,12 @@ protected void masterOperation(
107107
client.getRemoteClusterClient(remoteCluster);
108108

109109
String leaderIndex = request.getLeaderIndex();
110-
createFollowerIndexAndFollowRemoteIndex(request, remoteCluster, leaderIndex, listener);
111-
}
112-
113-
private void createFollowerIndexAndFollowRemoteIndex(
114-
final PutFollowAction.Request request,
115-
final String remoteCluster,
116-
final String leaderIndex,
117-
final ActionListener<PutFollowAction.Response> listener) {
118110
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
119-
client,
120-
remoteCluster,
121-
leaderIndex,
122-
listener::onFailure,
123-
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
111+
client,
112+
remoteCluster,
113+
leaderIndex,
114+
listener::onFailure,
115+
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
124116
}
125117

126118
private void createFollowerIndex(

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,9 @@ public void testAutoFollower() {
8383
};
8484
AutoFollower autoFollower = new AutoFollower(handler, currentState) {
8585
@Override
86-
void getLeaderClusterState(Map<String, String> headers,
87-
String remoteCluster,
86+
void getLeaderClusterState(String remoteCluster,
8887
BiConsumer<ClusterState, Exception> handler) {
89-
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
88+
assertThat(remoteCluster, equalTo("remote"));
9089
handler.accept(leaderState, null);
9190
}
9291

@@ -143,8 +142,7 @@ public void testAutoFollowerClusterStateApiFailure() {
143142
};
144143
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
145144
@Override
146-
void getLeaderClusterState(Map<String, String> headers,
147-
String remoteCluster,
145+
void getLeaderClusterState(String remoteCluster,
148146
BiConsumer<ClusterState, Exception> handler) {
149147
handler.accept(null, failure);
150148
}
@@ -204,8 +202,7 @@ public void testAutoFollowerUpdateClusterStateFailure() {
204202
};
205203
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
206204
@Override
207-
void getLeaderClusterState(Map<String, String> headers,
208-
String remoteCluster,
205+
void getLeaderClusterState(String remoteCluster,
209206
BiConsumer<ClusterState, Exception> handler) {
210207
handler.accept(leaderState, null);
211208
}
@@ -267,8 +264,7 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() {
267264
};
268265
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
269266
@Override
270-
void getLeaderClusterState(Map<String, String> headers,
271-
String remoteCluster,
267+
void getLeaderClusterState(String remoteCluster,
272268
BiConsumer<ClusterState, Exception> handler) {
273269
handler.accept(leaderState, null);
274270
}

0 commit comments

Comments
 (0)