Skip to content

Commit d43a1fa

Browse files
authored
Lock down Engine.Searcher (#34363)
`Engine.Searcher` is non-final today which makes it error prone in the case of wrapping the underlying reader or lucene `IndexSearcher` like we do in `IndexSearcherWrapper`. Yet, there is no subclass of it yet that would be dramatic to just drop on the floor. With the start of development of frozen indices this changed since in #34357 functionality was added to a subclass which would be dropped if a `IndexSearcherWrapper` is installed on an index. This change locks down the `Engine.Searcher` to prevent such a functionality trap.
1 parent a1ec913 commit d43a1fa

File tree

10 files changed

+126
-180
lines changed

10 files changed

+126
-180
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.lucene.store.IOContext;
4444
import org.apache.lucene.util.Accountable;
4545
import org.apache.lucene.util.Accountables;
46-
import org.apache.lucene.util.IOUtils;
4746
import org.apache.lucene.util.SetOnce;
4847
import org.elasticsearch.ExceptionsHelper;
4948
import org.elasticsearch.action.index.IndexRequest;
@@ -84,6 +83,7 @@
8483
import java.io.Closeable;
8584
import java.io.FileNotFoundException;
8685
import java.io.IOException;
86+
import java.io.UncheckedIOException;
8787
import java.nio.file.NoSuchFileException;
8888
import java.util.Arrays;
8989
import java.util.Base64;
@@ -665,14 +665,23 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
665665
Releasable releasable = store::decRef;
666666
try {
667667
ReferenceManager<IndexSearcher> referenceManager = getReferenceManager(scope);
668-
Searcher engineSearcher = new Searcher(source, referenceManager.acquire(),
669-
s -> {
670-
try {
671-
referenceManager.release(s);
672-
} finally {
673-
store.decRef();
674-
}
675-
}, logger);
668+
IndexSearcher acquire = referenceManager.acquire();
669+
AtomicBoolean released = new AtomicBoolean(false);
670+
Searcher engineSearcher = new Searcher(source, acquire,
671+
() -> {
672+
if (released.compareAndSet(false, true)) {
673+
try {
674+
referenceManager.release(acquire);
675+
} finally {
676+
store.decRef();
677+
}
678+
} else {
679+
/* In general, searchers should never be released twice or this would break reference counting. There is one rare case
680+
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short
681+
* amount of time, this is why we only log a warning instead of throwing an exception. */
682+
logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
683+
}
684+
});
676685
releasable = null; // success - hand over the reference to the engine searcher
677686
return engineSearcher;
678687
} catch (AlreadyClosedException ex) {
@@ -1175,69 +1184,51 @@ default void onFailedEngine(String reason, @Nullable Exception e) {
11751184
}
11761185
}
11771186

1178-
public static class Searcher implements Releasable {
1187+
public static final class Searcher implements Releasable {
11791188
private final String source;
11801189
private final IndexSearcher searcher;
1181-
private final AtomicBoolean released = new AtomicBoolean(false);
1182-
private final Logger logger;
1183-
private final IOUtils.IOConsumer<IndexSearcher> onClose;
1190+
private final Closeable onClose;
11841191

1185-
public Searcher(String source, IndexSearcher searcher, Logger logger) {
1186-
this(source, searcher, s -> s.getIndexReader().close(), logger);
1187-
}
1188-
1189-
public Searcher(String source, IndexSearcher searcher, IOUtils.IOConsumer<IndexSearcher> onClose, Logger logger) {
1192+
public Searcher(String source, IndexSearcher searcher, Closeable onClose) {
11901193
this.source = source;
11911194
this.searcher = searcher;
11921195
this.onClose = onClose;
1193-
this.logger = logger;
11941196
}
11951197

11961198
/**
11971199
* The source that caused this searcher to be acquired.
11981200
*/
1199-
public final String source() {
1201+
public String source() {
12001202
return source;
12011203
}
12021204

1203-
public final IndexReader reader() {
1205+
public IndexReader reader() {
12041206
return searcher.getIndexReader();
12051207
}
12061208

1207-
public final DirectoryReader getDirectoryReader() {
1209+
public DirectoryReader getDirectoryReader() {
12081210
if (reader() instanceof DirectoryReader) {
12091211
return (DirectoryReader) reader();
12101212
}
12111213
throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader");
12121214
}
12131215

1214-
public final IndexSearcher searcher() {
1216+
public IndexSearcher searcher() {
12151217
return searcher;
12161218
}
12171219

12181220
@Override
12191221
public void close() {
1220-
if (released.compareAndSet(false, true) == false) {
1221-
/* In general, searchers should never be released twice or this would break reference counting. There is one rare case
1222-
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount
1223-
* of time, this is why we only log a warning instead of throwing an exception.
1224-
*/
1225-
logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
1226-
return;
1227-
}
12281222
try {
1229-
onClose.accept(searcher());
1223+
onClose.close();
12301224
} catch (IOException e) {
1231-
throw new IllegalStateException("Cannot close", e);
1225+
throw new UncheckedIOException("failed to close", e);
12321226
} catch (AlreadyClosedException e) {
12331227
// This means there's a bug somewhere: don't suppress it
12341228
throw new AssertionError(e);
12351229
}
12361230
}
12371231

1238-
public final Logger getLogger() {
1239-
return logger;
1240-
}
12411232
}
12421233

12431234
public abstract static class Operation {

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
606606
// in the case of a already pruned translog generation we might get null here - yet very unlikely
607607
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
608608
.getIndexSettings().getIndexVersionCreated());
609-
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger),
609+
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader::close),
610610
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
611611
}
612612
} catch (IOException e) {
@@ -2086,7 +2086,7 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader)
20862086
if (warmer != null) {
20872087
try {
20882088
assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
2089-
warmer.warm(new Searcher("top_reader_warming", searcher, s -> {}, logger));
2089+
warmer.warm(new Searcher("top_reader_warming", searcher, () -> {}));
20902090
} catch (Exception e) {
20912091
if (isEngineClosed.get() == false) {
20922092
logger.warn("failed to prepare/warm", e);

server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ public final Engine.Searcher wrap(Engine.Searcher engineSearcher) throws IOExcep
9999
} else {
100100
// we close the reader to make sure wrappers can release resources if needed....
101101
// our NonClosingReaderWrapper makes sure that our reader is not closed
102-
return new Engine.Searcher(engineSearcher.source(), indexSearcher, s -> IOUtils.close(s.getIndexReader(), engineSearcher),
103-
engineSearcher.getLogger());
102+
return new Engine.Searcher(engineSearcher.source(), indexSearcher, () ->
103+
IOUtils.close(indexSearcher.getIndexReader(), // this will close the wrappers excluding the NonClosingReaderWrapper
104+
engineSearcher)); // this will run the closeable on the wrapped engine searcher
104105
}
105106
}
106107

server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.io.IOException;
4343
import java.util.Collections;
4444
import java.util.concurrent.ConcurrentHashMap;
45+
import java.util.concurrent.atomic.AtomicBoolean;
4546
import java.util.concurrent.atomic.AtomicInteger;
4647

4748
public class IndexSearcherWrapperTests extends ESTestCase {
@@ -73,20 +74,20 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
7374
final int sourceRefCount = open.getRefCount();
7475
final AtomicInteger count = new AtomicInteger();
7576
final AtomicInteger outerCount = new AtomicInteger();
76-
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) {
77-
final Engine.Searcher wrap = wrapper.wrap(engineSearcher);
78-
assertEquals(1, wrap.reader().getRefCount());
79-
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
80-
if (key == open.getReaderCacheHelper().getKey()) {
81-
count.incrementAndGet();
82-
}
83-
outerCount.incrementAndGet();
84-
});
85-
assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
86-
wrap.close();
87-
assertFalse("wrapped reader is closed", wrap.reader().tryIncRef());
88-
assertEquals(sourceRefCount, open.getRefCount());
89-
}
77+
final AtomicBoolean closeCalled = new AtomicBoolean(false);
78+
final Engine.Searcher wrap = wrapper.wrap(new Engine.Searcher("foo", searcher, () -> closeCalled.set(true)));
79+
assertEquals(1, wrap.reader().getRefCount());
80+
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
81+
if (key == open.getReaderCacheHelper().getKey()) {
82+
count.incrementAndGet();
83+
}
84+
outerCount.incrementAndGet();
85+
});
86+
assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
87+
wrap.close();
88+
assertFalse("wrapped reader is closed", wrap.reader().tryIncRef());
89+
assertEquals(sourceRefCount, open.getRefCount());
90+
assertTrue(closeCalled.get());
9091
assertEquals(1, closeCalls.get());
9192

