From 3c215d9ac520276252974df084a1d5c57f45670e Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 1 May 2019 13:40:49 +0100 Subject: [PATCH 01/10] Complete P task on stop --- .../core/indexing/AsyncTwoPhaseIndexer.java | 17 +++++- .../integration/DataFrameRestTestCase.java | 6 ++ ...TransportStopDataFrameTransformAction.java | 59 ++++++++----------- .../transforms/DataFrameTransformTask.java | 51 ++++------------ .../test/data_frame/transforms_start_stop.yml | 1 + 5 files changed, 57 insertions(+), 77 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index ec7e0de9e34fc..4d2553d27ad7f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -22,9 +22,11 @@ * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). * Only one background job can run simultaneously and {@link #onFinish} is called when the job - * finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is - * aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when - * {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer. + * finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call + * to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} + * is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()} + * to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called. + * {@link #stop()} can be used to stop the background job without aborting the indexer. * * In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query, * indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input. @@ -251,6 +253,14 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { */ protected abstract void onFinish(ActionListener listener); + /** + * Called when the indexer is stopped. This is only called when the indexer is stopped + * via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called + * when the indexer's work is done. + */ + protected void onStop() { + } + /** * Called when a background job detects that the indexer is aborted causing the * async execution to stop. @@ -276,6 +286,7 @@ private IndexerState finishAndSetState() { case STOPPING: // must be started again + onStop(); return IndexerState.STOPPED; case ABORTING: diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 85c0ac44a69af..6747eab94e835 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -21,6 +21,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; +import org.junit.After; import org.junit.AfterClass; import java.io.IOException; @@ -272,6 +273,11 @@ protected static void deleteDataFrameTransform(String transformId) throws IOExce adminClient().performRequest(request); } + @After + public static void waitForDataFrame() throws Exception { + waitForPendingDataFrameTasks(); + } + @AfterClass public static void removeIndices() throws Exception { wipeDataFrameTransforms(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 120f1ef77596b..343b18882b35b 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -84,41 +84,34 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF RestStatus.CONFLICT)); return; } - if (request.waitForCompletion() == false) { - transformTask.stop(listener); - } else { - ActionListener blockingListener = ActionListener.wrap(response -> { - if (response.isStopped()) { - // The Task acknowledged that it is stopped/stopping... wait until the status actually - // changes over before returning. Switch over to Generic threadpool so - // we don't block the network thread - threadPool.generic().execute(() -> { - try { - long untilInNanos = System.nanoTime() + request.getTimeout().getNanos(); - - while (System.nanoTime() - untilInNanos < 0) { - if (transformTask.isStopped()) { - listener.onResponse(response); - return; - } - Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis()); - } - // ran out of time - listener.onFailure(new ElasticsearchTimeoutException( - DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT, - request.getTimeout().getStringRep(), request.getId()))); - } catch (InterruptedException e) { - listener.onFailure(new ElasticsearchException(DataFrameMessages.getMessage( - DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT, request.getId()), e)); + + transformTask.stop(); + + if (request.waitForCompletion()) { + // Wait until the task status changes before returning. + // Switch over to Generic threadpool so we don't block the network thread + threadPool.generic().execute(() -> { + try { + long untilInNanos = System.nanoTime() + request.getTimeout().getNanos(); + + while (System.nanoTime() - untilInNanos < 0) { + if (transformTask.isStopped()) { + listener.onResponse(new StopDataFrameTransformAction.Response(true)); + return; } - }); - } else { - // Did not acknowledge stop, just return the response - listener.onResponse(response); + Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis()); + } + // ran out of time + listener.onFailure(new ElasticsearchTimeoutException( + DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT, + request.getTimeout().getStringRep(), request.getId()))); + } catch (InterruptedException e) { + listener.onFailure(new ElasticsearchException(DataFrameMessages.getMessage( + DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT, request.getId()), e)); } - }, listener::onFailure); - - transformTask.stop(blockingListener); + }); + } else { + listener.onResponse(new StopDataFrameTransformAction.Response(true)); } } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 2020300a0cf77..0cdb31231cddf 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -27,7 +27,6 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; -import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; @@ -218,51 +217,16 @@ public synchronized void start(ActionListener listener) { )); } - public synchronized void stop(ActionListener listener) { + public synchronized void stop() { if (getIndexer() == null) { - listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later", - getTransformId())); return; } // taskState is initialized as STOPPED and is updated in tandem with the indexerState // Consequently, if it is STOPPED, we consider the whole task STOPPED. if (taskState.get() == DataFrameTransformTaskState.STOPPED) { - listener.onResponse(new StopDataFrameTransformAction.Response(true)); return; } - final IndexerState newState = getIndexer().stop(); - switch (newState) { - case STOPPED: - // Fall through to `STOPPING` as the behavior is the same for both, we should persist for both - case STOPPING: - // update the persistent state to STOPPED. There are two scenarios and both are safe: - // 1. we persist STOPPED now, indexer continues a bit then sees the flag and checkpoints another STOPPED with the more recent - // position. - // 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up at last checkpoint, - // overwrite some docs and eventually checkpoint. - taskState.set(DataFrameTransformTaskState.STOPPED); - DataFrameTransformState state = new DataFrameTransformState( - DataFrameTransformTaskState.STOPPED, - IndexerState.STOPPED, - getIndexer().getPosition(), - currentCheckpoint.get(), - stateReason.get(), - getIndexer().getProgress()); - persistStateToClusterState(state, ActionListener.wrap( - task -> { - auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]"); - listener.onResponse(new StopDataFrameTransformAction.Response(true)); - }, - exc -> listener.onFailure(new ElasticsearchException( - "Error while updating state for data frame transform [{}] to [{}]", exc, - transform.getId(), - state.getIndexerState())))); - break; - default: - listener.onFailure(new ElasticsearchException("Cannot stop task for data frame transform [{}], because state was [{}]", - transform.getId(), newState)); - break; - } + getIndexer().stop(); } @Override @@ -280,12 +244,10 @@ public synchronized void triggered(Event event) { /** * Attempt to gracefully cleanup the data frame transform so it can be terminated. - * This tries to remove the job from the scheduler, and potentially any other - * cleanup operations in the future + * This tries to remove the job from the scheduler and completes the persistent task */ synchronized void shutdown() { try { - logger.info("Data frame indexer [" + transform.getId() + "] received abort request, stopping indexer."); schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId()); schedulerEngine.unregister(this); } catch (Exception e) { @@ -612,6 +574,13 @@ protected void onFinish(ActionListener listener) { } } + @Override + protected void onStop() { + auditor.info(transformConfig.getId(), "Received stop request, stopping indexer"); + logger.info("Data frame transform [{}] received stop request, stopping indexer", transformConfig.getId()); + transformTask.shutdown(); + } + @Override protected void onAbort() { auditor.info(transformConfig.getId(), "Received abort request, stopping indexer"); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index f1ac07b72340c..ad5a6a9fc5d36 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -106,6 +106,7 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "airline-transform-start-stop" + wait_for_completion: true - match: { stopped: true } - do: From 18840a0a3366780c82cab8e9dc61486e34de745c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 1 May 2019 20:02:06 +0100 Subject: [PATCH 02/10] onStop tests --- .../core/indexing/AsyncTwoPhaseIndexer.java | 7 +- .../indexing/AsyncTwoPhaseIndexerTests.java | 75 ++++++++++++++++--- ...portGetDataFrameTransformsStatsAction.java | 1 - .../transforms/DataFrameTransformTask.java | 4 +- 4 files changed, 72 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 4d2553d27ad7f..ccf075b13ae5a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -86,8 +86,10 @@ public synchronized IndexerState start() { /** * Sets the internal state to {@link IndexerState#STOPPING} if an async job is - * running in the background. If there is no job running when this function is - * called, the state is directly set to {@link IndexerState#STOPPED}. + * running in the background, {@link #onStop()} will be called when the background job + * detects that the indexer is stopped. + * If there is no job running when this function is called + * the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly. * * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). */ @@ -96,6 +98,7 @@ public synchronized IndexerState stop() { if (previousState == IndexerState.INDEXING) { return IndexerState.STOPPING; } else if (previousState == IndexerState.STARTED) { + onStop(); return IndexerState.STOPPED; } else { return previousState; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index b39c4f1a25a76..e56491bdb5764 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.io.IOException; import java.util.Collections; @@ -34,17 +35,26 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { AtomicBoolean isFinished = new AtomicBoolean(false); + AtomicBoolean isStopped = new AtomicBoolean(false); + + @Before + public void reset() { + isFinished.set(false); + isStopped.set(false); + } private class MockIndexer extends AsyncTwoPhaseIndexer { private final CountDownLatch latch; // test the execution order private volatile int step; + private final boolean stoppedBeforeFinished; protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition, - CountDownLatch latch) { + CountDownLatch latch, boolean stoppedBeforeFinished) { super(executor, initialState, initialPosition, new MockJobStats()); this.latch = latch; + this.stoppedBeforeFinished = stoppedBeforeFinished; } @Override @@ -57,7 +67,7 @@ protected IterationResult doProcess(SearchResponse searchResponse) { awaitForLatch(); assertThat(step, equalTo(3)); ++step; - return new IterationResult(Collections.emptyList(), 3, true); + return new IterationResult<>(Collections.emptyList(), 3, true); } private void awaitForLatch() { @@ -99,7 +109,8 @@ protected void doNextBulk(BulkRequest request, ActionListener next @Override protected void doSaveState(IndexerState state, Integer position, Runnable next) { - assertThat(step, equalTo(5)); + int expectedStep = stoppedBeforeFinished ? 3 : 5; + assertThat(step, equalTo(expectedStep)); ++step; next.run(); } @@ -114,7 +125,12 @@ protected void onFinish(ActionListener listener) { assertThat(step, equalTo(4)); ++step; listener.onResponse(null); - isFinished.set(true); + assertTrue(isFinished.compareAndSet(false, true)); + } + + @Override + protected void onStop() { + assertTrue(isStopped.compareAndSet(false, true)); } @Override @@ -180,7 +196,7 @@ protected void doSaveState(IndexerState state, Integer position, Runnable next) protected void onFailure(Exception exc) { assertThat(step, equalTo(2)); ++step; - isFinished.set(true); + assertTrue(isFinished.compareAndSet(false, true)); } @Override @@ -209,10 +225,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public void testStateMachine() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); - isFinished.set(false); try { CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); @@ -220,7 +235,8 @@ public void testStateMachine() throws Exception { countDownLatch.countDown(); assertThat(indexer.getPosition(), equalTo(2)); - ESTestCase.awaitBusy(() -> isFinished.get()); + assertTrue(awaitBusy(() -> isFinished.get())); + assertFalse(isStopped.get()); assertThat(indexer.getStep(), equalTo(6)); assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); assertThat(indexer.getStats().getNumPages(), equalTo(1L)); @@ -234,18 +250,57 @@ public void testStateMachine() throws Exception { public void testStateMachineBrokenSearch() throws InterruptedException { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); - isFinished.set(false); try { MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS)); + assertTrue(awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS)); assertThat(indexer.getStep(), equalTo(3)); } finally { executor.shutdownNow(); } } + + public void testStop_AfterIndexerIsFinished() throws InterruptedException { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + CountDownLatch countDownLatch = new CountDownLatch(1); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + countDownLatch.countDown(); + assertTrue(awaitBusy(() -> isFinished.get())); + + indexer.stop(); + assertTrue(isStopped.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); + } finally { + executor.shutdownNow(); + } + } + + public void testStop_WhileIndexing() throws InterruptedException { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + CountDownLatch countDownLatch = new CountDownLatch(1); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + indexer.stop(); + countDownLatch.countDown(); + + assertThat(indexer.getPosition(), equalTo(2)); + assertTrue(awaitBusy(() -> isStopped.get())); + assertFalse(isFinished.get()); + } finally { + executor.shutdownNow(); + } + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index 7ab5f28001407..bb01da4c7e50a 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -132,7 +132,6 @@ protected void doExecute(Task task, Request request, ActionListener fi }, e -> { // If the index to search, or the individual config is not there, just return empty - logger.error("failed to expand ids", e); if (e instanceof ResourceNotFoundException) { finalListener.onResponse(new Response(Collections.emptyList())); } else { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 0cdb31231cddf..636b8d555a536 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -576,8 +576,8 @@ protected void onFinish(ActionListener listener) { @Override protected void onStop() { - auditor.info(transformConfig.getId(), "Received stop request, stopping indexer"); - logger.info("Data frame transform [{}] received stop request, stopping indexer", transformConfig.getId()); + auditor.info(transformConfig.getId(), "Indexer has stopped"); + logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); transformTask.shutdown(); } From 1ceacf15dfaf83f06cb93f49528b6d2651f60eac Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 1 May 2019 20:02:27 +0100 Subject: [PATCH 03/10] Wait for p task stopped in action --- ...TransportStopDataFrameTransformAction.java | 88 ++++++++++++------- .../transforms/DataFrameTransformTask.java | 6 +- .../test/data_frame/transforms_start_stop.yml | 2 + 3 files changed, 61 insertions(+), 35 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 343b18882b35b..e2b1c6e6670fc 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -5,9 +5,7 @@ */ package org.elasticsearch.xpack.dataframe.action; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; @@ -16,50 +14,58 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; -import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; - public class TransportStopDataFrameTransformAction extends TransportTasksAction { - private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); private final ThreadPool threadPool; private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; + private final PersistentTasksService persistentTasksService; @Inject public TransportStopDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService, ThreadPool threadPool, + PersistentTasksService persistentTasksService, DataFrameTransformsConfigManager dataFrameTransformsConfigManager) { super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new, StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME); this.threadPool = threadPool; this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; + this.persistentTasksService = persistentTasksService; } @Override protected void doExecute(Task task, StopDataFrameTransformAction.Request request, ActionListener listener) { + final ActionListener finalListener; + if (request.waitForCompletion()) { + finalListener = waitForStopListener(request, listener); + } else { + finalListener = listener; + } + dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap( expandedIds -> { request.setExpandedIds(new HashSet<>(expandedIds)); request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state())); - super.doExecute(task, request, listener); + super.doExecute(task, request, finalListener); }, listener::onFailure )); @@ -86,33 +92,6 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF } transformTask.stop(); - - if (request.waitForCompletion()) { - // Wait until the task status changes before returning. - // Switch over to Generic threadpool so we don't block the network thread - threadPool.generic().execute(() -> { - try { - long untilInNanos = System.nanoTime() + request.getTimeout().getNanos(); - - while (System.nanoTime() - untilInNanos < 0) { - if (transformTask.isStopped()) { - listener.onResponse(new StopDataFrameTransformAction.Response(true)); - return; - } - Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis()); - } - // ran out of time - listener.onFailure(new ElasticsearchTimeoutException( - DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT, - request.getTimeout().getStringRep(), request.getId()))); - } catch (InterruptedException e) { - listener.onFailure(new ElasticsearchException(DataFrameMessages.getMessage( - DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT, request.getId()), e)); - } - }); - } else { - listener.onResponse(new StopDataFrameTransformAction.Response(true)); - } } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); @@ -132,4 +111,45 @@ protected StopDataFrameTransformAction.Response newResponse(StopDataFrameTransfo boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped); return new StopDataFrameTransformAction.Response(allStopped); } + + private ActionListener + waitForStopListener(StopDataFrameTransformAction.Request request, + ActionListener listener) { + + return ActionListener.wrap( + response -> { + // Wait until the persistent task is stopped + // Switch over to Generic threadpool so we don't block the network thread + threadPool.generic().execute(() -> + waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), listener)); + }, + listener::onFailure + ); + } + + private void waitForDataFrameStopped(Collection persistentTaskIds, TimeValue timeout, + ActionListener listener) { + persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> { + + logger.error("PTasks: " + persistentTasksCustomMetaData.toString()); + + for (String persistentTaskId: persistentTaskIds) { + if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { + return false; + } + } + logger.error("task gone"); + return true; + }, timeout, new ActionListener<>() { + @Override + public void onResponse(Boolean result) { + listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 636b8d555a536..ba55918d4cdde 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -84,7 +84,7 @@ public DataFrameTransformTask(long id, String type, String action, TaskId parent String initialReason = null; long initialGeneration = 0; Map initialPosition = null; - logger.info("[{}] init, got state: [{}]", transform.getId(), state != null); + logger.trace("[{}] init, got state: [{}]", transform.getId(), state != null); if (state != null) { initialTaskState = state.getTaskState(); initialReason = state.getReason(); @@ -218,6 +218,7 @@ public synchronized void start(ActionListener listener) { } public synchronized void stop() { + logger.info("stop task"); if (getIndexer() == null) { return; } @@ -226,6 +227,7 @@ public synchronized void stop() { if (taskState.get() == DataFrameTransformTaskState.STOPPED) { return; } + getIndexer().stop(); } @@ -247,6 +249,7 @@ public synchronized void triggered(Event event) { * This tries to remove the job from the scheduler and completes the persistent task */ synchronized void shutdown() { + logger.info("Shutting down"); try { schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId()); schedulerEngine.unregister(this); @@ -255,6 +258,7 @@ synchronized void shutdown() { return; } markAsCompleted(); + logger.info("task marked as completed"); } public DataFrameTransformProgress getProgress() { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index ad5a6a9fc5d36..9fe814574b012 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -233,6 +233,8 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "_all" + wait_for_completion: true + - match: { stopped: true } - do: From b53f71184907a9d1cca215cc594ae7d4ebd31abf Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 18 Apr 2019 10:15:07 +0100 Subject: [PATCH 04/10] Delete data frame just removes the config This changes the behaviour of delete to simply remove the config after checking the DF does not have a running P task --- .../DeleteDataFrameTransformAction.java | 85 ++--------------- ...DataFrameTransformActionResponseTests.java | 22 ----- ...ansportDeleteDataFrameTransformAction.java | 95 ++++++++----------- .../TransportPutDataFrameTransformAction.java | 4 - .../RestDeleteDataFrameTransformAction.java | 3 +- 5 files changed, 45 insertions(+), 164 deletions(-) delete mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java index 6b7de0ab80f3a..82a6a4c81d263 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java @@ -7,25 +7,17 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.TaskOperationFailure; -import org.elasticsearch.action.support.tasks.BaseTasksRequest; -import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper; import java.io.IOException; -import java.util.Collections; -import java.util.List; import java.util.Objects; -public class DeleteDataFrameTransformAction extends Action { +public class DeleteDataFrameTransformAction extends Action { public static final DeleteDataFrameTransformAction INSTANCE = new DeleteDataFrameTransformAction(); public static final String NAME = "cluster:admin/data_frame/delete"; @@ -35,17 +27,12 @@ private DeleteDataFrameTransformAction() { } @Override - public Response newResponse() { + public AcknowledgedResponse newResponse() { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - @Override - public Writeable.Reader getResponseReader() { - return Response::new; - } - - public static class Request extends BaseTasksRequest { - private final String id; + public static class Request extends MasterNodeRequest { + private String id; public Request(String id) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); @@ -60,11 +47,6 @@ public String getId() { return id; } - @Override - public boolean match(Task task) { - return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -94,59 +76,4 @@ public boolean equals(Object obj) { return Objects.equals(id, other.id); } } - - public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { - - private final boolean acknowledged; - - public Response(StreamInput in) throws IOException { - super(in); - acknowledged = in.readBoolean(); - } - - public Response(boolean acknowledged, List taskFailures, List nodeFailures) { - super(taskFailures, nodeFailures); - this.acknowledged = acknowledged; - } - - public Response(boolean acknowledged) { - this(acknowledged, Collections.emptyList(), Collections.emptyList()); - } - - public boolean isDeleted() { - return acknowledged; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(acknowledged); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - { - toXContentCommon(builder, params); - builder.field("acknowledged", acknowledged); - } - builder.endObject(); - return builder; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - DeleteDataFrameTransformAction.Response response = (DeleteDataFrameTransformAction.Response) o; - return super.equals(o) && acknowledged == response.acknowledged; - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), acknowledged); - } - } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java deleted file mode 100644 index 54501fde5cfe8..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.core.dataframe.action; - -import org.elasticsearch.common.io.stream.Writeable.Reader; -import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response; - -public class DeleteDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase { - @Override - protected Response createTestInstance() { - return new Response(randomBoolean()); - } - - @Override - protected Reader instanceReader() { - return Response::new; - } -} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java index 2cdc4009e785b..2dcb4d247f8e7 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java @@ -5,93 +5,72 @@ */ package org.elasticsearch.xpack.dataframe.action; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionListenerResponseHandler; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.tasks.Task; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request; -import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response; -import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; -import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; -import java.util.List; +import java.io.IOException; -public class TransportDeleteDataFrameTransformAction extends TransportTasksAction { +public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction { private final DataFrameTransformsConfigManager transformsConfigManager; @Inject - public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, - ClusterService clusterService, DataFrameTransformsConfigManager transformsConfigManager) { - super(DeleteDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new, - Response::new, ThreadPool.Names.SAME); + public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + DataFrameTransformsConfigManager transformsConfigManager) { + super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, + Request::new, indexNameExpressionResolver); this.transformsConfigManager = transformsConfigManager; } @Override - protected Response newResponse(Request request, List tasks, List taskOperationFailures, - List failedNodeExceptions) { - assert tasks.size() + taskOperationFailures.size() == 1; - boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(Response::isDeleted); + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } - return new Response(cancelled, taskOperationFailures, failedNodeExceptions); + protected AcknowledgedResponse read(StreamInput in) throws IOException { + AcknowledgedResponse response = new AcknowledgedResponse(); + response.readFrom(in); + return response; } @Override - protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener listener) { - assert task.getTransformId().equals(request.getId()); - IndexerState state = task.getState().getIndexerState(); - if (state.equals(IndexerState.STOPPED)) { - task.onCancelled(); - transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> { - listener.onResponse(new Response(true)); - }, listener::onFailure)); + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { + listener.onFailure(new ElasticsearchStatusException("Cannot delete data frame [" + request.getId() + + "] as the task is running. Stop the task first", RestStatus.CONFLICT)); } else { - listener.onFailure(new IllegalStateException("Could not delete transform [" + request.getId() + "] because " - + "indexer state is [" + state + "]. Transform must be [" + IndexerState.STOPPED + "] before deletion.")); + // Task is not runing, delete the configuration document + transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap( + r -> listener.onResponse(new AcknowledgedResponse(r)), + listener::onFailure)); } } @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - final ClusterState state = clusterService.state(); - final DiscoveryNodes nodes = state.nodes(); - if (nodes.isLocalNodeElectedMaster()) { - PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { - super.doExecute(task, request, listener); - } else { - // we couldn't find the transform in the persistent task CS, but maybe the transform exists in the configuration index, - // if so delete the orphaned document and do not throw (for the normal case we want to stop the task first, - // than delete the configuration document if and only if the data frame transform is in stopped state) - transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> { - listener.onResponse(new Response(true)); - return; - }, listener::onFailure)); - } - } else { - // Delegates DeleteTransform to elected master node, so it becomes the coordinating node. - // Non-master nodes may have a stale cluster state that shows transforms which are cancelled - // on the master, which makes testing difficult. - if (nodes.getMasterNode() == null) { - listener.onFailure(new MasterNotDiscoveredException("no known master nodes")); - } else { - transportService.sendRequest(nodes.getMasterNode(), actionName, request, - new ActionListenerResponseHandler<>(listener, Response::new)); - } - } + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return null; } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index 0b8ef692cdd8c..997739b2407a7 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -6,8 +6,6 @@ package org.elasticsearch.xpack.dataframe.action; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; @@ -63,8 +61,6 @@ public class TransportPutDataFrameTransformAction extends TransportMasterNodeAction { - private static final Logger logger = LogManager.getLogger(TransportPutDataFrameTransformAction.class); - private final XPackLicenseState licenseState; private final Client client; private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java index 183952e060338..125e61b5021e4 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; @@ -33,7 +34,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id); return channel -> client.execute(DeleteDataFrameTransformAction.INSTANCE, request, - new BaseTasksResponseToXContentListener<>(channel)); + new RestToXContentListener<>(channel)); } @Override From 4a76a94ddbfb181d0cb37192b52463d019ea3098 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 2 May 2019 14:17:48 +0100 Subject: [PATCH 05/10] Fix the tests --- .../action/DeleteDataFrameTransformAction.java | 10 ++++++++++ .../dataframe/integration/DataFrameIntegTestCase.java | 6 +++--- .../dataframe/integration/DataFrameRestTestCase.java | 7 +++---- .../action/TransportStopDataFrameTransformAction.java | 5 +++++ .../test/data_frame/transforms_start_stop.yml | 1 + 5 files changed, 22 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java index 82a6a4c81d263..715fa0f5dc78b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper; @@ -31,6 +32,15 @@ public AcknowledgedResponse newResponse() { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } + @Override + public Writeable.Reader getResponseReader() { + return in -> { + AcknowledgedResponse response = new AcknowledgedResponse(); + response.readFrom(in); + return response; + }; + } + public static class Request extends MasterNodeRequest { private String id; diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index 84f3e05de5cd1..ba6a6137789a3 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -93,11 +93,11 @@ protected StartDataFrameTransformAction.Response startDataFrameTransform(String new StartDataFrameTransformAction.Request(id, false)).actionGet(); } - protected DeleteDataFrameTransformAction.Response deleteDataFrameTransform(String id) { - DeleteDataFrameTransformAction.Response response = client().execute(DeleteDataFrameTransformAction.INSTANCE, + protected AcknowledgedResponse deleteDataFrameTransform(String id) { + AcknowledgedResponse response = client().execute(DeleteDataFrameTransformAction.INSTANCE, new DeleteDataFrameTransformAction.Request(id)) .actionGet(); - if (response.isDeleted()) { + if (response.isAcknowledged()) { transformConfigs.remove(id); } return response; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 6747eab94e835..4344aa823b4cc 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -274,20 +274,19 @@ protected static void deleteDataFrameTransform(String transformId) throws IOExce } @After - public static void waitForDataFrame() throws Exception { + public void waitForDataFrame() throws Exception { + wipeDataFrameTransforms(); waitForPendingDataFrameTasks(); } @AfterClass public static void removeIndices() throws Exception { - wipeDataFrameTransforms(); - waitForPendingDataFrameTasks(); // we might have disabled wiping indices, but now its time to get rid of them // note: can not use super.cleanUpCluster() as this method must be static wipeIndices(); } - protected static void wipeDataFrameTransforms() throws IOException, InterruptedException { + public void wipeDataFrameTransforms() throws IOException, InterruptedException { List> transformConfigs = getDataFrameTransforms(); for (Map transformConfig : transformConfigs) { String transformId = (String) transformConfig.get("id"); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index e2b1c6e6670fc..45c1c1bf07053 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -92,6 +92,7 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF } transformTask.stop(); + listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); @@ -131,6 +132,10 @@ private void waitForDataFrameStopped(Collection persistentTaskIds, TimeV ActionListener listener) { persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> { + logger.error("Waiting for tasks " + persistentTaskIds); + if (persistentTasksCustomMetaData == null) { + return true; + } logger.error("PTasks: " + persistentTasksCustomMetaData.toString()); for (String persistentTaskId: persistentTaskIds) { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 9fe814574b012..1e9223b79f201 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -200,6 +200,7 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "airline-transform-start-later" + wait_for_completion: true - match: { stopped: true } - do: From 22d7cc69285a62ec93ddc998c0d5c4ddb831972e Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 2 May 2019 14:20:06 +0100 Subject: [PATCH 06/10] Clean up logging --- .../action/TransportStopDataFrameTransformAction.java | 8 +++----- .../dataframe/transforms/DataFrameTransformTask.java | 3 --- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 45c1c1bf07053..314831c97db1f 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -132,19 +132,17 @@ private void waitForDataFrameStopped(Collection persistentTaskIds, TimeV ActionListener listener) { persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> { - logger.error("Waiting for tasks " + persistentTaskIds); if (persistentTasksCustomMetaData == null) { return true; } - logger.error("PTasks: " + persistentTasksCustomMetaData.toString()); - for (String persistentTaskId: persistentTaskIds) { + for (String persistentTaskId : persistentTaskIds) { if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { return false; } } - logger.error("task gone"); - return true; + return true; + }, timeout, new ActionListener<>() { @Override public void onResponse(Boolean result) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index ba55918d4cdde..c332d29945aaf 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -218,7 +218,6 @@ public synchronized void start(ActionListener listener) { } public synchronized void stop() { - logger.info("stop task"); if (getIndexer() == null) { return; } @@ -249,7 +248,6 @@ public synchronized void triggered(Event event) { * This tries to remove the job from the scheduler and completes the persistent task */ synchronized void shutdown() { - logger.info("Shutting down"); try { schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId()); schedulerEngine.unregister(this); @@ -258,7 +256,6 @@ synchronized void shutdown() { return; } markAsCompleted(); - logger.info("task marked as completed"); } public DataFrameTransformProgress getProgress() { From b6aadd7953fc812d3358ee91ac5c18bb4248278e Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 3 May 2019 10:08:37 +0100 Subject: [PATCH 07/10] Address review comments --- .../action/TransportDeleteDataFrameTransformAction.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java index 2dcb4d247f8e7..ac40334dfb443 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -62,7 +63,7 @@ protected void masterOperation(Request request, ClusterState state, ActionListen listener.onFailure(new ElasticsearchStatusException("Cannot delete data frame [" + request.getId() + "] as the task is running. Stop the task first", RestStatus.CONFLICT)); } else { - // Task is not runing, delete the configuration document + // Task is not running, delete the configuration document transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap( r -> listener.onResponse(new AcknowledgedResponse(r)), listener::onFailure)); @@ -71,6 +72,6 @@ protected void masterOperation(Request request, ClusterState state, ActionListen @Override protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return null; + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); } } From f775aeeaba7c9f5f2d2fd977f7d9eee93d74cc51 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 3 May 2019 11:24:28 +0100 Subject: [PATCH 08/10] Make master the coordinating node for stop --- ...TransportStopDataFrameTransformAction.java | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 314831c97db1f..26f5259c69dc8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -7,13 +7,17 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; @@ -53,22 +57,33 @@ public TransportStopDataFrameTransformAction(TransportService transportService, @Override protected void doExecute(Task task, StopDataFrameTransformAction.Request request, ActionListener listener) { - - final ActionListener finalListener; - if (request.waitForCompletion()) { - finalListener = waitForStopListener(request, listener); + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + if (nodes.isLocalNodeElectedMaster() == false) { + // Delegates stop data frame to elected master node so it becomes the coordinating node. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master node")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, StopDataFrameTransformAction.Response::new)); + } } else { - finalListener = listener; - } + final ActionListener finalListener; + if (request.waitForCompletion()) { + finalListener = waitForStopListener(request, listener); + } else { + finalListener = listener; + } - dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap( - expandedIds -> { - request.setExpandedIds(new HashSet<>(expandedIds)); - request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state())); - super.doExecute(task, request, finalListener); - }, - listener::onFailure - )); + dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap( + expandedIds -> { + request.setExpandedIds(new HashSet<>(expandedIds)); + request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state())); + super.doExecute(task, request, finalListener); + }, + listener::onFailure + )); + } } @Override From fb88d248a349968830dd7bd010c1bfb6ced6379e Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 3 May 2019 16:48:59 +0100 Subject: [PATCH 09/10] Wait for DF stopped in client tests --- .../documentation/DataFrameTransformDocumentationIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 3c5059279b44d..07713d5371460 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -76,7 +76,8 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest @After public void cleanUpTransforms() throws IOException { for (String transformId : transformsToClean) { - highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT); + highLevelClient().dataFrame().stopDataFrameTransform( + new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { From 51a17f0d945fff9db4d35b3120b1cc956c166d41 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 7 May 2019 09:58:25 +0100 Subject: [PATCH 10/10] Wait for DF stopped in client tests cleanup --- .../java/org/elasticsearch/client/DataFrameTransformIT.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index f01db621bc2e0..1bd49154ee548 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -141,7 +141,8 @@ private void indexData(String indexName) throws IOException { @After public void cleanUpTransforms() throws IOException { for (String transformId : transformsToClean) { - highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT); + highLevelClient().dataFrame().stopDataFrameTransform( + new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { @@ -265,7 +266,7 @@ public void testStartStop() throws IOException { assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1)); assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState()); - StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id); + StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null); StopDataFrameTransformResponse stopResponse = execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync); assertTrue(stopResponse.isStopped());