From 39d604cff634a2f92206f0c4a9be264a4a73e7aa Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Sat, 14 Mar 2020 08:45:56 -0400 Subject: [PATCH 1/7] Try to save memory on aggregations This delays deserializing the aggregation response try until *right* before we merge the objects. --- .../action/search/SearchPhaseController.java | 33 +++-- .../common/io/stream/DelayableWriteable.java | 115 ++++++++++++++++ .../common/io/stream/FilterStreamInput.java | 5 + .../NamedWriteableAwareStreamInput.java | 5 + .../common/io/stream/StreamInput.java | 8 ++ .../search/query/QuerySearchResult.java | 31 +++-- .../io/stream/DelayableWriteableTests.java | 127 ++++++++++++++++++ .../search/query/QuerySearchResultTests.java | 4 +- 8 files changed, 302 insertions(+), 26 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java create mode 100644 server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index f2cf6b199dcbe..674119b2fbe25 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -67,8 +67,11 @@ import java.util.Map; import java.util.function.Function; import java.util.function.IntFunction; +import java.util.function.Supplier; import java.util.stream.Collectors; +import static java.util.stream.Collectors.toList; + public final class SearchPhaseController { private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0]; @@ -429,7 +432,7 @@ public ReducedQueryPhase reducedQueryPhase(Collection queryResults, - List bufferedAggs, List bufferedTopDocs, + List> bufferedAggs, List bufferedTopDocs, TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest, InternalAggregation.ReduceContextBuilder aggReduceContextBuilder, boolean performFinalReduce) { @@ -453,7 +456,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection aggregationsList; + final List> aggregationsList; if (bufferedAggs != null) { consumeAggs = false; // we already have results from intermediate reduces and just need to perform the final reduce @@ -492,7 +495,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection { private final SearchShardTarget[] processedShards; - private final InternalAggregations[] aggsBuffer; + private final Supplier[] aggsBuffer; private final TopDocs[] topDocsBuffer; private final boolean hasAggs; private final boolean hasTopDocs; @@ -642,7 +646,9 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search this.progressListener = progressListener; this.processedShards = new SearchShardTarget[expectedResultSize]; // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. - this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0]; + @SuppressWarnings("unchecked") + Supplier[] aggsBuffer = new Supplier[hasAggs ? bufferSize : 0]; + this.aggsBuffer = aggsBuffer; this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0]; this.hasTopDocs = hasTopDocs; this.hasAggs = hasAggs; @@ -665,10 +671,11 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (querySearchResult.isNull() == false) { if (index == bufferSize) { if (hasAggs) { - ReduceContext reduceContext = aggReduceContextBuilder.forPartialReduction(); - InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce( + Arrays.stream(aggsBuffer).map(Supplier::get).collect(toList()), + aggReduceContextBuilder.forPartialReduction()); Arrays.fill(aggsBuffer, null); - aggsBuffer[0] = reducedAggs; + aggsBuffer[0] = () -> reducedAggs; } if (hasTopDocs) { TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), @@ -681,12 +688,12 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { index = 1; if (hasAggs || hasTopDocs) { progressListener.notifyPartialReduce(SearchProgressListener.buildSearchShards(processedShards), - topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases); + topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0].get() : null, numReducePhases); } } final int i = index++; if (hasAggs) { - aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); + aggsBuffer[i] = querySearchResult.consumeAggs(); } if (hasTopDocs) { final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null @@ -698,7 +705,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget(); } - private synchronized List getRemainingAggs() { + private synchronized List> getRemainingAggs() { return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null; } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java new file mode 100644 index 0000000000000..9f87fde0490d8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * A holder for {@link Writeable}s that can delays reading the underlying + * {@linkplain Writeable} when it is read from a remote node. + */ +public abstract class DelayableWriteable implements Supplier, Writeable { + /** + * Build a {@linkplain DelayableWriteable} that wraps an existing object + * but is serialized so that deserializing it can be delayed. + */ + public static DelayableWriteable referencing(T reference) { + return new Referencing<>(reference); + } + /** + * Build a {@linkplain DelayableWriteable} that copies a buffer from + * the provided {@linkplain StreamInput} and deserializes the buffer + * when {@link Supplier#get()} is called. + */ + public static DelayableWriteable delayed(Writeable.Reader reader, StreamInput in) throws IOException { + return new Delayed<>(reader, in); + } + + private DelayableWriteable() {} + + public abstract boolean isDelayed(); + + private static class Referencing extends DelayableWriteable { + private T reference; + + Referencing(T reference) { + this.reference = reference; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + try (BytesStreamOutput buffer = new BytesStreamOutput()) { + reference.writeTo(buffer); + out.writeBytesReference(buffer.bytes()); + } + } + + @Override + public T get() { + return reference; + } + + @Override + public boolean isDelayed() { + return false; + } + } + + private static class Delayed extends DelayableWriteable { + private final Writeable.Reader reader; + private final Version remoteVersion; + private final BytesReference serialized; + private final NamedWriteableRegistry registry; + + Delayed(Writeable.Reader reader, StreamInput in) throws IOException { + this.reader = reader; + remoteVersion = in.getVersion(); + serialized = in.readBytesReference(); + registry = in.namedWriteableRegistry(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T get() { + try { + try (StreamInput in = registry == null ? + serialized.streamInput() : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)) { + in.setVersion(remoteVersion); + return reader.read(in); + } + } catch (IOException e) { + throw new RuntimeException("unexpected error expanding aggregations", e); + } + } + + @Override + public boolean isDelayed() { + return true; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java index 2fa700634b851..6d7d9fd6952fd 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java @@ -94,4 +94,9 @@ public void setVersion(Version version) { protected void ensureCanReadBytes(int length) throws EOFException { delegate.ensureCanReadBytes(length); } + + @Override + public NamedWriteableRegistry namedWriteableRegistry() { + return delegate.namedWriteableRegistry(); + } } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java index 5db80a711eeb9..149231b1fe433 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java @@ -52,4 +52,9 @@ public C readNamedWriteable(@SuppressWarnings("unused + "] than it was read from [" + name + "]."; return c; } + + @Override + public NamedWriteableRegistry namedWriteableRegistry() { + return namedWriteableRegistry; + } } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 300b42153dc47..4caf1702872af 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -1093,6 +1093,14 @@ public T readException() throws IOException { return null; } + /** + * Get the registry of named writeables is his stream has one, + * {@code null} otherwise. + */ + public NamedWriteableRegistry namedWriteableRegistry() { + return null; + } + /** * Reads a {@link NamedWriteable} from the current stream, by first reading its name and then looking for * the corresponding entry in the registry by name, so that the proper object can be read and returned. diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 18af04add2082..d065a74d4f667 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -22,13 +22,13 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; @@ -54,7 +55,7 @@ public final class QuerySearchResult extends SearchPhaseResult { private TotalHits totalHits; private float maxScore = Float.NaN; private DocValueFormat[] sortValueFormats; - private InternalAggregations aggregations; + private DelayableWriteable aggregations; private boolean hasAggs; private Suggest suggest; private boolean searchTimedOut; @@ -196,21 +197,21 @@ public boolean hasAggs() { * Returns and nulls out the aggregation for this search results. This allows to free up memory once the aggregation is consumed. * @throws IllegalStateException if the aggregations have already been consumed. */ - public Aggregations consumeAggs() { + public Supplier consumeAggs() { if (aggregations == null) { throw new IllegalStateException("aggs already consumed"); } - Aggregations aggs = aggregations; + Supplier aggs = aggregations; aggregations = null; return aggs; } public void aggregations(InternalAggregations aggregations) { - this.aggregations = aggregations; + this.aggregations = aggregations == null ? null : DelayableWriteable.referencing(aggregations); hasAggs = aggregations != null; } - public InternalAggregations aggregations() { + public DelayableWriteable aggregations() { return aggregations; } @@ -314,18 +315,22 @@ public void readFromWithId(SearchContextId id, StreamInput in) throws IOExceptio } setTopDocs(readTopDocs(in)); if (hasAggs = in.readBoolean()) { - aggregations = new InternalAggregations(in); + if (in.getVersion().before(Version.V_8_0_0)) { + aggregations = DelayableWriteable.referencing(new InternalAggregations(in)); + } else { + aggregations = DelayableWriteable.delayed(InternalAggregations::new, in); + } } if (in.getVersion().before(Version.V_7_2_0)) { List pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() .map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList()); if (hasAggs && pipelineAggregators.isEmpty() == false) { - List internalAggs = aggregations.asList().stream() + List internalAggs = aggregations.get().asList().stream() .map(agg -> (InternalAggregation) agg).collect(Collectors.toList()); //Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while //later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1. - this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators); + this.aggregations = DelayableWriteable.referencing(new InternalAggregations(internalAggs, pipelineAggregators)); } } if (in.readBoolean()) { @@ -366,7 +371,11 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(false); } else { out.writeBoolean(true); - aggregations.writeTo(out); + if (out.getVersion().before(Version.V_8_0_0)) { + aggregations.get().writeTo(out); + } else { + aggregations.writeTo(out); + } } if (out.getVersion().before(Version.V_7_2_0)) { //Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, @@ -375,7 +384,7 @@ public void writeToNoId(StreamOutput out) throws IOException { if (aggregations == null) { out.writeNamedWriteableList(Collections.emptyList()); } else { - out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators()); + out.writeNamedWriteableList(aggregations.get().getTopLevelPipelineAggregators()); } } if (suggest == null) { diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java new file mode 100644 index 0000000000000..5ca54a84b9c9a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java @@ -0,0 +1,127 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.Version; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static java.util.Collections.singletonList; +import static org.hamcrest.Matchers.equalTo; + +public class DelayableWriteableTests extends ESTestCase { + // NOTE: we don't use AbstractWireSerializingTestCase because we don't implement equals and hashCode. + public static class Example implements NamedWriteable { + private final String s; + + public Example(String s) { + this.s = s; + } + + public Example(StreamInput in) throws IOException { + s = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(s); + } + + @Override + public String getWriteableName() { + return "example"; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Example other = (Example) obj; + return s.equals(other.s); + } + + @Override + public int hashCode() { + return s.hashCode(); + } + } + + public static class NamedHolder implements Writeable { + private final Example e; + + public NamedHolder(Example e) { + this.e = e; + } + + public NamedHolder(StreamInput in) throws IOException { + e = in.readNamedWriteable(Example.class); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(e); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + NamedHolder other = (NamedHolder) obj; + return e.equals(other.e); + } + + @Override + public int hashCode() { + return e.hashCode(); + } + } + + public void testRoundTrip() throws IOException { + Example e = new Example(randomAlphaOfLength(5)); + roundTripTestCase(DelayableWriteable.referencing(e), Example::new); + } + + public void testRoundTripWithNamedWriteable() throws IOException { + NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); + roundTripTestCase(DelayableWriteable.referencing(n), NamedHolder::new); + } + + private void roundTripTestCase(DelayableWriteable original, Writeable.Reader reader) throws IOException { + assertFalse(original.isDelayed()); + DelayableWriteable roundTripped = roundTrip(original, reader, Version.CURRENT); + assertTrue(roundTripped.isDelayed()); + assertThat(roundTripped.get(), equalTo(original.get())); + } + + private DelayableWriteable roundTrip(DelayableWriteable original, + Writeable.Reader reader, Version version) throws IOException { + return copyInstance(original, writableRegistry(), (out, d) -> d.writeTo(out), + in -> DelayableWriteable.delayed(reader, in), version); + } + + @Override + protected NamedWriteableRegistry writableRegistry() { + return new NamedWriteableRegistry(singletonList( + new NamedWriteableRegistry.Entry(Example.class, "example", Example::new))); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index 04a0d71b650a9..3174c2c4fe8f0 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -85,8 +85,8 @@ public void testSerialization() throws Exception { assertEquals(querySearchResult.size(), deserialized.size()); assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); if (deserialized.hasAggs()) { - Aggregations aggs = querySearchResult.consumeAggs(); - Aggregations deserializedAggs = deserialized.consumeAggs(); + Aggregations aggs = querySearchResult.consumeAggs().get(); + Aggregations deserializedAggs = deserialized.consumeAggs().get(); assertEquals(aggs.asList(), deserializedAggs.asList()); List pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators(); List deserializedPipelineAggs = From 8cddcab12893bf65530a33f4e59c89cc89bdbfdb Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 19 Mar 2020 10:27:48 -0400 Subject: [PATCH 2/7] Fix request cache --- .../common/io/stream/DelayableWriteable.java | 18 +++++++- .../io/stream/DelayableWriteableTests.java | 46 +++++++++++++++++-- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java index 9f87fde0490d8..e75b81fff4ef4 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -91,7 +91,23 @@ private static class Delayed extends DelayableWriteable @Override public void writeTo(StreamOutput out) throws IOException { - throw new UnsupportedOperationException(); + if (out.getVersion() == remoteVersion) { + /* + * If the version *does* line up we can just copy the bytes + * which is good because this is how shard request caching + * works. + */ + out.writeBytesReference(serialized); + } else { + /* + * If the version doesn't line up then we have to deserialize + * into the Writeable and re-serialize it against the new + * output stream so it can apply any backwards compatibility + * differences in the wire protocol. This ain't efficient but + * it should be quite rare. + */ + referencing(get()).writeTo(out); + } } @Override diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java index 5ca54a84b9c9a..fbd8702608ead 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; @@ -96,18 +97,49 @@ public int hashCode() { } } - public void testRoundTrip() throws IOException { + public void testRoundTripFromReferencing() throws IOException { Example e = new Example(randomAlphaOfLength(5)); - roundTripTestCase(DelayableWriteable.referencing(e), Example::new); + DelayableWriteable original = DelayableWriteable.referencing(e); + assertFalse(original.isDelayed()); + roundTripTestCase(original, Example::new); } - public void testRoundTripWithNamedWriteable() throws IOException { + public void testRoundTripFromReferencingWithNamedWriteable() throws IOException { NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); - roundTripTestCase(DelayableWriteable.referencing(n), NamedHolder::new); + DelayableWriteable original = DelayableWriteable.referencing(n); + assertFalse(original.isDelayed()); + roundTripTestCase(original, NamedHolder::new); + } + + public void testRoundTripFromDelayed() throws IOException { + Example e = new Example(randomAlphaOfLength(5)); + DelayableWriteable original = roundTrip(DelayableWriteable.referencing(e), Example::new, Version.CURRENT); + assertTrue(original.isDelayed()); + roundTripTestCase(original, Example::new); + } + + public void testRoundTripFromDelayedWithNamedWriteable() throws IOException { + NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); + DelayableWriteable original = roundTrip(DelayableWriteable.referencing(n), NamedHolder::new, Version.CURRENT); + assertTrue(original.isDelayed()); + roundTripTestCase(original, NamedHolder::new); + } + + public void testRoundTripFromDelayedFromOldVersion() throws IOException { + Example e = new Example(randomAlphaOfLength(5)); + DelayableWriteable original = roundTrip(DelayableWriteable.referencing(e), Example::new, randomOldVersion()); + assertTrue(original.isDelayed()); + roundTripTestCase(original, Example::new); + } + + public void testRoundTripFromDelayedFromOldVersionWithNamedWriteable() throws IOException { + NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); + DelayableWriteable original = roundTrip(DelayableWriteable.referencing(n), NamedHolder::new, randomOldVersion()); + assertTrue(original.isDelayed()); + roundTripTestCase(original, NamedHolder::new); } private void roundTripTestCase(DelayableWriteable original, Writeable.Reader reader) throws IOException { - assertFalse(original.isDelayed()); DelayableWriteable roundTripped = roundTrip(original, reader, Version.CURRENT); assertTrue(roundTripped.isDelayed()); assertThat(roundTripped.get(), equalTo(original.get())); @@ -124,4 +156,8 @@ protected NamedWriteableRegistry writableRegistry() { return new NamedWriteableRegistry(singletonList( new NamedWriteableRegistry.Entry(Example.class, "example", Example::new))); } + + private static Version randomOldVersion() { + return randomValueOtherThanMany(Version.CURRENT::before, () -> VersionUtils.randomCompatibleVersion(random(), Version.CURRENT)); + } } From 6921c8ca514d26fb94b97f1e02bd69c4264f30c1 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 19 Mar 2020 18:18:40 -0400 Subject: [PATCH 3/7] Explain --- .../org/elasticsearch/search/query/QuerySearchResult.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index d065a74d4f667..334ea7fc6920f 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -55,6 +55,13 @@ public final class QuerySearchResult extends SearchPhaseResult { private TotalHits totalHits; private float maxScore = Float.NaN; private DocValueFormat[] sortValueFormats; + /** + * Aggregation results. We wrap them in + * {@linkplain DelayableWriteable} because + * {@link InternalAggregation} is usually made up of many small objects + * which have a fairly high overhead in the JVM. So we delay deserializing + * them until just before we need them. + */ private DelayableWriteable aggregations; private boolean hasAggs; private Suggest suggest; From 76b077db6cf90e12147eefd38110ad78616aa59a Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 19 Mar 2020 18:28:44 -0400 Subject: [PATCH 4/7] Rework read --- .../search/query/QuerySearchResult.java | 59 +++++++++++-------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 334ea7fc6920f..3060d3b576bfd 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -19,6 +19,15 @@ package org.elasticsearch.search.query; +import static java.util.stream.Collectors.toList; +import static org.elasticsearch.common.lucene.Lucene.readTopDocs; +import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; @@ -37,15 +46,6 @@ import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.suggest.Suggest; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import static org.elasticsearch.common.lucene.Lucene.readTopDocs; -import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; - public final class QuerySearchResult extends SearchPhaseResult { private int from; @@ -321,23 +321,32 @@ public void readFromWithId(SearchContextId id, StreamInput in) throws IOExceptio } } setTopDocs(readTopDocs(in)); - if (hasAggs = in.readBoolean()) { - if (in.getVersion().before(Version.V_8_0_0)) { - aggregations = DelayableWriteable.referencing(new InternalAggregations(in)); - } else { - aggregations = DelayableWriteable.delayed(InternalAggregations::new, in); + if (in.getVersion().before(Version.V_8_0_0)) { + InternalAggregations readAggs = null; + if (hasAggs = in.readBoolean()) { + readAggs = new InternalAggregations(in); } - } - if (in.getVersion().before(Version.V_7_2_0)) { - List pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() - .map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList()); - if (hasAggs && pipelineAggregators.isEmpty() == false) { - List internalAggs = aggregations.get().asList().stream() - .map(agg -> (InternalAggregation) agg).collect(Collectors.toList()); - //Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while - //later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of - //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1. - this.aggregations = DelayableWriteable.referencing(new InternalAggregations(internalAggs, pipelineAggregators)); + if (in.getVersion().before(Version.V_7_2_0)) { + List pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() + .map(a -> (SiblingPipelineAggregator) a).collect(toList()); + if (hasAggs && pipelineAggregators.isEmpty() == false) { + List internalAggs = readAggs.copyResults(); + /* + * Earlier versions serialize sibling pipeline aggs + * separately as they used to be set to QuerySearchResult + * directly, while later versions include them in + * InternalAggregations. Note that despite serializing + * sibling pipeline aggs as part of nternalAggregations is + * supported since 6.7.0, the shards set sibling pipeline + * aggs to InternalAggregations only from 7.1. + */ + readAggs = new InternalAggregations(internalAggs, pipelineAggregators); + } + } + aggregations = DelayableWriteable.referencing(readAggs); + } else { + if (hasAggs = in.readBoolean()) { + aggregations = DelayableWriteable.delayed(InternalAggregations::new, in); } } if (in.readBoolean()) { From eb37343d15675e1941a240befc21a85d356694ea Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 20 Mar 2020 12:41:13 -0400 Subject: [PATCH 5/7] clear asap --- .../action/search/SearchPhaseController.java | 61 ++++++++++++------- .../search/query/QuerySearchResult.java | 5 +- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 674119b2fbe25..1f4ce71cacd8f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -19,8 +19,19 @@ package org.elasticsearch.action.search; -import com.carrotsearch.hppc.IntArrayList; -import com.carrotsearch.hppc.ObjectObjectHashMap; +import static java.util.stream.Collectors.toList; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.lucene.index.Term; import org.apache.lucene.search.CollectionStatistics; @@ -58,19 +69,8 @@ import org.elasticsearch.search.suggest.Suggest.Suggestion; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.function.IntFunction; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import static java.util.stream.Collectors.toList; +import com.carrotsearch.hppc.IntArrayList; +import com.carrotsearch.hppc.ObjectObjectHashMap; public final class SearchPhaseController { private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0]; @@ -511,9 +511,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection> aggregationsList + ) { + /* + * Parse the aggregations, clearing the list as we go so bits backing + * the DelayedWriteable can be collected immediately. + */ + List toReduce = new ArrayList<>(aggregationsList.size()); + for (int i = 0; i < aggregationsList.size(); i++) { + toReduce.add(aggregationsList.get(i).get()); + aggregationsList.set(i, null); + } + return aggregationsList.isEmpty() ? null : InternalAggregations.topLevelReduce(toReduce, + performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction()); + } + /* * Returns the size of the requested top documents (from + size) */ @@ -671,10 +687,13 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (querySearchResult.isNull() == false) { if (index == bufferSize) { if (hasAggs) { + List aggs = new ArrayList<>(aggsBuffer.length); + for (int i = 0; i < aggsBuffer.length; i++) { + aggs.add(aggsBuffer[i].get()); + aggsBuffer[i] = null; // null the buffer so it can be GCed now. + } InternalAggregations reducedAggs = InternalAggregations.topLevelReduce( - Arrays.stream(aggsBuffer).map(Supplier::get).collect(toList()), - aggReduceContextBuilder.forPartialReduction()); - Arrays.fill(aggsBuffer, null); + aggs, aggReduceContextBuilder.forPartialReduction()); aggsBuffer[0] = () -> reducedAggs; } if (hasTopDocs) { diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 3060d3b576bfd..ddd3987296d95 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.function.Supplier; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; @@ -204,11 +203,11 @@ public boolean hasAggs() { * Returns and nulls out the aggregation for this search results. This allows to free up memory once the aggregation is consumed. * @throws IllegalStateException if the aggregations have already been consumed. */ - public Supplier consumeAggs() { + public DelayableWriteable consumeAggs() { if (aggregations == null) { throw new IllegalStateException("aggs already consumed"); } - Supplier aggs = aggregations; + DelayableWriteable aggs = aggregations; aggregations = null; return aggs; } From 80e31ad697584c9126a0e55b061fb9a5d5b91b9f Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 20 Mar 2020 16:10:10 -0400 Subject: [PATCH 6/7] Checkstyle --- .../org/elasticsearch/action/search/SearchPhaseController.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 1f4ce71cacd8f..9648f74e96b27 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -19,8 +19,6 @@ package org.elasticsearch.action.search; -import static java.util.stream.Collectors.toList; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; From c6162ff446abe15844130d23e7f74f8e3f241018 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 23 Mar 2020 09:32:50 -0400 Subject: [PATCH 7/7] Twist --- .../search/query/QuerySearchResult.java | 39 +++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index ddd3987296d95..2863f45de85f2 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -19,12 +19,12 @@ package org.elasticsearch.search.query; +import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; import java.io.IOException; -import java.util.Collections; import java.util.List; import org.apache.lucene.search.FieldDoc; @@ -384,24 +384,39 @@ public void writeToNoId(StreamOutput out) throws IOException { writeTopDocs(out, topDocsAndMaxScore); if (aggregations == null) { out.writeBoolean(false); + if (out.getVersion().before(Version.V_7_2_0)) { + /* + * Earlier versions expect sibling pipeline aggs separately + * as they used to be set to QuerySearchResult directly, while + * later versions expect them in InternalAggregations. Note + * that despite serializing sibling pipeline aggs as part of + * InternalAggregations is supported since 6.7.0, the shards + * set sibling pipeline aggs to InternalAggregations only from + * 7.1 on. + */ + out.writeNamedWriteableList(emptyList()); + } } else { out.writeBoolean(true); if (out.getVersion().before(Version.V_8_0_0)) { - aggregations.get().writeTo(out); + InternalAggregations aggs = aggregations.get(); + aggs.writeTo(out); + if (out.getVersion().before(Version.V_7_2_0)) { + /* + * Earlier versions expect sibling pipeline aggs separately + * as they used to be set to QuerySearchResult directly, while + * later versions expect them in InternalAggregations. Note + * that despite serializing sibling pipeline aggs as part of + * InternalAggregations is supported since 6.7.0, the shards + * set sibling pipeline aggs to InternalAggregations only from + * 7.1 on. + */ + out.writeNamedWriteableList(aggs.getTopLevelPipelineAggregators()); + } } else { aggregations.writeTo(out); } } - if (out.getVersion().before(Version.V_7_2_0)) { - //Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, - //while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of - //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on. - if (aggregations == null) { - out.writeNamedWriteableList(Collections.emptyList()); - } else { - out.writeNamedWriteableList(aggregations.get().getTopLevelPipelineAggregators()); - } - } if (suggest == null) { out.writeBoolean(false); } else {