Skip to content

Commit a22120d

Browse files
committed
Handle serialization exceptions during publication (#41781)
Today if an exception is thrown when serializing a cluster state during publication then the master enters a poisoned state where it cannot publish any more cluster states, but nor does it stand down as master, yielding repeated exceptions of the following form: ``` failed to commit cluster state version [12345] org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException: publishing failed at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1045) ~[elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.MasterService.publish(MasterService.java:252) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:238) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:681) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252) [elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215) [elasticsearch-7.0.0.jar:7.0.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] Caused by: org.elasticsearch.cluster.coordination.CoordinationStateRejectedException: cannot start publishing next value before accepting previous one at org.elasticsearch.cluster.coordination.CoordinationState.handleClientValue(CoordinationState.java:280) ~[elasticsearch-7.0.0.jar:7.0.0] at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1030) ~[elasticsearch-7.0.0.jar:7.0.0] ... 11 more ``` This is because it already created the publication request using `CoordinationState#handleClientValue()` but then it fails before accepting it. This commit addresses this by performing the serialization before calling `handleClientValue()`. Relates #41090, which was the source of such a serialization exception.
1 parent 491c0cd commit a22120d

File tree

2 files changed

+68
-2
lines changed

2 files changed

+68
-2
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,6 @@ public void onFailure(Exception e) {
502502
});
503503
}
504504

505-
506505
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
507506
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
508507
synchronized (mutex) {
@@ -1027,9 +1026,10 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void
10271026
assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())) :
10281027
getLocalNode() + " should be in published " + clusterState;
10291028

1030-
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
10311029
final PublicationTransportHandler.PublicationContext publicationContext =
10321030
publicationHandler.newPublicationContext(clusterChangedEvent);
1031+
1032+
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
10331033
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
10341034
new ListenableFuture<>(), ackListener, publishListener);
10351035
currentPublication = Optional.of(publication);

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.logging.log4j.message.ParameterizedMessage;
2727
import org.elasticsearch.ElasticsearchException;
2828
import org.elasticsearch.Version;
29+
import org.elasticsearch.cluster.AbstractDiffable;
2930
import org.elasticsearch.cluster.ClusterModule;
3031
import org.elasticsearch.cluster.ClusterState;
3132
import org.elasticsearch.cluster.ClusterStateTaskListener;
@@ -55,6 +56,7 @@
5556
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
5657
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
5758
import org.elasticsearch.common.io.stream.StreamInput;
59+
import org.elasticsearch.common.io.stream.StreamOutput;
5860
import org.elasticsearch.common.logging.Loggers;
5961
import org.elasticsearch.common.settings.ClusterSettings;
6062
import org.elasticsearch.common.settings.Setting;
@@ -64,6 +66,7 @@
6466
import org.elasticsearch.common.unit.TimeValue;
6567
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
6668
import org.elasticsearch.common.util.set.Sets;
69+
import org.elasticsearch.common.xcontent.XContentBuilder;
6770
import org.elasticsearch.discovery.DiscoveryModule;
6871
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
6972
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
@@ -95,6 +98,7 @@
9598
import java.util.Optional;
9699
import java.util.Set;
97100
import java.util.concurrent.Callable;
101+
import java.util.concurrent.atomic.AtomicBoolean;
98102
import java.util.concurrent.atomic.AtomicInteger;
99103
import java.util.function.BiConsumer;
100104
import java.util.function.Consumer;
@@ -137,6 +141,7 @@
137141
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
138142
import static org.hamcrest.Matchers.hasItem;
139143
import static org.hamcrest.Matchers.hasSize;
144+
import static org.hamcrest.Matchers.instanceOf;
140145
import static org.hamcrest.Matchers.is;
141146
import static org.hamcrest.Matchers.lessThanOrEqualTo;
142147
import static org.hamcrest.Matchers.not;
@@ -1151,6 +1156,67 @@ public void testSingleNodeDiscoveryWithQuorum() {
11511156
cluster.stabilise();
11521157
}
11531158

1159+
private static class BrokenCustom extends AbstractDiffable<ClusterState.Custom> implements ClusterState.Custom {
1160+
1161+
static final String EXCEPTION_MESSAGE = "simulated";
1162+
1163+
@Override
1164+
public String getWriteableName() {
1165+
return "broken";
1166+
}
1167+
1168+
@Override
1169+
public Version getMinimalSupportedVersion() {
1170+
return Version.V_EMPTY;
1171+
}
1172+
1173+
@Override
1174+
public void writeTo(StreamOutput out) throws IOException {
1175+
throw new ElasticsearchException(EXCEPTION_MESSAGE);
1176+
}
1177+
1178+
@Override
1179+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
1180+
return builder;
1181+
}
1182+
}
1183+
1184+
public void testClusterRecoversAfterExceptionDuringSerialization() {
1185+
final Cluster cluster = new Cluster(randomIntBetween(2, 5)); // 1-node cluster doesn't do any serialization
1186+
cluster.runRandomly();
1187+
cluster.stabilise();
1188+
1189+
final ClusterNode leader1 = cluster.getAnyLeader();
1190+
1191+
logger.info("--> submitting broken task to [{}]", leader1);
1192+
1193+
final AtomicBoolean failed = new AtomicBoolean();
1194+
leader1.submitUpdateTask("broken-task",
1195+
cs -> ClusterState.builder(cs).putCustom("broken", new BrokenCustom()).build(),
1196+
(source, e) -> {
1197+
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
1198+
assertThat(e.getCause().getMessage(), equalTo(BrokenCustom.EXCEPTION_MESSAGE));
1199+
failed.set(true);
1200+
});
1201+
cluster.runFor(DEFAULT_DELAY_VARIABILITY + 1, "processing broken task");
1202+
assertTrue(failed.get());
1203+
1204+
cluster.stabilise();
1205+
1206+
final ClusterNode leader2 = cluster.getAnyLeader();
1207+
long finalValue = randomLong();
1208+
1209+
logger.info("--> submitting value [{}] to [{}]", finalValue, leader2);
1210+
leader2.submitValue(finalValue);
1211+
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
1212+
1213+
for (final ClusterNode clusterNode : cluster.clusterNodes) {
1214+
final String nodeId = clusterNode.getId();
1215+
final ClusterState appliedState = clusterNode.getLastAppliedClusterState();
1216+
assertThat(nodeId + " has the applied value", value(appliedState), is(finalValue));
1217+
}
1218+
}
1219+
11541220
private static long defaultMillis(Setting<TimeValue> setting) {
11551221
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
11561222
}

0 commit comments

Comments
 (0)