Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,10 @@
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -26,23 +17,10 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackClientPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
Expand Down Expand Up @@ -70,10 +48,8 @@
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
Expand All @@ -85,71 +61,31 @@
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.security.SecurityField;
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

/**
* Base class of ML integration tests that use a native autodetect process
*/
abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {

private List<Job.Builder> jobs = new ArrayList<>();
private List<DatafeedConfig> datafeeds = new ArrayList<>();
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
}

@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
}

@Override
protected Settings externalClusterClientSettings() {
Path key;
Path certificate;
try {
key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
} catch (URISyntaxException e) {
throw new IllegalStateException("error trying to get keystore path", e);
}
Settings.Builder builder = Settings.builder();
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
builder.put("xpack.security.transport.ssl.enabled", true);
builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
return builder.build();
}

protected void cleanUp() {
protected void cleanUpResources() {
cleanUpDatafeeds();
cleanUpJobs();
waitForPendingTasks();
}

private void cleanUpDatafeeds() {
Expand Down Expand Up @@ -182,18 +118,6 @@ private void cleanUpJobs() {
}
}

private void waitForPendingTasks() {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setWaitForCompletion(true);
listTasksRequest.setDetailed(true);
listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
try {
admin().cluster().listTasks(listTasksRequest).get();
} catch (Exception e) {
throw new AssertionError("Failed to wait for pending tasks to complete", e);
}
}

protected void registerJob(Job.Builder job) {
if (jobs.add(job) == false) {
throw new IllegalArgumentException("job [" + job.getId() + "] is already registered");
Expand Down Expand Up @@ -441,56 +365,6 @@ protected PersistJobAction.Response persistJob(String jobId) {
return client().execute(PersistJobAction.INSTANCE, request).actionGet();
}

@Override
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
StartDatafeedAction.DatafeedParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
OpenJobAction.JobParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
// remove local node reference
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
String masterId = masterClusterState.nodes().getMasterNodeId();
for (Client client : cluster().getClients()) {
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
// remove local node reference
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
final Map<String, Object> localStateMap = convertToMap(localClusterState);
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
if (masterClusterState.version() == localClusterState.version() &&
masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull("clusterstate JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
} catch (AssertionError error) {
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(), localClusterState.toString());
throw error;
}
}
}
}
}

protected List<String> generateData(long timestamp, TimeValue bucketSpan, int bucketCount,
Function<Integer, Integer> timeToCountFunction) throws IOException {
List<String> data = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackClientPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.security.SecurityField;
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;

/**
* Base class of ML integration tests that use a native autodetect process
*/
abstract class MlNativeIntegTestCase extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
}

@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
}

@Override
protected Settings externalClusterClientSettings() {
Path key;
Path certificate;
try {
key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
} catch (URISyntaxException e) {
throw new IllegalStateException("error trying to get keystore path", e);
}
Settings.Builder builder = Settings.builder();
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
builder.put("xpack.security.transport.ssl.enabled", true);
builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
return builder.build();
}

protected void cleanUp() {
cleanUpResources();
waitForPendingTasks();
}

protected abstract void cleanUpResources();

private void waitForPendingTasks() {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setWaitForCompletion(true);
listTasksRequest.setDetailed(true);
listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
try {
admin().cluster().listTasks(listTasksRequest).get();
} catch (Exception e) {
throw new AssertionError("Failed to wait for pending tasks to complete", e);
}
}

@Override
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
StartDatafeedAction.DatafeedParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
OpenJobAction.JobParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
// remove local node reference
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
String masterId = masterClusterState.nodes().getMasterNodeId();
for (Client client : cluster().getClients()) {
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
// remove local node reference
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
final Map<String, Object> localStateMap = convertToMap(localClusterState);
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
if (masterClusterState.version() == localClusterState.version() &&
masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull("clusterstate JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
} catch (AssertionError error) {
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(), localClusterState.toString());
throw error;
}
}
}
}
}
}