9293
IOUtils.close(open, writer, dir);
@@ -121,15 +122,15 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
121122
}
122123
};
123124
final ConcurrentHashMap<Object, TopDocs> cache = new ConcurrentHashMap<>();
124-
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) {
125-
try (Engine.Searcher wrap = wrapper.wrap(engineSearcher)) {
126-
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
127-
cache.remove(key);
128-
});
129-
TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1);
130-
cache.put(wrap.reader().getReaderCacheHelper().getKey(), search);
131-
}
125+
AtomicBoolean closeCalled = new AtomicBoolean(false);
126+
try (Engine.Searcher wrap = wrapper.wrap(new Engine.Searcher("foo", searcher, () -> closeCalled.set(true)))) {
127+
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
128+
cache.remove(key);
129+
});
130+
TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1);
131+
cache.put(wrap.reader().getReaderCacheHelper().getKey(), search);
132132
}
133+
assertTrue(closeCalled.get());
133134
assertEquals(1, closeCalls.get());
134135

135136
assertEquals(1, cache.size());
@@ -151,11 +152,11 @@ public void testNoWrap() throws IOException {
151152
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
152153
searcher.setSimilarity(iwc.getSimilarity());
153154
IndexSearcherWrapper wrapper = new IndexSearcherWrapper();
154-
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, logger)) {
155+
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, open::close)) {
155156
final Engine.Searcher wrap = wrapper.wrap(engineSearcher);
156157
assertSame(wrap, engineSearcher);
157158
}
158-
IOUtils.close(open, writer, dir);
159+
IOUtils.close(writer, dir);
159160
}
160161

