From 3495f070ee79624b838979507e7937455fed5ded Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 21 Mar 2017 11:11:05 -0400 Subject: [PATCH 1/2] Reindex: wait for cleanup before responding Changes reindex and friends to wait until the entire request has been "cleaned up" before responding. "Clean up" in this context is clearing the scroll and (for reindex-from-remote) shutting down the client. Failures to clean up are still only logged, not returned to the user. Closes #23653 --- .../AbstractAsyncBulkByScrollAction.java | 20 ++++++++++------- .../byscroll/ClientScrollableHitSource.java | 3 ++- .../bulk/byscroll/ScrollableHitSource.java | 22 ++++++++++++------- .../remote/RemoteScrollableHitSource.java | 9 +++++--- .../RemoteScrollableHitSourceTests.java | 21 ++++++++++++++++++ 5 files changed, 55 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java index 834321f1798ec..72d39c038ed01 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java @@ -465,14 +465,18 @@ protected void finishHim(Exception failure) { * @param searchFailures any search failures accumulated during the request * @param timedOut have any of the sub-requests timed out? */ - protected void finishHim(Exception failure, List indexingFailures, List searchFailures, boolean timedOut) { - scrollSource.close(); - if (failure == null) { - listener.onResponse( - buildResponse(timeValueNanos(System.nanoTime() - startTime.get()), indexingFailures, searchFailures, timedOut)); - } else { - listener.onFailure(failure); - } + protected void finishHim(Exception failure, List indexingFailures, + List searchFailures, boolean timedOut) { + scrollSource.close(() -> { + if (failure == null) { + BulkByScrollResponse response = buildResponse( + timeValueNanos(System.nanoTime() - startTime.get()), + indexingFailures, searchFailures, timedOut); + listener.onResponse(response); + } else { + listener.onFailure(failure); + } + }); } /** diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java index 9fc02e29e62c9..d477021e0d352 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java @@ -113,8 +113,9 @@ public void onFailure(Exception e) { } @Override - protected void cleanup() { + protected void cleanup(Runnable onCompletion) { // Nothing to do + onCompletion.run(); } /** diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java index 73aa653698611..6426bad592f3e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java @@ -47,7 +47,7 @@ /** * A scrollable source of results. */ -public abstract class ScrollableHitSource implements Closeable { +public abstract class ScrollableHitSource { private final AtomicReference scrollId = new AtomicReference<>(); protected final Logger logger; @@ -82,25 +82,31 @@ public final void startNextScroll(TimeValue extraKeepAlive, Consumer o } protected abstract void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer onResponse); - @Override - public final void close() { + public final void close(Runnable onCompletion) { String scrollId = this.scrollId.get(); if (Strings.hasLength(scrollId)) { - clearScroll(scrollId, this::cleanup); + clearScroll(scrollId, () -> cleanup(onCompletion)); } else { - cleanup(); + cleanup(onCompletion); } } + /** * Called to clear a scroll id. + * * @param scrollId the id to clear - * @param onCompletion implementers must call this after completing the clear whether they are successful or not + * @param onCompletion implementers must call this after completing the clear whether they are + * successful or not */ protected abstract void clearScroll(String scrollId, Runnable onCompletion); /** - * Called after the process has been totally finished to clean up any resources the process needed like remote connections. + * Called after the process has been totally finished to clean up any resources the process + * needed like remote connections. + * + * @param onCompletion implementers must call this after completing the cleanup whether they are + * successful or not */ - protected abstract void cleanup(); + protected abstract void cleanup(Runnable onCompletion); /** * Set the id of the last scroll. Used for debugging. diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java index 796106c269e50..6781da6497201 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java @@ -141,15 +141,18 @@ private void logFailure(Exception e) { } @Override - protected void cleanup() { - /* This is called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail to - * close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. */ + protected void cleanup(Runnable onCompletion) { + /* This is called on the RestClient's thread pool and attempting to close the client on its + * own threadpool causes it to fail to close. So we always shutdown the RestClient + * asynchronously on a thread in Elasticsearch's generic thread pool. */ threadPool.generic().submit(() -> { try { client.close(); logger.debug("Shut down remote connection"); } catch (IOException e) { logger.error("Failed to shutdown the remote connection", e); + } finally { + onCompletion.run(); } }); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java index 7376ed543490a..eb7abea6af5d1 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java @@ -80,7 +80,9 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteScrollableHitSourceTests extends ESTestCase { @@ -478,6 +480,25 @@ public void testUnexpectedJsonThinksRemoveIsNotES() throws IOException { e.getCause().getCause().getCause().getMessage()); } + public void testCleanupSuccessful() throws Exception { + AtomicBoolean cleanupCallbackCalled = new AtomicBoolean(); + RestClient client = mock(RestClient.class); + TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(client); + hitSource.cleanup(() -> cleanupCallbackCalled.set(true)); + verify(client).close(); + assertTrue(cleanupCallbackCalled.get()); + } + + public void testCleanupFailure() throws Exception { + AtomicBoolean cleanupCallbackCalled = new AtomicBoolean(); + RestClient client = mock(RestClient.class); + doThrow(new RuntimeException("test")).when(client).close(); + TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(client); + hitSource.cleanup(() -> cleanupCallbackCalled.set(true)); + verify(client).close(); + assertTrue(cleanupCallbackCalled.get()); + } + private RemoteScrollableHitSource sourceWithMockedRemoteCall(String... paths) throws Exception { return sourceWithMockedRemoteCall(true, ContentType.APPLICATION_JSON, paths); } From b25de4a507ec937b08bf6874a242d48d167e7b94 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 21 Mar 2017 14:48:16 -0400 Subject: [PATCH 2/2] Remove out of date comment --- .../action/bulk/byscroll/ClientScrollableHitSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java index d477021e0d352..3bacc187ebb6d 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java @@ -114,7 +114,6 @@ public void onFailure(Exception e) { @Override protected void cleanup(Runnable onCompletion) { - // Nothing to do onCompletion.run(); }