Skip to content

Commit 2bc6ab2

Browse files
committed
Script: Fix value of ctx._now to be current epoch time in milliseconds (#23175)
In update scripts, `ctx._now` uses the same milliseconds value used by the rest of the system to calculate deltas. However, that time is not actually epoch milliseconds, as it is derived from `System.nanoTime()`. This change reworks the estimated time thread in ThreadPool which this time is based on to make available both the relative time, as well as absolute milliseconds (epoch) which may be used with calendar system. It also renames the EstimatedTimeThread to a more apt CachedTimeThread. closes #23169
1 parent f272212 commit 2bc6ab2

File tree

7 files changed

+80
-29
lines changed

7 files changed

+80
-29
lines changed

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
266266
final UpdateHelper.Result translate;
267267
// translate update request
268268
try {
269-
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
269+
translate = updateHelper.prepare(updateRequest, primary, threadPool::absoluteTimeInMillis);
270270
} catch (Exception failure) {
271271
// we may fail translating a update to index or delete operation
272272
// we use index result to communicate failure while translating update request

core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
176176
final ShardId shardId = request.getShardId();
177177
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
178178
final IndexShard indexShard = indexService.getShard(shardId.getId());
179-
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::estimatedTimeInMillis);
179+
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis);
180180
switch (result.getResponseResult()) {
181181
case CREATED:
182182
IndexRequest upsertRequest = result.action();

core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public Codec getCodec() {
188188

189189
/**
190190
* Returns a thread-pool mainly used to get estimated time stamps from
191-
* {@link org.elasticsearch.threadpool.ThreadPool#estimatedTimeInMillis()} and to schedule
191+
* {@link org.elasticsearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule
192192
* async force merge calls on the {@link org.elasticsearch.threadpool.ThreadPool.Names#FORCE_MERGE} thread-pool
193193
*/
194194
public ThreadPool getThreadPool() {

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
140140
EngineMergeScheduler scheduler = null;
141141
boolean success = false;
142142
try {
143-
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
143+
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
144144
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
145145
throttle = new IndexThrottle();
146146
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
@@ -384,7 +384,7 @@ private boolean checkVersionConflict(final Operation op, final long currentVersi
384384

385385
private long checkDeletedAndGCed(VersionValue versionValue) {
386386
long currentVersion;
387-
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
387+
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
388388
currentVersion = Versions.NOT_FOUND; // deleted, and GC
389389
} else {
390390
currentVersion = versionValue.version();
@@ -625,7 +625,7 @@ public DeleteResult delete(Delete delete) throws IOException {
625625
private void maybePruneDeletedTombstones() {
626626
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
627627
// every 1/4 of gcDeletesInMillis:
628-
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
628+
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
629629
pruneDeletedTombstones();
630630
}
631631
}
@@ -665,7 +665,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
665665
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
666666
deleteResult = new DeleteResult(updatedVersion, found);
667667
versionMap.putUnderLock(delete.uid().bytes(),
668-
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
668+
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().relativeTimeInMillis()));
669669
}
670670
if (deleteResult.hasFailure() == false) {
671671
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
@@ -908,7 +908,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
908908
}
909909

910910
private void pruneDeletedTombstones() {
911-
long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
911+
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
912912

913913
// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...
914914

core/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ private void contextProcessing(SearchContext context) {
561561
}
562562

563563
private void contextProcessedSuccessfully(SearchContext context) {
564-
context.accessed(threadPool.estimatedTimeInMillis());
564+
context.accessed(threadPool.relativeTimeInMillis());
565565
}
566566

567567
private void cleanContext(SearchContext context) {
@@ -794,7 +794,7 @@ public int getActiveContexts() {
794794
class Reaper implements Runnable {
795795
@Override
796796
public void run() {
797-
final long time = threadPool.estimatedTimeInMillis();
797+
final long time = threadPool.relativeTimeInMillis();
798798
for (SearchContext context : activeContexts.values()) {
799799
// Use the same value for both checks since lastAccessTime can
800800
// be modified by another thread between checks!

core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public static ThreadPoolType fromType(String type) {
142142

143143
private final ScheduledThreadPoolExecutor scheduler;
144144

145-
private final EstimatedTimeThread estimatedTimeThread;
145+
private final CachedTimeThread cachedTimeThread;
146146

147147
static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();
148148

@@ -213,16 +213,33 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
213213
this.scheduler.setRemoveOnCancelPolicy(true);
214214

215215
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
216-
this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
217-
this.estimatedTimeThread.start();
216+
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
217+
this.cachedTimeThread.start();
218218
}
219219

220-
public long estimatedTimeInMillis() {
221-
return estimatedTimeThread.estimatedTimeInMillis();
220+
/**
221+
* Returns a value of milliseconds that may be used for relative time calculations.
222+
*
223+
* This method should only be used for calculating time deltas. For an epoch based
224+
* timestamp, see {@link #absoluteTimeInMillis()}.
225+
*/
226+
public long relativeTimeInMillis() {
227+
return cachedTimeThread.relativeTimeInMillis();
228+
}
229+
230+
/**
231+
* Returns the value of milliseconds since UNIX epoch.
232+
*
233+
* This method should only be used for exact date/time formatting. For calculating
234+
* time deltas that should not suffer from negative deltas, which are possible with
235+
* this method, see {@link #relativeTimeInMillis()}.
236+
*/
237+
public long absoluteTimeInMillis() {
238+
return cachedTimeThread.absoluteTimeInMillis();
222239
}
223240

224241
public Counter estimatedTimeInMillisCounter() {
225-
return estimatedTimeThread.counter;
242+
return cachedTimeThread.counter;
226243
}
227244

228245
public ThreadPoolInfo info() {
@@ -342,8 +359,8 @@ public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable co
342359
}
343360

344361
public void shutdown() {
345-
estimatedTimeThread.running = false;
346-
estimatedTimeThread.interrupt();
362+
cachedTimeThread.running = false;
363+
cachedTimeThread.interrupt();
347364
scheduler.shutdown();
348365
for (ExecutorHolder executor : executors.values()) {
349366
if (executor.executor() instanceof ThreadPoolExecutor) {
@@ -353,8 +370,8 @@ public void shutdown() {
353370
}
354371

355372
public void shutdownNow() {
356-
estimatedTimeThread.running = false;
357-
estimatedTimeThread.interrupt();
373+
cachedTimeThread.running = false;
374+
cachedTimeThread.interrupt();
358375
scheduler.shutdownNow();
359376
for (ExecutorHolder executor : executors.values()) {
360377
if (executor.executor() instanceof ThreadPoolExecutor) {
@@ -371,7 +388,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
371388
}
372389
}
373390

374-
estimatedTimeThread.join(unit.toMillis(timeout));
391+
cachedTimeThread.join(unit.toMillis(timeout));
375392
return result;
376393
}
377394

@@ -471,29 +488,50 @@ public String toString() {
471488
}
472489
}
473490

474-
static class EstimatedTimeThread extends Thread {
491+
/**
492+
* A thread to cache millisecond time values from
493+
* {@link System#nanoTime()} and {@link System#currentTimeMillis()}.
494+
*
495+
* The values are updated at a specified interval.
496+
*/
497+
static class CachedTimeThread extends Thread {
475498

476499
final long interval;
477500
final TimeCounter counter;
478501
volatile boolean running = true;
479-
volatile long estimatedTimeInMillis;
502+
volatile long relativeMillis;
503+
volatile long absoluteMillis;
480504

481-
EstimatedTimeThread(String name, long interval) {
505+
CachedTimeThread(String name, long interval) {
482506
super(name);
483507
this.interval = interval;
484-
this.estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime());
508+
this.relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
509+
this.absoluteMillis = System.currentTimeMillis();
485510
this.counter = new TimeCounter();
486511
setDaemon(true);
487512
}
488513

489-
public long estimatedTimeInMillis() {
490-
return this.estimatedTimeInMillis;
514+
/**
515+
* Return the current time used for relative calculations. This is
516+
* {@link System#nanoTime()} truncated to milliseconds.
517+
*/
518+
long relativeTimeInMillis() {
519+
return relativeMillis;
520+
}
521+
522+
/**
523+
* Return the current epoch time, used to find absolute time. This is
524+
* a cached version of {@link System#currentTimeMillis()}.
525+
*/
526+
long absoluteTimeInMillis() {
527+
return absoluteMillis;
491528
}
492529

493530
@Override
494531
public void run() {
495532
while (running) {
496-
estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime());
533+
relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
534+
absoluteMillis = System.currentTimeMillis();
497535
try {
498536
Thread.sleep(interval);
499537
} catch (InterruptedException e) {
@@ -512,7 +550,7 @@ public long addAndGet(long delta) {
512550

513551
@Override
514552
public long get() {
515-
return estimatedTimeInMillis;
553+
return relativeMillis;
516554
}
517555
}
518556
}

core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,17 @@ public void testBoundedByBetweenMinAndMax() {
4646
assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value));
4747
}
4848

49+
public void testAbsoluteTime() throws Exception {
50+
TestThreadPool threadPool = new TestThreadPool("test");
51+
try {
52+
long currentTime = System.currentTimeMillis();
53+
long gotTime = threadPool.absoluteTimeInMillis();
54+
long delta = Math.abs(gotTime - currentTime);
55+
assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime,
56+
delta < 10000); // the delta can be large, we just care it is the same order of magnitude
57+
} finally {
58+
threadPool.shutdown();
59+
threadPool.close();
60+
}
61+
}
4962
}

0 commit comments

Comments
 (0)