Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ public String getAllocationId() {
* matching sync ids are irrelevant.
*/
public boolean hasMatchingSyncId() {
// TODO: we should eventually either distinguish between sync-id and non sync-id equivalent closed shard allocation or
// rename this to isSynced().
// left this for now, since it changes the API and should preferably be handled together with seqno based
// replica shard allocation.
return matchingBytes == Long.MAX_VALUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;
Expand All @@ -49,7 +50,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand Down Expand Up @@ -101,17 +101,16 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
final String currentSyncId;
final TransportNodesListShardStoreMetaData.StoreFilesMetaData currentStore;
if (shardStores.getData().containsKey(currentNode)) {
currentSyncId = shardStores.getData().get(currentNode).storeFilesMetaData().syncId();
currentStore = shardStores.getData().get(currentNode).storeFilesMetaData();
} else {
currentSyncId = null;
currentStore = null;
}
if (currentNode.equals(nodeWithHighestMatch) == false
&& Objects.equals(currentSyncId, primaryStore.syncId()) == false
&& matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch)) {
// we found a better match that has a full sync id match, the existing allocation is not fully synced
// so we found a better one, cancel this one
&& isSyncedRecovery(primaryStore, currentStore, isClosedIndex(shard, allocation)) == false
&& matchingNodes.isSyncedRecovery(nodeWithHighestMatch)) {
// we found a better match that can do a fast recovery, cancel current recovery
logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]",
currentNode, nodeWithHighestMatch);
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
Expand Down Expand Up @@ -315,6 +314,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
boolean explain) {
ObjectLongMap<DiscoveryNode> nodesToSize = new ObjectLongHashMap<>();
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
final boolean closedIndex = isClosedIndex(shard, allocation);
for (Map.Entry<DiscoveryNode, NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
Expand All @@ -335,7 +335,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al

long matchingBytes = -1;
if (explain) {
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData);
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData, closedIndex);
ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingBytes);
nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision));
}
Expand All @@ -345,7 +345,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
}

if (matchingBytes < 0) {
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData);
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData, closedIndex);
}
nodesToSize.put(discoNode, matchingBytes);
if (logger.isTraceEnabled()) {
Expand All @@ -362,11 +362,9 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
}

private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData) {
String primarySyncId = primaryStore.syncId();
String replicaSyncId = storeFilesMetaData.syncId();
// see if we have a sync id we can make use of
if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) {
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData,
boolean closedIndex) {
if (isSyncedRecovery(primaryStore, storeFilesMetaData, closedIndex)) {
return Long.MAX_VALUE;
} else {
long sizeMatched = 0;
Expand All @@ -380,6 +378,37 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St
}
}

/**
* Is a "synced recovery", which is either sync-id match or a closed index with equivalent last commits.
*/
private static boolean isSyncedRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore, boolean closedIndex) {
return syncIdMatch(primaryStore, candidateStore)
|| (closedIndex && equivalentStores(primaryStore, candidateStore));
}

private static boolean syncIdMatch(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) {
String primarySyncId = primaryStore.syncId();
String replicaSyncId = candidateStore.syncId();
return (replicaSyncId != null && replicaSyncId.equals(primarySyncId));
}

private static boolean equivalentStores(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) {
SequenceNumbers.CommitInfo primarySeqNoInfo = primaryStore.seqNoInfo();
SequenceNumbers.CommitInfo candidateSeqNoInfo = candidateStore.seqNoInfo();

return primarySeqNoInfo.maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED // disregard empty or upgraded without ops
&& primarySeqNoInfo.maxSeqNo == primarySeqNoInfo.localCheckpoint
&& candidateSeqNoInfo.maxSeqNo == candidateSeqNoInfo.localCheckpoint
&& primarySeqNoInfo.maxSeqNo == candidateSeqNoInfo.maxSeqNo;
}

