Skip to content

Commit feab59d

Browse files
authored
Bubble exceptions up in ClusterApplierService (#37729)
Exceptions thrown by the cluster applier service's settings and cluster appliers are bubbled up, and block the state from being applied instead of silently being ignored. In combination with the cluster state publishing lag detector, this will throw a node out of the cluster that can't properly apply cluster state updates.
1 parent 9357929 commit feab59d

File tree

6 files changed

+234
-145
lines changed

6 files changed

+234
-145
lines changed

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -390,31 +390,24 @@ protected void runTask(UpdateTask task) {
390390
newClusterState = task.apply(previousClusterState);
391391
} catch (Exception e) {
392392
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
393-
if (logger.isTraceEnabled()) {
394-
logger.trace(() -> new ParameterizedMessage(
395-
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
396-
executionTime,
397-
previousClusterState.version(),
398-
task.source,
399-
previousClusterState.nodes(),
400-
previousClusterState.routingTable(),
401-
previousClusterState.getRoutingNodes()),
402-
e);
403-
}
393+
logger.trace(() -> new ParameterizedMessage(
394+
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
395+
executionTime, previousClusterState.version(), task.source, previousClusterState), e);
404396
warnAboutSlowTaskIfNeeded(executionTime, task.source);
405397
task.listener.onFailure(task.source, e);
406398
return;
407399
}
408400

409401
if (previousClusterState == newClusterState) {
410-
task.listener.onSuccess(task.source);
411402
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
412403
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
413404
warnAboutSlowTaskIfNeeded(executionTime, task.source);
405+
task.listener.onSuccess(task.source);
414406
} else {
415407
if (logger.isTraceEnabled()) {
416-
logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState);
417-
} else if (logger.isDebugEnabled()) {
408+
logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source,
409+
newClusterState);
410+
} else {
418411
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
419412
}
420413
try {
@@ -424,20 +417,19 @@ protected void runTask(UpdateTask task) {
424417
executionTime, newClusterState.version(),
425418
newClusterState.stateUUID());
426419
warnAboutSlowTaskIfNeeded(executionTime, task.source);
420+
task.listener.onSuccess(task.source);
427421
} catch (Exception e) {
428422
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
429-
final long version = newClusterState.version();
430-
final String stateUUID = newClusterState.stateUUID();
431-
final String fullState = newClusterState.toString();
432-
logger.warn(() -> new ParameterizedMessage(
433-
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
434-
executionTime,
435-
version,
436-
stateUUID,
437-
task.source,
438-
fullState),
439-
e);
440-
// TODO: do we want to call updateTask.onFailure here?
423+
if (logger.isTraceEnabled()) {
424+
logger.warn(new ParameterizedMessage(
425+
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
426+
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source, newClusterState), e);
427+
} else {
428+
logger.warn(new ParameterizedMessage(
429+
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]",
430+
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e);
431+
}
432+
task.listener.onFailure(task.source, e);
441433
}
442434
}
443435
}
@@ -454,17 +446,14 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
454446
}
455447
}
456448

449+
logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
457450
nodeConnectionsService.connectToNodes(newClusterState.nodes());
458451

459-
logger.debug("applying cluster state version {}", newClusterState.version());
460-
try {
461-
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
462-
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
463-
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
464-
clusterSettings.applySettings(incomingSettings);
465-
}
466-
} catch (Exception ex) {
467-
logger.warn("failed to apply cluster settings", ex);
452+
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
453+
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
454+
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
455+
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
456+
clusterSettings.applySettings(incomingSettings);
468457
}
469458

470459
logger.debug("apply cluster state with version {}", newClusterState.version());
@@ -476,18 +465,12 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
476465
state.set(newClusterState);
477466

478467
callClusterStateListeners(clusterChangedEvent);
479-
480-
task.listener.onSuccess(task.source);
481468
}
482469

483470
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
484471
clusterStateAppliers.forEach(applier -> {
485-
try {
486-
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
487-
applier.applyClusterState(clusterChangedEvent);
488-
} catch (Exception ex) {
489-
logger.warn("failed to notify ClusterStateApplier", ex);
490-
}
472+
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
473+
applier.applyClusterState(clusterChangedEvent);
491474
});
492475
}
493476

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.ingest;
2121

22+
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
2224
import org.elasticsearch.ElasticsearchParseException;
2325
import org.elasticsearch.ExceptionsHelper;
2426
import org.elasticsearch.ResourceNotFoundException;
@@ -69,6 +71,8 @@ public class IngestService implements ClusterStateApplier {
6971

7072
public static final String NOOP_PIPELINE_NAME = "_none";
7173

74+
private static final Logger logger = LogManager.getLogger(IngestService.class);
75+
7276
private final ClusterService clusterService;
7377
private final ScriptService scriptService;
7478
private final Map<String, Processor.Factory> processorFactories;
@@ -256,7 +260,11 @@ Map<String, Pipeline> pipelines() {
256260
public void applyClusterState(final ClusterChangedEvent event) {
257261
ClusterState state = event.state();
258262
Map<String, Pipeline> originalPipelines = pipelines;
259-
innerUpdatePipelines(event.previousState(), state);
263+
try {
264+
innerUpdatePipelines(event.previousState(), state);
265+
} catch (ElasticsearchParseException e) {
266+
logger.warn("failed to update ingest pipelines", e);
267+
}
260268
//pipelines changed, so add the old metrics to the new metrics
261269
if (originalPipelines != pipelines) {
262270
pipelines.forEach((id, pipeline) -> {

0 commit comments

Comments
 (0)