2929import org .apache .lucene .search .Weight ;
3030import org .elasticsearch .Version ;
3131import org .elasticsearch .cluster .metadata .IndexMetaData ;
32+ import org .elasticsearch .common .lease .Releasable ;
33+ import org .elasticsearch .common .lease .Releasables ;
3234import org .elasticsearch .common .settings .Settings ;
3335import org .elasticsearch .common .util .MockBigArrays ;
3436import org .elasticsearch .index .IndexSettings ;
3537import org .elasticsearch .index .cache .query .DisabledQueryCache ;
3638import org .elasticsearch .index .engine .Engine ;
37- import org .elasticsearch .index .fielddata .IndexFieldData ;
3839import org .elasticsearch .index .fielddata .IndexFieldDataCache ;
3940import org .elasticsearch .index .fielddata .IndexFieldDataService ;
4041import org .elasticsearch .index .mapper .MappedFieldType ;
5657import java .util .Collections ;
5758import java .util .List ;
5859
60+ import static org .mockito .Matchers .anyObject ;
61+ import static org .mockito .Mockito .doAnswer ;
5962import static org .mockito .Mockito .mock ;
6063import static org .mockito .Mockito .when ;
6164
6568 * {@link AggregationBuilder} instance.
6669 */
6770public abstract class AggregatorTestCase extends ESTestCase {
71+ private List <Releasable > releasables = new ArrayList <>();
72+
6873 protected <A extends Aggregator , B extends AggregationBuilder > A createAggregator (B aggregationBuilder ,
6974 IndexSearcher indexSearcher ,
7075 MappedFieldType ... fieldTypes ) throws IOException {
@@ -99,6 +104,12 @@ public boolean shouldCache(Query query) throws IOException {
99104 when (searchContext .bigArrays ()).thenReturn (new MockBigArrays (Settings .EMPTY , circuitBreakerService ));
100105 when (searchContext .fetchPhase ())
101106 .thenReturn (new FetchPhase (Arrays .asList (new FetchSourceSubPhase (), new DocValueFieldsFetchSubPhase ())));
107+ doAnswer (invocation -> {
108+ /* Store the releasables so we can release them at the end of the test case. This is important because aggregations don't
109+ * close their sub-aggregations. This is fairly similar to what the production code does. */
110+ releasables .add ((Releasable ) invocation .getArguments ()[0 ]);
111+ return null ;
112+ }).when (searchContext ).addReleasable (anyObject (), anyObject ());
102113
103114 // TODO: now just needed for top_hits, this will need to be revised for other agg unit tests:
104115 MapperService mapperService = mock (MapperService .class );
@@ -110,10 +121,9 @@ public boolean shouldCache(Query query) throws IOException {
110121
111122 QueryShardContext queryShardContext = mock (QueryShardContext .class );
112123 for (MappedFieldType fieldType : fieldTypes ) {
113- IndexFieldData <?> fieldData = fieldType .fielddataBuilder ().build (indexSettings , fieldType ,
114- new IndexFieldDataCache .None (), circuitBreakerService , mock (MapperService .class ));
115124 when (queryShardContext .fieldMapper (fieldType .name ())).thenReturn (fieldType );
116- when (queryShardContext .getForField (fieldType )).thenReturn (fieldData );
125+ when (queryShardContext .getForField (fieldType )).then (invocation -> fieldType .fielddataBuilder ().build (
126+ indexSettings , fieldType , new IndexFieldDataCache .None (), circuitBreakerService , mock (MapperService .class )));
117127 when (searchContext .getQueryShardContext ()).thenReturn (queryShardContext );
118128 }
119129
@@ -126,13 +136,17 @@ protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSe
126136 Query query ,
127137 AggregationBuilder builder ,
128138 MappedFieldType ... fieldTypes ) throws IOException {
129- try (C a = createAggregator (builder , searcher , fieldTypes )) {
139+ C a = createAggregator (builder , searcher , fieldTypes );
140+ try {
130141 a .preCollection ();
131142 searcher .search (query , a );
132143 a .postCollection ();
133144 @ SuppressWarnings ("unchecked" )
134145 A internalAgg = (A ) a .buildAggregation (0L );
135146 return internalAgg ;
147+ } finally {
148+ Releasables .close (releasables );
149+ releasables .clear ();
136150 }
137151 }
138152
@@ -168,14 +182,10 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
168182 try {
169183 for (ShardSearcher subSearcher : subSearchers ) {
170184 C a = createAggregator (builder , subSearcher , fieldTypes );
171- try {
172- a .preCollection ();
173- subSearcher .search (weight , a );
174- a .postCollection ();
175- aggs .add (a .buildAggregation (0L ));
176- } finally {
177- closeAgg (a );
178- }
185+ a .preCollection ();
186+ subSearcher .search (weight , a );
187+ a .postCollection ();
188+ aggs .add (a .buildAggregation (0L ));
179189 }
180190 if (aggs .isEmpty ()) {
181191 return null ;
@@ -195,14 +205,8 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
195205 return internalAgg ;
196206 }
197207 } finally {
198- closeAgg (root );
199- }
200- }
201-
202- private void closeAgg (Aggregator agg ) {
203- agg .close ();
204- for (Aggregator sub : ((AggregatorBase ) agg ).subAggregators ) {
205- closeAgg (sub );
208+ Releasables .close (releasables );
209+ releasables .clear ();
206210 }
207211 }
208212
0 commit comments