diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 66f09dbd18bd9..6db48b394ddbd 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -19,8 +19,17 @@ package org.elasticsearch.action.search; -import com.carrotsearch.hppc.IntArrayList; -import com.carrotsearch.hppc.ObjectObjectHashMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.lucene.index.Term; import org.apache.lucene.search.CollectionStatistics; @@ -58,16 +67,8 @@ import org.elasticsearch.search.suggest.Suggest.Suggestion; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.function.IntFunction; -import java.util.stream.Collectors; +import com.carrotsearch.hppc.IntArrayList; +import com.carrotsearch.hppc.ObjectObjectHashMap; public final class SearchPhaseController { private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0]; @@ -429,7 +430,7 @@ public ReducedQueryPhase reducedQueryPhase(Collection queryResults, - List bufferedAggs, List bufferedTopDocs, + List> bufferedAggs, List bufferedTopDocs, TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest, InternalAggregation.ReduceContextBuilder aggReduceContextBuilder, boolean performFinalReduce) { @@ -453,7 +454,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection aggregationsList; + final List> aggregationsList; if (bufferedAggs != null) { consumeAggs = false; // we already have results from intermediate reduces and just need to perform the final reduce @@ -492,7 +493,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection> aggregationsList + ) { + /* + * Parse the aggregations, clearing the list as we go so bits backing + * the DelayedWriteable can be collected immediately. + */ + List toReduce = new ArrayList<>(aggregationsList.size()); + for (int i = 0; i < aggregationsList.size(); i++) { + toReduce.add(aggregationsList.get(i).get()); + aggregationsList.set(i, null); + } + return aggregationsList.isEmpty() ? null : InternalAggregations.topLevelReduce(toReduce, + performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction()); + } + /* * Returns the size of the requested top documents (from + size) */ @@ -600,7 +618,7 @@ public InternalSearchResponse buildResponse(SearchHits hits) { */ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults { private final SearchShardTarget[] processedShards; - private final InternalAggregations[] aggsBuffer; + private final Supplier[] aggsBuffer; private final TopDocs[] topDocsBuffer; private final boolean hasAggs; private final boolean hasTopDocs; @@ -642,7 +660,9 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search this.progressListener = progressListener; this.processedShards = new SearchShardTarget[expectedResultSize]; // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. - this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0]; + @SuppressWarnings("unchecked") + Supplier[] aggsBuffer = new Supplier[hasAggs ? bufferSize : 0]; + this.aggsBuffer = aggsBuffer; this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0]; this.hasTopDocs = hasTopDocs; this.hasAggs = hasAggs; @@ -665,10 +685,14 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (querySearchResult.isNull() == false) { if (index == bufferSize) { if (hasAggs) { - ReduceContext reduceContext = aggReduceContextBuilder.forPartialReduction(); - InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext); - Arrays.fill(aggsBuffer, null); - aggsBuffer[0] = reducedAggs; + List aggs = new ArrayList<>(aggsBuffer.length); + for (int i = 0; i < aggsBuffer.length; i++) { + aggs.add(aggsBuffer[i].get()); + aggsBuffer[i] = null; // null the buffer so it can be GCed now. + } + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce( + aggs, aggReduceContextBuilder.forPartialReduction()); + aggsBuffer[0] = () -> reducedAggs; } if (hasTopDocs) { TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), @@ -681,12 +705,12 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { index = 1; if (hasAggs || hasTopDocs) { progressListener.notifyPartialReduce(SearchProgressListener.buildSearchShards(processedShards), - topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases); + topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0].get() : null, numReducePhases); } } final int i = index++; if (hasAggs) { - aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); + aggsBuffer[i] = querySearchResult.consumeAggs(); } if (hasTopDocs) { final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null @@ -698,7 +722,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget(); } - private synchronized List getRemainingAggs() { + private synchronized List> getRemainingAggs() { return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null; } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java new file mode 100644 index 0000000000000..e75b81fff4ef4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -0,0 +1,131 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * A holder for {@link Writeable}s that can delays reading the underlying + * {@linkplain Writeable} when it is read from a remote node. + */ +public abstract class DelayableWriteable implements Supplier, Writeable { + /** + * Build a {@linkplain DelayableWriteable} that wraps an existing object + * but is serialized so that deserializing it can be delayed. + */ + public static DelayableWriteable referencing(T reference) { + return new Referencing<>(reference); + } + /** + * Build a {@linkplain DelayableWriteable} that copies a buffer from + * the provided {@linkplain StreamInput} and deserializes the buffer + * when {@link Supplier#get()} is called. + */ + public static DelayableWriteable delayed(Writeable.Reader reader, StreamInput in) throws IOException { + return new Delayed<>(reader, in); + } + + private DelayableWriteable() {} + + public abstract boolean isDelayed(); + + private static class Referencing extends DelayableWriteable { + private T reference; + + Referencing(T reference) { + this.reference = reference; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + try (BytesStreamOutput buffer = new BytesStreamOutput()) { + reference.writeTo(buffer); + out.writeBytesReference(buffer.bytes()); + } + } + + @Override + public T get() { + return reference; + } + + @Override + public boolean isDelayed() { + return false; + } + } + + private static class Delayed extends DelayableWriteable { + private final Writeable.Reader reader; + private final Version remoteVersion; + private final BytesReference serialized; + private final NamedWriteableRegistry registry; + + Delayed(Writeable.Reader reader, StreamInput in) throws IOException { + this.reader = reader; + remoteVersion = in.getVersion(); + serialized = in.readBytesReference(); + registry = in.namedWriteableRegistry(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion() == remoteVersion) { + /* + * If the version *does* line up we can just copy the bytes + * which is good because this is how shard request caching + * works. + */ + out.writeBytesReference(serialized); + } else { + /* + * If the version doesn't line up then we have to deserialize + * into the Writeable and re-serialize it against the new + * output stream so it can apply any backwards compatibility + * differences in the wire protocol. This ain't efficient but + * it should be quite rare. + */ + referencing(get()).writeTo(out); + } + } + + @Override + public T get() { + try { + try (StreamInput in = registry == null ? + serialized.streamInput() : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)) { + in.setVersion(remoteVersion); + return reader.read(in); + } + } catch (IOException e) { + throw new RuntimeException("unexpected error expanding aggregations", e); + } + } + + @Override + public boolean isDelayed() { + return true; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java index 2fa700634b851..6d7d9fd6952fd 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java @@ -94,4 +94,9 @@ public void setVersion(Version version) { protected void ensureCanReadBytes(int length) throws EOFException { delegate.ensureCanReadBytes(length); } + + @Override + public NamedWriteableRegistry namedWriteableRegistry() { + return delegate.namedWriteableRegistry(); + } } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java index 5db80a711eeb9..149231b1fe433 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java @@ -52,4 +52,9 @@ public C readNamedWriteable(@SuppressWarnings("unused + "] than it was read from [" + name + "]."; return c; } + + @Override + public NamedWriteableRegistry namedWriteableRegistry() { + return namedWriteableRegistry; + } } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 93e578200c37a..1546886b4107c 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -1097,6 +1097,14 @@ public T readException() throws IOException { return null; } + /** + * Get the registry of named writeables is his stream has one, + * {@code null} otherwise. + */ + public NamedWriteableRegistry namedWriteableRegistry() { + return null; + } + /** * Reads a {@link NamedWriteable} from the current stream, by first reading its name and then looking for * the corresponding entry in the registry by name, so that the proper object can be read and returned. diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 6e6319795b885..32be8aba9827c 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -19,16 +19,24 @@ package org.elasticsearch.search.query; +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toList; +import static org.elasticsearch.common.lucene.Lucene.readTopDocs; +import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; + +import java.io.IOException; +import java.util.List; + import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -37,14 +45,6 @@ import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.suggest.Suggest; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import static org.elasticsearch.common.lucene.Lucene.readTopDocs; -import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; - public final class QuerySearchResult extends SearchPhaseResult { private int from; @@ -54,7 +54,14 @@ public final class QuerySearchResult extends SearchPhaseResult { private TotalHits totalHits; private float maxScore = Float.NaN; private DocValueFormat[] sortValueFormats; - private InternalAggregations aggregations; + /** + * Aggregation results. We wrap them in + * {@linkplain DelayableWriteable} because + * {@link InternalAggregation} is usually made up of many small objects + * which have a fairly high overhead in the JVM. So we delay deserializing + * them until just before we need them. + */ + private DelayableWriteable aggregations; private boolean hasAggs; private Suggest suggest; private boolean searchTimedOut; @@ -196,21 +203,21 @@ public boolean hasAggs() { * Returns and nulls out the aggregation for this search results. This allows to free up memory once the aggregation is consumed. * @throws IllegalStateException if the aggregations have already been consumed. */ - public Aggregations consumeAggs() { + public DelayableWriteable consumeAggs() { if (aggregations == null) { throw new IllegalStateException("aggs already consumed"); } - Aggregations aggs = aggregations; + DelayableWriteable aggs = aggregations; aggregations = null; return aggs; } public void aggregations(InternalAggregations aggregations) { - this.aggregations = aggregations; + this.aggregations = aggregations == null ? null : DelayableWriteable.referencing(aggregations); hasAggs = aggregations != null; } - public InternalAggregations aggregations() { + public DelayableWriteable aggregations() { return aggregations; } @@ -313,19 +320,32 @@ public void readFromWithId(SearchContextId id, StreamInput in) throws IOExceptio } } setTopDocs(readTopDocs(in)); - if (hasAggs = in.readBoolean()) { - aggregations = new InternalAggregations(in); - } - if (in.getVersion().before(Version.V_7_2_0)) { - List pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() - .map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList()); - if (hasAggs && pipelineAggregators.isEmpty() == false) { - List internalAggs = aggregations.asList().stream() - .map(agg -> (InternalAggregation) agg).collect(Collectors.toList()); - //Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while - //later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of - //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1. - this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators); + if (in.getVersion().before(Version.V_7_7_0)) { + InternalAggregations readAggs = null; + if (hasAggs = in.readBoolean()) { + readAggs = new InternalAggregations(in); + } + if (in.getVersion().before(Version.V_7_2_0)) { + List pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() + .map(a -> (SiblingPipelineAggregator) a).collect(toList()); + if (hasAggs && pipelineAggregators.isEmpty() == false) { + List internalAggs = readAggs.copyResults(); + /* + * Earlier versions serialize sibling pipeline aggs + * separately as they used to be set to QuerySearchResult + * directly, while later versions include them in + * InternalAggregations. Note that despite serializing + * sibling pipeline aggs as part of nternalAggregations is + * supported since 6.7.0, the shards set sibling pipeline + * aggs to InternalAggregations only from 7.1. + */ + readAggs = new InternalAggregations(internalAggs, pipelineAggregators); + } + } + aggregations = DelayableWriteable.referencing(readAggs); + } else { + if (hasAggs = in.readBoolean()) { + aggregations = DelayableWriteable.delayed(InternalAggregations::new, in); } } if (in.readBoolean()) { @@ -369,18 +389,37 @@ public void writeToNoId(StreamOutput out) throws IOException { writeTopDocs(out, topDocsAndMaxScore); if (aggregations == null) { out.writeBoolean(false); + if (out.getVersion().before(Version.V_7_2_0)) { + /* + * Earlier versions expect sibling pipeline aggs separately + * as they used to be set to QuerySearchResult directly, while + * later versions expect them in InternalAggregations. Note + * that despite serializing sibling pipeline aggs as part of + * InternalAggregations is supported since 6.7.0, the shards + * set sibling pipeline aggs to InternalAggregations only from + * 7.1 on. + */ + out.writeNamedWriteableList(emptyList()); + } } else { out.writeBoolean(true); - aggregations.writeTo(out); - } - if (out.getVersion().before(Version.V_7_2_0)) { - //Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, - //while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of - //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on. - if (aggregations == null) { - out.writeNamedWriteableList(Collections.emptyList()); + if (out.getVersion().before(Version.V_7_7_0)) { + InternalAggregations aggs = aggregations.get(); + aggs.writeTo(out); + if (out.getVersion().before(Version.V_7_2_0)) { + /* + * Earlier versions expect sibling pipeline aggs separately + * as they used to be set to QuerySearchResult directly, while + * later versions expect them in InternalAggregations. Note + * that despite serializing sibling pipeline aggs as part of + * InternalAggregations is supported since 6.7.0, the shards + * set sibling pipeline aggs to InternalAggregations only from + * 7.1 on. + */ + out.writeNamedWriteableList(aggs.getTopLevelPipelineAggregators()); + } } else { - out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators()); + aggregations.writeTo(out); } } if (suggest == null) { diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java new file mode 100644 index 0000000000000..fbd8702608ead --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.Version; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import java.io.IOException; + +import static java.util.Collections.singletonList; +import static org.hamcrest.Matchers.equalTo; + +public class DelayableWriteableTests extends ESTestCase { + // NOTE: we don't use AbstractWireSerializingTestCase because we don't implement equals and hashCode. + public static class Example implements NamedWriteable { + private final String s; + + public Example(String s) { + this.s = s; + } + + public Example(StreamInput in) throws IOException { + s = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(s); + } + + @Override + public String getWriteableName() { + return "example"; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Example other = (Example) obj; + return s.equals(other.s); + } + + @Override + public int hashCode() { + return s.hashCode(); + } + } + + public static class NamedHolder implements Writeable { + private final Example e; + + public NamedHolder(Example e) { + this.e = e; + } + + public NamedHolder(StreamInput in) throws IOException { + e = in.readNamedWriteable(Example.class); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(e); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + NamedHolder other = (NamedHolder) obj; + return e.equals(other.e); + } + + @Override + public int hashCode() { + return e.hashCode(); + } + } + + public void testRoundTripFromReferencing() throws IOException { + Example e = new Example(randomAlphaOfLength(5)); + DelayableWriteable original = DelayableWriteable.referencing(e); + assertFalse(original.isDelayed()); + roundTripTestCase(original, Example::new); + } + + public void testRoundTripFromReferencingWithNamedWriteable() throws IOException { + NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); + DelayableWriteable original = DelayableWriteable.referencing(n); + assertFalse(original.isDelayed()); + roundTripTestCase(original, NamedHolder::new); + } + + public void testRoundTripFromDelayed() throws IOException { + Example e = new Example(randomAlphaOfLength(5)); + DelayableWriteable original = roundTrip(DelayableWriteable.referencing(e), Example::new, Version.CURRENT); + assertTrue(original.isDelayed()); + roundTripTestCase(original, Example::new); + } + + public void testRoundTripFromDelayedWithNamedWriteable() throws IOException { + NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); + DelayableWriteable original = roundTrip(DelayableWriteable.referencing(n), NamedHolder::new, Version.CURRENT); + assertTrue(original.isDelayed()); + roundTripTestCase(original, NamedHolder::new); + } + + public void testRoundTripFromDelayedFromOldVersion() throws IOException { + Example e = new Example(randomAlphaOfLength(5)); + DelayableWriteable original = roundTrip(DelayableWriteable.referencing(e), Example::new, randomOldVersion()); + assertTrue(original.isDelayed()); + roundTripTestCase(original, Example::new); + } + + public void testRoundTripFromDelayedFromOldVersionWithNamedWriteable() throws IOException { + NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); + DelayableWriteable original = roundTrip(DelayableWriteable.referencing(n), NamedHolder::new, randomOldVersion()); + assertTrue(original.isDelayed()); + roundTripTestCase(original, NamedHolder::new); + } + + private void roundTripTestCase(DelayableWriteable original, Writeable.Reader reader) throws IOException { + DelayableWriteable roundTripped = roundTrip(original, reader, Version.CURRENT); + assertTrue(roundTripped.isDelayed()); + assertThat(roundTripped.get(), equalTo(original.get())); + } + + private DelayableWriteable roundTrip(DelayableWriteable original, + Writeable.Reader reader, Version version) throws IOException { + return copyInstance(original, writableRegistry(), (out, d) -> d.writeTo(out), + in -> DelayableWriteable.delayed(reader, in), version); + } + + @Override + protected NamedWriteableRegistry writableRegistry() { + return new NamedWriteableRegistry(singletonList( + new NamedWriteableRegistry.Entry(Example.class, "example", Example::new))); + } + + private static Version randomOldVersion() { + return randomValueOtherThanMany(Version.CURRENT::before, () -> VersionUtils.randomCompatibleVersion(random(), Version.CURRENT)); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index 3042077fa0e3a..d5401242e9f8e 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -89,8 +89,8 @@ public void testSerialization() throws Exception { assertEquals(querySearchResult.size(), deserialized.size()); assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); if (deserialized.hasAggs()) { - Aggregations aggs = querySearchResult.consumeAggs(); - Aggregations deserializedAggs = deserialized.consumeAggs(); + Aggregations aggs = querySearchResult.consumeAggs().get(); + Aggregations deserializedAggs = deserialized.consumeAggs().get(); assertEquals(aggs.asList(), deserializedAggs.asList()); List pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators(); List deserializedPipelineAggs = @@ -126,7 +126,7 @@ public void testReadFromPre_7_1_0() throws IOException { QuerySearchResult querySearchResult = new QuerySearchResult(in); assertEquals(100, querySearchResult.getContextId().getId()); assertTrue(querySearchResult.hasAggs()); - InternalAggregations aggs = (InternalAggregations) querySearchResult.consumeAggs(); + InternalAggregations aggs = querySearchResult.consumeAggs().get(); assertEquals(1, aggs.asList().size()); //top-level pipeline aggs are retrieved as part of InternalAggregations although they were serialized separately assertEquals(1, aggs.getTopLevelPipelineAggregators().size());