Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

Expand Down Expand Up @@ -112,11 +114,14 @@ public Exception getFailure() {

private Item[] items;

private long tookInMillis;

MultiSearchResponse() {
}

public MultiSearchResponse(Item[] items) {
public MultiSearchResponse(Item[] items, long tookInMillis) {
this.items = items;
this.tookInMillis = tookInMillis;
}

@Override
Expand All @@ -131,13 +136,23 @@ public Item[] getResponses() {
return this.items;
}

/**
* How long the msearch took.
*/
public TimeValue getTook() {
return new TimeValue(tookInMillis);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
items = new Item[in.readVInt()];
for (int i = 0; i < items.length; i++) {
items[i] = Item.readItem(in);
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
tookInMillis = in.readVLong();
}
}

@Override
Expand All @@ -147,11 +162,15 @@ public void writeTo(StreamOutput out) throws IOException {
for (Item item : items) {
item.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(tookInMillis);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("took", tookInMillis);
builder.startArray(Fields.RESPONSES);
for (Item item : items) {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {

private final int availableProcessors;
private final ClusterService clusterService;
private final TransportAction<SearchRequest, SearchResponse> searchAction;
private final LongSupplier relativeTimeProvider;

@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
Expand All @@ -53,19 +55,23 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
this.relativeTimeProvider = System::nanoTime;
}

TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
IndexNameExpressionResolver resolver, int availableProcessors) {
IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
this.relativeTimeProvider = relativeTimeProvider;
}

@Override
protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
final long relativeStartTime = relativeTimeProvider.getAsLong();

ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);

Expand All @@ -85,7 +91,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchR
final AtomicInteger responseCounter = new AtomicInteger(numRequests);
int numConcurrentSearches = Math.min(numRequests, maxConcurrentSearches);
for (int i = 0; i < numConcurrentSearches; i++) {
executeSearch(searchRequestSlots, responses, responseCounter, listener);
executeSearch(searchRequestSlots, responses, responseCounter, listener, relativeStartTime);
}
}

Expand All @@ -111,11 +117,12 @@ static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState st
* @param responseCounter incremented on each response
* @param listener the listener attached to the multi-search request
*/
private void executeSearch(
void executeSearch(
final Queue<SearchRequestSlot> requests,
final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter,
final ActionListener<MultiSearchResponse> listener) {
final ActionListener<MultiSearchResponse> listener,
final long relativeStartTime) {
SearchRequestSlot request = requests.poll();
if (request == null) {
/*
Expand Down Expand Up @@ -155,16 +162,25 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It
} else {
if (thread == Thread.currentThread()) {
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread
threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener));
threadPool.generic()
.execute(() -> executeSearch(requests, responses, responseCounter, listener, relativeStartTime));
} else {
// we are on a different thread (we went asynchronous), it's safe to recurse
executeSearch(requests, responses, responseCounter, listener);
executeSearch(requests, responses, responseCounter, listener, relativeStartTime);
}
}
}

private void finish() {
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]),
buildTookInMillis()));
}

/**
* Builds how long it took to execute the msearch.
*/
private long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - relativeStartTime);
}
});
}
Expand All @@ -178,7 +194,5 @@ static final class SearchRequestSlot {
this.request = request;
this.responseSlot = responseSlot;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
mSearchResponses.add(new MultiSearchResponse.Item(response, null));
}

listener.onResponse(new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0])));
listener.onResponse(
new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]), randomIntBetween(1, 10000)));
}
};

Expand Down Expand Up @@ -153,10 +154,11 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
new MultiSearchResponse.Item(response, null)
}));
listener.onResponse(new MultiSearchResponse(
new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
new MultiSearchResponse.Item(response, null)
}, randomIntBetween(1, 10000)));
}
};

Expand Down
Loading