Skip to content

Commit dc25223

Browse files
fix initializing shard cancel logic issue
1 parent bf5248f commit dc25223

File tree

4 files changed

+3
-127
lines changed

4 files changed

+3
-127
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ private void reroute(RoutingAllocation allocation) {
395395
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
396396
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
397397
"auto-expand replicas out of sync with number of nodes in the cluster";
398-
398+
399399
removeDelayMarkers(allocation);
400400
// try to allocate existing shard copies first
401401
gatewayAllocator.allocateUnassigned(allocation);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -896,8 +896,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
896896
float minWeight = Float.POSITIVE_INFINITY;
897897
ModelNode minNode = null;
898898
Decision decision = null;
899-
ModelNode delayNode = null;
900-
Decision delayNodeDecision = null;
901899
if (throttledNodes.size() >= nodes.size() && explain == false) {
902900
// all nodes are throttled, so we know we won't be able to allocate this round,
903901
// so if we are not in explain mode, short circuit
@@ -926,11 +924,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
926924
new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0));
927925
nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight));
928926
}
929-
if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT
930-
&& shard.unassignedInfo().getDetails().indexOf(node.getNodeId()) != -1) {
931-
delayNode = node;
932-
delayNodeDecision = currentDecision;
933-
}
934927
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
935928
final boolean updateMinNode;
936929
if (currentWeight == minWeight) {
@@ -970,24 +963,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
970963
// decision was not set and a node was not assigned, so treat it as a NO decision
971964
decision = Decision.NO;
972965
}
973-
974-
// avoid allocating shard to other nodes before INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING
975-
if (decision.type() == Type.YES || decision.type() == Type.THROTTLE) {
976-
if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT
977-
&& shard.unassignedInfo().isDelayed()) {
978-
if (delayNode == null) {
979-
return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null);
980-
} else if (!minNode.getNodeId().equals(delayNode.getNodeId())) {
981-
if (delayNodeDecision.type() == Type.YES || delayNodeDecision.type() == Type.THROTTLE) {
982-
minNode = delayNode;
983-
decision = delayNodeDecision;
984-
} else {
985-
return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null);
986-
}
987-
}
988-
}
989-
}
990-
991966
List<NodeAllocationResult> nodeDecisions = null;
992967
if (explain) {
993968
nodeDecisions = new ArrayList<>();

server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java

Lines changed: 1 addition & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,13 @@ public class DelayedAllocationServiceTests extends ESAllocationTestCase {
6161
private TestDelayAllocationService delayedAllocationService;
6262
private MockAllocationService allocationService;
6363
private ClusterService clusterService;
64-
private DelayedShardsMockGatewayAllocator gatewayAllocator;
6564
private ThreadPool threadPool;
6665

6766
@Before
6867
public void createDelayedAllocationService() {
6968
threadPool = new TestThreadPool(getTestName());
7069
clusterService = mock(ClusterService.class);
71-
gatewayAllocator = new DelayedShardsMockGatewayAllocator();
72-
allocationService = createAllocationService(Settings.EMPTY, gatewayAllocator);
70+
allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
7371
delayedAllocationService = new TestDelayAllocationService(threadPool, clusterService, allocationService);
7472
verify(clusterService).addListener(delayedAllocationService);
7573
}
@@ -462,92 +460,6 @@ public void testDelayedUnassignedScheduleRerouteRescheduledOnShorterDelay() thro
462460
equalTo(shorterDelaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos)));
463461
}
464462

