Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,40 @@
*/
package org.elasticsearch.indices.state;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Rebalance;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static java.util.Collections.singletonList;
import static org.elasticsearch.indices.state.CloseIndexIT.assertException;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
Expand All @@ -50,36 +61,52 @@
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return singletonList(MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 10)
.put(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
.put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1)
.build();
}

@Override
protected int numberOfReplicas() {
return 1;
protected int maximumNumberOfShards() {
return 3;
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37274")
@TestLogging("org.elasticsearch.cluster.metadata.MetaDataIndexStateService:DEBUG,org.elasticsearch.action.admin.indices.close:DEBUG")
public void testCloseWhileRelocatingShards() throws Exception {
final String[] indices = new String[randomIntBetween(1, 3)];
final String[] indices = new String[randomIntBetween(3, 5)];
final Map<String, Long> docsPerIndex = new HashMap<>();
final Map<String, BackgroundIndexer> indexers = new HashMap<>();

for (int i = 0; i < indices.length; i++) {
final String indexName = "index-" + i;
createIndex(indexName);

final String indexName = "index-" + i;
int nbDocs = 0;
if (randomBoolean()) {
nbDocs = randomIntBetween(1, 20);
for (int j = 0; j < nbDocs; j++) {
IndexResponse indexResponse = client().prepareIndex(indexName, "_doc").setSource("num", j).get();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
switch (i) {
case 0:
logger.debug("creating empty index {}", indexName);
createIndex(indexName);
break;
case 1:
nbDocs = scaledRandomIntBetween(1, 100);
logger.debug("creating index {} with {} documents", indexName, nbDocs);
createIndex(indexName);
indexRandom(randomBoolean(), IntStream.range(0, nbDocs)
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n))
.collect(Collectors.toList()));
break;
default:
logger.debug("creating index {} with background indexing", indexName);
final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), -1, 1);
indexers.put(indexName, indexer);
waitForDocs(1, indexer);
}
docsPerIndex.put(indexName, (long) nbDocs);
indices[i] = indexName;
Expand All @@ -88,60 +115,72 @@ public void testCloseWhileRelocatingShards() throws Exception {
ensureGreen(indices);
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.toString())));
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())));

// start some concurrent indexing threads
final Map<String, BackgroundIndexer> indexers = new HashMap<>();
for (final String index : indices) {
if (randomBoolean()) {
final BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), -1, scaledRandomIntBetween(1, 3));
waitForDocs(1, indexer);
indexers.put(index, indexer);
}
}
final String targetNode = internalCluster().startDataOnlyNode();
ensureClusterSizeConsistency(); // wait for the master to finish processing join.

final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
final String newNode = internalCluster().startDataOnlyNode();
try {
final CountDownLatch latch = new CountDownLatch(1);
final List<Thread> threads = new ArrayList<>();

// start shards relocating threads
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (final String indexToRelocate : indices) {
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexToRelocate);
for (int i = 0; i < getNumShards(indexToRelocate).numPrimaries; i++) {
final int shardId = i;
ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard();
assertTrue(primary.started());
ShardRouting replica = indexRoutingTable.shard(shardId).replicaShards().iterator().next();
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
final CountDownLatch latch = new CountDownLatch(indices.length);
final CountDownLatch release = new CountDownLatch(1);

// relocate one shard for every index to be closed
final AllocationCommands commands = new AllocationCommands();
for (final String index : indices) {
final NumShards numShards = getNumShards(index);
final int shardId = numShards.numPrimaries == 1 ? 0 : randomIntBetween(0, numShards.numPrimaries - 1);
final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);

final ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard();
assertTrue(primary.started());

String currentNodeId = primary.currentNodeId();
if (numShards.numReplicas > 0) {
final ShardRouting replica = indexRoutingTable.shard(shardId).replicaShards().iterator().next();
assertTrue(replica.started());
if (randomBoolean()) {
currentNodeId = replica.currentNodeId();
}
}

final String currentNodeId = randomBoolean() ? primary.currentNodeId() : replica.currentNodeId();
assertNotNull(currentNodeId);

final Thread thread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
final DiscoveryNode sourceNode = clusterService.state().nodes().resolveNode(primary.currentNodeId());
((MockTransportService) internalCluster().getInstance(TransportService.class, targetNode))
.addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()),
(connection, requestId, action, request, options) -> {
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just avoid relocating primaries for now until we have a fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior is the same for primary or replica (as we don't fail replicas that cannot be verified by the verify shard before close action) so I don't see why we should only relocate replicas in this test.

I think we could keep the current test and relocate primary and replicas even if some close operations are not acknowledged, and if we fix this we could simply change the test to ensure that all closes are acknowledged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

logger.debug("blocking recovery of shard {}", ((StartRecoveryRequest) request).shardId());
latch.countDown();
try {
release.await();
logger.debug("releasing recovery of shard {}", ((StartRecoveryRequest) request).shardId());
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
connection.sendRequest(requestId, action, request, options);
}
assertAcked(client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(indexToRelocate, shardId, currentNodeId, newNode)));
});
threads.add(thread);
thread.start();
}
);
commands.add(new MoveAllocationCommand(index, shardId, currentNodeId, targetNode));
}

assertAcked(client().admin().cluster().reroute(new ClusterRerouteRequest().commands(commands)).get());

// start index closing threads
final List<Thread> threads = new ArrayList<>();
for (final String indexToClose : indices) {
final Thread thread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
} finally {
release.countDown();
}
// Closing is not always acknowledged when shards are relocating: this is the case when the target shard is initializing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to improve this, can you add it as an item to the meta issue?

I wonder if we can fix this by acquiring an operation permit for each batch of operations that we send as part of phase 2 during peer recovery, and then also check whether there's a read-only block under the permit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to improve this, can you add it as an item to the meta issue?

Done.

I wonder if we can fix this by acquiring an operation permit for each batch of operations that we send as part of phase 2 during peer recovery, and then also check whether there's a read-only block under the permit.

I'm not sure to see how it would fix the issue: acquiring an operation permit for a batch of operations does not ensure that all operations have been recovered at the time the verify shard before close action is executed. Or are you thinking of failing the recovery because of the block detected under the permit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I thought about failing the recovery in case where the block suddenly appears during the recovery. This has some other adverse consequences though. Needs more thought

// or is catching up operations. In these cases the TransportVerifyShardBeforeCloseAction will detect that the global
// and max sequence number don't match and will not ack the close.
AcknowledgedResponse closeResponse = client().admin().indices().prepareClose(indexToClose).get();
if (closeResponse.isAcknowledged()) {
assertTrue("Index closing should not be acknowledged twice", acknowledgedCloses.add(indexToClose));
Expand All @@ -155,6 +194,7 @@ public void testCloseWhileRelocatingShards() throws Exception {
for (Thread thread : threads) {
thread.join();
}

for (Map.Entry<String, BackgroundIndexer> entry : indexers.entrySet()) {
final BackgroundIndexer indexer = entry.getValue();
indexer.setAssertNoFailuresOnStop(false);
Expand All @@ -172,7 +212,8 @@ public void testCloseWhileRelocatingShards() throws Exception {
}
} finally {
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey())));
.setTransientSettings(Settings.builder()
.putNull(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey())));
}

for (String index : indices) {
Expand Down