Skip to content

Commit 68ae792

Browse files
authored
Log at DEBUG only on disconnect during cancellation (#74042)
If a `NodeDisconnectedException` happens when sending a ban for a task then today we log a message at `INFO` or `WARN` indicating that the ban failed, but we don't indicate why. The message also uses a default `toString()` for an inner class which is unhelpful. Ban failures during disconnections are benign and somewhat expected, and task cancellation respects disconnections anyway (#65443). There's not much the user can do about these messages either, and they can be confusing and draw attention away from the real problem. With this commit we log the failure messages at `DEBUG` on disconnections, and include the exception details. We also include the exception message for other kinds of failures, and we fix up a few cases where a useless default `toString()` implementation was used in log messages. Slightly relates #72968 in that these messages tend to obscure a connectivity issue.
1 parent ee98e85 commit 68ae792

File tree

5 files changed

+278
-4
lines changed

5 files changed

+278
-4
lines changed

server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.message.ParameterizedMessage;
1314
import org.elasticsearch.ElasticsearchSecurityException;
1415
import org.elasticsearch.ExceptionsHelper;
1516
import org.elasticsearch.Version;
@@ -21,6 +22,8 @@
2122
import org.elasticsearch.common.io.stream.StreamOutput;
2223
import org.elasticsearch.threadpool.ThreadPool;
2324
import org.elasticsearch.transport.EmptyTransportResponseHandler;
25+
import org.elasticsearch.transport.NodeDisconnectedException;
26+
import org.elasticsearch.transport.NodeNotConnectedException;
2427
import org.elasticsearch.transport.Transport;
2528
import org.elasticsearch.transport.TransportChannel;
2629
import org.elasticsearch.transport.TransportException;
@@ -145,8 +148,30 @@ public void handleResponse(TransportResponse.Empty response) {
145148

146149
@Override
147150
public void handleException(TransportException exp) {
148-
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
149-
logger.warn("Cannot send ban for tasks with the parent [{}] for connection [{}]", taskId, connection);
151+
final Throwable cause = ExceptionsHelper.unwrapCause(exp);
152+
assert cause instanceof ElasticsearchSecurityException == false;
153+
if (isUnimportantBanFailure(cause)) {
154+
logger.debug(
155+
new ParameterizedMessage(
156+
"cannot send ban for tasks with the parent [{}] on connection [{}]",
157+
taskId,
158+
connection),
159+
exp);
160+
} else if (logger.isDebugEnabled()) {
161+
logger.warn(
162+
new ParameterizedMessage(
163+
"cannot send ban for tasks with the parent [{}] on connection [{}]",
164+
taskId,
165+
connection),
166+
exp);
167+
} else {
168+
logger.warn(
169+
"cannot send ban for tasks with the parent [{}] on connection [{}]: {}",
170+
taskId,
171+
connection,
172+
exp.getMessage());
173+
}
174+
150175
groupedListener.onFailure(exp);
151176
}
152177
});
@@ -163,13 +188,38 @@ private void removeBanOnChildConnections(CancellableTask task, Collection<Transp
163188
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
164189
@Override
165190
public void handleException(TransportException exp) {
166-
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
167-
logger.info("failed to remove the parent ban for task {} for connection {}", request.parentTaskId, connection);
191+
final Throwable cause = ExceptionsHelper.unwrapCause(exp);
192+
assert cause instanceof ElasticsearchSecurityException == false;
193+
if (isUnimportantBanFailure(cause)) {
194+
logger.debug(
195+
new ParameterizedMessage(
196+
"failed to remove ban for tasks with the parent [{}] on connection [{}]",
197+
request.parentTaskId,
198+
connection),
199+
exp);
200+
} else if (logger.isDebugEnabled()) {
201+
logger.warn(
202+
new ParameterizedMessage(
203+
"failed to remove ban for tasks with the parent [{}] on connection [{}]",
204+
request.parentTaskId,
205+
connection),
206+
exp);
207+
} else {
208+
logger.warn(
209+
"failed to remove ban for tasks with the parent [{}] on connection [{}]: {}",
210+
request.parentTaskId,
211+
connection,
212+
exp.getMessage());
213+
}
168214
}
169215
});
170216
}
171217
}
172218

219+
private static boolean isUnimportantBanFailure(Throwable cause) {
220+
return cause instanceof NodeDisconnectedException || cause instanceof NodeNotConnectedException;
221+
}
222+
173223
private static class BanParentTaskRequest extends TransportRequest {
174224

175225
private final TaskId parentTaskId;

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,15 @@ public void sendRequest(long requestId, String action, TransportRequest request,
244244
TcpChannel channel = channel(options.type());
245245
outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), compress, false);
246246
}
247+
248+
@Override
249+
public String toString() {
250+
final StringBuilder stringBuilder = new StringBuilder();
251+
stringBuilder.append("NodeChannels[");
252+
node.appendDescriptionWithoutAttributes(stringBuilder);
253+
stringBuilder.append("]");
254+
return stringBuilder.toString();
255+
}
247256
}
248257

