Skip to content

Commit a3ce149

Browse files
authored
[ML] Jindex: Prefer index config documents to cluster state config (#35940)
1 parent 6aadcbf commit a3ce149

File tree

9 files changed

+311
-217
lines changed

9 files changed

+311
-217
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,21 @@ private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ClusterS
131131
return;
132132
}
133133

134-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
135-
if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) {
136-
deleteDatafeedFromMetadata(request, listener);
137-
} else {
138-
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
139-
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
140-
listener::onFailure
141-
));
142-
}
134+
135+
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
136+
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
137+
e -> {
138+
if (e.getClass() == ResourceNotFoundException.class) {
139+
// is the datafeed in the clusterstate
140+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
141+
if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) {
142+
deleteDatafeedFromMetadata(request, listener);
143+
return;
144+
}
145+
}
146+
listener.onFailure(e);
147+
}
148+
));
143149
}
144150

145151
private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.ElasticsearchException;
1212
import org.elasticsearch.ElasticsearchStatusException;
1313
import org.elasticsearch.ResourceAlreadyExistsException;
14+
import org.elasticsearch.ResourceNotFoundException;
1415
import org.elasticsearch.Version;
1516
import org.elasticsearch.action.ActionListener;
1617
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
@@ -692,47 +693,53 @@ public void onTimeout(TimeValue timeout) {
692693

693694
private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
694695

695-
boolean jobIsInClusterState = ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId);
696-
if (jobIsInClusterState) {
697-
clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() {
698-
@Override
699-
public ClusterState execute(ClusterState currentState) {
700-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
701-
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
702-
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
703-
jobBuilder.setFinishedTime(null);
704-
705-
mlMetadataBuilder.putJob(jobBuilder.build(), true);
706-
ClusterState.Builder builder = ClusterState.builder(currentState);
707-
return builder.metaData(new MetaData.Builder(currentState.metaData())
708-
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
709-
.build();
710-
}
696+
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
711697

712-
@Override
713-
public void onFailure(String source, Exception e) {
714-
logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e);
698+
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
699+
job -> listener.onResponse(new AcknowledgedResponse(true)),
700+
e -> {
701+
if (e.getClass() == ResourceNotFoundException.class) {
702+
// Maybe the config is in the clusterstate
703+
if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId)) {
704+
clearJobFinishedTimeClusterState(jobId, listener);
705+
return;
706+
}
707+
}
708+
logger.error("[" + jobId + "] Failed to clear finished_time", e);
709+
// Not a critical error so continue
715710
listener.onResponse(new AcknowledgedResponse(true));
716711
}
712+
));
713+
}
717714

718-
@Override
719-
public void clusterStateProcessed(String source, ClusterState oldState,
720-
ClusterState newState) {
721-
listener.onResponse(new AcknowledgedResponse(true));
722-
}
723-
});
724-
} else {
725-
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
726-
727-
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
728-
job -> listener.onResponse(new AcknowledgedResponse(true)),
729-
e -> {
730-
logger.error("[" + jobId + "] Failed to clear finished_time", e);
731-
// Not a critical error so continue
732-
listener.onResponse(new AcknowledgedResponse(true));
733-
}
734-
));
735-
}
715+
private void clearJobFinishedTimeClusterState(String jobId, ActionListener<AcknowledgedResponse> listener) {
716+
clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() {
717+
@Override
718+
public ClusterState execute(ClusterState currentState) {
719+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
720+
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
721+
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
722+
jobBuilder.setFinishedTime(null);
723+
724+
mlMetadataBuilder.putJob(jobBuilder.build(), true);
725+
ClusterState.Builder builder = ClusterState.builder(currentState);
726+
return builder.metaData(new MetaData.Builder(currentState.metaData())
727+
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
728+
.build();
729+
}
730+
731+
@Override
732+
public void onFailure(String source, Exception e) {
733+
logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e);
734+
listener.onResponse(new AcknowledgedResponse(true));
735+
}
736+
737+
@Override
738+
public void clusterStateProcessed(String source, ClusterState oldState,
739+
ClusterState newState) {
740+
listener.onResponse(new AcknowledgedResponse(true));
741+
}
742+
});
736743
}
737744

