Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlDailyMaintenanceService;
import org.elasticsearch.xpack.ml.MlInitializationService;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Stream;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_HIDDEN;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MlInitializationServiceIT extends MlNativeAutodetectIntegTestCase {

private ThreadPool threadPool;
private MlInitializationService mlInitializationService;

@Before
public void setUpMocks() {
threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
MlDailyMaintenanceService mlDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
ClusterService clusterService = mock(ClusterService.class);
mlInitializationService = new MlInitializationService(client(), threadPool, mlDailyMaintenanceService, clusterService);
}

public void testThatMlIndicesBecomeHiddenWhenTheNodeBecomesMaster() throws Exception {
String[] mlHiddenIndexNames = {
".ml-anomalies-7",
".ml-state-000001",
".ml-stats-000001",
".ml-notifications-000002",
".ml-annotations-6"
};
String[] otherIndexNames = { "some-index-1", "some-other-index-2" };
String[] allIndexNames = Stream.concat(Arrays.stream(mlHiddenIndexNames), Arrays.stream(otherIndexNames)).toArray(String[]::new);

for (String indexName : mlHiddenIndexNames) {
try {
assertAcked(prepareCreate(indexName).setSettings(Collections.singletonMap(SETTING_INDEX_HIDDEN, randomBoolean())));
} catch (ResourceAlreadyExistsException e) {
logger.info("Index " + indexName + "already exists: {}", e.getDetailedMessage());
}
}
createIndex(otherIndexNames);

GetSettingsResponse settingsResponse =
client().admin().indices().prepareGetSettings(allIndexNames)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.get();
assertThat(settingsResponse, is(notNullValue()));
for (String indexName : mlHiddenIndexNames) {
Settings settings = settingsResponse.getIndexToSettings().get(indexName);
assertThat(settings, is(notNullValue()));
}
for (String indexName : otherIndexNames) {
Settings settings = settingsResponse.getIndexToSettings().get(indexName);
assertThat(settings, is(notNullValue()));
assertThat(
"Index " + indexName + " expected not to be hidden but was",
settings.getAsBoolean(SETTING_INDEX_HIDDEN, false), is(equalTo(false)));
}

mlInitializationService.onMaster();
assertBusy(() -> assertTrue(mlInitializationService.areMlInternalIndicesHidden()));

settingsResponse =
client().admin().indices().prepareGetSettings(allIndexNames)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.get();
assertThat(settingsResponse, is(notNullValue()));
for (String indexName : mlHiddenIndexNames) {
Settings settings = settingsResponse.getIndexToSettings().get(indexName);
assertThat(settings, is(notNullValue()));
assertThat(
"Index " + indexName + " expected to be hidden but wasn't, settings = " + settings,
settings.getAsBoolean(SETTING_INDEX_HIDDEN, false), is(equalTo(true)));
}
for (String indexName : otherIndexNames) {
Settings settings = settingsResponse.getIndexToSettings().get(indexName);
assertThat(settings, is(notNullValue()));
assertThat(
"Index " + indexName + " expected not to be hidden but was, settings = " + settings,
settings.getAsBoolean(SETTING_INDEX_HIDDEN, false), is(equalTo(false)));
}
}

@Override
public Settings indexSettings() {
return Settings.builder().put(super.indexSettings())
.put(IndexMetadata.SETTING_DATA_PATH, (String) null)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1419,15 +1419,29 @@ public static SystemIndexDescriptor getInferenceIndexSecurityDescriptor() {
.build();
}

@Override
public Collection<AssociatedIndexDescriptor> getAssociatedIndexDescriptors() {
return List.of(
/**
* These are the ML hidden indices. They are "associated" in the sense that if the ML system indices
* are backed up or deleted then these hidden indices should also be backed up or deleted.
*/
private static Collection<AssociatedIndexDescriptor> ASSOCIATED_INDEX_DESCRIPTORS =
List.of(
new AssociatedIndexDescriptor(RESULTS_INDEX_PREFIX + "*", "Results indices"),
new AssociatedIndexDescriptor(STATE_INDEX_PREFIX + "*", "State indices"),
new AssociatedIndexDescriptor(MlStatsIndex.indexPattern(), "ML stats index"),
new AssociatedIndexDescriptor(".ml-notifications*", "ML notifications indices"),
new AssociatedIndexDescriptor(".ml-annotations*", "Ml annotations indices")
new AssociatedIndexDescriptor(".ml-annotations*", "ML annotations indices")
);

@Override
public Collection<AssociatedIndexDescriptor> getAssociatedIndexDescriptors() {
return ASSOCIATED_INDEX_DESCRIPTORS;
}

public static String[] getMlHiddenIndexPatterns() {
return ASSOCIATED_INDEX_DESCRIPTORS
.stream()
.map(AssociatedIndexDescriptor::getIndexPattern)
.toArray(String[]::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,54 @@
*/
package org.elasticsearch.xpack.ml;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_HIDDEN;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

class MlInitializationService implements ClusterStateListener {
public class MlInitializationService implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(MlInitializationService.class);

private final Client client;
private final ThreadPool threadPool;
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);
private final AtomicBoolean mlInternalIndicesHidden = new AtomicBoolean(false);

private final MlDailyMaintenanceService mlDailyMaintenanceService;

Expand All @@ -37,6 +62,7 @@ class MlInitializationService implements ClusterStateListener {
MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client,
MlAssignmentNotifier mlAssignmentNotifier) {
this(client,
threadPool,
new MlDailyMaintenanceService(
settings,
Objects.requireNonNull(clusterService).getClusterName(),
Expand All @@ -49,8 +75,10 @@ class MlInitializationService implements ClusterStateListener {
}

// For testing
MlInitializationService(Client client, MlDailyMaintenanceService dailyMaintenanceService, ClusterService clusterService) {
public MlInitializationService(Client client, ThreadPool threadPool, MlDailyMaintenanceService dailyMaintenanceService,
ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.threadPool = threadPool;
this.mlDailyMaintenanceService = dailyMaintenanceService;
clusterService.addListener(this);
clusterService.addLifecycleListener(new LifecycleListener() {
Expand All @@ -71,6 +99,7 @@ public void beforeStop() {

public void onMaster() {
mlDailyMaintenanceService.start();
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(this::makeMlInternalIndicesHidden);
}

public void offMaster() {
Expand Down Expand Up @@ -112,5 +141,106 @@ MlDailyMaintenanceService getDailyMaintenanceService() {
return mlDailyMaintenanceService;
}

/** For testing */
public boolean areMlInternalIndicesHidden() {
return mlInternalIndicesHidden.get();
}

private void makeMlInternalIndicesHidden() {
String[] mlHiddenIndexPatterns = MachineLearning.getMlHiddenIndexPatterns();

// Step 5: Handle errors encountered on the way.
ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(
updateAliasesResponse -> {
if (updateAliasesResponse.isAcknowledged() == false) {
logger.error("One or more of the ML internal aliases could not be made hidden.");
return;
}
mlInternalIndicesHidden.set(true);
},
e -> logger.error("An error occurred while making ML internal indices and aliases hidden", e)
);

// Step 4: Extract ML internal aliases that are not hidden and make them hidden.
ActionListener<GetAliasesResponse> getAliasesResponseListener = ActionListener.wrap(
getAliasesResponse -> {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
for (ObjectObjectCursor<String, List<AliasMetadata>> entry : getAliasesResponse.getAliases()) {
String index = entry.key;
String[] nonHiddenAliases = entry.value.stream()
.filter(metadata -> metadata.isHidden() == null || metadata.isHidden() == false)
.map(AliasMetadata::alias)
.toArray(String[]::new);
if (nonHiddenAliases.length == 0) {
continue;
}
indicesAliasesRequest.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(index)
.aliases(entry.value.stream().map(AliasMetadata::alias).toArray(String[]::new))
.isHidden(true));
}
if (indicesAliasesRequest.getAliasActions().isEmpty()) {
logger.debug("There are no ML internal aliases that need to be made hidden, [{}]", getAliasesResponse.getAliases());
finalListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
String indicesWithNonHiddenAliasesString =
indicesAliasesRequest.getAliasActions().stream()
.map(aliasAction -> aliasAction.indices()[0] + ": " + String.join(",", aliasAction.aliases()))
.collect(Collectors.joining("; "));
logger.debug("The following ML internal aliases will now be made hidden: [{}]", indicesWithNonHiddenAliasesString);
executeAsyncWithOrigin(client, ML_ORIGIN, IndicesAliasesAction.INSTANCE, indicesAliasesRequest, finalListener);
},
finalListener::onFailure
);

// Step 3: Once indices are hidden, fetch ML internal aliases to find out whether the aliases are hidden or not.
ActionListener<AcknowledgedResponse> updateSettingsListener = ActionListener.wrap(
updateSettingsResponse -> {
if (updateSettingsResponse.isAcknowledged() == false) {
logger.error("One or more of the ML internal indices could not be made hidden.");
return;
}
GetAliasesRequest getAliasesRequest = new GetAliasesRequest()
.indices(mlHiddenIndexPatterns)
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
executeAsyncWithOrigin(client, ML_ORIGIN, GetAliasesAction.INSTANCE, getAliasesRequest, getAliasesResponseListener);
},
finalListener::onFailure
);

// Step 2: Extract ML internal indices that are not hidden and make them hidden.
ActionListener<GetSettingsResponse> getSettingsListener = ActionListener.wrap(
getSettingsResponse -> {
String[] nonHiddenIndices =
getSettingsResponse.getIndexToSettings().stream()
.filter(e -> e.getValue().getAsBoolean(SETTING_INDEX_HIDDEN, false) == false)
.map(Map.Entry::getKey)
.toArray(String[]::new);
if (nonHiddenIndices.length == 0) {
logger.debug("There are no ML internal indices that need to be made hidden, [{}]", getSettingsResponse);
updateSettingsListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
String nonHiddenIndicesString = Arrays.stream(nonHiddenIndices).collect(Collectors.joining(", "));
logger.debug("The following ML internal indices will now be made hidden: [{}]", nonHiddenIndicesString);
UpdateSettingsRequest updateSettingsRequest =
new UpdateSettingsRequest()
.indices(nonHiddenIndices)
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.settings(Collections.singletonMap(SETTING_INDEX_HIDDEN, true));
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateSettingsAction.INSTANCE, updateSettingsRequest, updateSettingsListener);
},
finalListener::onFailure
);

// Step 1: Fetch ML internal indices settings to find out whether they are already hidden or not.
GetSettingsRequest getSettingsRequest =
new GetSettingsRequest()
.indices(mlHiddenIndexPatterns)
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
client.admin().indices().getSettings(getSettingsRequest, getSettingsListener);
}
}

Loading