55 */
66package org .elasticsearch .xpack .ml .datafeed ;
77
8+ import org .elasticsearch .ResourceNotFoundException ;
89import org .elasticsearch .action .ActionListener ;
910import org .elasticsearch .client .Client ;
1011import org .elasticsearch .cluster .ClusterState ;
@@ -50,17 +51,22 @@ public DatafeedConfigReader(DatafeedConfigProvider datafeedConfigProvider) {
5051 * @param listener DatafeedConfig listener
5152 */
5253 public void datafeedConfig (String datafeedId , ClusterState state , ActionListener <DatafeedConfig > listener ) {
53- MlMetadata mlMetadata = MlMetadata .getMlMetadata (state );
54- DatafeedConfig config = mlMetadata .getDatafeed (datafeedId );
55-
56- if (config != null ) {
57- listener .onResponse (config );
58- } else {
59- datafeedConfigProvider .getDatafeedConfig (datafeedId , ActionListener .wrap (
60- builder -> listener .onResponse (builder .build ()),
61- listener ::onFailure
62- ));
63- }
54+
55+ datafeedConfigProvider .getDatafeedConfig (datafeedId , ActionListener .wrap (
56+ builder -> listener .onResponse (builder .build ()),
57+ e -> {
58+ if (e .getClass () == ResourceNotFoundException .class ) {
59+ // look in the clusterstate
60+ MlMetadata mlMetadata = MlMetadata .getMlMetadata (state );
61+ DatafeedConfig config = mlMetadata .getDatafeed (datafeedId );
62+ if (config != null ) {
63+ listener .onResponse (config );
64+ return ;
65+ }
66+ }
67+ listener .onFailure (e );
68+ }
69+ ));
6470 }
6571
6672 /**
@@ -70,23 +76,16 @@ public void datafeedConfig(String datafeedId, ClusterState state, ActionListener
7076 public void expandDatafeedIds (String expression , boolean allowNoDatafeeds , ClusterState clusterState ,
7177 ActionListener <SortedSet <String >> listener ) {
7278
73- Set <String > clusterStateDatafeedIds = MlMetadata .getMlMetadata (clusterState ).expandDatafeedIds (expression );
7479 ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher (expression , allowNoDatafeeds );
75- requiredMatches .filterMatchedIds (clusterStateDatafeedIds );
7680
7781 datafeedConfigProvider .expandDatafeedIdsWithoutMissingCheck (expression , ActionListener .wrap (
7882 expandedDatafeedIds -> {
79- // Check for duplicate Ids
80- expandedDatafeedIds .forEach (id -> {
81- if (clusterStateDatafeedIds .contains (id )) {
82- listener .onFailure (new IllegalStateException ("Datafeed [" + id + "] configuration " +
83- "exists in both clusterstate and index" ));
84- return ;
85- }
86- });
87-
8883 requiredMatches .filterMatchedIds (expandedDatafeedIds );
8984
85+ // now read from the clusterstate
86+ Set <String > clusterStateDatafeedIds = MlMetadata .getMlMetadata (clusterState ).expandDatafeedIds (expression );
87+ requiredMatches .filterMatchedIds (clusterStateDatafeedIds );
88+
9089 if (requiredMatches .hasUnmatchedIds ()) {
9190 listener .onFailure (ExceptionsHelper .missingDatafeedException (requiredMatches .unmatchedIdsString ()));
9291 } else {
@@ -105,33 +104,33 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Clust
105104 public void expandDatafeedConfigs (String expression , boolean allowNoDatafeeds , ClusterState clusterState ,
106105 ActionListener <List <DatafeedConfig >> listener ) {
107106
108- Map <String , DatafeedConfig > clusterStateConfigs = expandClusterStateDatafeeds (expression , clusterState );
109-
110107 ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher (expression , allowNoDatafeeds );
111- requiredMatches .filterMatchedIds (clusterStateConfigs .keySet ());
112108
113109 datafeedConfigProvider .expandDatafeedConfigsWithoutMissingCheck (expression , ActionListener .wrap (
114110 datafeedBuilders -> {
115- // Check for duplicate Ids
116- datafeedBuilders .forEach (datafeedBuilder -> {
117- if (clusterStateConfigs .containsKey (datafeedBuilder .getId ())) {
118- listener .onFailure (new IllegalStateException ("Datafeed [" + datafeedBuilder .getId () + "] configuration " +
119- "exists in both clusterstate and index" ));
120- return ;
121- }
122- });
123-
124111 List <DatafeedConfig > datafeedConfigs = new ArrayList <>();
125112 for (DatafeedConfig .Builder builder : datafeedBuilders ) {
126113 datafeedConfigs .add (builder .build ());
127114 }
128115
116+ Map <String , DatafeedConfig > clusterStateConfigs = expandClusterStateDatafeeds (expression , clusterState );
117+
118+ // Duplicate configs existing in both the clusterstate and index documents are ok
119+ // this may occur during migration of configs.
120+ // Prefer the index configs and filter duplicates.
121+ for (String clusterStateDatafeedId : clusterStateConfigs .keySet ()) {
122+ boolean isDuplicate = datafeedConfigs .stream ()
123+ .anyMatch (datafeed -> datafeed .getId ().equals (clusterStateDatafeedId ));
124+ if (isDuplicate == false ) {
125+ datafeedConfigs .add (clusterStateConfigs .get (clusterStateDatafeedId ));
126+ }
127+ }
128+
129129 requiredMatches .filterMatchedIds (datafeedConfigs .stream ().map (DatafeedConfig ::getId ).collect (Collectors .toList ()));
130130
131131 if (requiredMatches .hasUnmatchedIds ()) {
132132 listener .onFailure (ExceptionsHelper .missingDatafeedException (requiredMatches .unmatchedIdsString ()));
133133 } else {
134- datafeedConfigs .addAll (clusterStateConfigs .values ());
135134 Collections .sort (datafeedConfigs , Comparator .comparing (DatafeedConfig ::getId ));
136135 listener .onResponse (datafeedConfigs );
137136 }
0 commit comments