Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ protected void doExecute(Task task, MultiSearchRequest request, ActionListener<M
Queue<SearchRequestSlot> searchRequestSlots = new ConcurrentLinkedQueue<>();
for (int i = 0; i < request.requests().size(); i++) {
SearchRequest searchRequest = request.requests().get(i);
searchRequest.setParentTask(client.getLocalNodeId(), task.getId());
searchRequestSlots.add(new SearchRequestSlot(searchRequest, i));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public void search(final SearchRequest request, final ActionListener<SearchRespo
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
});
}

@Override
public String getLocalNodeId() {
return "local_node_id";
}
};

if (controlledClock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand All @@ -53,6 +55,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
Expand All @@ -76,7 +79,62 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testBatchExecute() throws Exception {
public void testParentTaskId() throws Exception {
// Initialize dependencies of TransportMultiSearchAction
Settings settings = Settings.builder()
.put("node.name", TransportMultiSearchActionTests.class.getSimpleName())
.build();
ActionFilters actionFilters = mock(ActionFilters.class);
when(actionFilters.filters()).thenReturn(new ActionFilter[0]);
ThreadPool threadPool = new ThreadPool(settings);
try {
TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null,
Collections.emptySet()) {
@Override
public TaskManager getTaskManager() {
return taskManager;
}
};
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build());

String localNodeId = randomAlphaOfLengthBetween(3, 10);
int numSearchRequests = randomIntBetween(1, 100);
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
for (int i = 0; i < numSearchRequests; i++) {
multiSearchRequest.add(new SearchRequest());
}
AtomicInteger counter = new AtomicInteger(0);
Task task = multiSearchRequest.createTask(randomLong(), "type", "action", null, Collections.emptyMap());
NodeClient client = new NodeClient(settings, threadPool) {
@Override
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
assertEquals(task.getId(), request.getParentTask().getId());
assertEquals(localNodeId, request.getParentTask().getNodeId());
counter.incrementAndGet();
listener.onResponse(SearchResponse.empty(() -> 1L, SearchResponse.Clusters.EMPTY));
}

@Override
public String getLocalNodeId() {
return localNodeId;
}
};
TransportMultiSearchAction action =
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, 10, System::nanoTime, client);

PlainActionFuture<MultiSearchResponse> future = newFuture();
action.execute(task, multiSearchRequest, future);
future.get();
assertEquals(numSearchRequests, counter.get());
} finally {
assertTrue(ESTestCase.terminate(threadPool));
}
}

public void testBatchExecute() {
// Initialize dependencies of TransportMultiSearchAction
Settings settings = Settings.builder()
.put("node.name", TransportMultiSearchActionTests.class.getSimpleName())
Expand Down Expand Up @@ -123,6 +181,11 @@ public void search(final SearchRequest request, final ActionListener<SearchRespo
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
});
}

@Override
public String getLocalNodeId() {
return "local_node_id";
}
};

TransportMultiSearchAction action =
Expand Down