738745
private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.ml.action;
77

8+
import org.elasticsearch.ResourceNotFoundException;
89
import org.elasticsearch.action.ActionListener;
910
import org.elasticsearch.action.support.ActionFilters;
1011
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@@ -69,19 +70,6 @@ protected PutDatafeedAction.Response newResponse() {
6970
protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state,
7071
ActionListener<PutDatafeedAction.Response> listener) throws Exception {
7172

72-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
73-
boolean datafeedConfigIsInClusterState = mlMetadata.getDatafeed(request.getUpdate().getId()) != null;
74-
if (datafeedConfigIsInClusterState) {
75-
updateDatafeedInClusterState(request, listener);
76-
} else {
77-
updateDatafeedInIndex(request, state, listener);
78-
}
79-
}
80-
81-
private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, ClusterState state,
82-
ActionListener<PutDatafeedAction.Response> listener) throws Exception {
83-
final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
84-
8573
// Check datafeed is stopped
8674
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
8775
if (MlTasks.getDatafeedTask(request.getUpdate().getId(), tasks) != null) {
@@ -91,14 +79,30 @@ private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, Cluster
9179
return;
9280
}
9381

82+
updateDatafeedInIndex(request, state, listener);
83+
}
84+
85+
private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, ClusterState state,
86+
ActionListener<PutDatafeedAction.Response> listener) throws Exception {
87+
final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
9488
String datafeedId = request.getUpdate().getId();
9589

9690
CheckedConsumer<Boolean, Exception> updateConsumer = ok -> {
9791
datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers,
9892
jobConfigProvider::validateDatafeedJob,
9993
ActionListener.wrap(
10094
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
101-
listener::onFailure
95+
e -> {
96+
if (e.getClass() == ResourceNotFoundException.class) {
97+
// try the clusterstate
98+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
99+
if (mlMetadata.getDatafeed(request.getUpdate().getId()) != null) {
100+
updateDatafeedInClusterState(request, listener);
101+
return;
102+
}
103+
}
104+
listener.onFailure(e);
105+
}
102106
));
103107
};
104108

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.ml.datafeed;
77

