Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

/**
* The indices request cache allows to cache a shard level request stage responses, helping with improving
* similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent
* with the semantics of NRT (the index reader version is part of the cache key), and relies on size based
* with the semantics of NRT (the index reader cache key is part of the cache key), and relies on size based
* eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that
* are no longer used or closed shards.
* <p>
Expand Down Expand Up @@ -105,7 +106,7 @@ public void close() {
}

void clear(CacheEntity entity) {
keysToClean.add(new CleanupKey(entity, -1));
keysToClean.add(new CleanupKey(entity, null));
cleanCache();
}

Expand All @@ -119,7 +120,8 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
// removed when this issue is solved
BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey);
assert reader.getReaderCacheHelper() != null;
final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey);
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(key, cacheLoader);
if (cacheLoader.isLoaded()) {
Expand All @@ -128,7 +130,7 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> lo
logger.trace("Cache miss for reader version [{}] and request:\n {}", reader.getVersion(), cacheKeyRenderer.get());
}
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion());
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey());
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
if (previous == null) {
Expand All @@ -151,7 +153,8 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> lo
* @param cacheKey the cache key to invalidate
*/
void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) {
cache.invalidate(new Key(cacheEntity, reader.getVersion(), cacheKey));
assert reader.getReaderCacheHelper() != null;
cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey));
}

private static class Loader implements CacheLoader<Key, BytesReference> {
Expand Down Expand Up @@ -220,12 +223,12 @@ static class Key implements Accountable {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);

public final CacheEntity entity; // use as identity equality
public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
public final IndexReader.CacheKey readerCacheKey;
public final BytesReference value;

Key(CacheEntity entity, long readerVersion, BytesReference value) {
Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) {
this.entity = entity;
this.readerVersion = readerVersion;
this.readerCacheKey = Objects.requireNonNull(readerCacheKey);
this.value = value;
}

Expand All @@ -245,7 +248,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Key key = (Key) o;
if (readerVersion != key.readerVersion) return false;
if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false;
if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false;
if (!value.equals(key.value)) return false;
return true;
Expand All @@ -254,19 +257,19 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
int result = entity.getCacheIdentity().hashCode();
result = 31 * result + Long.hashCode(readerVersion);
result = 31 * result + readerCacheKey.hashCode();
result = 31 * result + value.hashCode();
return result;
}
}

private class CleanupKey implements IndexReader.ClosedListener {
final CacheEntity entity;
final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
final IndexReader.CacheKey readerCacheKey;

private CleanupKey(CacheEntity entity, long readerVersion) {
private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) {
this.entity = entity;
this.readerVersion = readerVersion;
this.readerCacheKey = readerCacheKey;
}

@Override
Expand All @@ -284,15 +287,15 @@ public boolean equals(Object o) {
return false;
}
CleanupKey that = (CleanupKey) o;
if (readerVersion != that.readerVersion) return false;
if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false;
if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false;
return true;
}

@Override
public int hashCode() {
int result = entity.getCacheIdentity().hashCode();
result = 31 * result + Long.hashCode(readerVersion);
result = 31 * result + Objects.hashCode(readerCacheKey);
return result;
}
}
Expand All @@ -307,7 +310,7 @@ synchronized void cleanCache() {
for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext(); ) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerVersion == -1 || cleanupKey.entity.isOpen() == false) {
if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) {
// -1 indicates full cleanup, as does a closed shard
currentFullClean.add(cleanupKey.entity.getCacheIdentity());
} else {
Expand All @@ -320,7 +323,7 @@ synchronized void cleanCache() {
if (currentFullClean.contains(key.entity.getCacheIdentity())) {
iterator.remove();
} else {
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerVersion))) {
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) {
iterator.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,10 +1166,9 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
} else if (request.requestCache() == false) {
return false;
}
// if the reader is not a directory reader, we can't get the version from it
if ((context.searcher().getIndexReader() instanceof DirectoryReader) == false) {
return false;
}
// We use the cacheKey of the index reader as a part of a key of the IndicesRequestCache.
assert context.searcher().getIndexReader().getReaderCacheHelper() != null;

// if now in millis is used (or in the future, a more generic "isDeterministic" flag
// then we can't cache based on "now" key within the search request, as it is not deterministic
if (context.getQueryShardContext().isCachable() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterInfoService;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
Expand Down Expand Up @@ -809,4 +811,37 @@ public void testGlobalCheckpointListenerTimeout() throws InterruptedException {
assertTrue(notified.get());
}

public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Exception {
createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
.put("index.refresh_interval", -1).build());
ensureGreen();
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexShard shard = indicesService.getShardOrNull(new ShardId(resolveIndex("test"), 0));
final SearchRequest countRequest = new SearchRequest("test").source(new SearchSourceBuilder().size(0));
final long numDocs = between(10, 20);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
if (randomBoolean()) {
shard.refresh("test");
}
}
shard.refresh("test");
assertThat(client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs));
assertThat(shard.getLocalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo()));
shard.resetEngineToGlobalCheckpoint();
final long moreDocs = between(10, 20);
for (int i = 0; i < moreDocs; i++) {
client().prepareIndex("test", "_doc", Long.toString(i + numDocs)).setSource("{}", XContentType.JSON).get();
if (randomBoolean()) {
shard.refresh("test");
}
}
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs, (long) searcher.reader().numDocs(), equalTo(numDocs + moreDocs));
}
assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs,
client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs + moreDocs));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
Expand Down Expand Up @@ -119,7 +121,11 @@ public void testCacheDifferentReaders() throws Exception {
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
BytesReference termBytes = XContentHelper.toXContent(termQuery, XContentType.JSON, false);

if (randomBoolean()) {
writer.flush();
IOUtils.close(writer);
writer = new IndexWriter(dir, newIndexWriterConfig());
}
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));

Expand Down Expand Up @@ -424,14 +430,23 @@ public void testInvalidate() throws Exception {
assertEquals(0, cache.numRegisteredCloseListeners());
}

public void testEqualsKey() {
public void testEqualsKey() throws IOException {
AtomicBoolean trueBoolean = new AtomicBoolean(true);
AtomicBoolean falseBoolean = new AtomicBoolean(false);
IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1));
IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1));
IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), 1L, new TestBytesReference(1));
IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 2L, new TestBytesReference(1));
IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(2));
Directory dir = newDirectory();
IndexWriterConfig config = newIndexWriterConfig();
IndexWriter writer = new IndexWriter(dir, config);
IndexReader reader1 = DirectoryReader.open(writer);
IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey();
writer.addDocument(new Document());
IndexReader reader2 = DirectoryReader.open(writer);
IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey();
IOUtils.close(reader1, reader2, writer, dir);
IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1));
IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1));
IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1));
IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1));
IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2));
String s = "Some other random object";
assertEquals(key1, key1);
assertEquals(key1, key2);
Expand Down