Skip to content

Commit f4bc9bf

Browse files
jasontedormartijnvg
andcommitted
Expose CCR stats to monitoring (#33617)
This commit exposes the CCR stats endpoint to monitoring collection. Co-authored-by: Martijn van Groningen <[email protected]>
1 parent bfbccf7 commit f4bc9bf

File tree

19 files changed

+734
-110
lines changed

19 files changed

+734
-110
lines changed

x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ followClusterTestCluster {
4747
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
4848
setting 'xpack.license.self_generated.type', 'trial'
4949
setting 'xpack.security.enabled', 'true'
50-
setting 'xpack.monitoring.enabled', 'false'
50+
setting 'xpack.monitoring.collection.enabled', 'true'
5151
extraConfigFile 'roles.yml', 'roles.yml'
5252
setupCommand 'setupTestAdmin',
5353
'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser"

x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
3131
import static org.hamcrest.Matchers.containsString;
3232
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3334
import static org.hamcrest.Matchers.is;
3435

3536
public class FollowIndexSecurityIT extends ESRestTestCase {
@@ -80,6 +81,7 @@ public void testFollowIndex() throws Exception {
8081
createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
8182
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
8283
assertThat(countCcrNodeTasks(), equalTo(5));
84+
assertBusy(() -> verifyCcrMonitoring(allowedIndex));
8385
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
8486
// Make sure that there are no other ccr relates operations running:
8587
assertBusy(() -> {
@@ -203,4 +205,46 @@ private static boolean indexExists(RestClient client, String index) throws IOExc
203205
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
204206
}
205207

208+
private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
209+
ensureYellow(".monitoring-*");
210+
211+
Request request = new Request("GET", "/.monitoring-*/_search");
212+
request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_stats\"}}}");
213+
Map<String, ?> response = toMap(adminClient().performRequest(request));
214+
215+
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
216+
assertThat(numDocs, greaterThanOrEqualTo(1));
217+
218+
int numberOfOperationsReceived = 0;
219+
int numberOfOperationsIndexed = 0;
220+
221+
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
222+
for (int i = 0; i < numDocs; i++) {
223+
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
224+
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
225+
if (leaderIndex.endsWith(expectedLeaderIndex) == false) {
226+
continue;
227+
}
228+
229+
int foundNumberOfOperationsReceived =
230+
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);
231+
numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived);
232+
int foundNumberOfOperationsIndexed =
233+
(int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit);
234+
numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed);
235+
}
236+
237+
assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1));
238+
assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
239+
}
240+
241+
private static void ensureYellow(String index) throws IOException {
242+
Request request = new Request("GET", "/_cluster/health/" + index);
243+
request.addParameter("wait_for_status", "yellow");
244+
request.addParameter("wait_for_no_relocating_shards", "true");
245+
request.addParameter("timeout", "70s");
246+
request.addParameter("level", "shards");
247+
adminClient().performRequest(request);
248+
}
249+
206250
}

x-pack/plugin/ccr/qa/multi-cluster/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ followClusterTestCluster {
2727
dependsOn leaderClusterTestRunner
2828
numNodes = 1
2929
clusterName = 'follow-cluster'
30+
setting 'xpack.monitoring.collection.enabled', 'true'
3031
setting 'xpack.license.self_generated.type', 'trial'
3132
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
3233
}

x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
2727
import static org.hamcrest.Matchers.equalTo;
28+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
2829

2930
public class FollowIndexIT extends ESRestTestCase {
3031

@@ -75,6 +76,7 @@ public void testFollowIndex() throws Exception {
7576
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
7677
}
7778
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
79+
assertBusy(() -> verifyCcrMonitoring(leaderIndexName));
7880
}
7981
}
8082

@@ -104,6 +106,7 @@ public void testAutoFollowPatterns() throws Exception {
104106
ensureYellow("logs-20190101");
105107
verifyDocuments("logs-20190101", 5);
106108
});
109+
assertBusy(() -> verifyCcrMonitoring("logs-20190101"));
107110
}
108111

109112
private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
@@ -155,6 +158,39 @@ private static void verifyDocuments(String index, int expectedNumDocs) throws IO
155158
}
156159
}
157160