8+
import org.elasticsearch.ResourceNotFoundException;
89
import org.elasticsearch.action.ActionListener;
910
import org.elasticsearch.client.Client;
1011
import org.elasticsearch.cluster.ClusterState;
@@ -50,17 +51,22 @@ public DatafeedConfigReader(DatafeedConfigProvider datafeedConfigProvider) {
5051
* @param listener DatafeedConfig listener
5152
*/
5253
public void datafeedConfig(String datafeedId, ClusterState state, ActionListener<DatafeedConfig> listener) {
53-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
54-
DatafeedConfig config = mlMetadata.getDatafeed(datafeedId);
55-
56-
if (config != null) {
57-
listener.onResponse(config);
58-
} else {
59-
datafeedConfigProvider.getDatafeedConfig(datafeedId, ActionListener.wrap(
60-
builder -> listener.onResponse(builder.build()),
61-
listener::onFailure
62-
));
63-
}
54+
55+
datafeedConfigProvider.getDatafeedConfig(datafeedId, ActionListener.wrap(
56+
builder -> listener.onResponse(builder.build()),
57+
e -> {
58+
if (e.getClass() == ResourceNotFoundException.class) {
59+
// look in the clusterstate
60+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
61+
DatafeedConfig config = mlMetadata.getDatafeed(datafeedId);
62+
if (config != null) {
63+
listener.onResponse(config);
64+
return;
65+
}
66+
}
67+
listener.onFailure(e);
68+
}
69+
));
6470
}
6571

6672
/**
@@ -70,23 +76,16 @@ public void datafeedConfig(String datafeedId, ClusterState state, ActionListener
7076
public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ClusterState clusterState,
7177
ActionListener<SortedSet<String>> listener) {
7278

73-
Set<String> clusterStateDatafeedIds = MlMetadata.getMlMetadata(clusterState).expandDatafeedIds(expression);
7479
ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds);
75-
requiredMatches.filterMatchedIds(clusterStateDatafeedIds);
7680

7781
datafeedConfigProvider.expandDatafeedIdsWithoutMissingCheck(expression, ActionListener.wrap(
7882
expandedDatafeedIds -> {
79-
// Check for duplicate Ids
80-
expandedDatafeedIds.forEach(id -> {
81-
if (clusterStateDatafeedIds.contains(id)) {
82-
listener.onFailure(new IllegalStateException("Datafeed [" + id + "] configuration " +
83-
"exists in both clusterstate and index"));
84-
return;
85-
}
86-
});
87-
8883
requiredMatches.filterMatchedIds(expandedDatafeedIds);
8984

85+
// now read from the clusterstate
86+
Set<String> clusterStateDatafeedIds = MlMetadata.getMlMetadata(clusterState).expandDatafeedIds(expression);
87+
requiredMatches.filterMatchedIds(clusterStateDatafeedIds);
88+
9089
if (requiredMatches.hasUnmatchedIds()) {
9190
listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString()));
9291
} else {
@@ -105,33 +104,32 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Clust
105104
public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ClusterState clusterState,
106105
ActionListener<List<DatafeedConfig>> listener) {
107106

108-
Map<String, DatafeedConfig> clusterStateConfigs = expandClusterStateDatafeeds(expression, clusterState);
109-
110107
ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds);
111-
requiredMatches.filterMatchedIds(clusterStateConfigs.keySet());
112108

113109
datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck(expression, ActionListener.wrap(
114110
datafeedBuilders -> {
115-
// Check for duplicate Ids
116-
datafeedBuilders.forEach(datafeedBuilder -> {
117-
if (clusterStateConfigs.containsKey(datafeedBuilder.getId())) {
118-
listener.onFailure(new IllegalStateException("Datafeed [" + datafeedBuilder.getId() + "] configuration " +
119-
"exists in both clusterstate and index"));
120-
return;
121-
}
122-
});
123-
124111
List<DatafeedConfig> datafeedConfigs = new ArrayList<>();
125112
for (DatafeedConfig.Builder builder : datafeedBuilders) {
126113
datafeedConfigs.add(builder.build());
127114
}
128115

116+
Map<String, DatafeedConfig> clusterStateConfigs = expandClusterStateDatafeeds(expression, clusterState);
117+
118+
// Duplicate configs existing in both the clusterstate and index documents are ok
119+
// this may occur during migration of configs.
120+
// Prefer the index configs and filter duplicates from the clusterstate configs.
121+
Set<String> indexConfigIds = datafeedConfigs.stream().map(DatafeedConfig::getId).collect(Collectors.toSet());
122+
for (String clusterStateDatafeedId : clusterStateConfigs.keySet()) {
123+
if (indexConfigIds.contains(clusterStateDatafeedId) == false) {
124+
datafeedConfigs.add(clusterStateConfigs.get(clusterStateDatafeedId));
125+
}
126+
}
127+
129128
requiredMatches.filterMatchedIds(datafeedConfigs.stream().map(DatafeedConfig::getId).collect(Collectors.toList()));
130129

131130
if (requiredMatches.hasUnmatchedIds()) {
132131
listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString()));
133132
} else {
134-
datafeedConfigs.addAll(clusterStateConfigs.values());
135133
Collections.sort(datafeedConfigs, Comparator.comparing(DatafeedConfig::getId));
136134
listener.onResponse(datafeedConfigs);
137135
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,11 @@ public void onResponse(DeleteResponse deleteResponse) {
234234
}
235235
@Override
236236
public void onFailure(Exception e) {
237-
actionListener.onFailure(e);
237+
if (e.getClass() == IndexNotFoundException.class) {
238+
actionListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId));
239+
} else {
240+
actionListener.onFailure(e);
241+
}
238242
}
239243
});
240244
}
@@ -303,7 +307,11 @@ public void onResponse(GetResponse getResponse) {
303307

304308
@Override
305309
public void onFailure(Exception e) {
306-
updatedConfigListener.onFailure(e);
310+
if (e.getClass() == IndexNotFoundException.class) {
311+
updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId));
312+
} else {
313+
updatedConfigListener.onFailure(e);
314+
}
307315
}
308316
});
309317
}

0 commit comments

Comments
 (0)