Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand All @@ -50,6 +51,7 @@
import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -442,6 +444,43 @@ public void testCloseIndexDuringRollingUpgrade() throws Exception {
}
}


/**
* We test that a closed index makes no-op replica allocation only.
*/
public void testClosedIndexReplicaAllocation() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test passed with the current behaviour. Can we make a small PR for this test only?

final String indexName = "closed_index_replica_allocation";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "120s")
.put("index.routing.allocation.include._name", "node-0")
.build());
indexDocs(indexName, 0, randomInt(10));
// allocate replica to node-2
updateIndexSettings(indexName,
Settings.builder().put("index.routing.allocation.include._name", "node-0,node-2,upgraded-node-*"));
ensureGreen(indexName);
closeIndex(indexName);
}

final Version indexVersionCreated = indexVersionCreated(indexName);
if (indexVersionCreated.onOrAfter(Version.V_7_2_0)) {
// index was created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
// todo: change to 7_X once backported.
if (CLUSTER_TYPE != ClusterType.OLD && minimumNodeVersion().onOrAfter(Version.V_8_0_0)) {
assertNoFileBasedRecovery(indexName, s-> s.startsWith("upgraded"));
}
} else {
assertClosedIndex(indexName, false);
}

}
/**
* Returns the version in which the given index has been created
*/
Expand Down Expand Up @@ -585,4 +624,35 @@ public void testUpdateDoc() throws Exception {
client().performRequest(update);
}
}

private void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
Map<String, Object> recoveries = entityAsMap(client()
.performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));

@SuppressWarnings("unchecked")
List<Map<String, ?>> shards = (List<Map<String,?>>) XContentMapValues.extractValue(indexName + ".shards", recoveries);
assertNotNull(shards);
boolean foundReplica = false;
for (Map<String, ?> shard : shards) {
if (shard.get("primary") == Boolean.FALSE
&& targetNode.test((String) XContentMapValues.extractValue("target.name", shard))) {
List<?> details = (List<?>) XContentMapValues.extractValue("index.files.details", shard);
// once detailed recoveries works, remove this if.
if (details == null) {
long totalFiles = ((Number) XContentMapValues.extractValue("index.files.total", shard)).longValue();
long reusedFiles = ((Number) XContentMapValues.extractValue("index.files.reused", shard)).longValue();
assertEquals("must reuse all files, recoveries [" + recoveries + "]", totalFiles, reusedFiles);
} else {
assertNotNull(details);
assertThat(details, empty());
}

long translogRecovered = ((Number) XContentMapValues.extractValue("translog.recovered", shard)).longValue();
assertEquals("must be noop, recoveries [" + recoveries + "]", 0, translogRecovered);
foundReplica = true;
}
}