private boolean isClosedIndex(ShardRouting shard, RoutingAllocation allocation) {
return allocation.metaData().getIndexSafe(shard.index()).getState() == IndexMetaData.State.CLOSE;
}

protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);

/**
Expand Down Expand Up @@ -418,7 +447,10 @@ public DiscoveryNode getNodeWithHighestMatch() {
return this.nodeWithHighestMatch;
}

public boolean isNodeMatchBySyncID(DiscoveryNode node) {
/**
* Is supplied node a sync'ed recovery, either sync-id match or closed index with identical last commit.
*/
public boolean isSyncedRecovery(DiscoveryNode node) {
return nodesToSize.get(node) == Long.MAX_VALUE;
}

Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,13 @@ public long getNumDocs() {
return numDocs;
}

/**
* @return sequence number info from lucene commit
*/
public SequenceNumbers.CommitInfo seqNoInfo() {
return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(commitUserData.entrySet());
}

static class LoadedMetadata {
final Map<String, StoreFileMetaData> fileMetadata;
final Map<String, String> userData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.gateway.AsyncShardFetch;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
Expand Down Expand Up @@ -220,6 +221,13 @@ public String syncId() {
return metadataSnapshot.getSyncId();
}

/**
* @return sequence number info from lucene commit
*/
public SequenceNumbers.CommitInfo seqNoInfo() {
return metadataSnapshot.seqNoInfo();
}

@Override
public String toString() {
return "StoreFilesMetaData{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testUnassignedReplicaDelayedAllocation() throws Exception {
logger.info("--> starting 3 nodes");
internalCluster().startNodes(3);

prepareIndex(1, 1);
IndexMetaData.State state = prepareIndex(1, 1);
logger.info("--> stopping the node with the replica");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode().getName()));
ensureStableCluster(2);
Expand Down Expand Up @@ -263,7 +263,7 @@ public void testUnassignedReplicaDelayedAllocation() throws Exception {
nodes.put(primaryNodeName, AllocationDecision.NO);
String[] currentNodes = internalCluster().getNodeNames();
nodes.put(currentNodes[0].equals(primaryNodeName) ? currentNodes[1] : currentNodes[0], AllocationDecision.YES);
verifyNodeDecisions(parser, nodes, includeYesDecisions, true);
verifyNodeDecisions(parser, nodes, includeYesDecisions, true, state == IndexMetaData.State.CLOSE);
assertEquals(Token.END_OBJECT, parser.nextToken());
}
}
Expand All @@ -272,7 +272,7 @@ public void testUnassignedReplicaWithPriorCopy() throws Exception {
logger.info("--> starting 3 nodes");
List<String> nodes = internalCluster().startNodes(3);

prepareIndex(1, 1);
IndexMetaData.State indexState = prepareIndex(1, 1);
String primaryNodeName = primaryNodeName();
nodes.remove(primaryNodeName);

Expand Down Expand Up @@ -383,7 +383,7 @@ public void testUnassignedReplicaWithPriorCopy() throws Exception {
for (String nodeName : internalCluster().getNodeNames()) {
nodeDecisions.put(nodeName, AllocationDecision.NO);
}
verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, true);
verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, true, indexState == IndexMetaData.State.CLOSE);
assertEquals(Token.END_OBJECT, parser.nextToken());
}
}
Expand Down Expand Up @@ -476,7 +476,7 @@ public void testAllocationFilteringOnIndexCreation() throws Exception {
for (String nodeName : internalCluster().getNodeNames()) {
nodeDecisions.put(nodeName, AllocationDecision.NO);
}
verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, false);
verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, false, false);
assertEquals(Token.END_OBJECT, parser.nextToken());
}
}
Expand Down Expand Up @@ -584,7 +584,7 @@ public void testAllocationFilteringPreventsShardMove() throws Exception {
assertEquals("move_explanation", parser.currentName());
parser.nextToken();
assertEquals("cannot move shard to another node, even though it is not allowed to remain on its current node", parser.text());
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false);
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false, false);
assertEquals(Token.END_OBJECT, parser.nextToken());
}
}
Expand Down Expand Up @@ -696,7 +696,7 @@ public void testRebalancingNotAllowed() throws Exception {
parser.nextToken();
assertEquals("rebalancing is not allowed, even though there is at least one node on which the shard can be allocated",
parser.text());
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.YES, true), includeYesDecisions, false);
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.YES, true), includeYesDecisions, false, false);
assertEquals(Token.END_OBJECT, parser.nextToken());
}
}
Expand Down Expand Up @@ -799,7 +799,7 @@ public void testWorseBalance() throws Exception {
parser.nextToken();
assertEquals("cannot rebalance as no target node exists that can both allocate this shard and improve the cluster balance",
parser.text());
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.WORSE_BALANCE, true), includeYesDecisions, false);
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.WORSE_BALANCE, true), includeYesDecisions, false, false);
assertEquals(Token.END_OBJECT, parser.nextToken());
}
}
Expand Down Expand Up @@ -909,7 +909,7 @@ public void testBetterBalanceButCannotAllocate() throws Exception {
parser.nextToken();
assertEquals("cannot rebalance as no target node exists that can both allocate this shard and improve the cluster balance",
parser.text());
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false);
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false, false);
assertEquals(Token.END_OBJECT, parser.nextToken());
}
}
Expand Down Expand Up @@ -1006,7 +1006,7 @@ public void testAssignedReplicaOnSpecificNode() throws Exception {
assertEquals("rebalance_explanation", parser.currentName());
parser.nextToken();
assertEquals("rebalancing is not allowed", parser.text());
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, false), includeYesDecisions, false);
verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, false), includeYesDecisions, false, false);
assertEquals(Token.END_OBJECT, parser.nextToken());
}
}
Expand Down Expand Up @@ -1160,8 +1160,10 @@ private ClusterAllocationExplanation runExplain(boolean primary, String nodeId,
return explanation;
}