161+
private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
162+
ensureYellow(".monitoring-*");
163+
164+
Request request = new Request("GET", "/.monitoring-*/_search");
165+
request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_stats\"}}}");
166+
Map<String, ?> response = toMap(client().performRequest(request));
167+
168+
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
169+
assertThat(numDocs, greaterThanOrEqualTo(1));
170+
171+
int numberOfOperationsReceived = 0;
172+
int numberOfOperationsIndexed = 0;
173+
174+
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
175+
for (int i = 0; i < numDocs; i++) {
176+
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
177+
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
178+
if (leaderIndex.endsWith(expectedLeaderIndex) == false) {
179+
continue;
180+
}
181+
182+
int foundNumberOfOperationsReceived =
183+
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);
184+
numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived);
185+
int foundNumberOfOperationsIndexed =
186+
(int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit);
187+
numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed);
188+
}
189+
190+
assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1));
191+
assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
192+
}
193+
158194
private static Map<String, Object> toMap(Response response) throws IOException {
159195
return toMap(EntityUtils.toString(response.getEntity()));
160196
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,17 @@
4040
import org.elasticsearch.threadpool.ThreadPool;
4141
import org.elasticsearch.watcher.ResourceWatcherService;
4242
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
43-
import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction;
44-
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
45-
import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction;
46-
import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction;
47-
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
4843
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction;
49-
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
5044
import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction;
5145
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
5246
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
5347
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
5448
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
49+
import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction;
5550
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
51+
import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction;
5652
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
57-
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
53+
import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction;
5854
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
5955
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
6056
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
@@ -67,6 +63,10 @@
6763
import org.elasticsearch.xpack.core.XPackClientActionPlugin;
6864
import org.elasticsearch.xpack.core.XPackPlugin;
6965
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
66+
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
67+
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
68+
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
69+
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
7070

7171
import java.util.Arrays;
7272
import java.util.Collection;
@@ -77,8 +77,8 @@
7777
import java.util.function.Supplier;
7878

7979
import static java.util.Collections.emptyList;
80-
import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_ENABLED_SETTING;
8180
import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING;
81+
import static org.elasticsearch.xpack.core.XPackSettings.CCR_ENABLED_SETTING;
8282

8383
/**
8484
* Container class for CCR functionality.

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.elasticsearch.common.settings.Setting;
99
import org.elasticsearch.common.settings.Setting.Property;
1010
import org.elasticsearch.common.unit.TimeValue;
11+
import org.elasticsearch.xpack.core.XPackSettings;
1112

1213
import java.util.Arrays;
1314
import java.util.List;
@@ -22,11 +23,6 @@ private CcrSettings() {
2223

2324
}
2425

25-
/**
26-
* Setting for controlling whether or not CCR is enabled.
27-
*/
28-
static final Setting<Boolean> CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope);
29-
3026
/**
3127
* Index setting for a following index.
3228
*/
@@ -46,7 +42,7 @@ private CcrSettings() {
4642
*/
4743
static List<Setting<?>> getSettings() {
4844
return Arrays.asList(
49-
CCR_ENABLED_SETTING,
45+
XPackSettings.CCR_ENABLED_SETTING,
5046
CCR_FOLLOWING_INDEX_SETTING,
5147
CCR_AUTO_FOLLOW_POLL_INTERVAL);
5248
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.tasks.TaskId;
2121
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
2222
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
23+
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
2324

2425
import java.util.ArrayList;
2526
import java.util.Arrays;

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535

3636
public class TransportCcrStatsAction extends TransportTasksAction<
3737
ShardFollowNodeTask,
38-
CcrStatsAction.TasksRequest,
39-
CcrStatsAction.TasksResponse, CcrStatsAction.TaskResponse> {
38+
CcrStatsAction.StatsRequest,
39+
CcrStatsAction.StatsResponses, CcrStatsAction.StatsResponse> {
4040

4141
private final IndexNameExpressionResolver resolver;
4242
private final CcrLicenseChecker ccrLicenseChecker;
@@ -58,8 +58,8 @@ public TransportCcrStatsAction(
5858
transportService,
5959
actionFilters,
6060
resolver,
61-
CcrStatsAction.TasksRequest::new,
62-
CcrStatsAction.TasksResponse::new,
61+
CcrStatsAction.StatsRequest::new,
62+
CcrStatsAction.StatsResponses::new,
6363
Ccr.CCR_THREAD_POOL_NAME);
6464
this.resolver = Objects.requireNonNull(resolver);
6565
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
@@ -68,8 +68,8 @@ public TransportCcrStatsAction(
6868
@Override
6969
protected void doExecute(
7070
final Task task,
71-
final CcrStatsAction.TasksRequest request,
72-
final ActionListener<CcrStatsAction.TasksResponse> listener) {
71+
final CcrStatsAction.StatsRequest request,
72+
final ActionListener<CcrStatsAction.StatsResponses> listener) {
7373
if (ccrLicenseChecker.isCcrAllowed() == false) {
7474
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
7575
return;
@@ -78,21 +78,21 @@ protected void doExecute(
7878
}
7979

8080
@Override
81-
protected CcrStatsAction.TasksResponse newResponse(
82-
final CcrStatsAction.TasksRequest request,
83-
final List<CcrStatsAction.TaskResponse> taskResponses,
81+
protected CcrStatsAction.StatsResponses newResponse(
82+
final CcrStatsAction.StatsRequest request,
83+
final List<CcrStatsAction.StatsResponse> statsRespons,
8484
final List<TaskOperationFailure> taskOperationFailures,
8585
final List<FailedNodeException> failedNodeExceptions) {
86-
return new CcrStatsAction.TasksResponse(taskOperationFailures, failedNodeExceptions, taskResponses);
86+
return new CcrStatsAction.StatsResponses(taskOperationFailures, failedNodeExceptions, statsRespons);
8787
}
8888

8989
@Override
90-
protected CcrStatsAction.TaskResponse readTaskResponse(final StreamInput in) throws IOException {
91-
return new CcrStatsAction.TaskResponse(in);
90+
protected CcrStatsAction.StatsResponse readTaskResponse(final StreamInput in) throws IOException {
91+
return new CcrStatsAction.StatsResponse(in);
9292
}
9393

9494
@Override
95-
protected void processTasks(final CcrStatsAction.TasksRequest request, final Consumer<ShardFollowNodeTask> operation) {
95+
protected void processTasks(final CcrStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
9696
final ClusterState state = clusterService.state();
9797
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
9898
for (final Task task : taskManager.getTasks().values()) {
@@ -107,10 +107,10 @@ protected void processTasks(final CcrStatsAction.TasksRequest request, final Con
107107

108108
@Override
109109
protected void taskOperation(
110-
final CcrStatsAction.TasksRequest request,
110+
final CcrStatsAction.StatsRequest request,
111111
final ShardFollowNodeTask task,
112-
final ActionListener<CcrStatsAction.TaskResponse> listener) {
113-
listener.onResponse(new CcrStatsAction.TaskResponse(task.getFollowShardId(), task.getStatus()));
112+
final ActionListener<CcrStatsAction.StatsResponse> listener) {
113+
listener.onResponse(new CcrStatsAction.StatsResponse(task.getFollowShardId(), task.getStatus()));
114114
}
115115

116116
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public String getName() {
3333

3434
@Override
3535
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
36-
final CcrStatsAction.TasksRequest request = new CcrStatsAction.TasksRequest();
36+
final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
3737
request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
3838
request.setIndicesOptions(IndicesOptions.fromRequest(restRequest, request.indicesOptions()));
3939
return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ public void onFailure(final Exception e) {
9090

9191
public void testThatCcrStatsAreUnavailableWithNonCompliantLicense() throws InterruptedException {
9292
final CountDownLatch latch = new CountDownLatch(1);
93-
client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.TasksRequest(), new ActionListener<CcrStatsAction.TasksResponse>() {
93+
client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.StatsRequest(), new ActionListener<CcrStatsAction.StatsResponses>() {
9494
@Override
95-
public void onResponse(final CcrStatsAction.TasksResponse tasksResponse) {
95+
public void onResponse(final CcrStatsAction.StatsResponses statsResponses) {
9696
latch.countDown();
9797
fail();
9898
}

0 commit comments

Comments
 (0)