Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
*/
protected abstract void reroute(ShardId shardId, String reason);

/**
* Clear cache for node, ensuring next fetch will fetch a fresh copy.
*/
synchronized void clearCacheForNode(String nodeId) {
cache.remove(nodeId);
}

/**
* Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
* it nodes that are no longer part of the state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -27,6 +28,7 @@
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -37,14 +39,19 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.AsyncShardFetch.Lister;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class GatewayAllocator {

Expand All @@ -59,6 +66,7 @@ public class GatewayAllocator {
asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, AsyncShardFetch<NodeStoreFilesMetaData>>
asyncFetchStore = ConcurrentCollections.newConcurrentMap();
private Set<String> lastSeenEphemeralIds = Collections.emptySet();

@Inject
public GatewayAllocator(RerouteService rerouteService, NodeClient client) {
Expand Down Expand Up @@ -109,6 +117,7 @@ public void applyFailedShards(final RoutingAllocation allocation, final List<Fai
public void allocateUnassigned(final RoutingAllocation allocation) {
assert primaryShardAllocator != null;
assert replicaShardAllocator != null;
ensureAsyncFetchStorePrimaryRecency(allocation);
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator);
}

Expand Down Expand Up @@ -138,6 +147,43 @@ public AllocateUnassignedDecision decideUnassignedShardAllocation(ShardRouting u
}
}

/**
* Clear the fetched data for the primary to ensure we do not cancel recoveries based on excessively stale data.
*/
private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) {
DiscoveryNodes nodes = allocation.nodes();
if (hasNewNodes(nodes)) {
final Set<String> newEphemeralIds = StreamSupport.stream(nodes.getDataNodes().spliterator(), false)
.map(node -> node.value.getEphemeralId()).collect(Collectors.toSet());
// Invalidate the cache if a data node has been added to the cluster. This ensures that we do not cancel a recovery if a node
// drops out, we fetch the shard data, then some indexing happens and then the node rejoins the cluster again. There are other
// ways we could decide to cancel a recovery based on stale data (e.g. changing allocation filters or a primary failure) but
// making the wrong decision here is not catastrophic so we only need to cover the common case.
logger.trace(() -> new ParameterizedMessage(
"new nodes {} found, clearing primary async-fetch-store cache", Sets.difference(newEphemeralIds, lastSeenEphemeralIds)));
asyncFetchStore.values().forEach(fetch -> clearCacheForPrimary(fetch, allocation));
// recalc to also (lazily) clear out old nodes.
this.lastSeenEphemeralIds = newEphemeralIds;
}
}

private static void clearCacheForPrimary(AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch,
RoutingAllocation allocation) {
ShardRouting primary = allocation.routingNodes().activePrimary(fetch.shardId);
if (primary != null) {
fetch.clearCacheForNode(primary.currentNodeId());
}
}

private boolean hasNewNodes(DiscoveryNodes nodes) {
for (ObjectObjectCursor<String, DiscoveryNode> node : nodes.getDataNodes()) {
if (lastSeenEphemeralIds.contains(node.value.getEphemeralId()) == false) {
return true;
}
}
return false;
}

class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {

InternalAsyncFetch(Logger logger, String type, ShardId shardId, Lister<? extends BaseNodesResponse<T>, T> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class AsyncShardFetchTests extends ESTestCase {
private final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
private final Response response1 = new Response(node1);
private final Response response1_2 = new Response(node1);
private final Throwable failure1 = new Throwable("simulated failure 1");
private final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
Expand Down Expand Up @@ -274,6 +275,85 @@ public void testTwoNodesAddedInBetween() throws Exception {
assertThat(fetchData.getData().get(node2), sameInstance(response2));
}

public void testClearCache() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build();
test.addSimulation(node1.getId(), response1);

// must work also with no data
test.clearCacheForNode(node1.getId());

// no fetched data, request still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));

// verify we get back right data from node
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));

// second fetch gets same data
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));

test.clearCacheForNode(node1.getId());

// prepare next request
test.addSimulation(node1.getId(), response1_2);

// no fetched data, new request on going
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));

// verify we get new data back
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));
}

public void testConcurrentRequestAndClearCache() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build();
test.addSimulation(node1.getId(), response1);

// no fetched data, request still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

// clear cache while request is still on going, before it is processed
test.clearCacheForNode(node1.getId());

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));

// prepare next request
test.addSimulation(node1.getId(), response1_2);

// verify still no fetched data, request still on going
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));

// verify we get new data back
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));

}

static class TestFetch extends AsyncShardFetch<Response> {

static class Entry {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.gateway;

import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ReplicaShardAllocatorIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
}

public void testRecentPrimaryInformation() throws Exception {
String indexName = "test";
String nodeWithPrimary = internalCluster().startNode();
assertAcked(
client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1ms")));
String nodeWithReplica = internalCluster().startDataOnlyNode();
Settings nodeWithReplicaSettings = internalCluster().dataPathSettings(nodeWithReplica);
ensureGreen(indexName);
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList()));
assertBusy(() -> {
SyncedFlushResponse syncedFlushResponse = client().admin().indices().prepareSyncedFlush(indexName).get();
assertThat(syncedFlushResponse.successfulShards(), equalTo(2));
});
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithReplica));
if (randomBoolean()) {
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList()));
}
CountDownLatch blockRecovery = new CountDownLatch(1);
CountDownLatch recoveryStarted = new CountDownLatch(1);
MockTransportService transportServiceOnPrimary
= (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary);
transportServiceOnPrimary.addSendBehavior((connection, requestId, action, request, options) -> {
if (PeerRecoveryTargetService.Actions.FILES_INFO.equals(action)) {
recoveryStarted.countDown();
try {
blockRecovery.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
connection.sendRequest(requestId, action, request, options);
});
String newNode = internalCluster().startDataOnlyNode();
recoveryStarted.await();
// destroy sync_id after the recovery on the new node has started
client().admin().indices().prepareFlush(indexName).setForce(true).get();
// AllocationService only calls GatewayAllocator if there are unassigned shards
assertAcked(client().admin().indices().prepareCreate("dummy-index").setWaitForActiveShards(0)
.setSettings(Settings.builder().put("index.routing.allocation.require.attr", "not-found")));
internalCluster().startDataOnlyNode(nodeWithReplicaSettings);
// need to wait for events to ensure the reroute has happened since we perform it async when a new node joins.
client().admin().cluster().prepareHealth(indexName).setWaitForYellowStatus().setWaitForEvents(Priority.LANGUID).get();
blockRecovery.countDown();
ensureGreen(indexName);
assertThat(internalCluster().nodesInclude(indexName), hasItem(newNode));
transportServiceOnPrimary.clearAllRules();
}
}