161162
private static class FieldMaskingReader extends FilterDirectoryReader {

server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void testPreProcess() throws Exception {
110110
try (Directory dir = newDirectory();
111111
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
112112
IndexReader reader = w.getReader();
113-
Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), logger)) {
113+
Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close)) {
114114

115115
DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService,
116116
indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);

server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ public void testUseIndexStats() throws IOException {
363363

364364
public void testApproximations() throws IOException {
365365
QueryProfiler profiler = new QueryProfiler();
366-
Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), logger);
366+
Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close);
367367
// disable query caching since we want to test approximations, which won't
368368
// be exposed on a cached entry
369369
ContextIndexSearcher searcher = new ContextIndexSearcher(engineSearcher, null, MAYBE_CACHE_POLICY);

test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,15 +240,15 @@ protected <A extends Aggregator> A createAggregator(Query query,
240240
}
241241

242242
protected SearchContext createSearchContext(IndexSearcher indexSearcher, IndexSettings indexSettings) {
243-
Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, logger);
243+
Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, () -> indexSearcher.getIndexReader().close());
244244
QueryCache queryCache = new DisabledQueryCache(indexSettings);
245245
QueryCachingPolicy queryCachingPolicy = new QueryCachingPolicy() {
246246
@Override
247247
public void onUse(Query query) {
248248
}
249249

250250
@Override
251-
public boolean shouldCache(Query query) throws IOException {
251+
public boolean shouldCache(Query query) {
252252
// never cache a query
253253
return false;
254254
}

test/framework/src/main/java/org/elasticsearch/test/engine/AssertingSearcher.java

Lines changed: 0 additions & 87 deletions
This file was deleted.

0 commit comments

Comments
 (0)