Skip to content

Commit c71807a

Browse files
Use Threadpool Time in ClusterApplierService (#39679)
* Use threadpool's time in `ClusterApplierService` to allow for deterministic tests * This is a part of/requirement for #39504
1 parent 7d68d61 commit c71807a

File tree

3 files changed

+29
-22
lines changed

3 files changed

+29
-22
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.Nullable;
2727
import org.elasticsearch.common.unit.TimeValue;
2828
import org.elasticsearch.common.util.concurrent.ThreadContext;
29+
import org.elasticsearch.threadpool.ThreadPool;
2930

3031
import java.util.Objects;
3132
import java.util.concurrent.atomic.AtomicReference;
@@ -44,6 +45,7 @@ public class ClusterStateObserver {
4445
private final Predicate<ClusterState> MATCH_ALL_CHANGES_PREDICATE = state -> true;
4546

4647
private final ClusterApplierService clusterApplierService;
48+
private final ThreadPool threadPool;
4749
private final ThreadContext contextHolder;
4850
volatile TimeValue timeOutValue;
4951

@@ -52,7 +54,7 @@ public class ClusterStateObserver {
5254
final TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener();
5355
// observingContext is not null when waiting on cluster state changes
5456
final AtomicReference<ObservingContext> observingContext = new AtomicReference<>(null);
55-
volatile Long startTimeNS;
57+
volatile Long startTimeMS;
5658
volatile boolean timedOut;
5759

5860

@@ -81,10 +83,11 @@ public ClusterStateObserver(ClusterState initialState, ClusterService clusterSer
8183
public ClusterStateObserver(ClusterState initialState, ClusterApplierService clusterApplierService, @Nullable TimeValue timeout,
8284
Logger logger, ThreadContext contextHolder) {
8385
this.clusterApplierService = clusterApplierService;
86+
this.threadPool = clusterApplierService.threadPool();
8487
this.lastObservedState = new AtomicReference<>(new StoredState(initialState));
8588
this.timeOutValue = timeout;
8689
if (timeOutValue != null) {
87-
this.startTimeNS = System.nanoTime();
90+
this.startTimeMS = threadPool.relativeTimeInMillis();
8891
}
8992
this.logger = logger;
9093
this.contextHolder = contextHolder;
@@ -134,7 +137,7 @@ public void waitForNextChange(Listener listener, Predicate<ClusterState> statePr
134137
if (timeOutValue == null) {
135138
timeOutValue = this.timeOutValue;
136139
if (timeOutValue != null) {
137-
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
140+
long timeSinceStartMS = threadPool.relativeTimeInMillis() - startTimeMS;
138141
timeoutTimeLeftMS = timeOutValue.millis() - timeSinceStartMS;
139142
if (timeoutTimeLeftMS <= 0L) {
140143
// things have timeout while we were busy -> notify
@@ -150,7 +153,7 @@ public void waitForNextChange(Listener listener, Predicate<ClusterState> statePr
150153
timeoutTimeLeftMS = null;
151154
}
152155
} else {
153-
this.startTimeNS = System.nanoTime();
156+
this.startTimeMS = threadPool.relativeTimeInMillis();
154157
this.timeOutValue = timeOutValue;
155158
timeoutTimeLeftMS = timeOutValue.millis();
156159
timedOut = false;
@@ -240,7 +243,7 @@ public void onTimeout(TimeValue timeout) {
240243
ObservingContext context = observingContext.getAndSet(null);
241244
if (context != null) {
242245
clusterApplierService.removeTimeoutListener(this);
243-
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
246+
long timeSinceStartMS = threadPool.relativeTimeInMillis() - startTimeMS;
244247
logger.trace("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]",
245248
timeOutValue, new TimeValue(timeSinceStartMS));
246249
// update to latest, in case people want to retry

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,10 @@ public void runOnApplierThread(final String source, Consumer<ClusterState> clust
311311
runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
312312
}
313313

314+
public ThreadPool threadPool() {
315+
return threadPool;
316+
}
317+
314318
@Override
315319
public void onNewClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier,
316320
final ClusterApplyListener listener) {
@@ -383,12 +387,12 @@ protected void runTask(UpdateTask task) {
383387
logger.debug("processing [{}]: execute", task.source);
384388
final ClusterState previousClusterState = state.get();
385389

386-
long startTimeNS = currentTimeInNanos();
390+
long startTimeMS = currentTimeInMillis();
387391
final ClusterState newClusterState;
388392
try {
389393
newClusterState = task.apply(previousClusterState);
390394
} catch (Exception e) {
391-
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
395+
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
392396
logger.trace(() -> new ParameterizedMessage(
393397
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
394398
executionTime, previousClusterState.version(), task.source, previousClusterState), e);
@@ -398,7 +402,7 @@ protected void runTask(UpdateTask task) {
398402
}
399403

400404
if (previousClusterState == newClusterState) {
401-
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
405+
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
402406
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
403407
warnAboutSlowTaskIfNeeded(executionTime, task.source);
404408
task.listener.onSuccess(task.source);
@@ -411,14 +415,14 @@ protected void runTask(UpdateTask task) {
411415
}
412416
try {
413417
applyChanges(task, previousClusterState, newClusterState);
414-
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
418+
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
415419
logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
416420
executionTime, newClusterState.version(),
417421
newClusterState.stateUUID());
418422
warnAboutSlowTaskIfNeeded(executionTime, task.source);
419423
task.listener.onSuccess(task.source);
420424
} catch (Exception e) {
421-
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
425+
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
422426
if (logger.isTraceEnabled()) {
423427
logger.warn(new ParameterizedMessage(
424428
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
@@ -617,8 +621,8 @@ public void run() {
617621
}
618622

619623
// this one is overridden in tests so we can control time
620-
protected long currentTimeInNanos() {
621-
return System.nanoTime();
624+
protected long currentTimeInMillis() {
625+
return threadPool.relativeTimeInMillis();
622626
}
623627

624628
}

server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void tearDown() throws Exception {
8888
super.tearDown();
8989
}
9090

91-
TimedClusterApplierService createTimedClusterService(boolean makeMaster) throws InterruptedException {
91+
TimedClusterApplierService createTimedClusterService(boolean makeMaster) {
9292
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
9393
emptySet(), Version.CURRENT);
9494
TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(Settings.builder().put("cluster.name",
@@ -141,9 +141,9 @@ public void testClusterStateUpdateLogging() throws Exception {
141141
Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class);
142142
Loggers.addAppender(clusterLogger, mockAppender);
143143
try {
144-
clusterApplierService.currentTimeOverride = System.nanoTime();
144+
clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
145145
clusterApplierService.runOnApplierThread("test1",
146-
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
146+
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(),
147147
new ClusterApplyListener() {
148148
@Override
149149
public void onSuccess(String source) { }
@@ -155,7 +155,7 @@ public void onFailure(String source, Exception e) {
155155
});
156156
clusterApplierService.runOnApplierThread("test2",
157157
currentState -> {
158-
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos();
158+
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).millis();
159159
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
160160
},
161161
new ClusterApplyListener() {
@@ -214,9 +214,9 @@ public void testLongClusterStateUpdateLogging() throws Exception {
214214
try {
215215
final CountDownLatch latch = new CountDownLatch(4);
216216
final CountDownLatch processedFirstTask = new CountDownLatch(1);
217-
clusterApplierService.currentTimeOverride = System.nanoTime();
217+
clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
218218
clusterApplierService.runOnApplierThread("test1",
219-
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
219+
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(),
220220
new ClusterApplyListener() {
221221
@Override
222222
public void onSuccess(String source) {
@@ -232,7 +232,7 @@ public void onFailure(String source, Exception e) {
232232
processedFirstTask.await();
233233
clusterApplierService.runOnApplierThread("test2",
234234
currentState -> {
235-
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos();
235+
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).millis();
236236
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
237237
},
238238
new ClusterApplyListener() {
@@ -247,7 +247,7 @@ public void onFailure(String source, Exception e) {
247247
}
248248
});
249249
clusterApplierService.runOnApplierThread("test3",
250-
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(),
250+
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).millis(),
251251
new ClusterApplyListener() {
252252
@Override
253253
public void onSuccess(String source) {
@@ -510,11 +510,11 @@ static class TimedClusterApplierService extends ClusterApplierService {
510510
}
511511

512512
@Override
513-
protected long currentTimeInNanos() {
513+
protected long currentTimeInMillis() {
514514
if (currentTimeOverride != null) {
515515
return currentTimeOverride;
516516
}
517-
return super.currentTimeInNanos();
517+
return super.currentTimeInMillis();
518518
}
519519
}
520520
}

0 commit comments

Comments
 (0)