249258
// This allows transport implementations to potentially override specific connection profiles. This

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ public boolean isClosed() {
126126
@Override
127127
public void close() {
128128
}
129+
130+
@Override
131+
public String toString() {
132+
return "local node connection";
133+
}
129134
};
130135

131136
/**
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.tasks;
10+
11+
import org.apache.logging.log4j.Level;
12+
import org.apache.logging.log4j.LogManager;
13+
import org.elasticsearch.Version;
14+
import org.elasticsearch.action.admin.cluster.node.tasks.TaskManagerTestCase;
15+
import org.elasticsearch.action.support.PlainActionFuture;
16+
import org.elasticsearch.cluster.node.DiscoveryNode;
17+
import org.elasticsearch.common.io.stream.StreamInput;
18+
import org.elasticsearch.common.logging.Loggers;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.core.TimeValue;
21+
import org.elasticsearch.core.internal.io.IOUtils;
22+
import org.elasticsearch.test.MockLogAppender;
23+
import org.elasticsearch.test.junit.annotations.TestLogging;
24+
import org.elasticsearch.test.transport.MockTransportService;
25+
import org.elasticsearch.test.transport.StubbableTransport;
26+
import org.elasticsearch.threadpool.ThreadPool;
27+
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
28+
import org.elasticsearch.transport.NodeDisconnectedException;
29+
import org.elasticsearch.transport.TransportException;
30+
import org.elasticsearch.transport.TransportRequest;
31+
import org.elasticsearch.transport.TransportRequestOptions;
32+
import org.elasticsearch.transport.TransportResponse;
33+
import org.elasticsearch.transport.TransportResponseHandler;
34+
35+
import java.io.Closeable;
36+
import java.util.ArrayList;
37+
import java.util.Collections;
38+
import java.util.Map;
39+
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.function.Function;
41+
42+
import static org.hamcrest.Matchers.anyOf;
43+
import static org.hamcrest.Matchers.instanceOf;
44+
45+
public class BanFailureLoggingTests extends TaskManagerTestCase {
46+
47+
@TestLogging(reason = "testing logging at DEBUG", value = "org.elasticsearch.tasks.TaskCancellationService:DEBUG")
48+
public void testLogsAtDebugOnDisconnectionDuringBan() throws Exception {
49+
runTest(
50+
(connection, requestId, action, request, options) -> {
51+
if (action.equals(TaskCancellationService.BAN_PARENT_ACTION_NAME)) {
52+
connection.close();
53+
}
54+
connection.sendRequest(requestId, action, request, options);
55+
},
56+
childNode -> new MockLogAppender.SeenEventExpectation(
57+
"cannot send message",
58+
TaskCancellationService.class.getName(),
59+
Level.DEBUG,
60+
"*cannot send ban for tasks*" + childNode.getId() + "*"));
61+
}
62+
63+
@TestLogging(reason = "testing logging at DEBUG", value = "org.elasticsearch.tasks.TaskCancellationService:DEBUG")
64+
public void testLogsAtDebugOnDisconnectionDuringBanRemoval() throws Exception {
65+
final AtomicInteger banCount = new AtomicInteger();
66+
runTest(
67+
(connection, requestId, action, request, options) -> {
68+
if (action.equals(TaskCancellationService.BAN_PARENT_ACTION_NAME) && banCount.incrementAndGet() >= 2) {
69+
connection.close();
70+
}
71+
connection.sendRequest(requestId, action, request, options);
72+
},
73+
childNode -> new MockLogAppender.SeenEventExpectation(
74+
"cannot send message",
75+
TaskCancellationService.class.getName(),
76+
Level.DEBUG,
77+
"*failed to remove ban for tasks*" + childNode.getId() + "*"));
78+
}
79+
80+
private void runTest(
81+
StubbableTransport.SendRequestBehavior sendRequestBehavior,
82+
Function<DiscoveryNode, MockLogAppender.SeenEventExpectation> expectation) throws Exception {
83+
84+
final ArrayList<Closeable> resources = new ArrayList<>(3);
85+
86+
try {
87+
88+
final MockTransportService parentTransportService = MockTransportService.createNewService(
89+
Settings.EMPTY,
90+
Version.CURRENT,
91+
threadPool);
92+
resources.add(parentTransportService);
93+
parentTransportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(parentTransportService));
94+
parentTransportService.start();
95+
parentTransportService.acceptIncomingRequests();
96+
97+
final MockTransportService childTransportService = MockTransportService.createNewService(
98+
Settings.EMPTY,
99+
Version.CURRENT,
100+
threadPool);
101+
resources.add(childTransportService);
102+
childTransportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(childTransportService));
103+
childTransportService.registerRequestHandler(
104+
"internal:testAction[c]",
105+
ThreadPool.Names.MANAGEMENT, // busy-wait for cancellation but not on a transport thread
106+
(StreamInput in) -> new TransportRequest.Empty(in) {
107+
@Override
108+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
109+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
110+
}
111+
},
112+
(request, channel, task) -> {
113+
final CancellableTask cancellableTask = (CancellableTask) task;
114+
assertBusy(() -> assertTrue(cancellableTask.isCancelled()));
115+
channel.sendResponse(new TaskCancelledException("task cancelled"));
116+
});
117+
118+
childTransportService.start();
119+
childTransportService.acceptIncomingRequests();
120+
121+
parentTransportService.addSendBehavior(sendRequestBehavior);
122+
123+
AbstractSimpleTransportTestCase.connectToNode(parentTransportService, childTransportService.getLocalDiscoNode());
124+
125+
final CancellableTask parentTask = (CancellableTask) parentTransportService.getTaskManager().register(
126+
"transport",
127+
"internal:testAction",
128+
new ParentRequest());
129+
130+
parentTransportService.sendChildRequest(
131+
childTransportService.getLocalDiscoNode(),
132+
"internal:testAction[c]",
133+
TransportRequest.Empty.INSTANCE,
134+
parentTask,
135+
TransportRequestOptions.EMPTY,
136+
new ChildResponseHandler(() -> parentTransportService.getTaskManager().unregister(parentTask)));
137+
138+
MockLogAppender appender = new MockLogAppender();
139+
appender.start();
140+
resources.add(appender::stop);
141+
Loggers.addAppender(LogManager.getLogger(TaskCancellationService.class), appender);
142+
resources.add(() -> Loggers.removeAppender(LogManager.getLogger(TaskCancellationService.class), appender));
143+
144+
appender.addExpectation(expectation.apply(childTransportService.getLocalDiscoNode()));
145+
146+
final PlainActionFuture<Void> cancellationFuture = new PlainActionFuture<>();
147+
parentTransportService.getTaskManager().cancelTaskAndDescendants(parentTask, "test", true, cancellationFuture);
148+
try {
149+
cancellationFuture.actionGet(TimeValue.timeValueSeconds(5));
150+
} catch (NodeDisconnectedException e) {
151+
// acceptable; we mostly ignore the result of cancellation anyway
152+
}
153+
154+
// assert busy since failure to remove a ban may be logged after cancellation completed
155+
assertBusy(appender::assertAllExpectationsMatched);
156+
} finally {
157+
Collections.reverse(resources);
158+
IOUtils.close(resources);
159+
}
160+
}
161+
162+
private static class ParentRequest implements TaskAwareRequest {
163+
@Override
164+
public void setParentTask(TaskId taskId) {
165+
fail("setParentTask should not be called");
166+
}
167+
168+
@Override
169+
public TaskId getParentTask() {
170+
return TaskId.EMPTY_TASK_ID;
171+
}
172+
173+
@Override
174+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
175+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
176+
}
177+
}
178+
179+
private static class ChildResponseHandler implements TransportResponseHandler<TransportResponse.Empty> {
180+
private final Runnable onException;
181+
182+
ChildResponseHandler(Runnable onException) {
183+
this.onException = onException;
184+
}
185+
186+
@Override
187+
public void handleResponse(TransportResponse.Empty response) {
188+
fail("should not get successful response");
189+
}
190+
191+
@Override
192+
public void handleException(TransportException exp) {
193+
assertThat(exp.unwrapCause(), anyOf(
194+
instanceOf(TaskCancelledException.class),
195+
instanceOf(NodeDisconnectedException.class)));
196+
onException.run();
197+
}
198+
199+
@Override
200+
public TransportResponse.Empty read(StreamInput in) {
201+
return TransportResponse.Empty.INSTANCE;
202+
}
203+
}
204+
205+
}

test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ public void close() {
253253
public Transport.Connection getConnection() {
254254
return connection;
255255
}
256+
257+
@Override
258+
public String toString() {
259+
return "WrappedConnection[" + connection + "]";
260+
}
256261
}
257262

258263
@FunctionalInterface

0 commit comments

Comments
 (0)