Skip to content

Commit 0e7f976

Browse files
committed
pending refresh
1 parent 09c7e66 commit 0e7f976

File tree

3 files changed

+43
-9
lines changed

3 files changed

+43
-9
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ Runnable getGlobalCheckpointSyncer() {
268268

269269
private final AtomicLong lastSearcherAccess = new AtomicLong();
270270
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
271+
private final RefreshPendingLocationListener refreshPendingLocationListener;
271272
private volatile boolean useRetentionLeasesInPeerRecovery;
272273

273274
public IndexShard(
@@ -366,6 +367,7 @@ public boolean shouldCache(Query query) {
366367
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
367368
persistMetadata(path, indexSettings, shardRouting, null, logger);
368369
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
370+
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
369371
}
370372

371373
public ThreadPool getThreadPool() {
@@ -2673,7 +2675,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
26732675
similarityService.similarity(mapperService), codecService, shardEventListener,
26742676
indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig,
26752677
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
2676-
Collections.singletonList(refreshListeners),
2678+
List.of(refreshListeners, refreshPendingLocationListener),
26772679
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
26782680
indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases,
26792681
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
@@ -3184,15 +3186,44 @@ final long getLastSearcherAccess() {
31843186
return lastSearcherAccess.get();
31853187
}
31863188

3189+
/**
3190+
* Returns true if this shard has some scheduled refresh that is pending because of search-idle.
3191+
*/
3192+
public final boolean hasRefreshPending() {
3193+
return pendingRefreshLocation.get() != null;
3194+
}
3195+
31873196
private void setRefreshPending(Engine engine) {
3188-
Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation();
3189-
Translog.Location location;
3190-
do {
3191-
location = this.pendingRefreshLocation.get();
3192-
if (location != null && lastWriteLocation.compareTo(location) <= 0) {
3193-
break;
3197+
final Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation();
3198+
pendingRefreshLocation.updateAndGet(curr -> {
3199+
if (curr == null || curr.compareTo(lastWriteLocation) <= 0) {
3200+
return lastWriteLocation;
3201+
} else {
3202+
return curr;
31943203
}
3195-
} while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false);
3204+
});
3205+
}
3206+
3207+
private class RefreshPendingLocationListener implements ReferenceManager.RefreshListener {
3208+
Translog.Location lastWriteLocation;
3209+
3210+
@Override
3211+
public void beforeRefresh() {
3212+
lastWriteLocation = getEngine().getTranslogLastWriteLocation();
3213+
}
3214+
3215+
@Override
3216+
public void afterRefresh(boolean didRefresh) {
3217+
if (didRefresh) {
3218+
pendingRefreshLocation.updateAndGet(pendingLocation -> {
3219+
if (pendingLocation == null || pendingLocation.compareTo(lastWriteLocation) <= 0) {
3220+
return null;
3221+
} else {
3222+
return pendingLocation;
3223+
}
3224+
});
3225+
}
3226+
}
31963227
}
31973228

31983229
/**

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1113,7 +1113,7 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException
11131113
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
11141114
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
11151115
IndexShard indexShard = indexService.getShard(request.shardId().getId());
1116-
if (indexShard.isSearchIdle()) {
1116+
if (indexShard.hasRefreshPending()) {
11171117
return new CanMatchResponse(true, null);
11181118
}
11191119
// we don't want to use the reader wrapper since it could run costly operations

server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public void testPendingRefreshWithIntervalChange() throws Exception {
157157
assertHitCount(client().prepareSearch().get(), 1);
158158
client().prepareIndex("test").setId("1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
159159
assertFalse(shard.scheduledRefresh());
160+
assertTrue(shard.hasRefreshPending());
160161

161162
// now disable background refresh and make sure the refresh happens
162163
CountDownLatch updateSettingsLatch = new CountDownLatch(1);
@@ -168,11 +169,13 @@ public void testPendingRefreshWithIntervalChange() throws Exception {
168169
// wait for both to ensure we don't have in-flight operations
169170
updateSettingsLatch.await();
170171
refreshLatch.await();
172+
assertFalse(shard.hasRefreshPending());
171173
// We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc;
172174
// otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify.
173175
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
174176
client().prepareIndex("test").setId("2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
175177
assertTrue(shard.scheduledRefresh());
178+
assertFalse(shard.hasRefreshPending());
176179
assertTrue(shard.isSearchIdle());
177180
assertHitCount(client().prepareSearch().get(), 3);
178181
}

0 commit comments

Comments
 (0)