3838import java .util .Collection ;
3939import java .util .HashMap ;
4040import java .util .HashSet ;
41+ import java .util .Iterator ;
4142import java .util .List ;
4243import java .util .Map ;
4344import java .util .Set ;
4445import java .util .concurrent .atomic .AtomicBoolean ;
4546import java .util .concurrent .atomic .AtomicReference ;
47+ import java .util .function .Function ;
4648import java .util .stream .Collectors ;
4749
4850import static org .elasticsearch .xpack .core .ClientHelper .ML_ORIGIN ;
6870 * If there was an error in step 3 and the config is in both the clusterstate and
6971 * index then when the migrator retries it must not overwrite an existing job config
7072 * document as once the index document is present all update operations will function
71- * on that rather than the clusterstate
73+ * on that rather than the clusterstate.
74+ *
75+ * The number of configs indexed in each bulk operation is limited by {@link #MAX_BULK_WRITE_SIZE}
76+ * pairs of datafeeds and jobs are migrated together.
7277 */
7378public class MlConfigMigrator {
7479
7580 private static final Logger logger = LogManager .getLogger (MlConfigMigrator .class );
7681
7782 public static final String MIGRATED_FROM_VERSION = "migrated from version" ;
7883
84+ static final int MAX_BULK_WRITE_SIZE = 100 ;
85+
7986 private final Client client ;
8087 private final ClusterService clusterService ;
8188
@@ -111,10 +118,12 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
111118 return ;
112119 }
113120
114- Collection <DatafeedConfig > datafeedsToMigrate = stoppedDatafeedConfigs (clusterState );
115- List < Job > jobsToMigrate = nonDeletingJobs (closedJobConfigs (clusterState )).stream ()
121+ Collection <DatafeedConfig > stoppedDatafeeds = stoppedDatafeedConfigs (clusterState );
122+ Map < String , Job > eligibleJobs = nonDeletingJobs (closedJobConfigs (clusterState )).stream ()
116123 .map (MlConfigMigrator ::updateJobForMigration )
117- .collect (Collectors .toList ());
124+ .collect (Collectors .toMap (Job ::getId , Function .identity (), (a , b ) -> a ));
125+
126+ JobsAndDatafeeds jobsAndDatafeedsToMigrate = limitWrites (stoppedDatafeeds , eligibleJobs );
118127
119128 ActionListener <Boolean > unMarkMigrationInProgress = ActionListener .wrap (
120129 response -> {
@@ -127,16 +136,18 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
127136 }
128137 );
129138
130- if (datafeedsToMigrate . isEmpty () && jobsToMigrate . isEmpty () ) {
139+ if (jobsAndDatafeedsToMigrate . totalCount () == 0 ) {
131140 unMarkMigrationInProgress .onResponse (Boolean .FALSE );
132141 return ;
133142 }
134143
135- writeConfigToIndex (datafeedsToMigrate , jobsToMigrate , ActionListener .wrap (
144+ logger .debug ("migrating ml configurations" );
145+
146+ writeConfigToIndex (jobsAndDatafeedsToMigrate .datafeedConfigs , jobsAndDatafeedsToMigrate .jobs , ActionListener .wrap (
136147 failedDocumentIds -> {
137- List <String > successfulJobWrites = filterFailedJobConfigWrites (failedDocumentIds , jobsToMigrate );
148+ List <String > successfulJobWrites = filterFailedJobConfigWrites (failedDocumentIds , jobsAndDatafeedsToMigrate . jobs );
138149 List <String > successfulDatafeedWrites =
139- filterFailedDatafeedConfigWrites (failedDocumentIds , datafeedsToMigrate );
150+ filterFailedDatafeedConfigWrites (failedDocumentIds , jobsAndDatafeedsToMigrate . datafeedConfigs );
140151 removeFromClusterState (successfulJobWrites , successfulDatafeedWrites , unMarkMigrationInProgress );
141152 },
142153 unMarkMigrationInProgress ::onFailure
@@ -341,6 +352,62 @@ public static List<DatafeedConfig> stoppedDatafeedConfigs(ClusterState clusterSt
341352 .collect (Collectors .toList ());
342353 }
343354
355+ public static class JobsAndDatafeeds {
356+ List <Job > jobs ;
357+ List <DatafeedConfig > datafeedConfigs ;
358+
359+ private JobsAndDatafeeds () {
360+ jobs = new ArrayList <>();
361+ datafeedConfigs = new ArrayList <>();
362+ }
363+
364+ public int totalCount () {
365+ return jobs .size () + datafeedConfigs .size ();
366+ }
367+ }
368+
369+ /**
370+ * Return at most {@link #MAX_BULK_WRITE_SIZE} configs favouring
371+ * datafeed and job pairs so if a datafeed is chosen so is its job.
372+ *
373+ * @param datafeedsToMigrate Datafeed configs
374+ * @param jobsToMigrate Job configs
375+ * @return Job and datafeed configs
376+ */
377+ public static JobsAndDatafeeds limitWrites (Collection <DatafeedConfig > datafeedsToMigrate , Map <String , Job > jobsToMigrate ) {
378+ JobsAndDatafeeds jobsAndDatafeeds = new JobsAndDatafeeds ();
379+
380+ if (datafeedsToMigrate .size () + jobsToMigrate .size () <= MAX_BULK_WRITE_SIZE ) {
381+ jobsAndDatafeeds .jobs .addAll (jobsToMigrate .values ());
382+ jobsAndDatafeeds .datafeedConfigs .addAll (datafeedsToMigrate );
383+ return jobsAndDatafeeds ;
384+ }
385+
386+ int count = 0 ;
387+
388+ // prioritise datafeed and job pairs
389+ for (DatafeedConfig datafeedConfig : datafeedsToMigrate ) {
390+ if (count < MAX_BULK_WRITE_SIZE ) {
391+ jobsAndDatafeeds .datafeedConfigs .add (datafeedConfig );
392+ count ++;
393+ Job datafeedsJob = jobsToMigrate .remove (datafeedConfig .getJobId ());
394+ if (datafeedsJob != null ) {
395+ jobsAndDatafeeds .jobs .add (datafeedsJob );
396+ count ++;
397+ }
398+ }
399+ }
400+
401+ // are there jobs without datafeeds to migrate
402+ Iterator <Job > iter = jobsToMigrate .values ().iterator ();
403+ while (iter .hasNext () && count < MAX_BULK_WRITE_SIZE ) {
404+ jobsAndDatafeeds .jobs .add (iter .next ());
405+ count ++;
406+ }
407+
408+ return jobsAndDatafeeds ;
409+ }
410+
344411 /**
345412 * Check for failures in the bulk response and return the
346413 * Ids of any documents not written to the index
0 commit comments