From b0b06e5204164cafc65488649eda9596804550dd Mon Sep 17 00:00:00 2001 From: olcbean Date: Mon, 27 Mar 2017 19:13:16 +0200 Subject: [PATCH 1/8] Introducing "took" time (in ms) for _msearch #23131 --- .../action/search/MultiSearchResponse.java | 23 +- .../search/TransportMultiSearchAction.java | 34 ++- .../action/search/ExpandSearchPhaseTests.java | 4 +- .../search/MultiSearchActionTookTests.java | 213 ++++++++++++++++++ .../action/search/MultiSearchIT.java | 60 +++++ .../search/MultiSearchRequestTests.java | 8 +- .../TransportMultiSearchActionTests.java | 7 +- 7 files changed, 334 insertions(+), 15 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java create mode 100644 core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index 4d42ad334a9f0..6ff32e2109f84 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -111,11 +112,14 @@ public Exception getFailure() { } private Item[] items; + + private long tookInMillis; MultiSearchResponse() { } - public MultiSearchResponse(Item[] items) { + public MultiSearchResponse(long tookInMillis, Item[] items) { + this.tookInMillis = tookInMillis; this.items = items; } @@ -130,6 +134,20 @@ public Iterator iterator() { public Item[] getResponses() { return this.items; } + + /** + * How long the msearch took. + */ + public TimeValue getTook() { + return new TimeValue(tookInMillis); + } + + /** + * How long the msearch took in milliseconds. + */ + public long getTookInMillis() { + return tookInMillis; + } @Override public void readFrom(StreamInput in) throws IOException { @@ -138,6 +156,7 @@ public void readFrom(StreamInput in) throws IOException { for (int i = 0; i < items.length; i++) { items[i] = Item.readItem(in); } + tookInMillis = in.readVLong(); } @Override @@ -147,11 +166,13 @@ public void writeTo(StreamOutput out) throws IOException { for (Item item : items) { item.writeTo(out); } + out.writeVLong(tookInMillis); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field("took", tookInMillis); builder.startArray(Fields.RESPONSES); for (Item item : items) { builder.startObject(); diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index b65cd4d55516a..56b225064aff2 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -37,13 +37,16 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongSupplier; public class TransportMultiSearchAction extends HandledTransportAction { private final int availableProcessors; private final ClusterService clusterService; private final TransportAction searchAction; + private final LongSupplier relativeTimeProvider; @Inject public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, @@ -53,19 +56,23 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran this.clusterService = clusterService; this.searchAction = searchAction; this.availableProcessors = EsExecutors.numberOfProcessors(settings); + relativeTimeProvider = System::nanoTime; } TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService, ClusterService clusterService, TransportAction searchAction, - IndexNameExpressionResolver resolver, int availableProcessors) { + IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) { super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new); this.clusterService = clusterService; this.searchAction = searchAction; this.availableProcessors = availableProcessors; + this.relativeTimeProvider = relativeTimeProvider; } @Override protected void doExecute(MultiSearchRequest request, ActionListener listener) { + final long startTimeInNanos = getRelativeTime(); + ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); @@ -85,7 +92,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener requests, final AtomicArray responses, final AtomicInteger responseCounter, + long startTimeInNanos, final ActionListener listener) { SearchRequestSlot request = requests.poll(); if (request == null) { @@ -155,20 +163,32 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It } else { if (thread == Thread.currentThread()) { // we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread - threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener)); + threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, startTimeInNanos, listener)); } else { // we are on a different thread (we went asynchronous), it's safe to recurse - executeSearch(requests, responses, responseCounter, listener); + executeSearch(requests, responses, responseCounter, startTimeInNanos, listener); } } } private void finish() { - listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]))); + listener.onResponse(new MultiSearchResponse(getExecTimeInMillis(startTimeInNanos), responses.toArray(new MultiSearchResponse.Item[responses.length()]))); } + + /** + * Builds how long it took to execute the msearch. + */ + private long getExecTimeInMillis(long startTimeNanos) { + return TimeUnit.NANOSECONDS.toMillis(getRelativeTime() - startTimeNanos); + } + }); } + private long getRelativeTime() { + return relativeTimeProvider.getAsLong(); + } + static final class SearchRequestSlot { final SearchRequest request; @@ -178,7 +198,5 @@ static final class SearchRequestSlot { this.request = request; this.responseSlot = responseSlot; } - } - } diff --git a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java index 81a6359997d7a..146bb19ecaaeb 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java @@ -101,7 +101,6 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL } listener.onResponse(new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]))); - } }; SearchHits hits = new SearchHits(new SearchHit[]{new SearchHit(1, "ID", new Text("type"), @@ -152,7 +151,8 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits, null, null, null, false, null, 1); SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null); - listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{ + listener.onResponse(new MultiSearchResponse( + randomIntBetween(1, 10000), new MultiSearchResponse.Item[]{ new MultiSearchResponse.Item(null, new RuntimeException("boom")), new MultiSearchResponse.Item(response, null) })); diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java new file mode 100644 index 0000000000000..079b451a349ac --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -0,0 +1,213 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * MultiSearch took time tests + */ +public class MultiSearchActionTookTests extends ESTestCase { + + private static ThreadPool threadPool; + private ClusterService clusterService; + + @BeforeClass + public static void beforeClass() { + threadPool = new TestThreadPool("MultiSearchActionTookTests"); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + clusterService = createClusterService(threadPool); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + private enum Clock { + CONTROLLED, + SYSTEM; + } + + //Unit conversion using a controller clock + public void testTookWithControlledClock() throws Exception { + runTestTook(Clock.CONTROLLED); + } + + //Using {@link System#nanoTime()} + public void testTookWithRealClock() throws Exception { + runTestTook(Clock.SYSTEM); + } + + private void runTestTook(Clock clock) throws Exception { + MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest()); + AtomicLong expected = new AtomicLong(); + + TransportMultiSearchAction action = createTransportMultiSearchAction(clock, expected); + + action.doExecute(multiSearchRequest, new ActionListener() { + @Override + public void onResponse(MultiSearchResponse multiSearchResponse) { + if (clock == Clock.CONTROLLED) { + assertThat( + TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS), + equalTo(multiSearchResponse.getTook().getMillis())); + } else { + assertThat( + multiSearchResponse.getTook().getMillis(), + greaterThanOrEqualTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(e); + } + }); + } + + private TransportMultiSearchAction createTransportMultiSearchAction(Clock clock, AtomicLong expected) { + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + TaskManager taskManager = mock(TaskManager.class); + TransportService transportService = new TransportService(Settings.EMPTY, null, null, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), + UUIDs.randomBase64UUID()), + null) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + ActionFilters actionFilters = new ActionFilters(new HashSet<>()); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); + IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); + + final int availableProcessors = Runtime.getRuntime().availableProcessors(); + AtomicInteger counter = new AtomicInteger(); + final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME); + Randomness.shuffle(threadPoolNames); + final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); + final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); + + TransportAction searchAction = new TransportAction( + Settings.EMPTY, "action", threadPool, actionFilters, resolver, taskManager) { + @Override + protected void doExecute(SearchRequest request, ActionListener listener) { + requests.add(request); + commonExecutor.execute(() -> { + counter.decrementAndGet(); + listener.onResponse(new SearchResponse()); + }); + } + }; + + if (clock == Clock.CONTROLLED) { + return new TransportMultiSearchAction(threadPool, actionFilters, transportService, + clusterService, searchAction, resolver, availableProcessors, expected::get) { + @Override + void executeSearch(final Queue requests, + final AtomicArray responses, + final AtomicInteger responseCounter, long startTimeInNanos, + final ActionListener listener) { + expected.set(1000000); + super.executeSearch(requests, responses, responseCounter, startTimeInNanos, listener); + } + }; + } else { + return new TransportMultiSearchAction(threadPool, actionFilters, transportService, + clusterService, searchAction, resolver, availableProcessors, System::nanoTime) { + + @Override + void executeSearch(final Queue requests, + final AtomicArray responses, + final AtomicInteger responseCounter, long startTimeInNanos, + final ActionListener listener) { + long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(0, 10)); + expected.set(elapsed); + super.executeSearch(requests, responses, responseCounter, startTimeInNanos, listener); + } + }; + } + } + + static class Resolver extends IndexNameExpressionResolver { + + Resolver(Settings settings) { + super(settings); + } + + @Override + public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { + return request.indices(); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java new file mode 100644 index 0000000000000..cf7fcc4c9091d --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.search.MultiSearchResponse.Item; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +/** + * MultiSearch IT + */ +public class MultiSearchIT extends ESSingleNodeTestCase { + //Verifies that the MultiSearch took time is ge to any of the individual search took times + public void testMultiSearchTookTimeAgainstSearchTookTime () throws Exception { + createIndex("test", Settings.EMPTY, "test", "title", "type=text"); + client().prepareIndex("test", "test", "1").setSource("title", "foo bar baz").get(); + client().prepareIndex("test", "test", "2").setSource("title", "foo foo foo").get(); + client().prepareIndex("test", "test", "3").setSource("title", "bar baz bax").get(); + client().admin().indices().prepareRefresh("test").get(); + + SearchRequest searchRequestAll = new SearchRequest().indices("test").source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); + SearchRequest searchRequestBar = new SearchRequest().indices("test").source(new SearchSourceBuilder().query(new MatchQueryBuilder("title", "bar"))); + + MultiSearchRequest multiSeachRequest = new MultiSearchRequest(); + multiSeachRequest.add(searchRequestAll); + multiSeachRequest.add(searchRequestBar); + + MultiSearchResponse multiSearchResponse = client().multiSearch(multiSeachRequest).get(); + + long maxSearchResponseTookTime = 0; + for (Item response : multiSearchResponse.getResponses()) { + maxSearchResponseTookTime = Math.max(maxSearchResponseTookTime, response.getResponse().getTookInMillis()); + } + + long multiSearchResponseTime = multiSearchResponse.getTookInMillis(); + + assertThat(multiSearchResponseTime, greaterThanOrEqualTo(maxSearchResponseTookTime)); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index 3a162f302bc3b..f5bde86eb2941 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -146,13 +146,17 @@ public void testSimpleAdd4() throws Exception { } public void testResponseErrorToXContent() throws IOException { + long tookInMillis = randomIntBetween(1, 1000); MultiSearchResponse response = new MultiSearchResponse( + tookInMillis, new MultiSearchResponse.Item[]{ new MultiSearchResponse.Item(null, new IllegalStateException("foobar")), new MultiSearchResponse.Item(null, new IllegalStateException("baaaaaazzzz")) }); - - assertEquals("{\"responses\":[" + + assertEquals("{\"took\":" + + tookInMillis + + ",\"responses\":[" + "{" + "\"error\":{\"root_cause\":[{\"type\":\"illegal_state_exception\",\"reason\":\"foobar\"}]," + "\"type\":\"illegal_state_exception\",\"reason\":\"foobar\"},\"status\":500" diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index e811da82c47a8..115867cfcbbe6 100644 --- a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -45,6 +45,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; @@ -102,8 +103,10 @@ protected void doExecute(SearchRequest request, ActionListener l }); } }; - TransportMultiSearchAction action = - new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10); + + TransportMultiSearchAction action = + new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10, + System::nanoTime); // Execute the multi search api and fail if we find an error after executing: try { From 8317497a22adefdc2145590c0c42a6d308720a7f Mon Sep 17 00:00:00 2001 From: olcbean Date: Tue, 28 Mar 2017 00:08:07 +0200 Subject: [PATCH 2/8] Enforce line width 100 --- .../search/MultiSearchActionTookTests.java | 38 ++++++++----- .../action/search/MultiSearchIT.java | 11 ++-- .../TransportMultiSearchActionTests.java | 57 +++++++++++-------- 3 files changed, 64 insertions(+), 42 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 079b451a349ac..318375a63bf1a 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -120,9 +120,8 @@ public void onResponse(MultiSearchResponse multiSearchResponse) { TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS), equalTo(multiSearchResponse.getTook().getMillis())); } else { - assertThat( - multiSearchResponse.getTook().getMillis(), - greaterThanOrEqualTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); + assertThat(multiSearchResponse.getTook().getMillis(), greaterThanOrEqualTo( + TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); } } @@ -133,8 +132,10 @@ public void onFailure(Exception e) { }); } - private TransportMultiSearchAction createTransportMultiSearchAction(Clock clock, AtomicLong expected) { - Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + private TransportMultiSearchAction createTransportMultiSearchAction(Clock clock, + AtomicLong expected) { + Settings settings = Settings.builder() + .put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); TaskManager taskManager = mock(TaskManager.class); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, @@ -148,20 +149,25 @@ public TaskManager getTaskManager() { }; ActionFilters actionFilters = new ActionFilters(new HashSet<>()); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); + when(clusterService.state()) + .thenReturn(ClusterState.builder(new ClusterName("test")).build()); IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); final int availableProcessors = Runtime.getRuntime().availableProcessors(); AtomicInteger counter = new AtomicInteger(); - final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME); + final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, + ThreadPool.Names.SAME); Randomness.shuffle(threadPoolNames); final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); - final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); + final Set requests = Collections + .newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); - TransportAction searchAction = new TransportAction( - Settings.EMPTY, "action", threadPool, actionFilters, resolver, taskManager) { + TransportAction searchAction = + new TransportAction(Settings.EMPTY, "action", + threadPool, actionFilters, resolver, taskManager) { @Override - protected void doExecute(SearchRequest request, ActionListener listener) { + protected void doExecute(SearchRequest request, + ActionListener listener) { requests.add(request); commonExecutor.execute(() -> { counter.decrementAndGet(); @@ -178,8 +184,9 @@ void executeSearch(final Queue requests, final AtomicArray responses, final AtomicInteger responseCounter, long startTimeInNanos, final ActionListener listener) { - expected.set(1000000); - super.executeSearch(requests, responses, responseCounter, startTimeInNanos, listener); + expected.set(1000000); + super.executeSearch(requests, responses, responseCounter, startTimeInNanos, + listener); } }; } else { @@ -193,14 +200,15 @@ void executeSearch(final Queue requests, final ActionListener listener) { long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(0, 10)); expected.set(elapsed); - super.executeSearch(requests, responses, responseCounter, startTimeInNanos, listener); + super.executeSearch(requests, responses, responseCounter, startTimeInNanos, + listener); } }; } } static class Resolver extends IndexNameExpressionResolver { - + Resolver(Settings settings) { super(settings); } diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java index cf7fcc4c9091d..55f3583676b59 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java @@ -39,9 +39,11 @@ public void testMultiSearchTookTimeAgainstSearchTookTime () throws Exception { client().prepareIndex("test", "test", "3").setSource("title", "bar baz bax").get(); client().admin().indices().prepareRefresh("test").get(); - SearchRequest searchRequestAll = new SearchRequest().indices("test").source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); - SearchRequest searchRequestBar = new SearchRequest().indices("test").source(new SearchSourceBuilder().query(new MatchQueryBuilder("title", "bar"))); - + SearchRequest searchRequestAll = new SearchRequest().indices("test") + .source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); + SearchRequest searchRequestBar = new SearchRequest().indices("test") + .source(new SearchSourceBuilder().query(new MatchQueryBuilder("title", "bar"))); + MultiSearchRequest multiSeachRequest = new MultiSearchRequest(); multiSeachRequest.add(searchRequestAll); multiSeachRequest.add(searchRequestBar); @@ -50,7 +52,8 @@ public void testMultiSearchTookTimeAgainstSearchTookTime () throws Exception { long maxSearchResponseTookTime = 0; for (Item response : multiSearchResponse.getResponses()) { - maxSearchResponseTookTime = Math.max(maxSearchResponseTookTime, response.getResponse().getTookInMillis()); + maxSearchResponseTookTime = Math.max(maxSearchResponseTookTime, + response.getResponse().getTookInMillis()); } long multiSearchResponseTime = multiSearchResponse.getTookInMillis(); diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index 115867cfcbbe6..fb08b21b44feb 100644 --- a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -45,7 +45,6 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; @@ -64,15 +63,18 @@ public void testBatchExecute() throws Exception { when(actionFilters.filters()).thenReturn(new ActionFilter[0]); ThreadPool threadPool = new ThreadPool(settings); TaskManager taskManager = mock(TaskManager.class); - TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) { + TransportService transportService = new TransportService(Settings.EMPTY, null, null, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), + UUIDs.randomBase64UUID()), null) { @Override public TaskManager getTaskManager() { return taskManager; } }; ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); + when(clusterService.state()) + .thenReturn(ClusterState.builder(new ClusterName("test")).build()); IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); // Keep track of the number of concurrent searches started by multi search api, @@ -81,20 +83,26 @@ public TaskManager getTaskManager() { AtomicInteger counter = new AtomicInteger(); AtomicReference errorHolder = new AtomicReference<>(); // randomize whether or not requests are executed asynchronously - final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME); + final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, + ThreadPool.Names.SAME); Randomness.shuffle(threadPoolNames); final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); final ExecutorService rarelyExecutor = threadPool.executor(threadPoolNames.get(1)); - final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); - TransportAction searchAction = new TransportAction - (Settings.EMPTY, "action", threadPool, actionFilters, resolver, taskManager) { + final Set requests = Collections + .newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); + TransportAction searchAction = + new TransportAction( + Settings.EMPTY, "action", threadPool, actionFilters, resolver, + taskManager) { @Override - protected void doExecute(SearchRequest request, ActionListener listener) { + protected void doExecute(SearchRequest request, + ActionListener listener) { requests.add(request); int currentConcurrentSearches = counter.incrementAndGet(); if (currentConcurrentSearches > maxAllowedConcurrentSearches) { - errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches + - "] is higher than is allowed [" + maxAllowedConcurrentSearches + "]")); + errorHolder.set(new AssertionError("Current concurrent search [" + + currentConcurrentSearches + "] is higher than is allowed [" + + maxAllowedConcurrentSearches + "]")); } final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor; executorService.execute(() -> { @@ -103,16 +111,17 @@ protected void doExecute(SearchRequest request, ActionListener l }); } }; - - TransportMultiSearchAction action = - new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10, + + TransportMultiSearchAction action = new TransportMultiSearchAction(threadPool, + actionFilters, transportService, clusterService, searchAction, resolver, 10, System::nanoTime); // Execute the multi search api and fail if we find an error after executing: try { /* - * Allow for a large number of search requests in a single batch as previous implementations could stack overflow if the number - * of requests in a single batch was large + * Allow for a large number of search requests in a single batch as + * previous implementations could stack overflow if the number of + * requests in a single batch was large */ int numSearchRequests = scaledRandomIntBetween(1, 8192); MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); @@ -134,13 +143,16 @@ public void testDefaultMaxConcurrentSearches() { int numDataNodes = randomIntBetween(1, 10); DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); for (int i = 0; i < numDataNodes; i++) { - builder.add(new DiscoveryNode("_id" + i, buildNewFakeTransportAddress(), Collections.emptyMap(), - Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT)); + builder.add(new DiscoveryNode("_id" + i, buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.singleton(DiscoveryNode.Role.DATA), + Version.CURRENT)); } - builder.add(new DiscoveryNode("master", buildNewFakeTransportAddress(), Collections.emptyMap(), - Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT)); - builder.add(new DiscoveryNode("ingest", buildNewFakeTransportAddress(), Collections.emptyMap(), - Collections.singleton(DiscoveryNode.Role.INGEST), Version.CURRENT)); + builder.add( + new DiscoveryNode("master", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT)); + builder.add( + new DiscoveryNode("ingest", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.INGEST), Version.CURRENT)); ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(builder).build(); int result = TransportMultiSearchAction.defaultMaxConcurrentSearches(10, state); @@ -150,5 +162,4 @@ public void testDefaultMaxConcurrentSearches() { result = TransportMultiSearchAction.defaultMaxConcurrentSearches(10, state); assertThat(result, equalTo(1)); } - } From b7c41ce5f9f80792c2297709aafa00b128c555e2 Mon Sep 17 00:00:00 2001 From: olcbean Date: Tue, 28 Mar 2017 11:34:30 +0200 Subject: [PATCH 3/8] Modifications to address the reviewer's remarks --- .../search/MultiSearchActionTookTests.java | 20 +++---- .../TransportMultiSearchActionTests.java | 56 ++++++++----------- 2 files changed, 30 insertions(+), 46 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 318375a63bf1a..bf705345590dc 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -91,31 +91,27 @@ public void tearDown() throws Exception { clusterService.close(); } - private enum Clock { - CONTROLLED, - SYSTEM; - } - //Unit conversion using a controller clock public void testTookWithControlledClock() throws Exception { - runTestTook(Clock.CONTROLLED); + runTestTook(true); } //Using {@link System#nanoTime()} public void testTookWithRealClock() throws Exception { - runTestTook(Clock.SYSTEM); + runTestTook(false); } - private void runTestTook(Clock clock) throws Exception { + private void runTestTook(boolean controlledClock) throws Exception { MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest()); AtomicLong expected = new AtomicLong(); - TransportMultiSearchAction action = createTransportMultiSearchAction(clock, expected); + TransportMultiSearchAction action = + createTransportMultiSearchAction(controlledClock, expected); action.doExecute(multiSearchRequest, new ActionListener() { @Override public void onResponse(MultiSearchResponse multiSearchResponse) { - if (clock == Clock.CONTROLLED) { + if (controlledClock) { assertThat( TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS), equalTo(multiSearchResponse.getTook().getMillis())); @@ -132,7 +128,7 @@ public void onFailure(Exception e) { }); } - private TransportMultiSearchAction createTransportMultiSearchAction(Clock clock, + private TransportMultiSearchAction createTransportMultiSearchAction(boolean controlledClock, AtomicLong expected) { Settings settings = Settings.builder() .put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); @@ -176,7 +172,7 @@ protected void doExecute(SearchRequest request, } }; - if (clock == Clock.CONTROLLED) { + if (controlledClock) { return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, availableProcessors, expected::get) { @Override diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index fb08b21b44feb..4410507eef92e 100644 --- a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -63,18 +63,15 @@ public void testBatchExecute() throws Exception { when(actionFilters.filters()).thenReturn(new ActionFilter[0]); ThreadPool threadPool = new ThreadPool(settings); TaskManager taskManager = mock(TaskManager.class); - TransportService transportService = new TransportService(Settings.EMPTY, null, null, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), - UUIDs.randomBase64UUID()), null) { + TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) { @Override public TaskManager getTaskManager() { return taskManager; } }; ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()) - .thenReturn(ClusterState.builder(new ClusterName("test")).build()); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); // Keep track of the number of concurrent searches started by multi search api, @@ -83,26 +80,20 @@ public TaskManager getTaskManager() { AtomicInteger counter = new AtomicInteger(); AtomicReference errorHolder = new AtomicReference<>(); // randomize whether or not requests are executed asynchronously - final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, - ThreadPool.Names.SAME); + final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME); Randomness.shuffle(threadPoolNames); final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); final ExecutorService rarelyExecutor = threadPool.executor(threadPoolNames.get(1)); - final Set requests = Collections - .newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); - TransportAction searchAction = - new TransportAction( - Settings.EMPTY, "action", threadPool, actionFilters, resolver, - taskManager) { + final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); + TransportAction searchAction = new TransportAction + (Settings.EMPTY, "action", threadPool, actionFilters, resolver, taskManager) { @Override - protected void doExecute(SearchRequest request, - ActionListener listener) { + protected void doExecute(SearchRequest request, ActionListener listener) { requests.add(request); int currentConcurrentSearches = counter.incrementAndGet(); if (currentConcurrentSearches > maxAllowedConcurrentSearches) { - errorHolder.set(new AssertionError("Current concurrent search [" - + currentConcurrentSearches + "] is higher than is allowed [" - + maxAllowedConcurrentSearches + "]")); + errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches + + "] is higher than is allowed [" + maxAllowedConcurrentSearches + "]")); } final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor; executorService.execute(() -> { @@ -111,17 +102,16 @@ protected void doExecute(SearchRequest request, }); } }; - - TransportMultiSearchAction action = new TransportMultiSearchAction(threadPool, - actionFilters, transportService, clusterService, searchAction, resolver, 10, + + TransportMultiSearchAction action = + new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10, System::nanoTime); // Execute the multi search api and fail if we find an error after executing: try { /* - * Allow for a large number of search requests in a single batch as - * previous implementations could stack overflow if the number of - * requests in a single batch was large + * Allow for a large number of search requests in a single batch as previous implementations could stack overflow if the number + * of requests in a single batch was large */ int numSearchRequests = scaledRandomIntBetween(1, 8192); MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); @@ -143,16 +133,13 @@ public void testDefaultMaxConcurrentSearches() { int numDataNodes = randomIntBetween(1, 10); DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); for (int i = 0; i < numDataNodes; i++) { - builder.add(new DiscoveryNode("_id" + i, buildNewFakeTransportAddress(), - Collections.emptyMap(), Collections.singleton(DiscoveryNode.Role.DATA), - Version.CURRENT)); + builder.add(new DiscoveryNode("_id" + i, buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT)); } - builder.add( - new DiscoveryNode("master", buildNewFakeTransportAddress(), Collections.emptyMap(), - Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT)); - builder.add( - new DiscoveryNode("ingest", buildNewFakeTransportAddress(), Collections.emptyMap(), - Collections.singleton(DiscoveryNode.Role.INGEST), Version.CURRENT)); + builder.add(new DiscoveryNode("master", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT)); + builder.add(new DiscoveryNode("ingest", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.INGEST), Version.CURRENT)); ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(builder).build(); int result = TransportMultiSearchAction.defaultMaxConcurrentSearches(10, state); @@ -162,4 +149,5 @@ public void testDefaultMaxConcurrentSearches() { result = TransportMultiSearchAction.defaultMaxConcurrentSearches(10, state); assertThat(result, equalTo(1)); } + } From 83f77ea97ebc31be389e679225cf8a47428575bf Mon Sep 17 00:00:00 2001 From: olcbean Date: Tue, 4 Apr 2017 19:46:16 +0200 Subject: [PATCH 4/8] Modifications to address the reviewer's remarks --- .../action/search/MultiSearchResponse.java | 25 ++++++++----------- .../search/TransportMultiSearchAction.java | 14 ++++++----- .../search/MultiSearchActionTookTests.java | 19 +++++++------- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index 6ff32e2109f84..14b103d0a24ca 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -21,13 +21,13 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -112,15 +112,15 @@ public Exception getFailure() { } private Item[] items; - + private long tookInMillis; MultiSearchResponse() { } - public MultiSearchResponse(long tookInMillis, Item[] items) { - this.tookInMillis = tookInMillis; + public MultiSearchResponse(Item[] items, long tookInMillis) { this.items = items; + this.tookInMillis = tookInMillis; } @Override @@ -134,14 +134,7 @@ public Iterator iterator() { public Item[] getResponses() { return this.items; } - - /** - * How long the msearch took. - */ - public TimeValue getTook() { - return new TimeValue(tookInMillis); - } - + /** * How long the msearch took in milliseconds. */ @@ -156,7 +149,9 @@ public void readFrom(StreamInput in) throws IOException { for (int i = 0; i < items.length; i++) { items[i] = Item.readItem(in); } - tookInMillis = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { + tookInMillis = in.readVLong(); + } } @Override @@ -166,7 +161,9 @@ public void writeTo(StreamOutput out) throws IOException { for (Item item : items) { item.writeTo(out); } - out.writeVLong(tookInMillis); + if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { + out.writeVLong(tookInMillis); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index 56b225064aff2..3e420f077e89b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -56,7 +56,7 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran this.clusterService = clusterService; this.searchAction = searchAction; this.availableProcessors = EsExecutors.numberOfProcessors(settings); - relativeTimeProvider = System::nanoTime; + this.relativeTimeProvider = System::nanoTime; } TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService, @@ -71,7 +71,7 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran @Override protected void doExecute(MultiSearchRequest request, ActionListener listener) { - final long startTimeInNanos = getRelativeTime(); + final long startTimeInNanos = relativeTime(); ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); @@ -172,20 +172,20 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It } private void finish() { - listener.onResponse(new MultiSearchResponse(getExecTimeInMillis(startTimeInNanos), responses.toArray(new MultiSearchResponse.Item[responses.length()]))); + listener.onResponse(new MultiSearchResponse(buildTookInMillis(startTimeInNanos), responses.toArray(new MultiSearchResponse.Item[responses.length()]))); } /** * Builds how long it took to execute the msearch. */ - private long getExecTimeInMillis(long startTimeNanos) { - return TimeUnit.NANOSECONDS.toMillis(getRelativeTime() - startTimeNanos); + private long buildTookInMillis(long startTimeNanos) { + return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos); } }); } - private long getRelativeTime() { + private long relativeTime() { return relativeTimeProvider.getAsLong(); } @@ -198,5 +198,7 @@ static final class SearchRequestSlot { this.request = request; this.responseSlot = responseSlot; } + } + } diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index bf705345590dc..0aeb8ee6ce4eb 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -65,38 +65,37 @@ */ public class MultiSearchActionTookTests extends ESTestCase { - private static ThreadPool threadPool; + private ThreadPool threadPool; private ClusterService clusterService; @BeforeClass public static void beforeClass() { - threadPool = new TestThreadPool("MultiSearchActionTookTests"); } @AfterClass public static void afterClass() { - ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); - threadPool = null; } @Before public void setUp() throws Exception { super.setUp(); + threadPool = new TestThreadPool("MultiSearchActionTookTests"); clusterService = createClusterService(threadPool); } @After public void tearDown() throws Exception { - super.tearDown(); clusterService.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + super.tearDown(); } - //Unit conversion using a controller clock + // test unit conversion using a controller clock public void testTookWithControlledClock() throws Exception { runTestTook(true); } - //Using {@link System#nanoTime()} + // test using System#nanoTime public void testTookWithRealClock() throws Exception { runTestTook(false); } @@ -114,16 +113,16 @@ public void onResponse(MultiSearchResponse multiSearchResponse) { if (controlledClock) { assertThat( TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS), - equalTo(multiSearchResponse.getTook().getMillis())); + equalTo(multiSearchResponse.getTookInMillis())); } else { - assertThat(multiSearchResponse.getTook().getMillis(), greaterThanOrEqualTo( + assertThat(multiSearchResponse.getTookInMillis(), greaterThanOrEqualTo( TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); } } @Override public void onFailure(Exception e) { - logger.warn(e); + throw new RuntimeException(e); } }); } From c6ad0cc144efbbc512c2cf9340f16fc5a85aff21 Mon Sep 17 00:00:00 2001 From: olcbean Date: Tue, 22 Aug 2017 18:14:17 +0200 Subject: [PATCH 5/8] rebase and set the bwc to 7.0 removing MultiSearchIT adding a note to the migration docs --- .../action/search/MultiSearchResponse.java | 13 ++-- .../search/TransportMultiSearchAction.java | 24 +++--- .../action/search/ExpandSearchPhaseTests.java | 12 +-- .../search/MultiSearchActionTookTests.java | 74 +++++++------------ .../action/search/MultiSearchIT.java | 63 ---------------- .../search/MultiSearchRequestTests.java | 11 ++- .../migration/migrate_7_0/java.asciidoc | 9 +++ 7 files changed, 67 insertions(+), 139 deletions(-) delete mode 100644 core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java create mode 100644 docs/reference/migration/migrate_7_0/java.asciidoc diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index 14b103d0a24ca..560379a6ce2f6 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -134,12 +135,12 @@ public Iterator iterator() { public Item[] getResponses() { return this.items; } - + /** - * How long the msearch took in milliseconds. + * How long the msearch took. */ - public long getTookInMillis() { - return tookInMillis; + public TimeValue getTook() { + return new TimeValue(tookInMillis); } @Override @@ -149,7 +150,7 @@ public void readFrom(StreamInput in) throws IOException { for (int i = 0; i < items.length; i++) { items[i] = Item.readItem(in); } - if (in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { tookInMillis = in.readVLong(); } } @@ -161,7 +162,7 @@ public void writeTo(StreamOutput out) throws IOException { for (Item item : items) { item.writeTo(out); } - if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { out.writeVLong(tookInMillis); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index 3e420f077e89b..f9d81ad2b682c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -34,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -47,6 +47,7 @@ public class TransportMultiSearchAction extends HandledTransportAction searchAction; private final LongSupplier relativeTimeProvider; + private SetOnce startTimeInNanos; @Inject public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, @@ -71,7 +72,7 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran @Override protected void doExecute(MultiSearchRequest request, ActionListener listener) { - final long startTimeInNanos = relativeTime(); + startTimeInNanos = new SetOnce<>(relativeTime()); ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); @@ -92,7 +93,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener requests, final AtomicArray responses, final AtomicInteger responseCounter, - long startTimeInNanos, final ActionListener listener) { SearchRequestSlot request = requests.poll(); if (request == null) { @@ -163,25 +163,25 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It } else { if (thread == Thread.currentThread()) { // we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread - threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, startTimeInNanos, listener)); + threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener)); } else { // we are on a different thread (we went asynchronous), it's safe to recurse - executeSearch(requests, responses, responseCounter, startTimeInNanos, listener); + executeSearch(requests, responses, responseCounter, listener); } } } private void finish() { - listener.onResponse(new MultiSearchResponse(buildTookInMillis(startTimeInNanos), responses.toArray(new MultiSearchResponse.Item[responses.length()]))); + listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]), + buildTookInMillis())); } - + /** * Builds how long it took to execute the msearch. */ - private long buildTookInMillis(long startTimeNanos) { - return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos); + private long buildTookInMillis() { + return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeInNanos.get()); } - }); } @@ -198,7 +198,5 @@ static final class SearchRequestSlot { this.request = request; this.responseSlot = responseSlot; } - } - } diff --git a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java index 146bb19ecaaeb..755d6965b7bae 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java @@ -100,7 +100,9 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL mSearchResponses.add(new MultiSearchResponse.Item(response, null)); } - listener.onResponse(new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]))); + listener.onResponse( + new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]), randomIntBetween(1, 10000))); + } }; SearchHits hits = new SearchHits(new SearchHit[]{new SearchHit(1, "ID", new Text("type"), @@ -152,10 +154,10 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL null, null, null, false, null, 1); SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null); listener.onResponse(new MultiSearchResponse( - randomIntBetween(1, 10000), new MultiSearchResponse.Item[]{ - new MultiSearchResponse.Item(null, new RuntimeException("boom")), - new MultiSearchResponse.Item(response, null) - })); + new MultiSearchResponse.Item[]{ + new MultiSearchResponse.Item(null, new RuntimeException("boom")), + new MultiSearchResponse.Item(response, null) + }, randomIntBetween(1, 10000))); } }; diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 0aeb8ee6ce4eb..85f5dec2f35a1 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -89,7 +89,7 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); super.tearDown(); } - + // test unit conversion using a controller clock public void testTookWithControlledClock() throws Exception { runTestTook(true); @@ -103,20 +103,18 @@ public void testTookWithRealClock() throws Exception { private void runTestTook(boolean controlledClock) throws Exception { MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest()); AtomicLong expected = new AtomicLong(); - - TransportMultiSearchAction action = - createTransportMultiSearchAction(controlledClock, expected); - + + TransportMultiSearchAction action = createTransportMultiSearchAction(controlledClock, expected); + action.doExecute(multiSearchRequest, new ActionListener() { @Override public void onResponse(MultiSearchResponse multiSearchResponse) { if (controlledClock) { - assertThat( - TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS), - equalTo(multiSearchResponse.getTookInMillis())); + assertThat(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS), + equalTo(multiSearchResponse.getTook().getMillis())); } else { - assertThat(multiSearchResponse.getTookInMillis(), greaterThanOrEqualTo( - TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); + assertThat(multiSearchResponse.getTook().getMillis(), + greaterThanOrEqualTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); } } @@ -127,16 +125,11 @@ public void onFailure(Exception e) { }); } - private TransportMultiSearchAction createTransportMultiSearchAction(boolean controlledClock, - AtomicLong expected) { - Settings settings = Settings.builder() - .put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + private TransportMultiSearchAction createTransportMultiSearchAction(boolean controlledClock, AtomicLong expected) { + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); TaskManager taskManager = mock(TaskManager.class); - TransportService transportService = new TransportService(Settings.EMPTY, null, null, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), - UUIDs.randomBase64UUID()), - null) { + TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) { @Override public TaskManager getTaskManager() { return taskManager; @@ -144,25 +137,20 @@ public TaskManager getTaskManager() { }; ActionFilters actionFilters = new ActionFilters(new HashSet<>()); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()) - .thenReturn(ClusterState.builder(new ClusterName("test")).build()); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); final int availableProcessors = Runtime.getRuntime().availableProcessors(); AtomicInteger counter = new AtomicInteger(); - final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, - ThreadPool.Names.SAME); + final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME); Randomness.shuffle(threadPoolNames); final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); - final Set requests = Collections - .newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); + final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); - TransportAction searchAction = - new TransportAction(Settings.EMPTY, "action", - threadPool, actionFilters, resolver, taskManager) { + TransportAction searchAction = new TransportAction(Settings.EMPTY, + "action", threadPool, actionFilters, resolver, taskManager) { @Override - protected void doExecute(SearchRequest request, - ActionListener listener) { + protected void doExecute(SearchRequest request, ActionListener listener) { requests.add(request); commonExecutor.execute(() -> { counter.decrementAndGet(); @@ -172,31 +160,25 @@ protected void doExecute(SearchRequest request, }; if (controlledClock) { - return new TransportMultiSearchAction(threadPool, actionFilters, transportService, - clusterService, searchAction, resolver, availableProcessors, expected::get) { + return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, + availableProcessors, expected::get) { @Override - void executeSearch(final Queue requests, - final AtomicArray responses, - final AtomicInteger responseCounter, long startTimeInNanos, - final ActionListener listener) { + void executeSearch(final Queue requests, final AtomicArray responses, + final AtomicInteger responseCounter, final ActionListener listener) { expected.set(1000000); - super.executeSearch(requests, responses, responseCounter, startTimeInNanos, - listener); + super.executeSearch(requests, responses, responseCounter, listener); } }; } else { - return new TransportMultiSearchAction(threadPool, actionFilters, transportService, - clusterService, searchAction, resolver, availableProcessors, System::nanoTime) { + return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, + availableProcessors, System::nanoTime) { @Override - void executeSearch(final Queue requests, - final AtomicArray responses, - final AtomicInteger responseCounter, long startTimeInNanos, - final ActionListener listener) { + void executeSearch(final Queue requests, final AtomicArray responses, + final AtomicInteger responseCounter, final ActionListener listener) { long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(0, 10)); expected.set(elapsed); - super.executeSearch(requests, responses, responseCounter, startTimeInNanos, - listener); + super.executeSearch(requests, responses, responseCounter, listener); } }; } diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java deleted file mode 100644 index 55f3583676b59..0000000000000 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchIT.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search; - -import org.elasticsearch.action.search.MultiSearchResponse.Item; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.query.MatchQueryBuilder; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.test.ESSingleNodeTestCase; - -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -/** - * MultiSearch IT - */ -public class MultiSearchIT extends ESSingleNodeTestCase { - //Verifies that the MultiSearch took time is ge to any of the individual search took times - public void testMultiSearchTookTimeAgainstSearchTookTime () throws Exception { - createIndex("test", Settings.EMPTY, "test", "title", "type=text"); - client().prepareIndex("test", "test", "1").setSource("title", "foo bar baz").get(); - client().prepareIndex("test", "test", "2").setSource("title", "foo foo foo").get(); - client().prepareIndex("test", "test", "3").setSource("title", "bar baz bax").get(); - client().admin().indices().prepareRefresh("test").get(); - - SearchRequest searchRequestAll = new SearchRequest().indices("test") - .source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); - SearchRequest searchRequestBar = new SearchRequest().indices("test") - .source(new SearchSourceBuilder().query(new MatchQueryBuilder("title", "bar"))); - - MultiSearchRequest multiSeachRequest = new MultiSearchRequest(); - multiSeachRequest.add(searchRequestAll); - multiSeachRequest.add(searchRequestBar); - - MultiSearchResponse multiSearchResponse = client().multiSearch(multiSeachRequest).get(); - - long maxSearchResponseTookTime = 0; - for (Item response : multiSearchResponse.getResponses()) { - maxSearchResponseTookTime = Math.max(maxSearchResponseTookTime, - response.getResponse().getTookInMillis()); - } - - long multiSearchResponseTime = multiSearchResponse.getTookInMillis(); - - assertThat(multiSearchResponseTime, greaterThanOrEqualTo(maxSearchResponseTookTime)); - } -} diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index f5bde86eb2941..e6de1d859d867 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -148,12 +148,11 @@ public void testSimpleAdd4() throws Exception { public void testResponseErrorToXContent() throws IOException { long tookInMillis = randomIntBetween(1, 1000); MultiSearchResponse response = new MultiSearchResponse( - tookInMillis, - new MultiSearchResponse.Item[]{ - new MultiSearchResponse.Item(null, new IllegalStateException("foobar")), - new MultiSearchResponse.Item(null, new IllegalStateException("baaaaaazzzz")) - }); - + new MultiSearchResponse.Item[] { + new MultiSearchResponse.Item(null, new IllegalStateException("foobar")), + new MultiSearchResponse.Item(null, new IllegalStateException("baaaaaazzzz")) + }, tookInMillis); + assertEquals("{\"took\":" + tookInMillis + ",\"responses\":[" diff --git a/docs/reference/migration/migrate_7_0/java.asciidoc b/docs/reference/migration/migrate_7_0/java.asciidoc new file mode 100644 index 0000000000000..2afcf98b81364 --- /dev/null +++ b/docs/reference/migration/migrate_7_0/java.asciidoc @@ -0,0 +1,9 @@ +[[breaking_70_java_changes]] +=== Java API changes + +=== `took` time added to `MultiSearchResponse` + +A new `took` field has been added to the `MultiSearchResponse`. The `took` +field contains the time to process a `MultiSearchRequest`. This value can be +retrived by `getTookTime()`. For more information refer to `BulkResponse` +and `SearchResponse` documentation. \ No newline at end of file From ccae5b4c9f3d37eae8ba17a947d0a67a86235836 Mon Sep 17 00:00:00 2001 From: olcbean Date: Mon, 16 Oct 2017 11:47:39 +0200 Subject: [PATCH 6/8] pushing the start time all the way down to finish --- .../action/search/TransportMultiSearchAction.java | 15 +++++++-------- .../action/search/MultiSearchActionTookTests.java | 9 +++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index f9d81ad2b682c..4f850dbf9e1c1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -47,7 +46,6 @@ public class TransportMultiSearchAction extends HandledTransportAction searchAction; private final LongSupplier relativeTimeProvider; - private SetOnce startTimeInNanos; @Inject public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, @@ -72,7 +70,7 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran @Override protected void doExecute(MultiSearchRequest request, ActionListener listener) { - startTimeInNanos = new SetOnce<>(relativeTime()); + long startTimeInNanos = relativeTime(); ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); @@ -93,7 +91,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener requests, final AtomicArray responses, final AtomicInteger responseCounter, - final ActionListener listener) { + final ActionListener listener, + long startTimeInNanos) { SearchRequestSlot request = requests.poll(); if (request == null) { /* @@ -163,10 +162,10 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It } else { if (thread == Thread.currentThread()) { // we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread - threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener)); + threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener, startTimeInNanos)); } else { // we are on a different thread (we went asynchronous), it's safe to recurse - executeSearch(requests, responses, responseCounter, listener); + executeSearch(requests, responses, responseCounter, listener, startTimeInNanos); } } } @@ -180,7 +179,7 @@ private void finish() { * Builds how long it took to execute the msearch. */ private long buildTookInMillis() { - return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeInNanos.get()); + return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeInNanos); } }); } diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 85f5dec2f35a1..73743230d1a14 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.CoreMatchers.equalTo; @@ -164,9 +165,9 @@ protected void doExecute(SearchRequest request, ActionListener l availableProcessors, expected::get) { @Override void executeSearch(final Queue requests, final AtomicArray responses, - final AtomicInteger responseCounter, final ActionListener listener) { + final AtomicInteger responseCounter, final ActionListener listener, long startTimeInNanos) { expected.set(1000000); - super.executeSearch(requests, responses, responseCounter, listener); + super.executeSearch(requests, responses, responseCounter, listener, startTimeInNanos); } }; } else { @@ -175,10 +176,10 @@ void executeSearch(final Queue requests, final AtomicArray requests, final AtomicArray responses, - final AtomicInteger responseCounter, final ActionListener listener) { + final AtomicInteger responseCounter, final ActionListener listener, long startTimeInNanos) { long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(0, 10)); expected.set(elapsed); - super.executeSearch(requests, responses, responseCounter, listener); + super.executeSearch(requests, responses, responseCounter, listener, startTimeInNanos); } }; } From 516937af47e2529bf6649200af72ac22c3f09719 Mon Sep 17 00:00:00 2001 From: olcbean Date: Mon, 16 Oct 2017 17:24:54 +0200 Subject: [PATCH 7/8] integrating reviewers comments --- .../search/TransportMultiSearchAction.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index 4f850dbf9e1c1..9dec3be5c1b11 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -70,7 +70,7 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran @Override protected void doExecute(MultiSearchRequest request, ActionListener listener) { - long startTimeInNanos = relativeTime(); + final long relativeStartTime = relativeTimeProvider.getAsLong(); ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); @@ -91,7 +91,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener responses, final AtomicInteger responseCounter, final ActionListener listener, - long startTimeInNanos) { + final long relativeStartTime) { SearchRequestSlot request = requests.poll(); if (request == null) { /* @@ -162,10 +162,11 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It } else { if (thread == Thread.currentThread()) { // we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread - threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener, startTimeInNanos)); + threadPool.generic() + .execute(() -> executeSearch(requests, responses, responseCounter, listener, relativeStartTime)); } else { // we are on a different thread (we went asynchronous), it's safe to recurse - executeSearch(requests, responses, responseCounter, listener, startTimeInNanos); + executeSearch(requests, responses, responseCounter, listener, relativeStartTime); } } } @@ -179,15 +180,11 @@ private void finish() { * Builds how long it took to execute the msearch. */ private long buildTookInMillis() { - return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeInNanos); + return TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - relativeStartTime); } }); } - private long relativeTime() { - return relativeTimeProvider.getAsLong(); - } - static final class SearchRequestSlot { final SearchRequest request; From 7ced928cadb83e46b73d1609f4f38045ba317449 Mon Sep 17 00:00:00 2001 From: olcbean Date: Mon, 30 Oct 2017 13:51:38 +0100 Subject: [PATCH 8/8] removing java.asciidoc --- docs/reference/migration/migrate_7_0/java.asciidoc | 9 --------- 1 file changed, 9 deletions(-) delete mode 100644 docs/reference/migration/migrate_7_0/java.asciidoc diff --git a/docs/reference/migration/migrate_7_0/java.asciidoc b/docs/reference/migration/migrate_7_0/java.asciidoc deleted file mode 100644 index 2afcf98b81364..0000000000000 --- a/docs/reference/migration/migrate_7_0/java.asciidoc +++ /dev/null @@ -1,9 +0,0 @@ -[[breaking_70_java_changes]] -=== Java API changes - -=== `took` time added to `MultiSearchResponse` - -A new `took` field has been added to the `MultiSearchResponse`. The `took` -field contains the time to process a `MultiSearchRequest`. This value can be -retrived by `getTookTime()`. For more information refer to `BulkResponse` -and `SearchResponse` documentation. \ No newline at end of file