|
39 | 39 | import org.elasticsearch.cluster.routing.RoutingTable; |
40 | 40 | import org.elasticsearch.cluster.routing.ShardRouting; |
41 | 41 | import org.elasticsearch.common.Nullable; |
42 | | -import org.elasticsearch.common.collect.Tuple; |
43 | 42 | import org.elasticsearch.common.component.AbstractLifecycleComponent; |
44 | 43 | import org.elasticsearch.common.compress.CompressedXContent; |
45 | 44 | import org.elasticsearch.common.inject.Inject; |
46 | 45 | import org.elasticsearch.common.lucene.Lucene; |
47 | 46 | import org.elasticsearch.common.settings.Settings; |
48 | | -import org.elasticsearch.common.unit.TimeValue; |
49 | 47 | import org.elasticsearch.common.util.Callback; |
50 | 48 | import org.elasticsearch.common.util.concurrent.ConcurrentCollections; |
51 | 49 | import org.elasticsearch.index.IndexService; |
@@ -93,26 +91,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic |
93 | 91 |
|
94 | 92 | private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {}; |
95 | 93 |
|
96 | | - // a map of mappings type we have seen per index due to cluster state |
97 | | - // we need this so we won't remove types automatically created as part of the indexing process |
98 | | - private final ConcurrentMap<Tuple<String, String>, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap(); |
99 | | - |
100 | 94 | // a list of shards that failed during recovery |
101 | 95 | // we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update |
102 | | - private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap(); |
| 96 | + private final ConcurrentMap<ShardId, ShardRouting> failedShards = ConcurrentCollections.newConcurrentMap(); |
103 | 97 | private final RestoreService restoreService; |
104 | 98 | private final RepositoriesService repositoriesService; |
105 | 99 |
|
106 | | - static class FailedShard { |
107 | | - public final long version; |
108 | | - public final long timestamp; |
109 | | - |
110 | | - FailedShard(long version) { |
111 | | - this.version = version; |
112 | | - this.timestamp = System.currentTimeMillis(); |
113 | | - } |
114 | | - } |
115 | | - |
116 | 100 | private final Object mutex = new Object(); |
117 | 101 | private final FailedShardHandler failedShardHandler = new FailedShardHandler(); |
118 | 102 |
|
@@ -431,11 +415,6 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) { |
431 | 415 |
|
432 | 416 | RoutingTable routingTable = event.state().routingTable(); |
433 | 417 | RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId()); |
434 | | - |
435 | | - if (routingNode == null) { |
436 | | - failedShards.clear(); |
437 | | - return; |
438 | | - } |
439 | 418 | DiscoveryNodes nodes = event.state().nodes(); |
440 | 419 |
|
441 | 420 | for (final ShardRouting shardRouting : routingNode) { |
@@ -507,40 +486,29 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) { |
507 | 486 | } |
508 | 487 |
|
509 | 488 | private void cleanFailedShards(final ClusterChangedEvent event) { |
510 | | - RoutingTable routingTable = event.state().routingTable(); |
511 | 489 | RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId()); |
512 | 490 | if (routingNode == null) { |
513 | 491 | failedShards.clear(); |
514 | 492 | return; |
515 | 493 | } |
516 | | - DiscoveryNodes nodes = event.state().nodes(); |
517 | | - long now = System.currentTimeMillis(); |
518 | | - String localNodeId = nodes.localNodeId(); |
519 | | - Iterator<Map.Entry<ShardId, FailedShard>> iterator = failedShards.entrySet().iterator(); |
520 | | - shards: |
521 | | - while (iterator.hasNext()) { |
522 | | - Map.Entry<ShardId, FailedShard> entry = iterator.next(); |
523 | | - FailedShard failedShard = entry.getValue(); |
524 | | - IndexRoutingTable indexRoutingTable = routingTable.index(entry.getKey().getIndex()); |
525 | | - if (indexRoutingTable != null) { |
526 | | - IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(entry.getKey().id()); |
527 | | - if (shardRoutingTable != null) { |
528 | | - for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) { |
529 | | - if (localNodeId.equals(shardRouting.currentNodeId())) { |
530 | | - // we have a timeout here just to make sure we don't have dangled failed shards for some reason |
531 | | - // its just another safely layer |
532 | | - if (shardRouting.version() == failedShard.version && ((now - failedShard.timestamp) < TimeValue.timeValueMinutes(60).millis())) { |
533 | | - // It's the same failed shard - keep it if it hasn't timed out |
534 | | - continue shards; |
535 | | - } else { |
536 | | - // Different version or expired, remove it |
537 | | - break; |
538 | | - } |
539 | | - } |
540 | | - } |
541 | | - } |
| 494 | + RoutingTable routingTable = event.state().routingTable(); |
| 495 | + for (Iterator<Map.Entry<ShardId, ShardRouting>> iterator = failedShards.entrySet().iterator(); iterator.hasNext(); ) { |
| 496 | + Map.Entry<ShardId, ShardRouting> entry = iterator.next(); |
| 497 | + ShardId failedShardId = entry.getKey(); |
| 498 | + ShardRouting failedShardRouting = entry.getValue(); |
| 499 | + IndexRoutingTable indexRoutingTable = routingTable.index(failedShardId.getIndex()); |
| 500 | + if (indexRoutingTable == null) { |
| 501 | + iterator.remove(); |
| 502 | + continue; |
| 503 | + } |
| 504 | + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(failedShardId.id()); |
| 505 | + if (shardRoutingTable == null) { |
| 506 | + iterator.remove(); |
| 507 | + continue; |
| 508 | + } |
| 509 | + if (shardRoutingTable.assignedShards().stream().noneMatch(shr -> shr.isSameAllocation(failedShardRouting))) { |
| 510 | + iterator.remove(); |
542 | 511 | } |
543 | | - iterator.remove(); |
544 | 512 | } |
545 | 513 | } |
546 | 514 |
|
@@ -788,7 +756,7 @@ private void failAndRemoveShard(ShardRouting shardRouting, String indexUUID, @Nu |
788 | 756 | private void sendFailShard(ShardRouting shardRouting, String indexUUID, String message, @Nullable Throwable failure) { |
789 | 757 | try { |
790 | 758 | logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message); |
791 | | - failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version())); |
| 759 | + failedShards.put(shardRouting.shardId(), shardRouting); |
792 | 760 | shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER); |
793 | 761 | } catch (Throwable e1) { |
794 | 762 | logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndexName(), shardRouting.getId(), message); |
|
0 commit comments