Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
Expand Down Expand Up @@ -172,7 +173,8 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
DefaultSearchContext searchContext = new DefaultSearchContext(0,
new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
null, searcher, indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher,
SearchService.NO_TIMEOUT
);
SearchContext.setCurrent(searchContext);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
Expand Down Expand Up @@ -153,7 +154,9 @@ protected ShardExistsResponse shardOperation(ShardExistsRequest request) {
SearchContext context = new DefaultSearchContext(0,
new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
shardTarget, indexShard.acquireSearcher("exists"), indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher);
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher,
SearchService.NO_TIMEOUT
);
SearchContext.setCurrent(context);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
Expand Down Expand Up @@ -114,7 +115,8 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId
0, new ShardSearchLocalRequest(new String[]{request.type()}, request.nowInMillis, request.filteringAlias()),
null, result.searcher(), indexService, indexShard,
scriptService, pageCacheRecycler,
bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher
bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher,
SearchService.NO_TIMEOUT
);
SearchContext.setCurrent(context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.threadpool.ThreadPool;

/**
Expand Down Expand Up @@ -100,6 +101,7 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(SearchService.DEFAULT_SEARCH_TIMEOUT, Validator.TIMEOUT);
}

public void addDynamicSettings(String... settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,25 @@ public String validate(String setting, String value) {
}
};

public static final Validator TIMEOUT = new Validator() {
@Override
public String validate(String setting, String value) {
try {
if (value == null) {
throw new NullPointerException("value must not be null");
}
TimeValue timeValue = TimeValue.parseTimeValue(value, null, setting);
assert timeValue != null;
if (timeValue.millis() < 0 && timeValue.millis() != -1) {
return "cannot parse value [" + value + "] as a timeout";
}
} catch (ElasticsearchParseException ex) {
return ex.getMessage();
}
return null;
}
};

public static final Validator TIME_NON_NEGATIVE = new Validator() {
@Override
public String validate(String setting, String value) {
Expand Down
24 changes: 22 additions & 2 deletions core/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
import org.elasticsearch.indices.IndicesWarmer.WarmerContext;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script.ScriptParseException;
import org.elasticsearch.script.ScriptContext;
Expand All @@ -101,6 +102,7 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.common.Strings.hasLength;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;

/**
Expand All @@ -111,7 +113,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public static final String NORMS_LOADING_KEY = "index.norms.loading";
public static final String DEFAULT_KEEPALIVE_KEY = "search.default_keep_alive";
public static final String KEEPALIVE_INTERVAL_KEY = "search.keep_alive_interval";
public static final String DEFAULT_SEARCH_TIMEOUT = "search.default_search_timeout";

public static final TimeValue NO_TIMEOUT = timeValueMillis(-1);

private final ThreadPool threadPool;

Expand All @@ -137,6 +141,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {

private final long defaultKeepAlive;

private volatile TimeValue defaultSearchTimeout;

private final ScheduledFuture<?> keepAliveReaper;

private final AtomicLong idGenerator = new AtomicLong();
Expand All @@ -148,7 +154,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final ParseFieldMatcher parseFieldMatcher;

@Inject
public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService,IndicesWarmer indicesWarmer, ThreadPool threadPool,
public SearchService(Settings settings, NodeSettingsService nodeSettingsService, ClusterService clusterService, IndicesService indicesService,IndicesWarmer indicesWarmer, ThreadPool threadPool,
ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase,
IndicesRequestCache indicesQueryCache) {
super(settings);
Expand Down Expand Up @@ -202,6 +208,20 @@ public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings
this.indicesWarmer.addListener(new NormsWarmer());
this.indicesWarmer.addListener(new FieldDataWarmer());
this.indicesWarmer.addListener(new SearchWarmer());

defaultSearchTimeout = settings.getAsTime(DEFAULT_SEARCH_TIMEOUT, NO_TIMEOUT);
nodeSettingsService.addListener(new SearchSettingsListener());
}

class SearchSettingsListener implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
final TimeValue maybeNewDefaultSearchTimeout = settings.getAsTime(SearchService.DEFAULT_SEARCH_TIMEOUT, SearchService.this.defaultSearchTimeout);
if (!maybeNewDefaultSearchTimeout.equals(SearchService.this.defaultSearchTimeout)) {
logger.info("updating [{}] from [{}] to [{}]", SearchService.DEFAULT_SEARCH_TIMEOUT, SearchService.this.defaultSearchTimeout, maybeNewDefaultSearchTimeout);
SearchService.this.defaultSearchTimeout = maybeNewDefaultSearchTimeout;
}
}
}

protected void putContext(SearchContext context) {
Expand Down Expand Up @@ -619,7 +639,7 @@ final SearchContext createContext(ShardSearchRequest request, @Nullable Engine.S

Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;

SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher);
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher, defaultSearchTimeout);
SearchContext.setCurrent(context);
try {
context.scroll(request.scroll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.lucene.MinimumScoreCollector;
import org.elasticsearch.common.lucene.search.FilteredCollector;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.CachedDfSource;
import org.elasticsearch.search.internal.SearchContext.Lifetime;

Expand Down Expand Up @@ -139,7 +140,7 @@ public Weight createNormalizedWeight(Query query, boolean needsScores) throws IO
public void search(Query query, Collector collector) throws IOException {
// Wrap the caller's collector with various wrappers e.g. those used to siphon
// matches off for aggregation or to impose a time-limit on collection.
final boolean timeoutSet = searchContext.timeoutInMillis() != -1;
final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis();
final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;

if (timeoutSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.lucene.search.function.BoostScoreFunction;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.analysis.AnalysisService;
Expand Down Expand Up @@ -90,7 +91,7 @@ public class DefaultSearchContext extends SearchContext {
private ScanContext scanContext;
private float queryBoost = 1.0f;
// timeout in millis
private long timeoutInMillis = -1;
private long timeoutInMillis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be final?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nevermind :)

// terminate after count
private int terminateAfter = DEFAULT_TERMINATE_AFTER;
private List<String> groupStats;
Expand Down Expand Up @@ -127,7 +128,9 @@ public class DefaultSearchContext extends SearchContext {
public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, IndexService indexService, IndexShard indexShard,
ScriptService scriptService, PageCacheRecycler pageCacheRecycler,
BigArrays bigArrays, Counter timeEstimateCounter, ParseFieldMatcher parseFieldMatcher) {
BigArrays bigArrays, Counter timeEstimateCounter, ParseFieldMatcher parseFieldMatcher,
TimeValue timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you call it something like "defaultTimeout" instead? I was a bit confused this number might come from a user while it's only a default that we fetch from the settings.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the name timeout for this parameter is appropriate. It is used to set the timeout for the context unless it is later overwritten using the setter. While the current uses set this parameter from the global default search timeout setting, the definition does not preclude the possibility of it being set from other sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK fair enough.

) {
super(parseFieldMatcher);
this.id = id;
this.request = request;
Expand All @@ -145,6 +148,7 @@ public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarg
this.indexService = indexService;
this.searcher = new ContextIndexSearcher(this, engineSearcher);
this.timeEstimateCounter = timeEstimateCounter;
this.timeoutInMillis = timeout.millis();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsPhase;
Expand All @@ -52,10 +53,10 @@ public static void assertNoInFLightContext() {
}

@Inject
public MockSearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesWarmer indicesWarmer,
public MockSearchService(Settings settings, NodeSettingsService nodeSettingsService, ClusterService clusterService, IndicesService indicesService, IndicesWarmer indicesWarmer,
ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays,
DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, IndicesRequestCache indicesQueryCache) {
super(settings, clusterService, indicesService, indicesWarmer, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase,
super(settings, nodeSettingsService, clusterService, indicesService, indicesWarmer, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase,
queryPhase, fetchPhase, indicesQueryCache);
}

Expand Down