|
24 | 24 | import org.elasticsearch.common.io.stream.StreamInput; |
25 | 25 | import org.elasticsearch.common.io.stream.StreamOutput; |
26 | 26 | import org.elasticsearch.common.settings.Settings; |
| 27 | +import org.elasticsearch.index.IndexSettings; |
27 | 28 | import org.elasticsearch.index.shard.ShardId; |
28 | 29 | import org.elasticsearch.persistent.PersistentTasksCustomMetaData; |
29 | 30 | import org.elasticsearch.persistent.PersistentTasksService; |
@@ -223,73 +224,78 @@ protected void doExecute(Request request, ActionListener<Response> listener) { |
223 | 224 | */ |
224 | 225 | void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, |
225 | 226 | ActionListener<Response> handler) { |
226 | | - if (leaderIndexMetadata == null) { |
227 | | - handler.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist")); |
228 | | - return; |
229 | | - } |
230 | | - |
231 | | - if (followIndexMetadata == null) { |
232 | | - handler.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist")); |
233 | | - return; |
234 | | - } |
235 | | - |
236 | | - if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) { |
237 | | - handler.onFailure(new IllegalArgumentException("leader index primary shards [" + |
238 | | - leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " + |
239 | | - "shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]")); |
240 | | - // TODO: other validation checks |
241 | | - } else { |
242 | | - final int numShards = followIndexMetadata.getNumberOfShards(); |
243 | | - final AtomicInteger counter = new AtomicInteger(numShards); |
244 | | - final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); |
245 | | - for (int i = 0; i < numShards; i++) { |
246 | | - final int shardId = i; |
247 | | - String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; |
248 | | - ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, |
249 | | - new ShardId(followIndexMetadata.getIndex(), shardId), |
250 | | - new ShardId(leaderIndexMetadata.getIndex(), shardId), |
251 | | - request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes); |
252 | | - persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask, |
253 | | - new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() { |
254 | | - @Override |
255 | | - public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) { |
256 | | - responses.set(shardId, task); |
257 | | - finalizeResponse(); |
258 | | - } |
| 227 | + validate(leaderIndexMetadata, followIndexMetadata, request); |
| 228 | + final int numShards = followIndexMetadata.getNumberOfShards(); |
| 229 | + final AtomicInteger counter = new AtomicInteger(numShards); |
| 230 | + final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); |
| 231 | + for (int i = 0; i < numShards; i++) { |
| 232 | + final int shardId = i; |
| 233 | + String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; |
| 234 | + ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, |
| 235 | + new ShardId(followIndexMetadata.getIndex(), shardId), |
| 236 | + new ShardId(leaderIndexMetadata.getIndex(), shardId), |
| 237 | + request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes); |
| 238 | + persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask, |
| 239 | + new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() { |
| 240 | + @Override |
| 241 | + public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) { |
| 242 | + responses.set(shardId, task); |
| 243 | + finalizeResponse(); |
| 244 | + } |
259 | 245 |
|
260 | | - @Override |
261 | | - public void onFailure(Exception e) { |
262 | | - responses.set(shardId, e); |
263 | | - finalizeResponse(); |
264 | | - } |
| 246 | + @Override |
| 247 | + public void onFailure(Exception e) { |
| 248 | + responses.set(shardId, e); |
| 249 | + finalizeResponse(); |
| 250 | + } |
265 | 251 |
|
266 | | - void finalizeResponse() { |
267 | | - Exception error = null; |
268 | | - if (counter.decrementAndGet() == 0) { |
269 | | - for (int j = 0; j < responses.length(); j++) { |
270 | | - Object response = responses.get(j); |
271 | | - if (response instanceof Exception) { |
272 | | - if (error == null) { |
273 | | - error = (Exception) response; |
274 | | - } else { |
275 | | - error.addSuppressed((Throwable) response); |
276 | | - } |
| 252 | + void finalizeResponse() { |
| 253 | + Exception error = null; |
| 254 | + if (counter.decrementAndGet() == 0) { |
| 255 | + for (int j = 0; j < responses.length(); j++) { |
| 256 | + Object response = responses.get(j); |
| 257 | + if (response instanceof Exception) { |
| 258 | + if (error == null) { |
| 259 | + error = (Exception) response; |
| 260 | + } else { |
| 261 | + error.addSuppressed((Throwable) response); |
277 | 262 | } |
278 | 263 | } |
| 264 | + } |
279 | 265 |
|
280 | | - if (error == null) { |
281 | | - // include task ids? |
282 | | - handler.onResponse(new Response(true)); |
283 | | - } else { |
284 | | - // TODO: cancel all started tasks |
285 | | - handler.onFailure(error); |
286 | | - } |
| 266 | + if (error == null) { |
| 267 | + // include task ids? |
| 268 | + handler.onResponse(new Response(true)); |
| 269 | + } else { |
| 270 | + // TODO: cancel all started tasks |
| 271 | + handler.onFailure(error); |
287 | 272 | } |
288 | 273 | } |
289 | 274 | } |
290 | | - ); |
291 | | - } |
| 275 | + } |
| 276 | + ); |
292 | 277 | } |
293 | 278 | } |
294 | 279 | } |
| 280 | + |
| 281 | + |
| 282 | + static void validate(IndexMetaData leaderIndex, IndexMetaData followIndex, Request request) { |
| 283 | + if (leaderIndex == null) { |
| 284 | + throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"); |
| 285 | + } |
| 286 | + |
| 287 | + if (followIndex == null) { |
| 288 | + throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"); |
| 289 | + } |
| 290 | + if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) { |
| 291 | + throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled"); |
| 292 | + } |
| 293 | + |
| 294 | + if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) { |
| 295 | + throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() + |
| 296 | + "] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]"); |
| 297 | + } |
| 298 | + // TODO: other validation checks |
| 299 | + } |
| 300 | + |
295 | 301 | } |
0 commit comments