Skip to content

Commit a768830

Browse files
dnhatnkcm
authored andcommitted
Replace version with reader cache key in IndicesRequestCache (#34189)
Today we use the version of a DirectoryReader as a component of the key of IndicesRequestCache. This usage is perfectly fine since the version is advanced every time a new change is made into IndexWriter. In other words, two DirectoryReaders with the same version should have the same content. However, this invariant is only guaranteed in the context of a single IndexWriter because the version is reset to the committed version value when IndexWriter is re-opened. Since #33473, each IndexShard may have more than one IndexWriter, and using the version of a DirectoryReader as a part of the cache key can cause IndicesRequestCache to return stale cached values. For example, in #27650, we rollback the engine (i.e., re-open IndexWriter), index new documents, refresh, then make a count request, but the search layer mistakenly returns the count of the DirectoryReader of the previous IndexWriter because the current DirectoryReader has the same version of the old DirectoryReader even their documents are different. This is possible because these two readers come from different IndexWriters. This commit replaces the the version with the reader cache key of IndexReader as a component of the cache key of IndicesRequestCache. Closes #27650 Relates #33473
1 parent 342c1e9 commit a768830

File tree

4 files changed

+80
-28
lines changed

4 files changed

+80
-28
lines changed

server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,15 @@
4747
import java.util.Collection;
4848
import java.util.Collections;
4949
import java.util.Iterator;
50+
import java.util.Objects;
5051
import java.util.Set;
5152
import java.util.concurrent.ConcurrentMap;
5253
import java.util.function.Supplier;
5354

5455
/**
5556
* The indices request cache allows to cache a shard level request stage responses, helping with improving
5657
* similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent
57-
* with the semantics of NRT (the index reader version is part of the cache key), and relies on size based
58+
* with the semantics of NRT (the index reader cache key is part of the cache key), and relies on size based
5859
* eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that
5960
* are no longer used or closed shards.
6061
* <p>
@@ -105,7 +106,7 @@ public void close() {
105106
}
106107

107108
void clear(CacheEntity entity) {
108-
keysToClean.add(new CleanupKey(entity, -1));
109+
keysToClean.add(new CleanupKey(entity, null));
109110
cleanCache();
110111
}
111112

@@ -119,7 +120,8 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
119120
// removed when this issue is solved
120121
BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
121122
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
122-
final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey);
123+
assert reader.getReaderCacheHelper() != null;
124+
final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey);
123125
Loader cacheLoader = new Loader(cacheEntity, loader);
124126
BytesReference value = cache.computeIfAbsent(key, cacheLoader);
125127
if (cacheLoader.isLoaded()) {
@@ -128,7 +130,7 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> lo
128130
logger.trace("Cache miss for reader version [{}] and request:\n {}", reader.getVersion(), cacheKeyRenderer.get());
129131
}
130132
// see if its the first time we see this reader, and make sure to register a cleanup key
131-
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion());
133+
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey());
132134
if (!registeredClosedListeners.containsKey(cleanupKey)) {
133135
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
134136
if (previous == null) {
@@ -151,7 +153,8 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> lo
151153
* @param cacheKey the cache key to invalidate
152154
*/
153155
void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) {
154-
cache.invalidate(new Key(cacheEntity, reader.getVersion(), cacheKey));
156+
assert reader.getReaderCacheHelper() != null;
157+
cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey));
155158
}
156159