private void prepareIndex(final int numPrimaries, final int numReplicas) {
prepareIndex(randomIndexState(), numPrimaries, numReplicas, Settings.EMPTY, ActiveShardCount.ALL);
private IndexMetaData.State prepareIndex(final int numPrimaries, final int numReplicas) {
IndexMetaData.State indexState = randomIndexState();
prepareIndex(indexState, numPrimaries, numReplicas, Settings.EMPTY, ActiveShardCount.ALL);
return indexState;
}

private void prepareIndex(final IndexMetaData.State state, final int numPrimaries, final int numReplicas,
Expand Down Expand Up @@ -1326,7 +1328,7 @@ private void verifyStaleShardCopyNodeDecisions(XContentParser parser, int numNod
}

private void verifyNodeDecisions(XContentParser parser, Map<String, AllocationDecision> expectedNodeDecisions,
boolean includeYesDecisions, boolean reuseStore) throws IOException {
boolean includeYesDecisions, boolean reuseStore, boolean synced) throws IOException {
parser.nextToken();
assertEquals("node_allocation_decisions", parser.currentName());
assertEquals(Token.START_ARRAY, parser.nextToken());
Expand All @@ -1346,9 +1348,15 @@ private void verifyNodeDecisions(XContentParser parser, Map<String, AllocationDe
assertTrue("store info should not be present", reuseStore);
assertEquals(Token.START_OBJECT, parser.nextToken());
parser.nextToken();
assertEquals("matching_size_in_bytes", parser.currentName());
parser.nextToken();
assertThat(parser.longValue(), greaterThan(0L));
if (synced) {
assertEquals("matching_sync_id", parser.currentName());
parser.nextToken();
assertTrue(parser.booleanValue());
} else {
assertEquals("matching_size_in_bytes", parser.currentName());
parser.nextToken();
assertThat(parser.longValue(), greaterThan(0L));
}
assertEquals(Token.END_OBJECT, parser.nextToken());
parser.nextToken();
}
Expand Down
Loading