99
1010import org .apache .logging .log4j .LogManager ;
1111import org .apache .logging .log4j .Logger ;
12+ import org .apache .lucene .util .Accountable ;
13+ import org .apache .lucene .util .RamUsageEstimator ;
1214import org .elasticsearch .action .ActionListener ;
1315import org .elasticsearch .action .search .MultiSearchResponse ;
1416import org .elasticsearch .action .search .SearchRequest ;
17+ import org .elasticsearch .common .breaker .CircuitBreaker ;
1518import org .elasticsearch .core .TimeValue ;
1619import org .elasticsearch .search .SearchHit ;
1720import org .elasticsearch .search .aggregations .Aggregation ;
3033import org .elasticsearch .xpack .ql .util .ActionListeners ;
3134
3235import java .util .ArrayList ;
36+ import java .util .Collection ;
3337import java .util .Iterator ;
3438import java .util .List ;
3539import java .util .Map ;
@@ -47,19 +51,45 @@ public class SampleIterator implements Executable {
4751
4852 private final QueryClient client ;
4953 private final List <SampleCriterion > criteria ;
50- private final Stack <Page > stack = new Stack <>();
54+ final Stack <Page > stack = new Stack <>();
5155 private final int maxCriteria ;
52- private final List <Sample > samples ;
56+ final List <Sample > samples ;
5357 private final int fetchSize ;
5458
5559 private long startTime ;
5660
57- public SampleIterator (QueryClient client , List <SampleCriterion > criteria , int fetchSize ) {
61+ // ---------- CIRCUIT BREAKER -----------
62+
63+ /**
64+ * Memory consumption will be calculated every CB_STACK_SIZE_PRECISION hits added to the stack
65+ * ie. the sum of sizes of pages added to the stack
66+ * (not considering stack.pop(), so the number of hits added to the stack is different
67+ * from the number of hits currently present in the stack)
68+ */
69+ protected static final int CB_STACK_SIZE_PRECISION = 1000 ;
70+ private static final String CB_COMPLETED_LABEL = "sample_completed" ;
71+ private static final String CB_INFLIGHT_LABEL = "sample_inflight" ;
72+ private final CircuitBreaker circuitBreaker ;
73+ private long samplesRamBytesUsed = 0 ;
74+ private long stackRamBytesUsed = 0 ;
75+ private long totalRamBytesUsed = 0 ;
76+ /**
77+ * total number of hits (ie. sum of page sizes) added to the stack
78+ * (not considering stack.pop(), so different from current stack size)
79+ */
80+ private long totalPageSize = 0 ;
81+ /**
82+ * total number of hits (ie. sum of page sizes) added to the stack when last memory check was executed
83+ */
84+ private long previousTotalPageSize = 0 ;
85+
86+ public SampleIterator (QueryClient client , List <SampleCriterion > criteria , int fetchSize , CircuitBreaker circuitBreaker ) {
5887 this .client = client ;
5988 this .criteria = criteria ;
6089 this .maxCriteria = criteria .size ();
6190 this .fetchSize = fetchSize ;
6291 this .samples = new ArrayList <>();
92+ this .circuitBreaker = circuitBreaker ;
6393 }
6494
6595 @ Override
@@ -69,6 +99,7 @@ public void execute(ActionListener<Payload> listener) {
6999 advance (runAfter (listener , () -> {
70100 stack .clear ();
71101 samples .clear ();
102+ clearCircuitBreaker ();
72103 client .close (listener .delegateFailure ((l , r ) -> {}));
73104 }));
74105 }
@@ -114,8 +145,8 @@ private void queryForCompositeAggPage(ActionListener<Payload> listener, final Sa
114145 InternalComposite composite = (InternalComposite ) a ;
115146 log .trace ("Found [{}] composite buckets" , composite .getBuckets ().size ());
116147 Page nextPage = new Page (composite , request );
117- if (nextPage .size () > 0 ) {
118- stack . push (nextPage );
148+ if (nextPage .size > 0 ) {
149+ pushToStack (nextPage );
119150 advance (listener );
120151 } else {
121152 if (stack .size () > 0 ) {
@@ -127,6 +158,15 @@ private void queryForCompositeAggPage(ActionListener<Payload> listener, final Sa
127158 }, listener ::onFailure ));
128159 }
129160
161+ protected void pushToStack (Page nextPage ) {
162+ stack .push (nextPage );
163+ totalPageSize += nextPage .size ;
164+ if (totalPageSize - previousTotalPageSize >= CB_STACK_SIZE_PRECISION ) {
165+ updateMemoryUsage ();
166+ previousTotalPageSize = totalPageSize ;
167+ }
168+ }
169+
130170 /*
131171 * Creates a _msearch request containing maxCriteria (number of filters in the query) * number_of_join_keys queries.
132172 * For a query with three filters
@@ -160,7 +200,6 @@ private void finalStep(ActionListener<Payload> listener) {
160200
161201 int initialSize = samples .size ();
162202 client .multiQuery (searches , ActionListener .wrap (r -> {
163- List <List <SearchHit >> finalSamples = new ArrayList <>();
164203 List <List <SearchHit >> sample = new ArrayList <>(maxCriteria );
165204 MultiSearchResponse .Item [] response = r .getResponses ();
166205 int docGroupsCounter = 1 ;
@@ -174,7 +213,6 @@ private void finalStep(ActionListener<Payload> listener) {
174213 if (docGroupsCounter == maxCriteria ) {
175214 List <SearchHit > match = matchSample (sample , maxCriteria );
176215 if (match != null ) {
177- finalSamples .add (match );
178216 samples .add (new Sample (sampleKeys .get (responseIndex / maxCriteria ), match ));
179217 }
180218 docGroupsCounter = 1 ;
@@ -187,20 +225,30 @@ private void finalStep(ActionListener<Payload> listener) {
187225 log .trace ("Final step... found [{}] new Samples" , samples .size () - initialSize );
188226 // if this final page is max_page_size in size it means: either it's the last page and it happens to have max_page_size elements
189227 // or it's just not the last page and we should advance
190- var next = page .size () == fetchSize ? page : stack .pop ();
228+ var next = page .size == fetchSize ? page : stack .pop ();
191229 log .trace ("Final step... getting next page of the " + (next == page ? "current" : "previous" ) + " page" );
192230 nextPage (listener , next );
193231 }, listener ::onFailure ));
194232 }
195233
234+ private void updateMemoryUsage () {
235+ long newSamplesRamSize = RamUsageEstimator .sizeOfCollection (samples );
236+ addMemory (newSamplesRamSize - samplesRamBytesUsed , CB_COMPLETED_LABEL );
237+ samplesRamBytesUsed = newSamplesRamSize ;
238+
239+ long newStackRamSize = RamUsageEstimator .sizeOfCollection (stack );
240+ addMemory (newStackRamSize - stackRamBytesUsed , CB_INFLIGHT_LABEL );
241+ stackRamBytesUsed = newStackRamSize ;
242+ }
243+
196244 /*
197245 * Finds the next set of results using the after_key of the previous set of buckets.
198246 * It can go back on previous page(s) until either there are no more results, or it finds a page with an after_key to use.
199247 */
200248 private void nextPage (ActionListener <Payload > listener , Page page ) {
201- page .request () .nextAfter (page .afterKey () );
202- log .trace ("Getting next page for page [{}] with afterkey [{}]" , page , page .afterKey () );
203- queryForCompositeAggPage (listener , page .request () );
249+ page .request .nextAfter (page .afterKey );
250+ log .trace ("Getting next page for page [{}] with afterkey [{}]" , page , page .afterKey );
251+ queryForCompositeAggPage (listener , page .request );
204252 }
205253
206254 /*
@@ -288,19 +336,66 @@ private static boolean match(int currentCriterion, List<List<SearchHit>> hits, L
288336 return false ;
289337 }
290338
339+ private void addMemory (long bytes , String label ) {
340+ totalRamBytesUsed += bytes ;
341+ circuitBreaker .addEstimateBytesAndMaybeBreak (bytes , label );
342+ }
343+
344+ private void clearCircuitBreaker () {
345+ circuitBreaker .addWithoutBreaking (-totalRamBytesUsed );
346+ stackRamBytesUsed = 0 ;
347+ samplesRamBytesUsed = 0 ;
348+ totalRamBytesUsed = 0 ;
349+ totalPageSize = 0 ;
350+ previousTotalPageSize = 0 ;
351+ }
352+
291353 private TimeValue timeTook () {
292354 return new TimeValue (System .currentTimeMillis () - startTime );
293355 }
294356
295- private record Page (
296- List <InternalComposite .InternalBucket > hits ,
297- int size ,
298- Map <String , Object > afterKey ,
299- List <String > keys ,
300- SampleQueryRequest request
301- ) {
302- Page (InternalComposite compositeAgg , SampleQueryRequest request ) {
303- this (compositeAgg .getBuckets (), compositeAgg .getBuckets ().size (), compositeAgg .afterKey (), request .keys (), request );
357+ protected static class Page implements Accountable {
358+ final List <InternalComposite .InternalBucket > hits ;
359+ final int size ;
360+ final Map <String , Object > afterKey ;
361+ final List <String > keys ;
362+ final SampleQueryRequest request ;
363+
364+ long ramBytesUsed = 0 ;
365+
366+ private static final long SHALLOW_SIZE = RamUsageEstimator .shallowSizeOfInstance (Page .class );
367+
368+ // for test purposes only
369+ protected Page (int size ) {
370+ hits = null ;
371+ this .size = size ;
372+ afterKey = null ;
373+ keys = null ;
374+ request = null ;
375+ }
376+
377+ protected Page (InternalComposite compositeAgg , SampleQueryRequest request ) {
378+ hits = compositeAgg .getBuckets ();
379+ size = compositeAgg .getBuckets ().size ();
380+ afterKey = compositeAgg .afterKey ();
381+ keys = request .keys ();
382+ this .request = request ;
383+ }
384+
385+ @ Override
386+ public long ramBytesUsed () {
387+ if (ramBytesUsed == 0 ) {
388+ ramBytesUsed = SHALLOW_SIZE ;
389+ ramBytesUsed += RamUsageEstimator .sizeOfCollection (hits );
390+ ramBytesUsed += RamUsageEstimator .sizeOfCollection (keys );
391+ ramBytesUsed += RamUsageEstimator .sizeOfMap (afterKey );
392+ }
393+ return ramBytesUsed ;
394+ }
395+
396+ @ Override
397+ public Collection <Accountable > getChildResources () {
398+ return Accountable .super .getChildResources ();
304399 }
305400 }
306401}
0 commit comments