Skip to content

Commit 4b15543

Browse files
martijnvgkcm
authored andcommitted
[CCR] Expose auto follow stats to monitoring (#33886)
1 parent d607fb0 commit 4b15543

File tree

14 files changed

+697
-151
lines changed

14 files changed

+697
-151
lines changed

x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ public void testAutoFollowPatterns() throws Exception {
152152
verifyDocuments(adminClient(), allowedIndex, 5);
153153
});
154154
assertThat(indexExists(adminClient(), disallowedIndex), is(false));
155+
assertBusy(() -> {
156+
verifyCcrMonitoring(allowedIndex, allowedIndex);
157+
verifyAutoFollowMonitoring();
158+
});
155159

156160
// Cleanup by deleting auto follow pattern and unfollowing:
157161
request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
@@ -309,4 +313,30 @@ private static void verifyCcrMonitoring(String expectedLeaderIndex, String expec
309313
assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
310314
}
311315

316+
private static void verifyAutoFollowMonitoring() throws IOException {
317+
Request request = new Request("GET", "/.monitoring-*/_search");
318+
request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_auto_follow_stats\"}}}");
319+
Map<String, ?> response;
320+
try {
321+
response = toMap(adminClient().performRequest(request));
322+
} catch (ResponseException e) {
323+
throw new AssertionError("error while searching", e);
324+
}
325+
326+
int numberOfSuccessfulFollowIndices = 0;
327+
328+
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
329+
assertThat(hits.size(), greaterThanOrEqualTo(1));
330+
331+
for (int i = 0; i < hits.size(); i++) {
332+
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
333+
334+
int foundNumberOfOperationsReceived =
335+
(int) XContentMapValues.extractValue("_source.ccr_auto_follow_stats.number_of_successful_follow_indices", hit);
336+
numberOfSuccessfulFollowIndices = Math.max(numberOfSuccessfulFollowIndices, foundNumberOfOperationsReceived);
337+
}
338+
339+
assertThat(numberOfSuccessfulFollowIndices, greaterThanOrEqualTo(1));
340+
}
341+
312342
}

x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ public void testAutoFollowPatterns() throws Exception {
125125
ensureYellow("logs-20190101");
126126
verifyDocuments("logs-20190101", 5);
127127
});
128-
assertBusy(() -> verifyCcrMonitoring("logs-20190101", "logs-20190101"));
128+
assertBusy(() -> {
129+
verifyCcrMonitoring("logs-20190101", "logs-20190101");
130+
verifyAutoFollowMonitoring();
131+
});
129132
}
130133

131134
private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
@@ -213,6 +216,32 @@ private static void verifyCcrMonitoring(final String expectedLeaderIndex, final
213216
assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
214217
}
215218

