Skip to content

Commit 3c3b3a7

Browse files
Use all profiling events on startup (#92087)
With this commit we use `profiling-events-all` to query profiling events if the appropriate index for a sample count is not present yet. This can happen on cluster startup when only a few events have been accumulated because additional indices are only created when there are enough events. This aligns behavior of the Elasticsearch plugin with the Kibana plugin. We also refactor the profiling-related test cases so we can simulate situations after cluster startup when not all profiling-related indices have been created. As testing also cancellation would require additional complexity and decreases test maintainability (need to adjust `slack` based on the scenario which is a very low-level change), we also separate tests for the regular case and cancellation. Relates #91640
1 parent a04d023 commit 3c3b3a7

File tree

5 files changed

+366
-290
lines changed

5 files changed

+366
-290
lines changed

docs/changelog/92087.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 92087
2+
summary: Use all profiling events on startup
3+
area: Search
4+
type: bug
5+
issues: []
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.profiler;
9+
10+
import org.apache.http.entity.ContentType;
11+
import org.apache.http.entity.StringEntity;
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.lucene.util.SetOnce;
14+
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
15+
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
16+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
17+
import org.elasticsearch.action.support.PlainActionFuture;
18+
import org.elasticsearch.client.Cancellable;
19+
import org.elasticsearch.client.Request;
20+
import org.elasticsearch.client.Response;
21+
import org.elasticsearch.plugins.Plugin;
22+
import org.elasticsearch.plugins.PluginsService;
23+
import org.elasticsearch.script.MockScriptPlugin;
24+
import org.elasticsearch.search.lookup.LeafStoredFieldsLookup;
25+
import org.elasticsearch.tasks.CancellableTask;
26+
import org.elasticsearch.tasks.Task;
27+
import org.elasticsearch.tasks.TaskId;
28+
import org.elasticsearch.tasks.TaskInfo;
29+
import org.elasticsearch.tasks.TaskManager;
30+
import org.elasticsearch.transport.TransportService;
31+
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.ArrayList;
34+
import java.util.Collection;
35+
import java.util.Collections;
36+
import java.util.HashMap;
37+
import java.util.HashSet;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Set;
41+
import java.util.concurrent.CancellationException;
42+
import java.util.concurrent.TimeUnit;
43+
import java.util.concurrent.atomic.AtomicBoolean;
44+
import java.util.concurrent.atomic.AtomicInteger;
45+
import java.util.function.Function;
46+
47+
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
48+
import static org.hamcrest.Matchers.greaterThan;
49+
import static org.hamcrest.Matchers.instanceOf;
50+
51+
public class CancellationIT extends ProfilingTestCase {
52+
@Override
53+
protected Collection<Class<? extends Plugin>> nodePlugins() {
54+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
55+
plugins.add(ScriptedBlockPlugin.class);
56+
return plugins;
57+
}
58+
59+
@Override
60+
protected boolean useOnlyAllEvents() {
61+
// we assume that all indices have been created to simplify the testing logic.
62+
return false;
63+
}
64+
65+
public void testAutomaticCancellation() throws Exception {
66+
Request restRequest = new Request("POST", "/_profiling/stacktraces");
67+
restRequest.setEntity(new StringEntity("""
68+
{
69+
"sample_size": 10000,
70+
"query": {
71+
"bool": {
72+
"filter": [
73+
{
74+
"script": {
75+
"script": {
76+
"lang": "mockscript",
77+
"source": "search_block",
78+
"params": {}
79+
}
80+
}
81+
}
82+
]
83+
}
84+
}
85+
}
86+
""", ContentType.APPLICATION_JSON.withCharset(StandardCharsets.UTF_8)));
87+
verifyCancellation(GetProfilingAction.NAME, restRequest);
88+
}
89+
90+
void verifyCancellation(String action, Request restRequest) throws Exception {
91+
Map<String, String> nodeIdToName = readNodesInfo();
92+
List<ScriptedBlockPlugin> plugins = initBlockFactory();
93+
94+
PlainActionFuture<Response> future = PlainActionFuture.newFuture();
95+
Cancellable cancellable = getRestClient().performRequestAsync(restRequest, wrapAsRestResponseListener(future));
96+
97+
awaitForBlock(plugins);
98+
Collection<TaskId> profilingTasks = collectProfilingRelatedTasks(action);
99+
cancellable.cancel();
100+
ensureTasksAreCancelled(profilingTasks, nodeIdToName::get);
101+
102+
disableBlocks(plugins);
103+
expectThrows(CancellationException.class, future::actionGet);
104+
}
105+
106+
private static Map<String, String> readNodesInfo() {
107+
Map<String, String> nodeIdToName = new HashMap<>();
108+
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get();
109+
assertFalse(nodesInfoResponse.hasFailures());
110+
for (NodeInfo node : nodesInfoResponse.getNodes()) {
111+
nodeIdToName.put(node.getNode().getId(), node.getNode().getName());
112+
}
113+
return nodeIdToName;
114+
}
115+
116+
private static Collection<TaskId> collectProfilingRelatedTasks(String transportAction) {
117+
SetOnce<TaskInfo> profilingTask = new SetOnce<>();
118+
Map<TaskId, Set<TaskId>> taskToParent = new HashMap<>();
119+
ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().get();
120+
for (TaskInfo task : listTasksResponse.getTasks()) {
121+
TaskId parentTaskId = task.parentTaskId();
122+
if (parentTaskId != null) {
123+
if (taskToParent.containsKey(parentTaskId) == false) {
124+
taskToParent.put(parentTaskId, new HashSet<>());
125+
}
126+
taskToParent.get(parentTaskId).add(task.taskId());
127+
}
128+
if (task.action().equals(transportAction)) {
129+
profilingTask.set(task);
130+
}
131+
}
132+
assertNotNull(profilingTask.get());
133+
Set<TaskId> childTaskIds = taskToParent.get(profilingTask.get().taskId());
134+
Set<TaskId> profilingTaskIds = new HashSet<>();
135+
profilingTaskIds.add(profilingTask.get().taskId());
136+
if (childTaskIds != null) {
137+
profilingTaskIds.addAll(childTaskIds);
138+
}
139+
return profilingTaskIds;
140+
}
141+
142+
private static void ensureTasksAreCancelled(Collection<TaskId> taskIds, Function<String, String> nodeIdToName) throws Exception {
143+
assertBusy(() -> {
144+
for (TaskId taskId : taskIds) {
145+
String nodeName = nodeIdToName.apply(taskId.getNodeId());
146+
TaskManager taskManager = internalCluster().getInstance(TransportService.class, nodeName).getTaskManager();
147+
Task task = taskManager.getTask(taskId.getId());
148+
// as we capture the task hierarchy at the beginning but cancel in the middle of execution, some tasks have been
149+
// unregistered already by the time we verify cancellation.
150+
if (task != null) {
151+
assertThat(task, instanceOf(CancellableTask.class));
152+
assertTrue(((CancellableTask) task).isCancelled());
153+
}
154+
}
155+
});
156+
}
157+
158+
private static List<ScriptedBlockPlugin> initBlockFactory() {
159+
List<ScriptedBlockPlugin> plugins = new ArrayList<>();
160+
for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) {
161+
plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class));
162+
}
163+
for (ScriptedBlockPlugin plugin : plugins) {
164+
plugin.reset();
165+
plugin.enableBlock();
166+
// Allow to execute one search and only block starting with the second one. This
167+
// is done so we have at least one child action and can check that all active children
168+
// are cancelled with the parent action.
169+
plugin.setSlack(1);
170+
}
171+
return plugins;
172+
}
173+
174+
private void awaitForBlock(List<ScriptedBlockPlugin> plugins) throws Exception {
175+
assertBusy(() -> {
176+
int numberOfBlockedPlugins = 0;
177+
for (ScriptedBlockPlugin plugin : plugins) {
178+
numberOfBlockedPlugins += plugin.hits.get();
179+
}
180+
logger.info("The plugin blocked on {} shards", numberOfBlockedPlugins);
181+
assertThat(numberOfBlockedPlugins, greaterThan(0));
182+
}, 10, TimeUnit.SECONDS);
183+
}
184+
185+
private static void disableBlocks(List<ScriptedBlockPlugin> plugins) {
186+
for (ScriptedBlockPlugin plugin : plugins) {
187+
plugin.disableBlock();
188+
}
189+
}
190+
191+
public static class ScriptedBlockPlugin extends MockScriptPlugin {
192+
static final String SCRIPT_NAME = "search_block";
193+
194+
private final AtomicInteger hits = new AtomicInteger();
195+
196+
private final AtomicInteger slack = new AtomicInteger(0);
197+
198+
private final AtomicBoolean shouldBlock = new AtomicBoolean(true);
199+
200+
void reset() {
201+
hits.set(0);
202+
}
203+
204+
void disableBlock() {
205+
shouldBlock.set(false);
206+
}
207+
208+
void enableBlock() {
209+
shouldBlock.set(true);
210+
}
211+
212+
void setSlack(int slack) {
213+
this.slack.set(slack);
214+
}
215+
216+
@Override
217+
public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
218+
return Collections.singletonMap(SCRIPT_NAME, params -> {
219+
LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields");
220+
LogManager.getLogger(CancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
221+
hits.incrementAndGet();
222+
if (slack.decrementAndGet() < 0) {
223+
try {
224+
waitUntil(() -> shouldBlock.get() == false);
225+
} catch (Exception e) {
226+
throw new RuntimeException(e);
227+
}
228+
}
229+
return true;
230+
});
231+
}
232+
}
233+
}

0 commit comments

Comments
 (0)