157160
private static class Loader implements CacheLoader<Key, BytesReference> {
@@ -220,12 +223,12 @@ static class Key implements Accountable {
220223
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);
221224

222225
public final CacheEntity entity; // use as identity equality
223-
public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
226+
public final IndexReader.CacheKey readerCacheKey;
224227
public final BytesReference value;
225228

226-
Key(CacheEntity entity, long readerVersion, BytesReference value) {
229+
Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) {
227230
this.entity = entity;
228-
this.readerVersion = readerVersion;
231+
this.readerCacheKey = Objects.requireNonNull(readerCacheKey);
229232
this.value = value;
230233
}
231234

@@ -245,7 +248,7 @@ public boolean equals(Object o) {
245248
if (this == o) return true;
246249
if (o == null || getClass() != o.getClass()) return false;
247250
Key key = (Key) o;
248-
if (readerVersion != key.readerVersion) return false;
251+
if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false;
249252
if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false;
250253
if (!value.equals(key.value)) return false;
251254
return true;
@@ -254,19 +257,19 @@ public boolean equals(Object o) {
254257
@Override
255258
public int hashCode() {
256259
int result = entity.getCacheIdentity().hashCode();
257-
result = 31 * result + Long.hashCode(readerVersion);
260+
result = 31 * result + readerCacheKey.hashCode();
258261
result = 31 * result + value.hashCode();
259262
return result;
260263
}
261264
}
262265

263266
private class CleanupKey implements IndexReader.ClosedListener {
264267
final CacheEntity entity;
265-
final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
268+
final IndexReader.CacheKey readerCacheKey;
266269

267-
private CleanupKey(CacheEntity entity, long readerVersion) {
270+
private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) {
268271
this.entity = entity;
269-
this.readerVersion = readerVersion;
272+
this.readerCacheKey = readerCacheKey;
270273
}
271274

272275
@Override
@@ -284,15 +287,15 @@ public boolean equals(Object o) {
284287
return false;
285288
}
286289
CleanupKey that = (CleanupKey) o;
287-
if (readerVersion != that.readerVersion) return false;
290+
if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false;
288291
if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false;
289292
return true;
290293
}
291294

292295
@Override
293296
public int hashCode() {
294297
int result = entity.getCacheIdentity().hashCode();
295-
result = 31 * result + Long.hashCode(readerVersion);
298+
result = 31 * result + Objects.hashCode(readerCacheKey);
296299
return result;
297300
}
298301
}
@@ -307,7 +310,7 @@ synchronized void cleanCache() {
307310
for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext(); ) {
308311
CleanupKey cleanupKey = iterator.next();
309312
iterator.remove();
310-
if (cleanupKey.readerVersion == -1 || cleanupKey.entity.isOpen() == false) {
313+
if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) {
311314
// -1 indicates full cleanup, as does a closed shard
312315
currentFullClean.add(cleanupKey.entity.getCacheIdentity());
313316
} else {
@@ -320,7 +323,7 @@ synchronized void cleanCache() {
320323
if (currentFullClean.contains(key.entity.getCacheIdentity())) {
321324
iterator.remove();
322325
} else {
323-
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerVersion))) {
326+
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) {
324327
iterator.remove();
325328
}
326329
}

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,10 +1166,9 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
11661166
} else if (request.requestCache() == false) {
11671167
return false;
11681168
}
1169-
// if the reader is not a directory reader, we can't get the version from it
1170-
if ((context.searcher().getIndexReader() instanceof DirectoryReader) == false) {
1171-
return false;
1172-
}
1169+
// We use the cacheKey of the index reader as a part of a key of the IndicesRequestCache.
1170+
assert context.searcher().getIndexReader().getReaderCacheHelper() != null;
1171+
11731172
// if now in millis is used (or in the future, a more generic "isDeterministic" flag
11741173
// then we can't cache based on "now" key within the search request, as it is not deterministic
11751174
if (context.getQueryShardContext().isCachable() == false) {

server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.admin.indices.stats.IndexStats;
2828
import org.elasticsearch.action.index.IndexRequest;
2929
import org.elasticsearch.action.index.IndexResponse;
30+
import org.elasticsearch.action.search.SearchRequest;
3031
import org.elasticsearch.action.search.SearchResponse;
3132
import org.elasticsearch.action.support.IndicesOptions;
3233
import org.elasticsearch.cluster.ClusterInfoService;
@@ -69,6 +70,7 @@
6970
import org.elasticsearch.indices.recovery.RecoveryState;
7071
import org.elasticsearch.plugins.Plugin;
7172
import org.elasticsearch.search.aggregations.AggregationBuilders;
73+
import org.elasticsearch.search.builder.SearchSourceBuilder;
7274
import org.elasticsearch.test.DummyShardLock;
7375
import org.elasticsearch.test.ESSingleNodeTestCase;
7476
import org.elasticsearch.test.IndexSettingsModule;
@@ -809,4 +811,37 @@ public void testGlobalCheckpointListenerTimeout() throws InterruptedException {
809811
assertTrue(notified.get());
810812
}
811813

814+
public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Exception {
815+
createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
816+
.put("index.refresh_interval", -1).build());
817+
ensureGreen();
818+
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
819+
final IndexShard shard = indicesService.getShardOrNull(new ShardId(resolveIndex("test"), 0));
820+
final SearchRequest countRequest = new SearchRequest("test").source(new SearchSourceBuilder().size(0));
821+
final long numDocs = between(10, 20);
822+
for (int i = 0; i < numDocs; i++) {
823+
client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
824+
if (randomBoolean()) {
825+
shard.refresh("test");
826+
}
827+
}
828+
shard.refresh("test");
829+
assertThat(client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs));
830+
assertThat(shard.getLocalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo()));
831+
shard.resetEngineToGlobalCheckpoint();
832+
final long moreDocs = between(10, 20);
833+
for (int i = 0; i < moreDocs; i++) {
834+
client().prepareIndex("test", "_doc", Long.toString(i + numDocs)).setSource("{}", XContentType.JSON).get();
835+
if (randomBoolean()) {
836+
shard.refresh("test");
837+
}
838+
}
839+
shard.refresh("test");
840+
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
841+
assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs, (long) searcher.reader().numDocs(), equalTo(numDocs + moreDocs));
842+
}
843+
assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs,
844+
client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs + moreDocs));
845+
}
846+
812847
}

