Skip to content

Commit 0ee8fed

Browse files
authored
Log leader and handshake failures by default (#42342)
Today the `LeaderChecker` and `HandshakingTransportAddressConnector` do not log anything above `DEBUG` level. However there are some situations where it is appropriate for them to log at a higher level: - if the low-level handshake succeeds but the high-level one fails then this indicates a config error that the user should resolve, and the exception will help them to do so. - if leader checks fail repeatedly then we restart discovery, and the exception will help to determine what went wrong. Resolves #42153
1 parent a21ff86 commit 0ee8fed

File tree

4 files changed

+55
-30
lines changed

4 files changed

+55
-30
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
164164
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
165165
this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
166166
this::handlePublishRequest, this::handleApplyCommit);
167-
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
167+
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure);
168168
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
169169
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
170170
this.clusterApplier = clusterApplier;
@@ -183,20 +183,14 @@ private ClusterFormationState getClusterFormationState() {
183183
StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList()), getCurrentTerm());
184184
}
185185

186-
private Runnable getOnLeaderFailure() {
187-
return new Runnable() {
188-
@Override
189-
public void run() {
190-
synchronized (mutex) {
191-
becomeCandidate("onLeaderFailure");
192-
}
193-
}
194-
195-
@Override
196-
public String toString() {
197-
return "notification of leader failure";
186+
private void onLeaderFailure(Exception e) {
187+
synchronized (mutex) {
188+
if (mode != Mode.CANDIDATE) {
189+
assert lastKnownLeader.isPresent();
190+
logger.info(new ParameterizedMessage("master node [{}] failed, restarting discovery", lastKnownLeader.get()), e);
198191
}
199-
};
192+
becomeCandidate("onLeaderFailure");
193+
}
200194
}
201195

202196
private void removeNode(DiscoveryNode discoveryNode, String reason) {

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
25+
import org.elasticsearch.ElasticsearchException;
2526
import org.elasticsearch.cluster.node.DiscoveryNode;
2627
import org.elasticsearch.cluster.node.DiscoveryNodes;
2728
import org.elasticsearch.common.Nullable;
@@ -33,6 +34,7 @@
3334
import org.elasticsearch.common.unit.TimeValue;
3435
import org.elasticsearch.threadpool.ThreadPool.Names;
3536
import org.elasticsearch.transport.ConnectTransportException;
37+
import org.elasticsearch.transport.NodeDisconnectedException;
3638
import org.elasticsearch.transport.TransportConnectionListener;
3739
import org.elasticsearch.transport.TransportException;
3840
import org.elasticsearch.transport.TransportRequest;
@@ -48,6 +50,7 @@
4850
import java.util.concurrent.atomic.AtomicBoolean;
4951
import java.util.concurrent.atomic.AtomicLong;
5052
import java.util.concurrent.atomic.AtomicReference;
53+
import java.util.function.Consumer;
5154

5255
/**
5356
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
@@ -75,20 +78,17 @@ public class LeaderChecker {
7578
public static final Setting<Integer> LEADER_CHECK_RETRY_COUNT_SETTING =
7679
Setting.intSetting("cluster.fault_detection.leader_check.retry_count", 3, 1, Setting.Property.NodeScope);
7780

78-
private final Settings settings;
79-
8081
private final TimeValue leaderCheckInterval;
8182
private final TimeValue leaderCheckTimeout;
8283
private final int leaderCheckRetryCount;
8384
private final TransportService transportService;
84-
private final Runnable onLeaderFailure;
85+
private final Consumer<Exception> onLeaderFailure;
8586

8687
private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();
8788

8889
private volatile DiscoveryNodes discoveryNodes;
8990

90-
public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
91-
this.settings = settings;
91+
public LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure) {
9292
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
9393
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
9494
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings);
@@ -234,16 +234,19 @@ public void handleException(TransportException exp) {
234234
}
235235

236236
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
237-
logger.debug(new ParameterizedMessage("leader [{}] disconnected, failing immediately", leader), exp);
238-
leaderFailed();
237+
logger.debug(new ParameterizedMessage(
238+
"leader [{}] disconnected during check", leader), exp);
239+
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
239240
return;
240241
}
241242

242243
long failureCount = failureCountSinceLastSuccess.incrementAndGet();
243244
if (failureCount >= leaderCheckRetryCount) {
244-
logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader [{}] has failed",
245-
failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp);
246-
leaderFailed();
245+
logger.debug(new ParameterizedMessage(
246+
"leader [{}] has failed {} consecutive checks (limit [{}] is {}); last failure was:",
247+
leader, failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp);
248+
leaderFailed(new ElasticsearchException(
249+
"node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp));
247250
return;
248251
}
249252

@@ -259,17 +262,28 @@ public String executor() {
259262
});
260263
}
261264

262-
void leaderFailed() {
265+
void leaderFailed(Exception e) {
263266
if (isClosed.compareAndSet(false, true)) {
264-
transportService.getThreadPool().generic().execute(onLeaderFailure);
267+
transportService.getThreadPool().generic().execute(new Runnable() {
268+
@Override
269+
public void run() {
270+
onLeaderFailure.accept(e);
271+
}
272+
273+
@Override
274+
public String toString() {
275+
return "notification of leader failure: " + e.getMessage();
276+
}
277+
});
265278
} else {
266279
logger.trace("already closed, not failing leader");
267280
}
268281
}
269282

270283
void handleDisconnectedNode(DiscoveryNode discoveryNode) {
271284
if (discoveryNode.equals(leader)) {
272-
leaderFailed();
285+
logger.debug("leader [{}] disconnected", leader);
286+
leaderFailed(new NodeDisconnectedException(discoveryNode, "disconnected"));
273287
}
274288
}
275289

server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
24+
import org.apache.logging.log4j.message.ParameterizedMessage;
2425
import org.elasticsearch.Version;
2526
import org.elasticsearch.action.ActionListener;
2627
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -89,6 +90,13 @@ protected void doRun() throws Exception {
8990
remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis());
9091
// success means (amongst other things) that the cluster names match
9192
logger.trace("[{}] handshake successful: {}", this, remoteNode);
93+
} catch (Exception e) {
94+
// we opened a connection and successfully performed a low-level handshake, so we were definitely talking to an
95+
// Elasticsearch node, but the high-level handshake failed indicating some kind of mismatched configurations
96+
// (e.g. cluster name) that the user should address
97+
logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e);
98+
listener.onFailure(e);
99+
return;
92100
} finally {
93101
IOUtils.closeWhileHandlingException(connection);
94102
}

server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,12 @@
5252
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
5353
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
5454
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
55+
import static org.hamcrest.Matchers.anyOf;
56+
import static org.hamcrest.Matchers.endsWith;
5557
import static org.hamcrest.Matchers.equalTo;
5658
import static org.hamcrest.Matchers.instanceOf;
5759
import static org.hamcrest.Matchers.lessThanOrEqualTo;
60+
import static org.hamcrest.Matchers.matchesRegex;
5861
import static org.hamcrest.Matchers.nullValue;
5962

6063
public class LeaderCheckerTests extends ESTestCase {
@@ -146,7 +149,10 @@ public String toString() {
146149
final AtomicBoolean leaderFailed = new AtomicBoolean();
147150

148151
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService,
149-
() -> assertTrue(leaderFailed.compareAndSet(false, true)));
152+
e -> {
153+
assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks"));
154+
assertTrue(leaderFailed.compareAndSet(false, true));
155+
});
150156

151157
logger.info("--> creating first checker");
152158
leaderChecker.updateLeader(leader1);
@@ -247,7 +253,10 @@ public String toString() {
247253

248254
final AtomicBoolean leaderFailed = new AtomicBoolean();
249255
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService,
250-
() -> assertTrue(leaderFailed.compareAndSet(false, true)));
256+
e -> {
257+
assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check")));
258+
assertTrue(leaderFailed.compareAndSet(false, true));
259+
});
251260

252261
leaderChecker.updateLeader(leader);
253262
{
@@ -316,7 +325,7 @@ public void testLeaderBehaviour() {
316325
transportService.start();
317326
transportService.acceptIncomingRequests();
318327

319-
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, () -> fail("shouldn't be checking anything"));
328+
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> fail("shouldn't be checking anything"));
320329

321330
final DiscoveryNodes discoveryNodes
322331
= DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build();

0 commit comments

Comments
 (0)