@@ -67,6 +67,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
6767
6868 private final String localClusterAlias ;
6969 private final long absoluteStartMillis ;
70+ private final boolean finalReduce ;
7071
7172 private SearchType searchType = SearchType .DEFAULT ;
7273
@@ -102,13 +103,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
102103 public SearchRequest () {
103104 this .localClusterAlias = null ;
104105 this .absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS ;
106+ this .finalReduce = true ;
105107 }
106108
107109 /**
108110 * Constructs a new search request from the provided search request
109111 */
110112 public SearchRequest (SearchRequest searchRequest ) {
111- this (searchRequest , searchRequest .indices , searchRequest .localClusterAlias , searchRequest .absoluteStartMillis );
113+ this (searchRequest , searchRequest .indices , searchRequest .localClusterAlias ,
114+ searchRequest .absoluteStartMillis , searchRequest .finalReduce );
112115 }
113116
114117 /**
@@ -132,25 +135,30 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
132135 }
133136
134137 /**
135- * Creates a new search request by providing the search request to copy all fields from, the indices to search against,
136- * the alias of the cluster where it will be executed, as well as the start time in milliseconds from the epoch time.
137- * Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request performing local reduction
138- * on each cluster. The coordinating CCS node provides the original search request, the indices to search against as well as the
139- * alias to prefix index names with in the returned search results, and the absolute start time to be used on the remote clusters
140- * to ensure that the same value is used.
138+ * Creates a new search request by providing the search request to copy all fields from, the indices to search against, the alias of
139+ * the cluster where it will be executed, as well as the start time in milliseconds from the epoch time and whether the reduction
140+ * should be final or not. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request
141+ * performing reduction on each cluster in order to minimize network round-trips between the coordinating node and the remote clusters.
142+ *
143+ * @param originalSearchRequest the original search request
144+ * @param indices the indices to search against
145+ * @param localClusterAlias the alias to prefix index names with in the returned search results
146+ * @param absoluteStartMillis the absolute start time to be used on the remote clusters to ensure that the same value is used
147+ * @param finalReduce whether the reduction should be final or not
141148 */
142149 static SearchRequest withLocalReduction (SearchRequest originalSearchRequest , String [] indices ,
143- String localClusterAlias , long absoluteStartMillis ) {
150+ String localClusterAlias , long absoluteStartMillis , boolean finalReduce ) {
144151 Objects .requireNonNull (originalSearchRequest , "search request must not be null" );
145152 validateIndices (indices );
146153 Objects .requireNonNull (localClusterAlias , "cluster alias must not be null" );
147154 if (absoluteStartMillis < 0 ) {
148155 throw new IllegalArgumentException ("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]" );
149156 }
150- return new SearchRequest (originalSearchRequest , indices , localClusterAlias , absoluteStartMillis );
157+ return new SearchRequest (originalSearchRequest , indices , localClusterAlias , absoluteStartMillis , finalReduce );
151158 }
152159
153- private SearchRequest (SearchRequest searchRequest , String [] indices , String localClusterAlias , long absoluteStartMillis ) {
160+ private SearchRequest (SearchRequest searchRequest , String [] indices , String localClusterAlias , long absoluteStartMillis ,
161+ boolean finalReduce ) {
154162 this .allowPartialSearchResults = searchRequest .allowPartialSearchResults ;
155163 this .batchedReduceSize = searchRequest .batchedReduceSize ;
156164 this .ccsMinimizeRoundtrips = searchRequest .ccsMinimizeRoundtrips ;
@@ -167,6 +175,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca
167175 this .types = searchRequest .types ;
168176 this .localClusterAlias = localClusterAlias ;
169177 this .absoluteStartMillis = absoluteStartMillis ;
178+ this .finalReduce = finalReduce ;
170179 }
171180
172181 /**
@@ -203,6 +212,12 @@ public SearchRequest(StreamInput in) throws IOException {
203212 localClusterAlias = null ;
204213 absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS ;
205214 }
215+ //TODO move to the 6_7_0 branch once backported to 6.x
216+ if (in .getVersion ().onOrAfter (Version .V_7_0_0 )) {
217+ finalReduce = in .readBoolean ();
218+ } else {
219+ finalReduce = true ;
220+ }
206221 if (in .getVersion ().onOrAfter (Version .V_7_0_0 )) {
207222 ccsMinimizeRoundtrips = in .readBoolean ();
208223 }
@@ -232,6 +247,10 @@ public void writeTo(StreamOutput out) throws IOException {
232247 out .writeVLong (absoluteStartMillis );
233248 }
234249 }
250+ //TODO move to the 6_7_0 branch once backported to 6.x
251+ if (out .getVersion ().onOrAfter (Version .V_7_0_0 )) {
252+ out .writeBoolean (finalReduce );
253+ }
235254 if (out .getVersion ().onOrAfter (Version .V_7_0_0 )) {
236255 out .writeBoolean (ccsMinimizeRoundtrips );
237256 }
@@ -277,11 +296,18 @@ String getLocalClusterAlias() {
277296 return localClusterAlias ;
278297 }
279298
299+ /**
300+ * Returns whether the reduction phase that will be performed needs to be final or not.
301+ */
302+ boolean isFinalReduce () {
303+ return finalReduce ;
304+ }
305+
280306 /**
281307 * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
282308 * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
283- * request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long)}, this method returns the provided
284- * current time, otherwise it will return {@link System#currentTimeMillis()}.
309+ * request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long, boolean )}, this method returns
310+ * the provided current time, otherwise it will return {@link System#currentTimeMillis()}.
285311 *
286312 */
287313 long getOrCreateAbsoluteStartMillis () {
0 commit comments