server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.apache.lucene.document.Field;
2424
import org.apache.lucene.document.StringField;
2525
import org.apache.lucene.index.DirectoryReader;
26+
import org.apache.lucene.index.IndexReader;
2627
import org.apache.lucene.index.IndexWriter;
28+
import org.apache.lucene.index.IndexWriterConfig;
2729
import org.apache.lucene.index.Term;
2830
import org.apache.lucene.search.IndexSearcher;
2931
import org.apache.lucene.search.TermQuery;
@@ -119,7 +121,11 @@ public void testCacheDifferentReaders() throws Exception {
119121
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
120122
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
121123
BytesReference termBytes = XContentHelper.toXContent(termQuery, XContentType.JSON, false);
122-
124+
if (randomBoolean()) {
125+
writer.flush();
126+
IOUtils.close(writer);
127+
writer = new IndexWriter(dir, newIndexWriterConfig());
128+
}
123129
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
124130
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
125131

@@ -424,14 +430,23 @@ public void testInvalidate() throws Exception {
424430
assertEquals(0, cache.numRegisteredCloseListeners());
425431
}
426432

427-
public void testEqualsKey() {
433+
public void testEqualsKey() throws IOException {
428434
AtomicBoolean trueBoolean = new AtomicBoolean(true);
429435
AtomicBoolean falseBoolean = new AtomicBoolean(false);
430-
IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1));
431-
IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1));
432-
IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), 1L, new TestBytesReference(1));
433-
IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 2L, new TestBytesReference(1));
434-
IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(2));
436+
Directory dir = newDirectory();
437+
IndexWriterConfig config = newIndexWriterConfig();
438+
IndexWriter writer = new IndexWriter(dir, config);
439+
IndexReader reader1 = DirectoryReader.open(writer);
440+
IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey();
441+
writer.addDocument(new Document());
442+
IndexReader reader2 = DirectoryReader.open(writer);
443+
IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey();
444+
IOUtils.close(reader1, reader2, writer, dir);
445+
IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1));
446+
IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1));
447+
IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1));
448+
IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1));
449+
IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2));
435450
String s = "Some other random object";
436451
assertEquals(key1, key1);
437452
assertEquals(key1, key2);

0 commit comments

Comments
 (0)