2323 * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
2424 * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
2525 * Only one background job can run simultaneously and {@link #onFinish} is called when the job
26- * finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call
27- * to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
26+ * finishes before state is persisted. The indexer can be stopped early by a call to {@link #stop()} which will
27+ * trigger the {@link #onStopping()} and {@link #onStopped()} methods.
28+ * {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
2829 * is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()}
2930 * to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called.
30- * {@link #stop()} can be used to stop the background job without aborting the indexer.
3131 *
3232 * In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
3333 * indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
@@ -87,10 +87,10 @@ public synchronized IndexerState start() {
8787
8888 /**
8989 * Sets the internal state to {@link IndexerState#STOPPING} if an async job is
90- * running in the background, {@link #onStop ()} will be called when the background job
90+ * running in the background, {@link #onStopped ()} will be called when the background job
9191 * detects that the indexer is stopped.
9292 * If there is no job running when this function is called the returned
93- * state is {@link IndexerState#STOPPED} and {@link #onStop ()} will not be called.
93+ * state is {@link IndexerState#STOPPED} and {@link #onStopped ()} will not be called.
9494 *
9595 * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
9696 */
@@ -249,20 +249,30 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
249249
250250 /**
251251 * Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to
252- * {@link IndexerState#STARTED}.
252+ * {@link IndexerState#STARTED} and before {@link #doSaveState(IndexerState, Object, Runnable)} is called
253253 *
254254 * @param listener listener to call after done
255255 */
256256 protected abstract void onFinish (ActionListener <Void > listener );
257257
258+
258259 /**
259- * Called when the indexer is stopped. This is only called when the indexer is stopped
260- * via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called
261- * when the indexer's work is done.
260+ * Called when a background job stops after internal state has changed from {@link IndexerState#STOPPING}
261+ * to {@link IndexerState#STOPPED} and before state is persisted via {@link #doSaveState(IndexerState, Object, Runnable)}.
262+ * This is only called when the indexer is stopped due to a call to {@link #stop()}
262263 */
263- protected void onStop () {
264+ protected void onStopping () {
264265 }
265266
267+ /**
268+ * Called when the indexer is stopped after {@link #onStopping()} and {@link #doSaveState(IndexerState, Object, Runnable)}
269+ * have been called.
270+ */
271+ protected void onStopped () {
272+ }
273+
274+
275+
266276 /**
267277 * Called when a background job detects that the indexer is aborted causing the
268278 * async execution to stop.
@@ -280,16 +290,18 @@ private void finishWithIndexingFailure(Exception exc) {
280290 }
281291
282292 private IndexerState finishAndSetState () {
283- AtomicBoolean callOnStop = new AtomicBoolean (false );
284- AtomicBoolean callOnAbort = new AtomicBoolean (false );
293+ AtomicBoolean callOnStopping = new AtomicBoolean ();
294+ AtomicBoolean callOnAbort = new AtomicBoolean ();
285295 IndexerState updatedState = state .updateAndGet (prev -> {
296+ callOnAbort .set (false );
297+ callOnStopping .set (false );
286298 switch (prev ) {
287299 case INDEXING :
288300 // ready for another job
289301 return IndexerState .STARTED ;
290302
291303 case STOPPING :
292- callOnStop .set (true );
304+ callOnStopping .set (true );
293305 // must be started again
294306 return IndexerState .STOPPED ;
295307
@@ -311,8 +323,8 @@ private IndexerState finishAndSetState() {
311323 }
312324 });
313325
314- if (callOnStop .get ()) {
315- onStop ();
326+ if (callOnStopping .get ()) {
327+ onStopping ();
316328 } else if (callOnAbort .get ()) {
317329 onAbort ();
318330 }
@@ -412,7 +424,7 @@ private boolean checkState(IndexerState currentState) {
412424
413425 case STOPPING :
414426 logger .info ("Indexer job encountered [" + IndexerState .STOPPING + "] state, halting indexer." );
415- doSaveState (finishAndSetState (), getPosition (), () -> {} );
427+ doSaveState (finishAndSetState (), getPosition (), this :: onStopped );
416428 return false ;
417429
418430 case STOPPED :
0 commit comments