219+
private static void verifyAutoFollowMonitoring() throws IOException {
220+
Request request = new Request("GET", "/.monitoring-*/_search");
221+
request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_auto_follow_stats\"}}}");
222+
Map<String, ?> response;
223+
try {
224+
response = toMap(client().performRequest(request));
225+
} catch (ResponseException e) {
226+
throw new AssertionError("error while searching", e);
227+
}
228+
229+
int numberOfSuccessfulFollowIndices = 0;
230+
231+
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
232+
assertThat(hits.size(), greaterThanOrEqualTo(1));
233+
234+
for (int i = 0; i < hits.size(); i++) {
235+
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
236+
237+
int foundNumberOfOperationsReceived =
238+
(int) XContentMapValues.extractValue("_source.ccr_auto_follow_stats.number_of_successful_follow_indices", hit);
239+
numberOfSuccessfulFollowIndices = Math.max(numberOfSuccessfulFollowIndices, foundNumberOfOperationsReceived);
240+
}
241+
242+
assertThat(numberOfSuccessfulFollowIndices, greaterThanOrEqualTo(1));
243+
}
244+
216245
private static Map<String, Object> toMap(Response response) throws IOException {
217246
return toMap(EntityUtils.toString(response.getEntity()));
218247
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.monitoring.collector.ccr;
7+
8+
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.cluster.service.ClusterService;
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.license.XPackLicenseState;
12+
import org.elasticsearch.xpack.core.XPackSettings;
13+
import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
14+
15+
import static org.hamcrest.Matchers.is;
16+
import static org.mockito.Mockito.verify;
17+
import static org.mockito.Mockito.when;
18+
19+
public abstract class AbstractCcrCollectorTestCase extends BaseCollectorTestCase {
20+
21+
public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
22+
final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
23+
final boolean ccrAllowed = randomBoolean();
24+
final boolean isElectedMaster = randomBoolean();
25+
whenLocalNodeElectedMaster(isElectedMaster);
26+
27+
// this controls the blockage
28+
when(licenseState.isMonitoringAllowed()).thenReturn(false);
29+
when(licenseState.isCcrAllowed()).thenReturn(ccrAllowed);
30+
31+
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
32+
33+
assertThat(collector.shouldCollect(isElectedMaster), is(false));
34+
if (isElectedMaster) {
35+
verify(licenseState).isMonitoringAllowed();
36+
}
37+
}
38+
39+
public void testShouldCollectReturnsFalseIfNotMaster() {
40+
// regardless of CCR being enabled
41+
final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
42+
43+
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
44+
when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
45+
// this controls the blockage
46+
final boolean isElectedMaster = false;
47+
48+
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
49+
50+
assertThat(collector.shouldCollect(isElectedMaster), is(false));
51+
}
52+
53+
public void testShouldCollectReturnsFalseIfCCRIsDisabled() {
54+
// this is controls the blockage
55+
final Settings settings = ccrDisabledSettings();
56+
57+
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
58+
when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
59+
60+
final boolean isElectedMaster = randomBoolean();
61+
whenLocalNodeElectedMaster(isElectedMaster);
62+
63+
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
64+
65+
assertThat(collector.shouldCollect(isElectedMaster), is(false));
66+
67+
if (isElectedMaster) {
68+
verify(licenseState).isMonitoringAllowed();
69+
}
70+
}
71+
72+
public void testShouldCollectReturnsFalseIfCCRIsNotAllowed() {
73+
final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
74+
75+
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
76+
// this is controls the blockage
77+
when(licenseState.isCcrAllowed()).thenReturn(false);
78+
final boolean isElectedMaster = randomBoolean();
79+
whenLocalNodeElectedMaster(isElectedMaster);
80+
81+
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
82+
83+
assertThat(collector.shouldCollect(isElectedMaster), is(false));
84+
85+
if (isElectedMaster) {
86+
verify(licenseState).isMonitoringAllowed();
87+
}
88+
}
89+
90+
public void testShouldCollectReturnsTrue() {
91+
final Settings settings = ccrEnabledSettings();
92+
93+
when(licenseState.isMonitoringAllowed()).thenReturn(true);
94+
when(licenseState.isCcrAllowed()).thenReturn(true);
95+
final boolean isElectedMaster = true;
96+
97+
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
98+
99+
assertThat(collector.shouldCollect(isElectedMaster), is(true));
100+
101+
verify(licenseState).isMonitoringAllowed();
102+
}
103+
104+
abstract AbstractCcrCollector createCollector(Settings settings,
105+
ClusterService clusterService,
106+
XPackLicenseState licenseState,
107+
Client client);
108+
109+
private Settings ccrEnabledSettings() {
110+
// since it's the default, we want to ensure we test both with/without it
111+
return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true).build();
112+
}
113+
114+
private Settings ccrDisabledSettings() {
115+
return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build();
116+
}
117+
118+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.monitoring.collector.ccr;
7+
8+
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.common.Strings;
10+
import org.elasticsearch.common.bytes.BytesReference;
11+
import org.elasticsearch.common.xcontent.XContentBuilder;
12+
import org.elasticsearch.common.xcontent.XContentHelper;
13+
import org.elasticsearch.common.xcontent.XContentType;
14+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
15+
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
16+
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
17+
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
18+
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
19+
import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase;
20+
import org.joda.time.DateTime;
21+
import org.joda.time.DateTimeZone;
22+
import org.junit.Before;
23+
24+
import java.io.IOException;
25+
import java.util.Collections;
26+
import java.util.Map;
27+
import java.util.NavigableMap;
28+
import java.util.TreeMap;
29+
30+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
31+
import static org.hamcrest.Matchers.anyOf;
32+
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.is;
34+
import static org.hamcrest.Matchers.notNullValue;
35+
import static org.hamcrest.Matchers.nullValue;
36+
37+
public class AutoFollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<AutoFollowStatsMonitoringDoc> {
38+
39+
private AutoFollowStats autoFollowStats;
40+
41+
@Before
42+
public void instantiateAutoFollowStats() {
43+
autoFollowStats = new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
44+
Collections.emptyNavigableMap());
45+
}
46+
47+
@Override
48+
protected AutoFollowStatsMonitoringDoc createMonitoringDoc(String cluster,
49+
long timestamp,
50+
long interval,
51+
MonitoringDoc.Node node,
52+
MonitoredSystem system,
53+
String type,
54+
String id) {
55+
return new AutoFollowStatsMonitoringDoc(cluster, timestamp, interval, node, autoFollowStats);
56+
}
57+
58+
@Override
59+
protected void assertMonitoringDoc(AutoFollowStatsMonitoringDoc document) {
60+
assertThat(document.getSystem(), is(MonitoredSystem.ES));
61+
assertThat(document.getType(), is(AutoFollowStatsMonitoringDoc.TYPE));
62+
assertThat(document.getId(), nullValue());
63+
assertThat(document.stats(), is(autoFollowStats));
64+
}
65+
66+
@Override
67+
public void testToXContent() throws IOException {
68+
final long timestamp = System.currentTimeMillis();
69+
final long intervalMillis = System.currentTimeMillis();
70+
final long nodeTimestamp = System.currentTimeMillis();
71+
final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", nodeTimestamp);
72+
73+
final NavigableMap<String, ElasticsearchException> recentAutoFollowExceptions =
74+
new TreeMap<>(Collections.singletonMap(
75+
randomAlphaOfLength(4),
76+
new ElasticsearchException("cannot follow index")));
77+
final AutoFollowStats autoFollowStats =
78+
new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), recentAutoFollowExceptions);
79+
80+
final AutoFollowStatsMonitoringDoc document =
81+
new AutoFollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, autoFollowStats);
82+
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
83+
assertThat(
84+
xContent.utf8ToString(),
85+
equalTo(
86+
"{"
87+
+ "\"cluster_uuid\":\"_cluster\","
88+
+ "\"timestamp\":\"" + new DateTime(timestamp, DateTimeZone.UTC).toString() + "\","
89+
+ "\"interval_ms\":" + intervalMillis + ","
90+
+ "\"type\":\"ccr_auto_follow_stats\","
91+
+ "\"source_node\":{"
92+
+ "\"uuid\":\"_uuid\","
93+
+ "\"host\":\"_host\","
94+
+ "\"transport_address\":\"_addr\","
95+
+ "\"ip\":\"_ip\","
96+
+ "\"name\":\"_name\","
97+
+ "\"timestamp\":\"" + new DateTime(nodeTimestamp, DateTimeZone.UTC).toString() + "\""
98+
+ "},"
99+
+ "\"ccr_auto_follow_stats\":{"
100+
+ "\"number_of_failed_follow_indices\":" + autoFollowStats.getNumberOfFailedFollowIndices() + ","
101+
+ "\"number_of_failed_remote_cluster_state_requests\":" +
102+
autoFollowStats.getNumberOfFailedRemoteClusterStateRequests() + ","
103+
+ "\"number_of_successful_follow_indices\":" + autoFollowStats.getNumberOfSuccessfulFollowIndices() + ","
104+
+ "\"recent_auto_follow_errors\":["
105+
+ "{"
106+
+ "\"leader_index\":\"" + recentAutoFollowExceptions.keySet().iterator().next() + "\","
107+
+ "\"auto_follow_exception\":{"
108+
+ "\"type\":\"exception\","
109+
+ "\"reason\":\"cannot follow index\""
110+
+ "}"
111+
+ "}"
112+
+ "]"
113+
+ "}"
114+
+ "}"));
115+
}
116+
117+
public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
118+
final NavigableMap<String, ElasticsearchException> fetchExceptions =
119+
new TreeMap<>(Collections.singletonMap("leader_index", new ElasticsearchException("cannot follow index")));
120+
final AutoFollowStats status = new AutoFollowStats(1, 0, 2, fetchExceptions);
121+
XContentBuilder builder = jsonBuilder();
122+
builder.value(status);
123+
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
124+
125+
Map<String, Object> template =
126+
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
127+
Map<?, ?> autoFollowStatsMapping =
128+
(Map<?, ?>) XContentMapValues.extractValue("mappings.doc.properties.ccr_auto_follow_stats.properties", template);
129+
130+
assertThat(serializedStatus.size(), equalTo(autoFollowStatsMapping.size()));
131+
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
132+
String fieldName = entry.getKey();
133+
Map<?, ?> fieldMapping = (Map<?, ?>) autoFollowStatsMapping.get(fieldName);
134+
assertThat(fieldMapping, notNullValue());
135+
136+
Object fieldValue = entry.getValue();
137+
String fieldType = (String) fieldMapping.get("type");
138+
if (fieldValue instanceof Long || fieldValue instanceof Integer) {
139+
assertThat("expected long field type for field [" + fieldName + "]", fieldType,
140+
anyOf(equalTo("long"), equalTo("integer")));
141+
} else if (fieldValue instanceof String) {
142+
assertThat("expected keyword field type for field [" + fieldName + "]", fieldType,
143+
anyOf(equalTo("keyword"), equalTo("text")));
144+
} else {
145+
// Manual test specific object fields and if not just fail:
146+
if (fieldName.equals("recent_auto_follow_errors")) {
147+
assertThat(fieldType, equalTo("nested"));
148+
assertThat(((Map<?, ?>) fieldMapping.get("properties")).size(), equalTo(2));
149+
assertThat(XContentMapValues.extractValue("properties.leader_index.type", fieldMapping), equalTo("keyword"));
150+
assertThat(XContentMapValues.extractValue("properties.auto_follow_exception.type", fieldMapping), equalTo("object"));
151+
152+
Map<?, ?> exceptionFieldMapping =
153+
(Map<?, ?>) XContentMapValues.extractValue("properties.auto_follow_exception.properties", fieldMapping);
154+
assertThat(exceptionFieldMapping.size(), equalTo(2));
155+
assertThat(XContentMapValues.extractValue("type.type", exceptionFieldMapping), equalTo("keyword"));
156+
assertThat(XContentMapValues.extractValue("reason.type", exceptionFieldMapping), equalTo("text"));
157+
} else {
158+
fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]");
159+
}
160+
}
161+
}
162+
}
163+
}

0 commit comments

Comments
 (0)