Skip to content

Commit 3d2cdf4

Browse files
committed
ML expired data cleanup cannot run on the client thread
1 parent d554df7 commit 3d2cdf4

File tree

5 files changed

+602
-2
lines changed

5 files changed

+602
-2
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.common.settings.Settings;
1616
import org.elasticsearch.threadpool.ThreadPool;
1717
import org.elasticsearch.transport.TransportService;
18+
import org.elasticsearch.transport.Transports;
1819
import org.elasticsearch.xpack.core.ClientHelper;
1920
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
2021
import org.elasticsearch.xpack.ml.MachineLearning;
@@ -66,10 +67,12 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
6667

6768
private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
6869
ActionListener<DeleteExpiredDataAction.Response> listener) {
70+
Transports.assertNotTransportThread("ML Daily Maintenance");
6971
if (mlDataRemoversIterator.hasNext()) {
7072
MlDataRemover remover = mlDataRemoversIterator.next();
7173
remover.remove(ActionListener.wrap(
72-
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener),
74+
booleanResponse -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() ->
75+
deleteExpiredData(mlDataRemoversIterator, listener)),
7376
listener::onFailure));
7477
} else {
7578
logger.info("Completed deletion of expired data");

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.search.SearchHit;
1616
import org.elasticsearch.search.builder.SearchSourceBuilder;
1717
import org.elasticsearch.search.sort.SortBuilders;
18+
import org.elasticsearch.transport.Transports;
1819
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
1920
import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils;
2021

@@ -89,6 +90,8 @@ public Deque<T> next() {
8990
private SearchResponse initScroll() {
9091
LOGGER.trace("ES API CALL: search index {}", index);
9192

93+
Transports.assertNotTransportThread("BatchedDocumentsIterator makes blocking calls");
94+
9295
isScrollInitialised = true;
9396

9497
SearchRequest searchRequest = new SearchRequest(index);
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import org.elasticsearch.gradle.LoggedExec
2+
3+
apply plugin: 'elasticsearch.standalone-rest-test'
4+
apply plugin: 'elasticsearch.rest-test'
5+
6+
dependencies {
7+
testCompile project(path: xpackModule('core'), configuration: 'runtime')
8+
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
9+
testCompile project(path: xpackModule('ml'), configuration: 'runtime')
10+
testCompile project(path: xpackModule('ml'), configuration: 'testArtifacts')
11+
}
12+
13+
integTestRunner {
14+
/*
15+
* We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each
16+
* other if we allow them to set the number of available processors as it's set-once in Netty.
17+
*/
18+
systemProperty 'es.set.netty.runtime.available.processors', 'false'
19+
}
20+
21+
// location of generated keystores and certificates
22+
File keystoreDir = new File(project.buildDir, 'keystore')
23+
24+
// Generate the node's keystore
25+
File nodeKeystore = new File(keystoreDir, 'test-node.jks')
26+
task createNodeKeyStore(type: LoggedExec) {
27+
doFirst {
28+
if (nodeKeystore.parentFile.exists() == false) {
29+
nodeKeystore.parentFile.mkdirs()
30+
}
31+
if (nodeKeystore.exists()) {
32+
delete nodeKeystore
33+
}
34+
}
35+
executable = new File(project.runtimeJavaHome, 'bin/keytool')
36+
standardInput = new ByteArrayInputStream('FirstName LastName\nUnit\nOrganization\nCity\nState\nNL\nyes\n\n'.getBytes('UTF-8'))
37+
args '-genkey',
38+
'-alias', 'test-node',
39+
'-keystore', nodeKeystore,
40+
'-keyalg', 'RSA',
41+
'-keysize', '2048',
42+
'-validity', '712',
43+
'-dname', 'CN=smoke-test-plugins-ssl',
44+
'-keypass', 'keypass',
45+
'-storepass', 'keypass'
46+
}
47+
48+
// Add keystores to test classpath: it expects it there
49+
sourceSets.test.resources.srcDir(keystoreDir)
50+
processTestResources.dependsOn(createNodeKeyStore)
51+
52+
integTestCluster {
53+
dependsOn createNodeKeyStore
54+
setting 'xpack.security.enabled', 'true'
55+
setting 'xpack.ml.enabled', 'true'
56+
setting 'logger.org.elasticsearch.xpack.ml.datafeed', 'TRACE'
57+
setting 'xpack.monitoring.enabled', 'false'
58+
setting 'xpack.security.authc.token.enabled', 'true'
59+
setting 'xpack.security.transport.ssl.enabled', 'true'
60+
setting 'xpack.security.transport.ssl.keystore.path', nodeKeystore.name
61+
setting 'xpack.security.transport.ssl.verification_mode', 'certificate'
62+
setting 'xpack.security.audit.enabled', 'true'
63+
setting 'xpack.license.self_generated.type', 'trial'
64+
65+
keystoreSetting 'bootstrap.password', 'x-pack-test-password'
66+
keystoreSetting 'xpack.security.transport.ssl.keystore.secure_password', 'keypass'
67+
68+
numNodes = 3
69+
70+
setupCommand 'setupDummyUser',
71+
'bin/elasticsearch-users', 'useradd', 'x_pack_rest_user', '-p', 'x-pack-test-password', '-r', 'superuser'
72+
73+
extraConfigFile nodeKeystore.name, nodeKeystore
74+
75+
waitCondition = { node, ant ->
76+
File tmpFile = new File(node.cwd, 'wait.success')
77+
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow",
78+
dest: tmpFile.toString(),
79+
username: 'x_pack_rest_user',
80+
password: 'x-pack-test-password',
81+
ignoreerrors: true,
82+
retries: 10)
83+
return tmpFile.exists()
84+
}
85+
}
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.Arrays;
3838
import java.util.Collections;
3939
import java.util.List;
40-
import java.util.concurrent.ExecutionException;
4140
import java.util.concurrent.TimeUnit;
4241

4342
import static org.hamcrest.Matchers.equalTo;

0 commit comments

Comments
 (0)