2424import org .elasticsearch .common .io .stream .StreamInput ;
2525import org .elasticsearch .common .io .stream .StreamOutput ;
2626import org .elasticsearch .common .settings .Settings ;
27+ import org .elasticsearch .index .IndexSettings ;
2728import org .elasticsearch .index .shard .ShardId ;
2829import org .elasticsearch .persistent .PersistentTasksCustomMetaData ;
2930import org .elasticsearch .persistent .PersistentTasksService ;
@@ -224,29 +225,13 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
224225 */
225226 void start (Request request , String clusterNameAlias , IndexMetaData leaderIndexMetadata , IndexMetaData followIndexMetadata ,
226227 ActionListener <Response > handler ) {
227- if (leaderIndexMetadata == null ) {
228- handler .onFailure (new IllegalArgumentException ("leader index [" + request .leaderIndex + "] does not exist" ));
229- return ;
230- }
231-
232- if (followIndexMetadata == null ) {
233- handler .onFailure (new IllegalArgumentException ("follow index [" + request .followIndex + "] does not exist" ));
234- return ;
235- }
236-
237- if (leaderIndexMetadata .getNumberOfShards () != followIndexMetadata .getNumberOfShards ()) {
238- handler .onFailure (new IllegalArgumentException ("leader index primary shards [" +
239- leaderIndexMetadata .getNumberOfShards () + "] does not match with the number of " +
240- "shards of the follow index [" + followIndexMetadata .getNumberOfShards () + "]" ));
241- // TODO: other validation checks
242- } else {
228+ validate (leaderIndexMetadata ,followIndexMetadata , request );
243229 final int numShards = followIndexMetadata .getNumberOfShards ();
244230 final AtomicInteger counter = new AtomicInteger (numShards );
245231 final AtomicReferenceArray <Object > responses = new AtomicReferenceArray <>(followIndexMetadata .getNumberOfShards ());
246232 Map <String , String > filteredHeaders = threadPool .getThreadContext ().getHeaders ().entrySet ().stream ()
247233 .filter (e -> ShardFollowTask .HEADER_FILTERS .contains (e .getKey ()))
248- .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
249- for (int i = 0 ; i < numShards ; i ++) {
234+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));for (int i = 0 ; i < numShards ; i ++) {
250235 final int shardId = i ;
251236 String taskId = followIndexMetadata .getIndexUUID () + "-" + shardId ;
252237 ShardFollowTask shardFollowTask = new ShardFollowTask (clusterNameAlias ,
@@ -261,39 +246,59 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowT
261246 finalizeResponse ();
262247 }
263248
264- @ Override
265- public void onFailure (Exception e ) {
266- responses .set (shardId , e );
267- finalizeResponse ();
268- }
249+ @ Override
250+ public void onFailure (Exception e ) {
251+ responses .set (shardId , e );
252+ finalizeResponse ();
253+ }
269254
270- void finalizeResponse () {
271- Exception error = null ;
272- if (counter .decrementAndGet () == 0 ) {
273- for (int j = 0 ; j < responses .length (); j ++) {
274- Object response = responses .get (j );
275- if (response instanceof Exception ) {
276- if (error == null ) {
277- error = (Exception ) response ;
278- } else {
279- error .addSuppressed ((Throwable ) response );
280- }
255+ void finalizeResponse () {
256+ Exception error = null ;
257+ if (counter .decrementAndGet () == 0 ) {
258+ for (int j = 0 ; j < responses .length (); j ++) {
259+ Object response = responses .get (j );
260+ if (response instanceof Exception ) {
261+ if (error == null ) {
262+ error = (Exception ) response ;
263+ } else {
264+ error .addSuppressed ((Throwable ) response );
281265 }
282266 }
267+ }
283268
284- if (error == null ) {
285- // include task ids?
286- handler .onResponse (new Response (true ));
287- } else {
288- // TODO: cancel all started tasks
289- handler .onFailure (error );
290- }
269+ if (error == null ) {
270+ // include task ids?
271+ handler .onResponse (new Response (true ));
272+ } else {
273+ // TODO: cancel all started tasks
274+ handler .onFailure (error );
291275 }
292276 }
293277 }
294- );
295- }
278+ }
279+ );
296280 }
297281 }
298282 }
283+
284+
285+ static void validate (IndexMetaData leaderIndex , IndexMetaData followIndex , Request request ) {
286+ if (leaderIndex == null ) {
287+ throw new IllegalArgumentException ("leader index [" + request .leaderIndex + "] does not exist" );
288+ }
289+
290+ if (followIndex == null ) {
291+ throw new IllegalArgumentException ("follow index [" + request .followIndex + "] does not exist" );
292+ }
293+ if (leaderIndex .getSettings ().getAsBoolean (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), false ) == false ) {
294+ throw new IllegalArgumentException ("leader index [" + request .leaderIndex + "] does not have soft deletes enabled" );
295+ }
296+
297+ if (leaderIndex .getNumberOfShards () != followIndex .getNumberOfShards ()) {
298+ throw new IllegalArgumentException ("leader index primary shards [" + leaderIndex .getNumberOfShards () +
299+ "] does not match with the number of shards of the follow index [" + followIndex .getNumberOfShards () + "]" );
300+ }
301+ // TODO: other validation checks
302+ }
303+
299304}
0 commit comments