Skip to content

Commit 935f70c

Browse files
committed
Handle serialization exceptions during publication (#41781)
Today if an exception is thrown when serializing a cluster state during publication then the master enters a poisoned state where it cannot publish any more cluster states, but nor does it stand down as master, yielding repeated exceptions of the following form: ``` failed to commit cluster state version [12345] org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException: publishing failed at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1045) ~[elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.MasterService.publish(MasterService.java:252) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:238) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:681) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215) [elasticsearch-7.0.0.jar:7.0.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] Caused by: org.elasticsearch.cluster.coordination.CoordinationStateRejectedException: cannot start publishing next value before accepting previous one at org.elasticsearch.cluster.coordination.CoordinationState.handleClientValue(CoordinationState.java:280) ~[elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1030) ~[elasticsearch-7.0.0.jar:7.0.0] ... 11 more ``` This is because it already created the publication request using `CoordinationState#handleClientValue()` but then it fails before accepting it. This commit addresses this by performing the serialization before calling `handleClientValue()`. Relates #41090, which was the source of such a serialization exception.
1 parent 4cca1e8 commit 935f70c

File tree

2 files changed

+68
-2
lines changed

2 files changed

+68
-2
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,6 @@ public void onFailure(Exception e) {
502502
});
503503
}
504504

505-
506505
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
507506
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
508507
synchronized (mutex) {
@@ -1027,9 +1026,10 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void
10271026
assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())) :
10281027
getLocalNode() + " should be in published " + clusterState;
10291028

1030-
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
10311029
final PublicationTransportHandler.PublicationContext publicationContext =
10321030
publicationHandler.newPublicationContext(clusterChangedEvent);
1031+
1032+
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
10331033
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
10341034
new ListenableFuture<>(), ackListener, publishListener);
10351035
currentPublication = Optional.of(publication);

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.logging.log4j.message.ParameterizedMessage;
2727
import org.elasticsearch.ElasticsearchException;
2828
import org.elasticsearch.Version;
29+
import org.elasticsearch.cluster.AbstractDiffable;
2930
import org.elasticsearch.cluster.ClusterModule;
3031
import org.elasticsearch.cluster.ClusterState;
3132
import org.elasticsearch.cluster.ClusterStateTaskListener;
@@ -54,6 +55,7 @@
5455
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
5556
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
5657
import org.elasticsearch.common.io.stream.StreamInput;
58+
import org.elasticsearch.common.io.stream.StreamOutput;
5759
import org.elasticsearch.common.logging.Loggers;
5860
import org.elasticsearch.common.settings.ClusterSettings;
5961
import org.elasticsearch.common.settings.Setting;
@@ -63,6 +65,7 @@
6365
import org.elasticsearch.common.unit.TimeValue;
6466
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
6567
import org.elasticsearch.common.util.set.Sets;
68+
import org.elasticsearch.common.xcontent.XContentBuilder;
6669
import org.elasticsearch.discovery.DiscoveryModule;
6770
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
6871
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
@@ -94,6 +97,7 @@
9497
import java.util.Optional;
9598
import java.util.Set;
9699
import java.util.concurrent.Callable;
100+
import java.util.concurrent.atomic.AtomicBoolean;
97101
import java.util.concurrent.atomic.AtomicInteger;
98102
import java.util.function.BiConsumer;
99103
import java.util.function.Consumer;
@@ -136,6 +140,7 @@
136140
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
137141
import static org.hamcrest.Matchers.hasItem;
138142
import static org.hamcrest.Matchers.hasSize;
143+
import static org.hamcrest.Matchers.instanceOf;
139144
import static org.hamcrest.Matchers.is;
140145
import static org.hamcrest.Matchers.lessThanOrEqualTo;
141146
import static org.hamcrest.Matchers.not;
@@ -1150,6 +1155,67 @@ public void testSingleNodeDiscoveryWithQuorum() {
11501155
cluster.stabilise();
11511156
}
11521157

1158+
private static class BrokenCustom extends AbstractDiffable<ClusterState.Custom> implements ClusterState.Custom {
1159+
1160+
static final String EXCEPTION_MESSAGE = "simulated";
1161+
1162+
@Override
1163+
public String getWriteableName() {
1164+
return "broken";
1165+
}
1166+
1167+
@Override
1168+
public Version getMinimalSupportedVersion() {
1169+
return Version.V_EMPTY;
1170+
}
1171+
1172+
@Override
1173+
public void writeTo(StreamOutput out) throws IOException {
1174+
throw new ElasticsearchException(EXCEPTION_MESSAGE);
1175+
}
1176+
1177+
@Override
1178+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
1179+
return builder;
1180+
}
1181+
}
1182+
1183+
public void testClusterRecoversAfterExceptionDuringSerialization() {
1184+
final Cluster cluster = new Cluster(randomIntBetween(2, 5)); // 1-node cluster doesn't do any serialization
1185+
cluster.runRandomly();
1186+
cluster.stabilise();
1187+
1188+
final ClusterNode leader1 = cluster.getAnyLeader();
1189+
1190+
logger.info("--> submitting broken task to [{}]", leader1);
1191+
1192+
final AtomicBoolean failed = new AtomicBoolean();
1193+
leader1.submitUpdateTask("broken-task",
1194+
cs -> ClusterState.builder(cs).putCustom("broken", new BrokenCustom()).build(),
1195+
(source, e) -> {
1196+
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
1197+
assertThat(e.getCause().getMessage(), equalTo(BrokenCustom.EXCEPTION_MESSAGE));
1198+
failed.set(true);
1199+
});
1200+
cluster.runFor(DEFAULT_DELAY_VARIABILITY + 1, "processing broken task");
1201+
assertTrue(failed.get());
1202+
1203+
cluster.stabilise();
1204+
1205+
final ClusterNode leader2 = cluster.getAnyLeader();
1206+
long finalValue = randomLong();
1207+
1208+
logger.info("--> submitting value [{}] to [{}]", finalValue, leader2);
1209+
leader2.submitValue(finalValue);
1210+
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
1211+
1212+
for (final ClusterNode clusterNode : cluster.clusterNodes) {
1213+
final String nodeId = clusterNode.getId();
1214+
final ClusterState appliedState = clusterNode.getLastAppliedClusterState();
1215+
assertThat(nodeId + " has the applied value", value(appliedState), is(finalValue));
1216+
}
1217+
}
1218+
11531219
private static long defaultMillis(Setting<TimeValue> setting) {
11541220
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
11551221
}

0 commit comments

Comments
 (0)