Skip to content

Commit facfbfb

Browse files
committed
Bubble exceptions up in ClusterApplierService
- exceptions thrown by settings and cluster appliers are bubbled up, and block the state from being applied instead of silently being ignored.
1 parent c284798 commit facfbfb

File tree

4 files changed

+201
-135
lines changed

4 files changed

+201
-135
lines changed

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -392,25 +392,19 @@ protected void runTask(UpdateTask task) {
392392
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
393393
if (logger.isTraceEnabled()) {
394394
logger.trace(() -> new ParameterizedMessage(
395-
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
396-
executionTime,
397-
previousClusterState.version(),
398-
task.source,
399-
previousClusterState.nodes(),
400-
previousClusterState.routingTable(),
401-
previousClusterState.getRoutingNodes()),
402-
e);
395+
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
396+
executionTime, previousClusterState.version(), task.source, previousClusterState), e);
403397
}
404398
warnAboutSlowTaskIfNeeded(executionTime, task.source);
405399
task.listener.onFailure(task.source, e);
406400
return;
407401
}
408402

409403
if (previousClusterState == newClusterState) {
410-
task.listener.onSuccess(task.source);
411404
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
412405
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
413406
warnAboutSlowTaskIfNeeded(executionTime, task.source);
407+
task.listener.onSuccess(task.source);
414408
} else {
415409
if (logger.isTraceEnabled()) {
416410
logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState);
@@ -424,20 +418,19 @@ protected void runTask(UpdateTask task) {
424418
executionTime, newClusterState.version(),
425419
newClusterState.stateUUID());
426420
warnAboutSlowTaskIfNeeded(executionTime, task.source);
421+
task.listener.onSuccess(task.source);
427422
} catch (Exception e) {
428423
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
429-
final long version = newClusterState.version();
430-
final String stateUUID = newClusterState.stateUUID();
431-
final String fullState = newClusterState.toString();
432-
logger.warn(() -> new ParameterizedMessage(
433-
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
434-
executionTime,
435-
version,
436-
stateUUID,
437-
task.source,
438-
fullState),
439-
e);
440-
// TODO: do we want to call updateTask.onFailure here?
424+
if (logger.isTraceEnabled()) {
425+
logger.warn(() -> new ParameterizedMessage(
426+
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
427+
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source, newClusterState), e);
428+
} else {
429+
logger.warn(() -> new ParameterizedMessage(
430+
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]",
431+
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e);
432+
}
433+
task.listener.onFailure(task.source, e);
441434
}
442435
}
443436
}
@@ -454,17 +447,14 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
454447
}
455448
}
456449

450+
logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
457451
nodeConnectionsService.connectToNodes(newClusterState.nodes());
458452

459-
logger.debug("applying cluster state version {}", newClusterState.version());
460-
try {
461-
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
462-
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
463-
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
464-
clusterSettings.applySettings(incomingSettings);
465-
}
466-
} catch (Exception ex) {
467-
logger.warn("failed to apply cluster settings", ex);
453+
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
454+
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
455+
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
456+
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
457+
clusterSettings.applySettings(incomingSettings);
468458
}
469459

470460
logger.debug("apply cluster state with version {}", newClusterState.version());
@@ -476,18 +466,12 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
476466
state.set(newClusterState);
477467

478468
callClusterStateListeners(clusterChangedEvent);
479-
480-
task.listener.onSuccess(task.source);
481469
}
482470

483471
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
484472
clusterStateAppliers.forEach(applier -> {
485-
try {
486-
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
487-
applier.applyClusterState(clusterChangedEvent);
488-
} catch (Exception ex) {
489-
logger.warn("failed to notify ClusterStateApplier", ex);
490-
}
473+
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
474+
applier.applyClusterState(clusterChangedEvent);
491475
});
492476
}
493477

0 commit comments

Comments
 (0)