88
99import org .apache .logging .log4j .LogManager ;
1010import org .apache .logging .log4j .Logger ;
11+ import org .elasticsearch .ExceptionsHelper ;
1112import org .elasticsearch .action .ActionListener ;
1213import org .elasticsearch .action .index .IndexRequest ;
14+ import org .elasticsearch .action .search .SearchPhaseExecutionException ;
1315import org .elasticsearch .action .search .SearchRequest ;
1416import org .elasticsearch .action .search .SearchResponse ;
17+ import org .elasticsearch .action .search .ShardSearchFailure ;
18+ import org .elasticsearch .common .breaker .CircuitBreakingException ;
1519import org .elasticsearch .common .xcontent .XContentBuilder ;
1620import org .elasticsearch .index .query .QueryBuilder ;
1721import org .elasticsearch .search .aggregations .bucket .composite .CompositeAggregation ;
22+ import org .elasticsearch .xpack .core .common .notifications .Auditor ;
1823import org .elasticsearch .xpack .core .dataframe .DataFrameField ;
24+ import org .elasticsearch .xpack .core .dataframe .DataFrameMessages ;
25+ import org .elasticsearch .xpack .core .dataframe .notifications .DataFrameAuditMessage ;
1926import org .elasticsearch .xpack .core .dataframe .transforms .DataFrameIndexerTransformStats ;
2027import org .elasticsearch .xpack .core .dataframe .transforms .DataFrameTransformConfig ;
2128import org .elasticsearch .xpack .core .indexing .AsyncTwoPhaseIndexer ;
2633import java .io .IOException ;
2734import java .io .UncheckedIOException ;
2835import java .util .Map ;
36+ import java .util .Objects ;
2937import java .util .concurrent .Executor ;
3038import java .util .concurrent .atomic .AtomicReference ;
3139import java .util .stream .Collectors ;
3543
3644public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer <Map <String , Object >, DataFrameIndexerTransformStats > {
3745
46+ public static final int MINIMUM_PAGE_SIZE = 10 ;
3847 public static final String COMPOSITE_AGGREGATION_NAME = "_data_frame" ;
3948 private static final Logger logger = LogManager .getLogger (DataFrameIndexer .class );
4049
50+ protected final Auditor <DataFrameAuditMessage > auditor ;
51+
4152 private Pivot pivot ;
53+ private int pageSize = 0 ;
4254
4355 public DataFrameIndexer (Executor executor ,
56+ Auditor <DataFrameAuditMessage > auditor ,
4457 AtomicReference <IndexerState > initialState ,
4558 Map <String , Object > initialPosition ,
4659 DataFrameIndexerTransformStats jobStats ) {
4760 super (executor , initialState , initialPosition , jobStats );
61+ this .auditor = Objects .requireNonNull (auditor );
4862 }
4963
5064 protected abstract DataFrameTransformConfig getConfig ();
5165
5266 protected abstract Map <String , String > getFieldMappings ();
5367
68+ protected abstract void failIndexer (String message );
69+
70+ public int getPageSize () {
71+ return pageSize ;
72+ }
73+
5474 /**
5575 * Request a checkpoint
5676 */
@@ -62,6 +82,11 @@ protected void onStart(long now, ActionListener<Void> listener) {
6282 QueryBuilder queryBuilder = getConfig ().getSource ().getQueryConfig ().getQuery ();
6383 pivot = new Pivot (getConfig ().getSource ().getIndex (), queryBuilder , getConfig ().getPivotConfig ());
6484
85+ // if we haven't set the page size yet, if it is set we might have reduced it after running into an out of memory
86+ if (pageSize == 0 ) {
87+ pageSize = pivot .getInitialPageSize ();
88+ }
89+
6590 // if run for the 1st time, create checkpoint
6691 if (getPosition () == null ) {
6792 createCheckpoint (listener );
@@ -73,6 +98,12 @@ protected void onStart(long now, ActionListener<Void> listener) {
7398 }
7499 }
75100
101+ @ Override
102+ protected void onFinish (ActionListener <Void > listener ) {
103+ // reset the page size, so we do not memorize a low page size forever, the pagesize will be re-calculated on start
104+ pageSize = 0 ;
105+ }
106+
76107 @ Override
77108 protected IterationResult <Map <String , Object >> doProcess (SearchResponse searchResponse ) {
78109 final CompositeAggregation agg = searchResponse .getAggregations ().get (COMPOSITE_AGGREGATION_NAME );
@@ -121,6 +152,70 @@ private Stream<IndexRequest> processBucketsToIndexRequests(CompositeAggregation
121152
122153 @ Override
123154 protected SearchRequest buildSearchRequest () {
124- return pivot .buildSearchRequest (getPosition ());
155+ return pivot .buildSearchRequest (getPosition (), pageSize );
156+ }
157+
158+ /**
159+ * Handle the circuit breaking case: A search consumed to much memory and got aborted.
160+ *
161+ * Going out of memory we smoothly reduce the page size which reduces memory consumption.
162+ *
163+ * Implementation details: We take the values from the circuit breaker as a hint, but
164+ * note that it breaks early, that's why we also reduce using
165+ *
166+ * @param e Exception thrown, only {@link CircuitBreakingException} are handled
167+ * @return true if exception was handled, false if not
168+ */
169+ protected boolean handleCircuitBreakingException (Exception e ) {
170+ CircuitBreakingException circuitBreakingException = getCircuitBreakingException (e );
171+
172+ if (circuitBreakingException == null ) {
173+ return false ;
174+ }
175+
176+ double reducingFactor = Math .min ((double ) circuitBreakingException .getByteLimit () / circuitBreakingException .getBytesWanted (),
177+ 1 - (Math .log10 (pageSize ) * 0.1 ));
178+
179+ int newPageSize = (int ) Math .round (reducingFactor * pageSize );
180+
181+ if (newPageSize < MINIMUM_PAGE_SIZE ) {
182+ String message = DataFrameMessages .getMessage (DataFrameMessages .LOG_DATA_FRAME_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE , pageSize );
183+ failIndexer (message );
184+ return true ;
185+ }
186+
187+ String message = DataFrameMessages .getMessage (DataFrameMessages .LOG_DATA_FRAME_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE , pageSize ,
188+ newPageSize );
189+ auditor .info (getJobId (), message );
190+ logger .info ("Data frame transform [" + getJobId () + "]:" + message );
191+
192+ pageSize = newPageSize ;
193+ return true ;
194+ }
195+
196+ /**
197+ * Inspect exception for circuit breaking exception and return the first one it can find.
198+ *
199+ * @param e Exception
200+ * @return CircuitBreakingException instance if found, null otherwise
201+ */
202+ private static CircuitBreakingException getCircuitBreakingException (Exception e ) {
203+ // circuit breaking exceptions are at the bottom
204+ Throwable unwrappedThrowable = ExceptionsHelper .unwrapCause (e );
205+
206+ if (unwrappedThrowable instanceof CircuitBreakingException ) {
207+ return (CircuitBreakingException ) unwrappedThrowable ;
208+ } else if (unwrappedThrowable instanceof SearchPhaseExecutionException ) {
209+ SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException ) e ;
210+ for (ShardSearchFailure shardFailure : searchPhaseException .shardFailures ()) {
211+ Throwable unwrappedShardFailure = ExceptionsHelper .unwrapCause (shardFailure .getCause ());
212+
213+ if (unwrappedShardFailure instanceof CircuitBreakingException ) {
214+ return (CircuitBreakingException ) unwrappedShardFailure ;
215+ }
216+ }
217+ }
218+
219+ return null ;
125220 }
126221}
0 commit comments