From 857bf7026d3916537b4712be379126da272267b2 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 21 Jan 2019 19:40:27 +0100 Subject: [PATCH] Use cancel instead of timeout for aborting publications --- .../cluster/coordination/Coordinator.java | 10 +++--- .../cluster/coordination/Publication.java | 32 +++++++++---------- .../coordination/PublicationTests.java | 4 +-- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 72fe2e081de74..210d11f8300e8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -455,7 +455,7 @@ void becomeCandidate(String method) { if (mode != Mode.CANDIDATE) { mode = Mode.CANDIDATE; - cancelActivePublication(); + cancelActivePublication("become candidate: " + method); joinAccumulator.close(mode); joinAccumulator = joinHelper.new CandidateJoinAccumulator(); @@ -527,7 +527,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) { discoveryUpgradeService.deactivate(); clusterFormationFailureHelper.stop(); closePrevotingAndElectionScheduler(); - cancelActivePublication(); + cancelActivePublication("become follower: " + method); preVoteCollector.update(getPreVoteResponse(), leaderNode); if (restartLeaderChecker) { @@ -935,7 +935,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) @Override public void run() { synchronized (mutex) { - publication.onTimeout(); + publication.cancel("timed out after " + publishTimeout); } } @@ -991,10 +991,10 @@ public void onFailure(Exception e) { }; } - private void cancelActivePublication() { + private void cancelActivePublication(String reason) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; if (currentPublication.isPresent()) { - currentPublication.get().onTimeout(); + currentPublication.get().cancel(reason); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 9ec8d562b81ba..6838c2f996ffc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -49,7 +49,7 @@ public abstract class Publication { private Optional applyCommitRequest; // set when state is committed private boolean isCompleted; // set when publication is completed - private boolean timedOut; // set when publication timed out + private boolean cancelled; // set when publication is cancelled public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) { this.publishRequest = publishRequest; @@ -71,17 +71,17 @@ public void start(Set faultyNodes) { publicationTargets.forEach(PublicationTarget::sendPublishRequest); } - public void onTimeout() { + public void cancel(String reason) { if (isCompleted) { return; } - assert timedOut == false; - timedOut = true; + assert cancelled == false; + cancelled = true; if (applyCommitRequest.isPresent() == false) { - logger.debug("onTimeout: [{}] timed out before committing", this); + logger.debug("cancel: [{}] cancelled before committing (reason: {})", this, reason); // fail all current publications - final Exception e = new ElasticsearchException("publication timed out before committing"); + final Exception e = new ElasticsearchException("publication cancelled before committing: " + reason); publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); } onPossibleCompletion(); @@ -101,7 +101,7 @@ private void onPossibleCompletion() { return; } - if (timedOut == false) { + if (cancelled == false) { for (final PublicationTarget target : publicationTargets) { if (target.isActive()) { return; @@ -125,8 +125,8 @@ private void onPossibleCompletion() { } // For assertions only: verify that this invariant holds - private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() { - if (timedOut == false) { + private boolean publicationCompletedIffAllTargetsInactiveOrCancelled() { + if (cancelled == false) { for (final PublicationTarget target : publicationTargets) { if (target.isActive()) { return isCompleted == false; @@ -222,7 +222,7 @@ void sendPublishRequest() { state = PublicationTargetState.SENT_PUBLISH_REQUEST; Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler()); // TODO Can this ^ fail with an exception? Target should be failed if so. - assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); + assert publicationCompletedIffAllTargetsInactiveOrCancelled(); } void handlePublishResponse(PublishResponse publishResponse) { @@ -245,7 +245,7 @@ void sendApplyCommit() { state = PublicationTargetState.SENT_APPLY_COMMIT; assert applyCommitRequest.isPresent(); Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler()); - assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); + assert publicationCompletedIffAllTargetsInactiveOrCancelled(); } void setAppliedCommit() { @@ -300,7 +300,7 @@ private class PublishResponseHandler implements ActionListener { if (e.getKey().equals(n2)) { if (timeOut) { - publication.onTimeout(); + publication.cancel("timed out"); } else { e.getValue().onFailure(new TransportException(new Exception("dummy failure"))); } @@ -407,7 +407,7 @@ public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedEx } }); - publication.onTimeout(); + publication.cancel("timed out"); assertTrue(publication.completed); assertTrue(publication.committed); assertEquals(committingNodes, ackListener.await(0L, TimeUnit.SECONDS));