Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -574,6 +575,7 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear
}

final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException {
final Releasable decreaseScrollContexts;
if (request.scroll() != null) {
if (maxOpenScrollContext == Integer.MAX_VALUE && openScrollContexts.get() > 500) {
/**
Expand All @@ -593,14 +595,17 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
maxOpenScrollContext + "]. " + "This limit can be set by changing the ["
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
}
decreaseScrollContexts = openScrollContexts::decrementAndGet;
} else {
decreaseScrollContexts = () -> {};
}
SearchContext context = null;
try {
context = createContext(request);
context.addReleasable(openScrollContexts::decrementAndGet, Lifetime.CONTEXT);
context.addReleasable(decreaseScrollContexts, Lifetime.CONTEXT);
} finally {
if (context == null) {
openScrollContexts.decrementAndGet();
decreaseScrollContexts.close();
}
}
onNewContext(context);
Expand Down Expand Up @@ -1023,6 +1028,13 @@ public int getActiveContexts() {
return this.activeContexts.size();
}

/**
* Returns the number of scroll contexts opened on the node
*/
public int getOpenScrollContexts() {
return openScrollContexts.get();
}

public ResponseCollectorService getResponseCollectorService() {
return this.responseCollectorService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,5 +796,6 @@ public void testCreateSearchContext() throws IOException {
assertSame(searchShardTarget, searchContext.dfsResult().getSearchShardTarget());
assertSame(searchShardTarget, searchContext.queryResult().getSearchShardTarget());
assertSame(searchShardTarget, searchContext.fetchResult().getSearchShardTarget());
assertThat(service.getOpenScrollContexts(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -121,6 +122,9 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
logger.info("[{}#{}]: cleaning up after test", getTestClass().getSimpleName(), getTestName());
SearchService searchService = getInstanceFromNode(SearchService.class);
assertThat(searchService.getActiveContexts(), equalTo(0));
assertThat(searchService.getOpenScrollContexts(), equalTo(0));
super.tearDown();
assertAcked(client().admin().indices().prepareDelete("*").get());
MetaData metaData = client().admin().cluster().prepareState().get().getState().getMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2355,6 +2355,7 @@ public void ensureEstimatedStats() {
public void assertAfterTest() throws IOException {
super.assertAfterTest();
assertRequestsFinished();
assertSearchContextsReleased();
for (NodeAndClient nodeAndClient : nodes.values()) {
NodeEnvironment env = nodeAndClient.node().getNodeEnvironment();
Set<ShardId> shardIds = env.lockedShards();
Expand Down Expand Up @@ -2388,4 +2389,18 @@ private void assertRequestsFinished() {
}
}
}

private void assertSearchContextsReleased() {
for (NodeAndClient nodeAndClient : nodes.values()) {
SearchService searchService = getInstance(SearchService.class, nodeAndClient.name);
try {
assertBusy(() -> {
assertThat(searchService.getActiveContexts(), equalTo(0));
assertThat(searchService.getOpenScrollContexts(), equalTo(0));
});
} catch (Exception e) {
throw new AssertionError("Failed to verify search contexts", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,22 @@ public void testCloseFreezeAndOpen() throws ExecutionException, InterruptedExcep
// now scroll
SearchResponse searchResponse = client().prepareSearch().setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED)
.setScroll(TimeValue.timeValueMinutes(1)).setSize(1).get();
do {
assertHitCount(searchResponse, 3);
assertEquals(1, searchResponse.getHits().getHits().length);
SearchService searchService = getInstanceFromNode(SearchService.class);
assertThat(searchService.getActiveContexts(), Matchers.greaterThanOrEqualTo(1));
for (int i = 0; i < 2; i++) {
shard = indexService.getShard(i);
engine = IndexShardTestCase.getEngine(shard);
assertFalse(((FrozenEngine) engine).isReaderOpen());
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
} while (searchResponse.getHits().getHits().length > 0);
try {
do {
assertHitCount(searchResponse, 3);
assertEquals(1, searchResponse.getHits().getHits().length);
SearchService searchService = getInstanceFromNode(SearchService.class);
assertThat(searchService.getActiveContexts(), Matchers.greaterThanOrEqualTo(1));
for (int i = 0; i < 2; i++) {
shard = indexService.getShard(i);
engine = IndexShardTestCase.getEngine(shard);
assertFalse(((FrozenEngine) engine).isReaderOpen());
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
} while (searchResponse.getHits().getHits().length > 0);
} finally {
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
}
}

public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOException, ExecutionException {
Expand Down