465-
public void testDelayUnassignedNoShardCopyBeforeDelay() throws Exception {
466-
TimeValue delaySetting = timeValueMillis(100);
467-
MetaData metaData = MetaData.builder()
468-
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
469-
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting))
470-
.numberOfShards(1).numberOfReplicas(1))
471-
.build();
472-
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
473-
.metaData(metaData)
474-
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
475-
clusterState = ClusterState.builder(clusterState)
476-
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).localNodeId("node1").masterNodeId("node1"))
477-
.build();
478-
final long baseTimestampNanos = System.nanoTime();
479-
allocationService.setNanoTimeOverride(baseTimestampNanos);
480-
clusterState = allocationService.reroute(clusterState, "reroute");
481-
// starting primaries
482-
clusterState = startInitializingShardsAndReroute(allocationService, clusterState);
483-
// starting replicas
484-
clusterState = startInitializingShardsAndReroute(allocationService, clusterState);
485-
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
486-
487-
String nodeId = null;
488-
final List<ShardRouting> allShards = clusterState.getRoutingTable().allShards("test");
489-
// we need to find the node with the replica to be removed
490-
for (ShardRouting shardRouting : allShards) {
491-
if (shardRouting.primary() == false) {
492-
nodeId = shardRouting.currentNodeId();
493-
break;
494-
}
495-
}
496-
assertNotNull(nodeId);
497-
498-
// skip allocation of handling existing data copy shard
499-
gatewayAllocator.skipAllocation(true);
500-
501-
// remove node that has replica and reroute
502-
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId);
503-
clusterState = ClusterState.builder(clusterState).nodes(nodes).build();
504-
clusterState = allocationService.disassociateDeadNodes(clusterState, true, "reroute");
505-
ClusterState stateWithDelayedShard = clusterState;
506-
// make sure the replica is marked as delayed (i.e. not reallocated)
507-
assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard));
508-
ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next();
509-
assertEquals(baseTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos());
510-
511-
// mock ClusterService.submitStateUpdateTask() method
512-
CountDownLatch latch = new CountDownLatch(1);
513-
AtomicReference<ClusterStateUpdateTask> clusterStateUpdateTask = new AtomicReference<>();
514-
doAnswer(invocationOnMock -> {
515-
clusterStateUpdateTask.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]);
516-
latch.countDown();
517-
return null;
518-
}).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class));
519-
assertNull(delayedAllocationService.delayedRerouteTask.get());
520-
long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)delaySetting.nanos() - 1)).nanos();
521-
long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent;
522-
delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
523-
delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState));
524-
525-
// check that delayed reroute task was created and registered with the proper settings
526-
DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
527-
assertNotNull(delayedRerouteTask);
528-
assertFalse(delayedRerouteTask.cancelScheduling.get());
529-
assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
530-
assertThat(delayedRerouteTask.nextDelay.nanos(),
531-
equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos)));
532-
533-
// check that submitStateUpdateTask() was invoked on the cluster service mock
534-
assertTrue(latch.await(30, TimeUnit.SECONDS));
535-
verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask.get()));
536-
537-
// advance the time on the allocation service to a timestamp that happened after the delayed scheduling
538-
long nanoTimeForReroute = clusterChangeEventTimestampNanos + delaySetting.nanos() + timeValueMillis(randomInt(200)).nanos();
539-
allocationService.setNanoTimeOverride(nanoTimeForReroute);
540-
// apply cluster state
541-
ClusterState stateWithRemovedDelay = clusterStateUpdateTask.get().execute(stateWithDelayedShard);
542-
// check that shard is not delayed anymore
543-
assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithRemovedDelay));
544-
// check that task is now removed
545-
assertNull(delayedAllocationService.delayedRerouteTask.get());
546-
547-
// reset
548-
gatewayAllocator.skipAllocation(false);
549-
}
550-
551463
private static class TestDelayAllocationService extends DelayedAllocationService {
552464
private volatile long nanoTimeOverride = -1L;
553465

@@ -571,4 +483,3 @@ protected long currentNanoTime() {
571483
}
572484
}
573485
}
574-

test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,7 @@ protected long currentNanoTime() {
254254
* Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
255255
*/
256256
protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
257-
private boolean skipAllocation = false;
258-
259-
public DelayedShardsMockGatewayAllocator() {
260-
}
257+
public DelayedShardsMockGatewayAllocator() {}
261258

262259
@Override
263260
public void applyStartedShards(RoutingAllocation allocation, List<ShardRouting> startedShards) {
@@ -271,9 +268,6 @@ public void applyFailedShards(RoutingAllocation allocation, List<FailedShard> fa
271268

272269
@Override
273270
public void allocateUnassigned(RoutingAllocation allocation) {
274-
if (this.skipAllocation) {
275-
return;
276-
}
277271
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
278272
while (unassignedIterator.hasNext()) {
279273
ShardRouting shard = unassignedIterator.next();
@@ -285,9 +279,5 @@ public void allocateUnassigned(RoutingAllocation allocation) {
285279
}
286280
}
287281
}
288-
289-
public void skipAllocation(boolean skipAllocation) {
290-
this.skipAllocation = skipAllocation;
291-
}
292282
}
293283
}

0 commit comments

Comments
 (0)