Skip to content

Commit a99fc23

Browse files
authored
Handle serialization exceptions during publication (#41781)
Today if an exception is thrown when serializing a cluster state during publication then the master enters a poisoned state where it cannot publish any more cluster states, but nor does it stand down as master, yielding repeated exceptions of the following form: ``` failed to commit cluster state version [12345] org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException: publishing failed at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1045) ~[elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.MasterService.publish(MasterService.java:252) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:238) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:681) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215) [elasticsearch-7.0.0.jar:7.0.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] Caused by: org.elasticsearch.cluster.coordination.CoordinationStateRejectedException: cannot start publishing next value before accepting previous one at org.elasticsearch.cluster.coordination.CoordinationState.handleClientValue(CoordinationState.java:280) ~[elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1030) ~[elasticsearch-7.0.0.jar:7.0.0] ... 11 more ``` This is because it already created the publication request using `CoordinationState#handleClientValue()` but then it fails before accepting it. This commit addresses this by performing the serialization before calling `handleClientValue()`. Relates #41090, which was the source of such a serialization exception.
1 parent 502b395 commit a99fc23

File tree

2 files changed

+68
-2
lines changed

2 files changed

+68
-2
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,6 @@ public void onFailure(Exception e) {
478478
});
479479
}
480480

481-
482481
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
483482
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
484483
synchronized (mutex) {
@@ -997,9 +996,10 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void
997996
assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())) :
998997
getLocalNode() + " should be in published " + clusterState;
999998

1000-
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
1001999
final PublicationTransportHandler.PublicationContext publicationContext =
10021000
publicationHandler.newPublicationContext(clusterChangedEvent);
1001+
1002+
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
10031003
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
10041004
new ListenableFuture<>(), ackListener, publishListener);
10051005
currentPublication = Optional.of(publication);

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.logging.log4j.message.ParameterizedMessage;
2727
import org.elasticsearch.ElasticsearchException;
2828
import org.elasticsearch.Version;
29+
import org.elasticsearch.cluster.AbstractDiffable;
2930
import org.elasticsearch.cluster.ClusterModule;
3031
import org.elasticsearch.cluster.ClusterState;
3132
import org.elasticsearch.cluster.ClusterStateTaskListener;
@@ -54,6 +55,7 @@
5455
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
5556
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
5657
import org.elasticsearch.common.io.stream.StreamInput;
58+
import org.elasticsearch.common.io.stream.StreamOutput;
5759
import org.elasticsearch.common.logging.Loggers;
5860
import org.elasticsearch.common.settings.ClusterSettings;
5961
import org.elasticsearch.common.settings.Setting;
@@ -63,6 +65,7 @@
6365
import org.elasticsearch.common.unit.TimeValue;
6466
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
6567
import org.elasticsearch.common.util.set.Sets;
68+
import org.elasticsearch.common.xcontent.XContentBuilder;
6669
import org.elasticsearch.discovery.DiscoveryModule;
6770
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
6871
import org.elasticsearch.env.NodeEnvironment;
@@ -93,6 +96,7 @@
9396
import java.util.Optional;
9497
import java.util.Set;
9598
import java.util.concurrent.Callable;
99+
import java.util.concurrent.atomic.AtomicBoolean;
96100
import java.util.concurrent.atomic.AtomicInteger;
97101
import java.util.function.BiConsumer;
98102
import java.util.function.Consumer;
@@ -135,6 +139,7 @@
135139
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
136140
import static org.hamcrest.Matchers.hasItem;
137141
import static org.hamcrest.Matchers.hasSize;
142+
import static org.hamcrest.Matchers.instanceOf;
138143
import static org.hamcrest.Matchers.is;
139144
import static org.hamcrest.Matchers.lessThanOrEqualTo;
140145
import static org.hamcrest.Matchers.not;
@@ -1149,6 +1154,67 @@ public void testSingleNodeDiscoveryWithQuorum() {
11491154
cluster.stabilise();
11501155
}
11511156

1157+
private static class BrokenCustom extends AbstractDiffable<ClusterState.Custom> implements ClusterState.Custom {
1158+
1159+
static final String EXCEPTION_MESSAGE = "simulated";
1160+
1161+
@Override
1162+
public String getWriteableName() {
1163+
return "broken";
1164+
}
1165+
1166+
@Override
1167+
public Version getMinimalSupportedVersion() {
1168+
return Version.V_EMPTY;
1169+
}
1170+
1171+
@Override
1172+
public void writeTo(StreamOutput out) throws IOException {
1173+
throw new ElasticsearchException(EXCEPTION_MESSAGE);
1174+
}
1175+
1176+
@Override
1177+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
1178+
return builder;
1179+
}
1180+
}
1181+
1182+
public void testClusterRecoversAfterExceptionDuringSerialization() {
1183+
final Cluster cluster = new Cluster(randomIntBetween(2, 5)); // 1-node cluster doesn't do any serialization
1184+
cluster.runRandomly();
1185+
cluster.stabilise();
1186+
1187+
final ClusterNode leader1 = cluster.getAnyLeader();
1188+
1189+
logger.info("--> submitting broken task to [{}]", leader1);
1190+
1191+
final AtomicBoolean failed = new AtomicBoolean();
1192+
leader1.submitUpdateTask("broken-task",
1193+
cs -> ClusterState.builder(cs).putCustom("broken", new BrokenCustom()).build(),
1194+
(source, e) -> {
1195+
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
1196+
assertThat(e.getCause().getMessage(), equalTo(BrokenCustom.EXCEPTION_MESSAGE));
1197+
failed.set(true);
1198+
});
1199+
cluster.runFor(DEFAULT_DELAY_VARIABILITY + 1, "processing broken task");
1200+
assertTrue(failed.get());
1201+
1202+
cluster.stabilise();
1203+
1204+
final ClusterNode leader2 = cluster.getAnyLeader();
1205+
long finalValue = randomLong();
1206+
1207+
logger.info("--> submitting value [{}] to [{}]", finalValue, leader2);
1208+
leader2.submitValue(finalValue);
1209+
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
1210+
1211+
for (final ClusterNode clusterNode : cluster.clusterNodes) {
1212+
final String nodeId = clusterNode.getId();
1213+
final ClusterState appliedState = clusterNode.getLastAppliedClusterState();
1214+
assertThat(nodeId + " has the applied value", value(appliedState), is(finalValue));
1215+
}
1216+
}
1217+
11521218
private static long defaultMillis(Setting<TimeValue> setting) {
11531219
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
11541220
}

0 commit comments

Comments
 (0)