assertTrue("must find replica", foundReplica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ public String getAllocationId() {
* matching sync ids are irrelevant.
*/
public boolean hasMatchingSyncId() {
// TODO: this method needs a rename, leaving it for now to not make too many iterations on that until we have full seqno
// based recovery.
return matchingBytes == Long.MAX_VALUE;
}

Expand Down Expand Up @@ -274,6 +276,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("allocation_id", allocationId);
}
if (matchingBytes >= 0) {
// TODO: we should eventually either distinguish between sync-id and non sync-id equivalent shard allocation or
// rename this to synced_match
// left this for now, since it changes the API and should preferably be handled together with seqno based
// replica shard allocation, consisting of whether this will be ops based and how many ops to recover.
if (hasMatchingSyncId()) {
builder.field("matching_sync_id", true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
*/
protected abstract void reroute(ShardId shardId, String reason);

/**
* Clear cache for node, ensuring next fetch will fetch a fresh copy.
*/
public synchronized void clearCacheForNode(String nodeId) {
cache.remove(nodeId);
}

/**
* Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
* it nodes that are no longer part of the state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -35,7 +37,11 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

public class GatewayAllocator {
Expand All @@ -52,6 +58,9 @@ public class GatewayAllocator {
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>>
asyncFetchStore = ConcurrentCollections.newConcurrentMap();

// contains ephemeralIds
private volatile Set<String> lastDataNodes = Collections.emptySet();

@Inject
public GatewayAllocator(RoutingService routingService,
TransportNodesListGatewayStartedShards startedAction,
Expand Down Expand Up @@ -101,6 +110,7 @@ public void applyFailedShards(final RoutingAllocation allocation, final List<Fai
}

public void allocateUnassigned(final RoutingAllocation allocation) {
ensureAsyncFetchStorePrimaryRecency(allocation);
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator);
}

Expand Down Expand Up @@ -128,6 +138,43 @@ public AllocateUnassignedDecision decideUnassignedShardAllocation(ShardRouting u
}
}

/**
* Whenever we see a new data node, we clear the information we have on primary to ensure it is at least as recent as the start
* of the new node. This reduces risk of making a decision on stale information from primary.
*/
private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a separate PR for this enhancement?

DiscoveryNodes nodes = allocation.nodes();
if (hasNewNodes(nodes, lastDataNodes)) {
asyncFetchStore.values().forEach(fetch -> clearCacheForPrimary(fetch, allocation));
// recalc to also (lazily) clear out old nodes.
Set<String> newDataNodes = new HashSet<>(nodes.getDataNodes().size());
for (Iterator<DiscoveryNode> iterator = nodes.getDataNodes().valuesIt(); iterator.hasNext(); ) {
newDataNodes.add(iterator.next().getEphemeralId());
}
this.lastDataNodes = newDataNodes;
}
}

private void clearCacheForPrimary(AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch,
RoutingAllocation allocation) {
ShardRouting primary = allocation.routingNodes().activePrimary(fetch.shardId);
if (primary != null) {
fetch.clearCacheForNode(primary.currentNodeId());
}
}

private boolean hasNewNodes(DiscoveryNodes nodes, Set<String> lastDataNodes) {
for (Iterator<DiscoveryNode> iterator = nodes.getDataNodes().valuesIt(); iterator.hasNext(); ) {
DiscoveryNode node = iterator.next();
if (lastDataNodes.contains(node.getEphemeralId()) == false) {
logger.trace("new node {} found, clearing primary async-fetch-store cache", node);
return true;
}
}

return false;
}

class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {

InternalAsyncFetch(Logger logger, String type, ShardId shardId, Lister<? extends BaseNodesResponse<T>, T> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand Down Expand Up @@ -101,17 +100,16 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
final String currentSyncId;
final TransportNodesListShardStoreMetaData.StoreFilesMetaData currentStore;
if (shardStores.getData().containsKey(currentNode)) {
currentSyncId = shardStores.getData().get(currentNode).storeFilesMetaData().syncId();
currentStore = shardStores.getData().get(currentNode).storeFilesMetaData();
} else {
currentSyncId = null;
currentStore = null;
}
if (currentNode.equals(nodeWithHighestMatch) == false
&& Objects.equals(currentSyncId, primaryStore.syncId()) == false
&& matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch)) {
// we found a better match that has a full sync id match, the existing allocation is not fully synced
// so we found a better one, cancel this one
&& isNoopRecovery(primaryStore, currentStore) == false
&& matchingNodes.isNoopRecovery(nodeWithHighestMatch)) {
// we found a better match that can do a fast recovery, cancel current recovery
logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]",
currentNode, nodeWithHighestMatch);
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
Expand Down Expand Up @@ -363,10 +361,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al

private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData) {
String primarySyncId = primaryStore.syncId();
String replicaSyncId = storeFilesMetaData.syncId();
// see if we have a sync id we can make use of
if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) {
if (isNoopRecovery(primaryStore, storeFilesMetaData)) {
return Long.MAX_VALUE;
} else {
long sizeMatched = 0;
Expand All @@ -380,6 +375,34 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St
}
}

/**
* Is a "noop recovery", which means expecting no operations to recover (though with sync-id, we could in principle still
* have a few).
*/
private static boolean isNoopRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) {
// keeping syncIdMatch for 7.x to remain backwards compatible with pre-7.3 versions, but will remove for 8.0.
return syncIdMatch(primaryStore, candidateStore)
|| noopMatch(primaryStore, candidateStore);
}

private static boolean syncIdMatch(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) {
String primarySyncId = primaryStore.syncId();
String replicaSyncId = candidateStore.syncId();
return (replicaSyncId != null && replicaSyncId.equals(primarySyncId));
}

private static boolean noopMatch(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) {
// We need the maxSeqNo conditions until we support non-noop recovery for closed indices (and preferably also have
// retention leases in place to ensure ops based recovery will actually be performed).
return primaryStore.hasSeqNoInfo()
&& primaryStore.maxSeqNo() == candidateStore.maxSeqNo()
&& primaryStore.provideRecoverySeqNo() <= candidateStore.requireRecoverySeqNo()
&& candidateStore.requireRecoverySeqNo() == primaryStore.maxSeqNo() + 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need the last condition?

}

protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);

/**
Expand Down Expand Up @@ -418,7 +441,10 @@ public DiscoveryNode getNodeWithHighestMatch() {
return this.nodeWithHighestMatch;
}

public boolean isNodeMatchBySyncID(DiscoveryNode node) {
/**
* Is supplied node a no-operations recovery, either sync-id match or sequence number match.
*/
public boolean isNoopRecovery(DiscoveryNode node) {
return nodesToSize.get(node) == Long.MAX_VALUE;
}

Expand Down
Loading