Skip to content

Commit 6fe6204

Browse files
committed
Fix reindex from remote clearing scroll (#22525)
Reindex-from-remote had a race when it tried to clear the scroll. It first starts the request to clear the scroll and then submits a task to the generic threadpool to shutdown the client. These two things race and, in my experience, closing the scroll generally loses. That means that most of the time reindex-from-remote isn't clearing the scrolls that it uses. This isn't the end of the world because we flush old scroll contexts after a while but this isn't great. Noticed while experimenting with #22514.
1 parent d66490e commit 6fe6204

File tree

5 files changed

+184
-228
lines changed

5 files changed

+184
-228
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,7 @@ protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Cons
9090
}
9191

9292
@Override
93-
public void clearScroll(String scrollId) {
94-
/*
95-
* Fire off the clear scroll but don't wait for it it return before
96-
* we send the use their response.
97-
*/
93+
public void clearScroll(String scrollId, Runnable onCompletion) {
9894
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
9995
clearScrollRequest.addScrollId(scrollId);
10096
/*
@@ -105,15 +101,22 @@ public void clearScroll(String scrollId) {
105101
@Override
106102
public void onResponse(ClearScrollResponse response) {
107103
logger.debug("Freed [{}] contexts", response.getNumFreed());
104+
onCompletion.run();
108105
}
109106

110107
@Override
111108
public void onFailure(Exception e) {
112109
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to clear scroll [{}]", scrollId), e);
110+
onCompletion.run();
113111
}
114112
});
115113
}
116114

115+
@Override
116+
protected void cleanup() {
117+
// Nothing to do
118+
}
119+
117120
/**
118121
* Run a search action and call onResponse when a the response comes in, retrying if the action fails with an exception caused by
119122
* rejected execution.

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,24 @@ public final void startNextScroll(TimeValue extraKeepAlive, Consumer<Response> o
8383
protected abstract void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse);
8484

8585
@Override
86-
public void close() {
86+
public final void close() {
8787
String scrollId = this.scrollId.get();
8888
if (Strings.hasLength(scrollId)) {
89-
clearScroll(scrollId);
89+
clearScroll(scrollId, this::cleanup);
90+
} else {
91+
cleanup();
9092
}
9193
}
92-
protected abstract void clearScroll(String scrollId);
94+
/**
95+
* Called to clear a scroll id.
96+
* @param scrollId the id to clear
97+
* @param onCompletion implementers must call this after completing the clear whether they are successful or not
98+
*/
99+
protected abstract void clearScroll(String scrollId, Runnable onCompletion);
100+
/**
101+
* Called after the process has been totally finished to clean up any resources the process needed like remote connections.
102+
*/
103+
protected abstract void cleanup();
93104

94105
/**
95106
* Set the id of the last scroll. Used for debugging.

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,21 +81,6 @@ public RemoteScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, Thr
8181
this.client = client;
8282
}
8383

84-
@Override
85-
public void close() {
86-
super.close();
87-
/* This might be called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail
88-
* to close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. That way we
89-
* never close the client in its own thread pool. */
90-
threadPool.generic().submit(() -> {
91-
try {
92-
client.close();
93-
} catch (IOException e) {
94-
logger.error("Failed to shutdown the remote connection", e);
95-
}
96-
});
97-
}
98-
9984
@Override
10085
protected void doStart(Consumer<? super Response> onResponse) {
10186
lookupRemoteVersion(version -> {
@@ -125,17 +110,32 @@ protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Cons
125110
}
126111

127112
@Override
128-
protected void clearScroll(String scrollId) {
129-
// Need to throw out response....
113+
protected void clearScroll(String scrollId, Runnable onCompletion) {
130114
client.performRequestAsync("DELETE", scrollPath(), emptyMap(), scrollEntity(scrollId), new ResponseListener() {
131115
@Override
132116
public void onSuccess(org.elasticsearch.client.Response response) {
133117
logger.debug("Successfully cleared [{}]", scrollId);
118+
onCompletion.run();
134119
}
135120

136121
@Override
137122
public void onFailure(Exception t) {
138123
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to clear scroll [{}]", scrollId), t);
124+
onCompletion.run();
125+
}
126+
});
127+
}
128+
129+
@Override
130+
protected void cleanup() {
131+
/* This is called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail to
132+
* close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. */
133+
threadPool.generic().submit(() -> {
134+
try {
135+
client.close();
136+
logger.info("Shut down remote connection");
137+
} catch (IOException e) {
138+
logger.error("Failed to shutdown the remote connection", e);
139139
}
140140
});
141141
}

0 commit comments

Comments
 (0)