diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java index 67d2103ce672d..aaae94d0297e5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java @@ -54,14 +54,16 @@ public class ClusterFormationFailureHelper { private final Supplier clusterFormationStateSupplier; private final ThreadPool threadPool; private final TimeValue clusterFormationWarningTimeout; + private final Runnable logLastFailedJoinAttempt; @Nullable // if no warning is scheduled private volatile WarningScheduler warningScheduler; public ClusterFormationFailureHelper(Settings settings, Supplier clusterFormationStateSupplier, - ThreadPool threadPool) { + ThreadPool threadPool, Runnable logLastFailedJoinAttempt) { this.clusterFormationStateSupplier = clusterFormationStateSupplier; this.threadPool = threadPool; this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings); + this.logLastFailedJoinAttempt = logLastFailedJoinAttempt; } public boolean isRunning() { @@ -94,6 +96,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { if (isActive()) { + logLastFailedJoinAttempt.run(); logger.warn(clusterFormationStateSupplier.get().getDescription()); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index f52726403a39c..71cd2fbb121e6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -169,7 +169,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, - transportService.getThreadPool()); + transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt); } private ClusterFormationState getClusterFormationState() { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index fab22a6ccb16d..0d4dbb2e688fc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -25,6 +26,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -58,6 +60,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -83,6 +86,8 @@ public class JoinHelper { private final Set> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>()); + private AtomicReference lastFailedJoinAttempt = new AtomicReference<>(); + JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, BiConsumer joinHandler, Function joinLeaderInTerm, @@ -172,7 +177,55 @@ boolean isJoinPending() { return pendingOutgoingJoins.isEmpty() == false; } - void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { + // package-private for testing + static class FailedJoinAttempt { + private final DiscoveryNode destination; + private final JoinRequest joinRequest; + private final TransportException exception; + private final long timestamp; + + FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, TransportException exception) { + this.destination = destination; + this.joinRequest = joinRequest; + this.exception = exception; + this.timestamp = System.nanoTime(); + } + + void logNow() { + logger.log(getLogLevel(exception), + () -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), + exception); + } + + static Level getLogLevel(TransportException e) { + Throwable cause = e.unwrapCause(); + if (cause instanceof CoordinationStateRejectedException || + cause instanceof FailedToCommitClusterStateException || + cause instanceof NotMasterException) { + return Level.DEBUG; + } + return Level.INFO; + } + + void logWarnWithTimestamp() { + logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ms ago, failed to join {} with {}", + TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)), + destination, + joinRequest), + exception); + } + } + + + void logLastFailedJoinAttempt() { + FailedJoinAttempt attempt = lastFailedJoinAttempt.get(); + if (attempt != null) { + attempt.logWarnWithTimestamp(); + lastFailedJoinAttempt.compareAndSet(attempt, null); + } + } + + public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { assert destination.isMasterNode() : "trying to join master-ineligible " + destination; final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin); final Tuple dedupKey = Tuple.tuple(destination, joinRequest); @@ -190,12 +243,15 @@ public Empty read(StreamInput in) { public void handleResponse(Empty response) { pendingOutgoingJoins.remove(dedupKey); logger.debug("successfully joined {} with {}", destination, joinRequest); + lastFailedJoinAttempt.set(null); } @Override public void handleException(TransportException exp) { pendingOutgoingJoins.remove(dedupKey); - logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); + FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); + attempt.logNow(); + lastFailedJoinAttempt.set(attempt); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java index 6e90aed5f74bf..8b08c9c3fc01e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java @@ -65,13 +65,14 @@ public void testScheduling() { = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); final AtomicLong warningCount = new AtomicLong(); + final AtomicLong logLastFailedJoinAttemptWarningCount = new AtomicLong(); final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(), () -> { warningCount.incrementAndGet(); return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L); }, - deterministicTaskQueue.getThreadPool()); + deterministicTaskQueue.getThreadPool(), () -> logLastFailedJoinAttemptWarningCount.incrementAndGet()); deterministicTaskQueue.runAllTasks(); assertThat("should not schedule anything yet", warningCount.get(), is(0L)); @@ -105,8 +106,10 @@ public void testScheduling() { deterministicTaskQueue.runAllTasksInTimeOrder(); assertThat(warningCount.get(), is(5L)); + assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L)); warningCount.set(0); + logLastFailedJoinAttemptWarningCount.set(0); clusterFormationFailureHelper.start(); clusterFormationFailureHelper.stop(); clusterFormationFailureHelper.start(); @@ -127,6 +130,7 @@ public void testScheduling() { deterministicTaskQueue.runAllTasksInTimeOrder(); assertThat(warningCount.get(), is(5L)); + assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L)); } public void testDescriptionOnMasterIneligibleNodes() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 97777d16b4df3..877d2a5a487d7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -18,12 +18,16 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.elasticsearch.Version; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -32,6 +36,7 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.Is.is; public class JoinHelperTests extends ESTestCase { @@ -107,4 +112,23 @@ public void testJoinDeduplication() { capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy")); assertFalse(joinHelper.isJoinPending()); } + + public void testFailedJoinAttemptLogLevel() { + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(new TransportException("generic transport exception")), is(Level.INFO)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("remote transport exception with generic cause", new Exception())), is(Level.INFO)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("caused by CoordinationStateRejectedException", + new CoordinationStateRejectedException("test"))), is(Level.DEBUG)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("caused by FailedToCommitClusterStateException", + new FailedToCommitClusterStateException("test"))), is(Level.DEBUG)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("caused by NotMasterException", + new NotMasterException("test"))), is(Level.DEBUG)); + } }