Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
Expand Down Expand Up @@ -75,6 +78,8 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
public static final Setting<TimeValue> INDICES_CACHE_QUERY_EXPIRE =
Setting.positiveTimeSetting("indices.requests.cache.expire", new TimeValue(0), Property.NodeScope);

private static final Logger LOGGER = LogManager.getLogger(IndicesRequestCache.class);

private final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
private final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
private final ByteSizeValue size;
Expand Down Expand Up @@ -109,13 +114,19 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
notification.getKey().entity.onRemoval(notification);
}

// NORELEASE The cacheKeyRenderer has been added in order to debug
// https://github.com/elastic/elasticsearch/issues/32827, it should be
// removed when this issue is solved
BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
DirectoryReader reader, BytesReference cacheKey) throws Exception {
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new parameter hurts a bit the API, can we avoid it somehow? Or at least leave a comment asking to remove it once #32827 is addressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it hurts the API, unfortunately I didn't see a better way of exposing the request information in the log message since the cache key is the wire-protocol bytes of the request and converting the cache key back into a request object would have exposed things on the request that we wouldn't want either. Happy to add a comment as you suggest, also happy to implement a better way of supplying this information if someone can think of one

final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey);
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(key, cacheLoader);
if (cacheLoader.isLoaded()) {
key.entity.onMiss();
if (logger.isTraceEnabled()) {
logger.trace("Cache miss for reader version [{}] and request:\n {}", reader.getVersion(), cacheKeyRenderer.get());
}
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion());
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Expand All @@ -126,6 +137,9 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> lo
}
} else {
key.entity.onHit();
if (logger.isTraceEnabled()) {
logger.trace("Cache hit for reader version [{}] and request:\n {}", reader.getVersion(), cacheKeyRenderer.get());
}
}
return value;
}
Expand Down
14 changes: 10 additions & 4 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,9 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q
final DirectoryReader directoryReader = context.searcher().getDirectoryReader();

boolean[] loadedFromCache = new boolean[] { true };
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> {
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), () -> {
return "Shard: " + request.shardId() + "\nSource:\n" + request.source();
}, out -> {
queryPhase.execute(context);
try {
context.queryResult().writeToNoId(out);
Expand All @@ -1217,6 +1219,10 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q
// running a search that times out concurrently will likely timeout again if it's run while we have this `stale` result in the
// cache. One other option is to not cache requests with a timeout at all...
indicesRequestCache.invalidate(new IndexShardCacheEntity(context.indexShard()), directoryReader, request.cacheKey());
if (logger.isTraceEnabled()) {
logger.trace("Query timed out, invalidating cache entry for request on shard [{}]:\n {}", request.shardId(),
request.source());
}
}
}

Expand All @@ -1232,8 +1238,8 @@ public ByteSizeValue getTotalIndexingBufferBytes() {
* @param loader loads the data into the cache if needed
* @return the contents of the cache or the result of calling the loader
*/
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey, Consumer<StreamOutput> loader)
throws Exception {
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey,
Supplier<String> cacheKeyRenderer, Consumer<StreamOutput> loader) throws Exception {
IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard);
Supplier<BytesReference> supplier = () -> {
/* BytesStreamOutput allows to pass the expected size but by default uses
Expand All @@ -1251,7 +1257,7 @@ private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader r
return out.bytes();
}
};
return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey);
return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey, cacheKeyRenderer);
}

static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.joda.time.DateTimeZone;

import java.time.ZoneOffset;
Expand All @@ -49,6 +50,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@TestLogging(value = "org.elasticsearch.indices.IndicesRequestCache:TRACE")
public class IndicesRequestCacheIT extends ESIntegTestCase {

// One of the primary purposes of the query cache is to cache aggs results
Expand Down Expand Up @@ -417,8 +419,8 @@ private static void assertCacheState(Client client, String index, long expectedH
.getRequestCache();
// Check the hit count and miss count together so if they are not
// correct we can see both values
assertEquals(Arrays.asList(expectedHits, expectedMisses),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount()));
assertEquals(Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testBasicOperationsCache() throws Exception {
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand All @@ -79,7 +79,7 @@ public void testBasicOperationsCache() throws Exception {
// cache hit
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testCacheDifferentReaders() throws Exception {
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand All @@ -140,7 +140,7 @@ public void testCacheDifferentReaders() throws Exception {
// cache the second
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(secondReader, 0);
value = cache.getOrCompute(entity, loader, secondReader, termBytes);
value = cache.getOrCompute(entity, loader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
Expand All @@ -152,7 +152,7 @@ public void testCacheDifferentReaders() throws Exception {

secondEntity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(secondReader, 0);
value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes);
value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
Expand All @@ -162,7 +162,7 @@ public void testCacheDifferentReaders() throws Exception {

entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(2, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
Expand Down Expand Up @@ -222,9 +222,9 @@ public void testEviction() throws Exception {
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
Loader secondLoader = new Loader(secondReader, 0);

BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes);
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value2.streamInput().readString());
size = requestCacheStats.stats().getMemorySize();
IOUtils.close(reader, secondReader, writer, dir, cache);
Expand Down Expand Up @@ -257,12 +257,12 @@ public void testEviction() throws Exception {
TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard);
Loader thirdLoader = new Loader(thirdReader, 0);

BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes);
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value2.streamInput().readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes);
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes, () -> termQuery.toString());
assertEquals("baz", value3.streamInput().readString());
assertEquals(2, cache.count());
assertEquals(1, requestCacheStats.stats().getEvictions());
Expand Down Expand Up @@ -298,12 +298,12 @@ public void testClearAllEntityIdentity() throws Exception {
TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity);
Loader thirdLoader = new Loader(thirdReader, 0);

BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes);
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value2.streamInput().readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes);
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes, () -> termQuery.toString());
assertEquals("baz", value3.streamInput().readString());
assertEquals(3, cache.count());
final long hitCount = requestCacheStats.stats().getHitCount();
Expand All @@ -312,7 +312,7 @@ public void testClearAllEntityIdentity() throws Exception {
cache.cleanCache();
assertEquals(1, cache.count());
// third has not been validated since it's a different identity
value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes);
value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes, () -> termQuery.toString());
assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount());
assertEquals("baz", value3.streamInput().readString());

Expand Down Expand Up @@ -371,7 +371,7 @@ public void testInvalidate() throws Exception {
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand All @@ -382,7 +382,7 @@ public void testInvalidate() throws Exception {
// cache hit
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand All @@ -396,7 +396,7 @@ public void testInvalidate() throws Exception {
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
cache.invalidate(entity, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
Expand Down