Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,24 +140,29 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
} else {
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
shardSearchFailures.length, getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
if (allowPartialResults == false && shardFailures.get() != null) {
// check if there are actual failures in the atomic array since
// successful retries can reset the failures to null
ShardOperationFailedException[] shardSearchFailures = buildShardFailures();
if (shardSearchFailures.length > 0) {
if (logger.isDebugEnabled()) {
int numShardFailures = shardSearchFailures.length;
shardSearchFailures = ExceptionsHelper.groupBy(shardSearchFailures);
Throwable cause = ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
numShardFailures, getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
return;
}
executePhase(nextPhase);
}
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
}
executePhase(nextPhase);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ public final void run() {
for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if(missingShards.length() >0 ){
if(missingShards.length() > 0){
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
}
if (missingShards.length() >0) {
if (missingShards.length() > 0) {
//Status red - shard is missing all copies and would produce partial results for an index search
final String msg = "Search rejected due to missing shards ["+ missingShards +
"]. Consider using `allow_partial_search_results` setting to bypass this error.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

public class SearchAsyncActionTests extends ESTestCase {

Expand Down Expand Up @@ -371,6 +372,102 @@ protected void executeNext(Runnable runnable, Thread originalThread) {
executor.shutdown();
}

public void testAllowPartialResults() throws InterruptedException {
SearchRequest request = new SearchRequest();
request.allowPartialSearchResults(false);
int numConcurrent = randomIntBetween(1, 5);
request.setMaxConcurrentShardRequests(numConcurrent);
int numShards = randomIntBetween(5, 10);
AtomicBoolean searchPhaseDidRun = new AtomicBoolean(false);
ActionListener<SearchResponse> responseListener = ActionListener.wrap(response -> {},
(e) -> { throw new AssertionError("unexpected", e);} );
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
// for the sake of this test we place the replica on the same node. ie. this is not a mistake since we limit per node now
DiscoveryNode replicaNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);

AtomicInteger contextIdGenerator = new AtomicInteger(0);
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS),
numShards, true, primaryNode, replicaNode);
int numShardAttempts = 0;
for (SearchShardIterator it : shardsIter) {
numShardAttempts += it.remaining();
}
CountDownLatch latch = new CountDownLatch(numShardAttempts);

SearchTransportService transportService = new SearchTransportService(null, null);
Map<String, Transport.Connection> lookup = new HashMap<>();
Map<ShardId, Boolean> seenShard = new ConcurrentHashMap<>();
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
AtomicInteger numRequests = new AtomicInteger(0);
AtomicInteger numFailReplicas = new AtomicInteger(0);
AbstractSearchAsyncAction<TestSearchPhaseResult> asyncAction =
new AbstractSearchAsyncAction<>(
"test",
logger,
transportService,
(cluster, node) -> {
assert cluster == null : "cluster was not null: " + cluster;
return lookup.get(node); },
aliasFilters,
Collections.emptyMap(),
Collections.emptyMap(),
null,
request,
responseListener,
shardsIter,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
null,
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY) {

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<TestSearchPhaseResult> listener) {
seenShard.computeIfAbsent(shard.shardId(), (i) -> {
numRequests.incrementAndGet(); // only count this once per shard copy
return Boolean.TRUE;
});
new Thread(() -> {
Transport.Connection connection = getConnection(null, shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode());
if (shardIt.remaining() > 0) {
numFailReplicas.incrementAndGet();
listener.onFailure(new RuntimeException());
} else {
listener.onResponse(testSearchPhaseResult);
}
}).start();
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase("test") {
@Override
public void run() {
assertTrue(searchPhaseDidRun.compareAndSet(false, true));
}
};
}

@Override
protected void executeNext(Runnable runnable, Thread originalThread) {
super.executeNext(runnable, originalThread);
latch.countDown();
}
};
asyncAction.start();
latch.await();
assertTrue(searchPhaseDidRun.get());
assertEquals(numShards, numRequests.get());
assertThat(numFailReplicas.get(), greaterThanOrEqualTo(1));
}

static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, OriginalIndices originalIndices, int numShards,
boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) {
ArrayList<SearchShardIterator> list = new ArrayList<>();
Expand Down