From 98645137d8b84f62e55ba9bb152673115b113cbb Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 13 Nov 2018 16:02:08 +0100 Subject: [PATCH 1/4] [RCI] Expose all permits acquisition in IndexShard and TransportReplicationAction This commit adds the acquirePrimaryAllOperationsPermits() and the acquireReplicaAllOperationsPermits() methods to the IndexShard class. These methods allow to acquire all operations permits on a primary or a replica shard and can be used in future transport replication actions to acquire all permits instead of a single one. Related to elastic #33888 --- .../TransportReplicationAction.java | 93 +-- .../elasticsearch/index/shard/IndexShard.java | 188 +++--- .../TransportReplicationActionTests.java | 8 +- ...ReplicationAllPermitsAcquisitionTests.java | 534 ++++++++++++++++++ .../index/shard/IndexShardTests.java | 235 ++++++-- 5 files changed, 892 insertions(+), 166 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 38972a7f77462..691d2e4939b30 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -334,7 +334,38 @@ class AsyncPrimaryAction extends AbstractRunnable implements ActionListener onReferenceAcquired = this; + acquirePrimaryOperationPermit(indexShard, request, new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable)); + } + + @Override + public void onFailure(Exception e) { + onReferenceAcquired.onFailure(e); + } + }); } @Override @@ -587,7 +618,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; final ShardId shardId = request.shardId(); assert shardId != null : "request shardId must be set"; - this.replica = getIndexShard(shardId); + this.replica = getIndexShard(shardId, targetAllocationID); } @Override @@ -660,10 +691,10 @@ protected void doRun() throws Exception { setPhase(task, "replica"); final String actualAllocationId = this.replica.routingEntry().allocationId().getId(); if (actualAllocationId.equals(targetAllocationID) == false) { - throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID, + throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID, actualAllocationId); } - replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request); + acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); } /** @@ -697,7 +728,7 @@ public void onFailure(Exception e) { } } - protected IndexShard getIndexShard(ShardId shardId) { + protected IndexShard getIndexShard(final ShardId shardId, final String targetAllocationID) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); } @@ -938,42 +969,26 @@ void retryBecauseUnavailable(ShardId shardId, String message) { } /** - * Tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally - * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}). + * Executes the logic for acquiring one or more operation permit on a primary shard. The default is to acquire a single permit but this + * method can be overridden to acquire more. */ - private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm, - ActionListener onReferenceAcquired, Object debugInfo) { - IndexShard indexShard = getIndexShard(shardId); - // we may end up here if the cluster state used to route the primary is so stale that the underlying - // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails - // the replica will take over and a replica will be assigned to the first node. - if (indexShard.routingEntry().primary() == false) { - throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(), - "actual shard is not a primary " + indexShard.routingEntry()); - } - final String actualAllocationId = indexShard.routingEntry().allocationId().getId(); - if (actualAllocationId.equals(allocationId) == false) { - throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId); - } - final long actualTerm = indexShard.getPendingPrimaryTerm(); - if (actualTerm != primaryTerm) { - throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId, - primaryTerm, actualTerm); - } - - ActionListener onAcquired = new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable)); - } - - @Override - public void onFailure(Exception e) { - onReferenceAcquired.onFailure(e); - } - }; + protected void acquirePrimaryOperationPermit(final IndexShard primary, + final Request request, + final ActionListener onAcquired) { + primary.acquirePrimaryOperationPermit(onAcquired, executor, request); + } - indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo); + /** + * Executes the logic for acquiring one or more operation permit on a replica shard. The default is to acquire a single permit but this + * method can be overridden to acquire more. + */ + protected void acquireReplicaOperationPermit(final IndexShard replica, + final ReplicaRequest request, + final ActionListener onAcquired, + final long primaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes) { + replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request); } class ShardReference implements Releasable { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0638ce32a147c..b6594d33c8c26 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2302,7 +2302,18 @@ public void acquirePrimaryOperationPermit(ActionListener onPermitAcq indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo); } - private void bumpPrimaryTerm(long newPrimaryTerm, final CheckedRunnable onBlocked) { + /** + * Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called. + * It is the responsibility of the caller to close the {@link Releasable}. + */ + public void acquirePrimaryAllOperationsPermits(final ActionListener onPermitAcquired, final TimeValue timeout) { + verifyNotClosed(); + assert shardRouting.primary() : "acquirePrimaryAllOperationsPermits should only be called on primary shard: " + shardRouting; + + indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit()); + } + + private void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable onBlocked) { assert Thread.holdsLock(mutex); assert newPrimaryTerm > pendingPrimaryTerm; assert operationPrimaryTerm <= pendingPrimaryTerm; @@ -2337,31 +2348,11 @@ public void onResponse(final Releasable releasable) { termUpdated.countDown(); } - /** - * Acquire a replica operation permit whenever the shard is ready for indexing (see - * {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in - * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an - * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified - * name. - * - * @param opPrimaryTerm the operation primary term - * @param globalCheckpoint the global checkpoint associated with the request - * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary - * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} - * @param onPermitAcquired the listener for permit acquisition - * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed - * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are - * enabled the tracing will capture the supplied object's {@link Object#toString()} value. - * Otherwise the object isn't used - */ - public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, - final ActionListener onPermitAcquired, final String executorOnDelay, - final Object debugInfo) { - verifyNotClosed(); + private void updatePrimaryTermIfNeeded(final long opPrimaryTerm, final long globalCheckpoint) { if (opPrimaryTerm > pendingPrimaryTerm) { synchronized (mutex) { if (opPrimaryTerm > pendingPrimaryTerm) { - IndexShardState shardState = state(); + final IndexShardState shardState = state(); // only roll translog and update primary term if shard has made it past recovery // Having a new primary term here means that the old primary failed and that there is a new primary, which again // means that the master will fail this shard as all initializing shards are failed when a primary is selected @@ -2373,58 +2364,121 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g if (opPrimaryTerm > pendingPrimaryTerm) { bumpPrimaryTerm(opPrimaryTerm, () -> { - updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); - final long currentGlobalCheckpoint = getGlobalCheckpoint(); - final long maxSeqNo = seqNoStats().getMaxSeqNo(); - logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", - opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); - if (currentGlobalCheckpoint < maxSeqNo) { - resetEngineToGlobalCheckpoint(); - } else { - getEngine().rollTranslogGeneration(); - } + updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); + final long currentGlobalCheckpoint = getGlobalCheckpoint(); + final long maxSeqNo = seqNoStats().getMaxSeqNo(); + logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", + opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); + if (currentGlobalCheckpoint < maxSeqNo) { + resetEngineToGlobalCheckpoint(); + } else { + getEngine().rollTranslogGeneration(); + } }); } } } } - assert opPrimaryTerm <= pendingPrimaryTerm - : "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]"; - indexShardOperationPermits.acquire( - new ActionListener() { - @Override - public void onResponse(final Releasable releasable) { - if (opPrimaryTerm < operationPrimaryTerm) { - releasable.close(); - final String message = String.format( - Locale.ROOT, - "%s operation primary term [%d] is too old (current [%d])", - shardId, - opPrimaryTerm, - operationPrimaryTerm); - onPermitAcquired.onFailure(new IllegalStateException(message)); - } else { - assert assertReplicationTarget(); - try { - updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); - advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); - } catch (Exception e) { - releasable.close(); - onPermitAcquired.onFailure(e); - return; - } - onPermitAcquired.onResponse(releasable); - } - } + : "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]"; + } - @Override - public void onFailure(final Exception e) { - onPermitAcquired.onFailure(e); + /** + * Creates a new action listener which verifies that the operation primary term is not too old. If the given primary + * term is lower than the current one, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with + * an {@link IllegalStateException}. Otherwise the global checkpoint and the max_seq_no_of_updates marker of the replica are updated + * before the invocation of the {@link ActionListener#onResponse(Object)}} method of the provided listener. + */ + private ActionListener createListener(final ActionListener listener, + final long opPrimaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes) { + return new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + if (opPrimaryTerm < operationPrimaryTerm) { + releasable.close(); + final String message = String.format( + Locale.ROOT, + "%s operation primary term [%d] is too old (current [%d])", + shardId, + opPrimaryTerm, + operationPrimaryTerm); + listener.onFailure(new IllegalStateException(message)); + } else { + assert assertReplicationTarget(); + try { + updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); + advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); + } catch (final Exception e) { + releasable.close(); + listener.onFailure(e); + return; } - }, - executorOnDelay, - true, debugInfo); + listener.onResponse(releasable); + } + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + }; + } + + /** + * Acquire a replica operation permit whenever the shard is ready for indexing (see + * {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in + * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an + * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified + * name. + * + * @param opPrimaryTerm the operation primary term + * @param globalCheckpoint the global checkpoint associated with the request + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary + * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} + * @param onPermitAcquired the listener for permit acquisition + * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are + * enabled the tracing will capture the supplied object's {@link Object#toString()} value. + * Otherwise the object isn't used + */ + public void acquireReplicaOperationPermit(final long opPrimaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes, + final ActionListener onPermitAcquired, + final String executorOnDelay, + final Object debugInfo) { + verifyNotClosed(); + updatePrimaryTermIfNeeded(opPrimaryTerm, globalCheckpoint); + + ActionListener listener = createListener(onPermitAcquired, opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); + indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo); + } + + /** + * Acquire all replica operation permits whenever the shard is ready for indexing (see + * {@link #acquirePrimaryAllOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in + * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an + * {@link IllegalStateException}. + * + * @param opPrimaryTerm the operation primary term + * @param globalCheckpoint the global checkpoint associated with the request + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary + * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} + * @param onPermitAcquired the listener for permit acquisition + * @param timeout the maximum time to wait for the in-flight operations block + */ + public void acquireReplicaAllOperationsPermits(final long opPrimaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes, + final ActionListener onPermitAcquired, + final TimeValue timeout) { + verifyNotClosed(); + updatePrimaryTermIfNeeded(opPrimaryTerm, globalCheckpoint); + + ActionListener listener = createListener(onPermitAcquired, opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); + indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()); } public int getActiveOperationsCount() { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index c8c40a7f5841a..0b07b8bba338f 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -784,7 +784,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception { new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool) { @Override - protected IndexShard getIndexShard(ShardId shardId) { + protected IndexShard getIndexShard(ShardId shardId, String targetAllocationId) { return shard; } }; @@ -949,11 +949,11 @@ action.new PrimaryOperationTransportHandler().messageReceived( logger.debug("got exception:" , throwable); assertTrue(throwable.getClass() + " is not a retry exception", action.retryPrimaryException(throwable)); if (wrongAllocationId) { - assertThat(throwable.getMessage(), containsString("expected aID [_not_a_valid_aid_] but found [" + + assertThat(throwable.getMessage(), containsString("expected allocation id [_not_a_valid_aid_] but found [" + primary.allocationId().getId() + "]")); } else { - assertThat(throwable.getMessage(), containsString("expected aID [" + primary.allocationId().getId() + "] with term [" + - requestTerm + "] but found [" + primaryTerm + "]")); + assertThat(throwable.getMessage(), containsString("expected allocation id [" + primary.allocationId().getId() + + "] with term [" + requestTerm + "] but found [" + primaryTerm + "]")); } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java new file mode 100644 index 0000000000000..0696c5e4e0b15 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -0,0 +1,534 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.support.replication; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_INDEX_UUID; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTestCase { + + private ClusterService clusterService; + private TransportService transportService; + private ShardStateAction shardStateAction; + private ShardId shardId; + private IndexShard primary; + private IndexShard replica; + private boolean globalBlock; + private ClusterBlock block; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + globalBlock = randomBoolean(); + RestStatus restStatus = randomFrom(RestStatus.values()); + block = new ClusterBlock(randomIntBetween(1, 10), randomAlphaOfLength(5), false, true, false, restStatus, ClusterBlockLevel.ALL); + clusterService = createClusterService(threadPool); + + final ClusterState.Builder state = ClusterState.builder(clusterService.state()); + Set roles = new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())); + DiscoveryNode node1 = new DiscoveryNode("_name1", "_node1", buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("_name2", "_node2", buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); + state.nodes(DiscoveryNodes.builder() + .add(node1) + .add(node2) + .localNodeId(node1.getId()) + .masterNodeId(node1.getId())); + + shardId = new ShardId("index", UUID.randomUUID().toString(), 0); + ShardRouting shardRouting = + newShardRouting(shardId, node1.getId(), true, ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); + + Settings indexSettings = Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_INDEX_UUID, shardId.getIndex().getUUID()) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + .build(); + + primary = newStartedShard(p -> newShard(shardRouting, indexSettings, new InternalEngineFactory()), true); + for (int i = 0; i < 10; i++) { + final String id = Integer.toString(i); + indexDoc(primary, "_doc", id, "{\"value\":" + id + "}"); + } + + IndexMetaData indexMetaData = IndexMetaData.builder(shardId.getIndexName()) + .settings(indexSettings) + .primaryTerm(shardId.id(), primary.getOperationPrimaryTerm()) + .putMapping("_doc","{ \"properties\": { \"value\": { \"type\": \"short\"}}}") + .build(); + state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded()); + + replica = newShard(primary.shardId(), false, node2.getId(), indexMetaData, null); + recoverReplica(replica, primary, true); + + IndexRoutingTable.Builder routing = IndexRoutingTable.builder(indexMetaData.getIndex()); + routing.addIndexShard(new IndexShardRoutingTable.Builder(shardId) + .addShard(primary.routingEntry()) + .build()); + state.routingTable(RoutingTable.builder().add(routing.build()).build()); + + setState(clusterService, state.build()); + + final Settings transportSettings = Settings.builder().put("node.name", node1.getId()).build(); + transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, threadPool, null); + transportService.start(); + transportService.acceptIncomingRequests(); + shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); + } + + @Override + @After + public void tearDown() throws Exception { + closeShards(primary, replica); + transportService.stop(); + clusterService.close(); + super.tearDown(); + } + + public void testTransportReplicationActionWithAllPermits() throws Exception { + final int numOperations = scaledRandomIntBetween(4, 32); + final int delayedOperations = randomIntBetween(1, numOperations); + logger.trace("starting [{}] operations, among which the first [{}] started ops should be blocked by [{}]", + numOperations, delayedOperations, block); + + final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(delayedOperations + 1); + final List threads = new ArrayList<>(delayedOperationsBarrier.getParties()); + + @SuppressWarnings("unchecked") + final PlainActionFuture[] futures = new PlainActionFuture[numOperations]; + final TestAction[] actions = new TestAction[numOperations]; + + for (int i = 0; i < numOperations; i++) { + final int threadId = i; + final boolean delayed = (threadId < delayedOperations); + + final PlainActionFuture listener = new PlainActionFuture<>(); + futures[threadId] = listener; + + // An action with blocks which acquires a single operation permit during execution + final TestAction singlePermitAction = new TestAction(Settings.EMPTY, "internal:singlePermitWithBlocks[" + threadId + "]", + transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, false, Optional.of(globalBlock)); + actions[threadId] = singlePermitAction; + + Thread thread = new Thread(() -> { + TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = + singlePermitAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(listener), null) { + @Override + protected void doRun() throws Exception { + if (delayed) { + logger.trace("op [{}] has started and will resume execution once allPermitsAction is terminated", threadId); + delayedOperationsBarrier.await(); + } + super.doRun(); + } + + @Override + public void onResponse(final TransportReplicationAction.PrimaryShardReference reference) { + assertThat(reference.indexShard.getActiveOperationsCount(), greaterThan(0)); + assertSame(primary, reference.indexShard); + assertBlockIsPresentForDelayedOp(); + super.onResponse(reference); + } + + @Override + public void onFailure(Exception e) { + assertBlockIsPresentForDelayedOp(); + super.onFailure(e); + } + + private void assertBlockIsPresentForDelayedOp() { + if (delayed) { + final ClusterState clusterState = clusterService.state(); + if (globalBlock) { + assertTrue("Global block must exist", clusterState.blocks().hasGlobalBlock(block)); + } else { + String indexName = primary.shardId().getIndexName(); + assertTrue("Index block must exist", clusterState.blocks().hasIndexBlock(indexName, block)); + } + } + } + }; + asyncPrimaryAction.run(); + }); + threads.add(thread); + thread.start(); + } + + logger.trace("now starting the operation that acquires all permits and sets the block in the cluster state"); + + // An action which acquires all operation permits during execution and set a block + final TestAction allPermitsAction = new TestAction(Settings.EMPTY, "internal:allPermits", transportService, clusterService, + shardStateAction, threadPool, shardId, primary, replica, true, Optional.empty()); + + final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); + Thread thread = new Thread(() -> { + TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = + allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(allPermitFuture), null) { + @Override + public void onResponse(TransportReplicationAction.PrimaryShardReference reference) { + assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); + assertSame(primary, reference.indexShard); + + final ClusterState clusterState = clusterService.state(); + final ClusterBlocks.Builder blocks = ClusterBlocks.builder(); + if (globalBlock) { + assertFalse("Global block must not exist yet", clusterState.blocks().hasGlobalBlock(block)); + blocks.addGlobalBlock(block); + } else { + String indexName = reference.indexShard.shardId().getIndexName(); + assertFalse("Index block must not exist yet", clusterState.blocks().hasIndexBlock(indexName, block)); + blocks.addIndexBlock(indexName, block); + } + + logger.trace("adding test block to cluster state {}", block); + setState(clusterService, ClusterState.builder(clusterState).blocks(blocks)); + + try { + logger.trace("releasing delayed operations"); + delayedOperationsBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + onFailure(e); + } + super.onResponse(reference); + } + }; + asyncPrimaryAction.run(); + }); + threads.add(thread); + thread.start(); + + logger.trace("waiting for all operations to terminate"); + for (Thread t : threads) { + t.join(); + } + + final Response allPermitsResponse = allPermitFuture.get(); + assertSuccessfulOperation(allPermitsAction, allPermitsResponse); + + for (int i = 0; i < numOperations; i++) { + final PlainActionFuture future = futures[i]; + final TestAction action = actions[i]; + + if (i < delayedOperations) { + ExecutionException exception = expectThrows(ExecutionException.class, "delayed operation should have failed", future::get); + assertFailedOperation(action, exception); + } else { + // non delayed operation might fail depending on the order they were executed + try { + assertSuccessfulOperation(action, futures[i].get()); + } catch (final ExecutionException e) { + assertFailedOperation(action, e); + } + } + } + } + + private void assertSuccessfulOperation(final TestAction action, final Response response) { + final String name = action.getActionName(); + assertThat(name + " operation should have been executed on primary", action.executedOnPrimary.get(), is(true)); + assertThat(name + " operation should have been executed on replica", action.executedOnReplica.get(), is(true)); + assertThat(name + " operation must have a non null result", response, notNullValue()); + assertThat(name + " operation should have been successful on 2 shards", response.getShardInfo().getSuccessful(), equalTo(2)); + } + + private void assertFailedOperation(final TestAction action,final ExecutionException exception) { + final String name = action.getActionName(); + assertThat(name + " operation should not have been executed on primary", action.executedOnPrimary.get(), nullValue()); + assertThat(name + " operation should not have been executed on replica", action.executedOnReplica.get(), nullValue()); + assertThat(exception.getCause(), instanceOf(ClusterBlockException.class)); + ClusterBlockException clusterBlockException = (ClusterBlockException) exception.getCause(); + assertThat(clusterBlockException.blocks(), hasItem(equalTo(block))); + } + + private long primaryTerm() { + return primary.getOperationPrimaryTerm(); + } + + private String allocationId() { + return primary.routingEntry().allocationId().getId(); + } + + private Request request() { + return new Request().setShardId(primary.shardId()); + } + + + private class TestAction extends TransportReplicationAction { + + private final ShardId shardId; + private final IndexShard primary; + private final IndexShard replica; + private final boolean acquireAllPermits; + private final Optional globalBlock; + private final TimeValue timeout = TimeValue.timeValueSeconds(30L); + + private final SetOnce executedOnPrimary = new SetOnce<>(); + private final SetOnce executedOnReplica = new SetOnce<>(); + + TestAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ShardStateAction shardStateAction, ThreadPool threadPool, + ShardId shardId, IndexShard primary, IndexShard replica, + boolean acquireAllPermits, Optional globalBlock) { + super(settings, actionName, transportService, clusterService, null, threadPool, shardStateAction, + new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME); + this.shardId = Objects.requireNonNull(shardId); + this.primary = Objects.requireNonNull(primary); + this.replica = Objects.requireNonNull(replica); + this.acquireAllPermits = acquireAllPermits; + this.globalBlock = globalBlock; + } + + public String getActionName() { + return this.actionName; + } + + @Override + protected ClusterBlockLevel globalBlockLevel() { + if (globalBlock.isPresent()) { + return globalBlock.get() ? ClusterBlockLevel.WRITE : super.globalBlockLevel(); + } + return null; + } + + @Override + protected ClusterBlockLevel indexBlockLevel() { + if (globalBlock.isPresent()) { + return globalBlock.get() == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel(); + } + return null; + } + + @Override + protected IndexShard getIndexShard(final ShardId indexShardId, final String targetAllocationId) { + if (shardId.equals(indexShardId) == false) { + throw new AssertionError("shard id differs from " + shardId); + } + if (Objects.equals(primary.routingEntry().allocationId().getId(), targetAllocationId)) { + return primary; + } else if (Objects.equals(replica.routingEntry().allocationId().getId(), targetAllocationId)) { + return replica; + } + throw new ShardNotFoundException(shardId, "something went wrong"); + } + + @Override + protected void sendReplicaRequest(final ConcreteReplicaRequest replicaRequest, + final DiscoveryNode node, + final ActionListener listener) { + assertEquals(clusterService.state().nodes().get("_node2"), node); + ReplicaOperationTransportHandler replicaOperationTransportHandler = this.new ReplicaOperationTransportHandler(); + try { + replicaOperationTransportHandler.messageReceived(replicaRequest, new TransportChannel() { + @Override + public String getProfileName() { + return null; + } + + @Override + public String getChannelType() { + return null; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + listener.onResponse((ReplicationOperation.ReplicaResponse) response); + } + + @Override + public void sendResponse(Exception exception) throws IOException { + listener.onFailure(exception); + } + }, null); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + protected void acquirePrimaryOperationPermit(IndexShard shard, Request request, ActionListener onAcquired) { + assertTrue(shard.routingEntry().primary()); + assertSame(primary, shard); + if (acquireAllPermits) { + shard.acquirePrimaryAllOperationsPermits(onAcquired, timeout); + } else { + super.acquirePrimaryOperationPermit(shard, request, onAcquired); + } + } + + @Override + protected void acquireReplicaOperationPermit(IndexShard shard, Request request, ActionListener onAcquired, + long primaryTerm, long globalCheckpoint, long maxSeqNo) { + assertFalse(shard.routingEntry().primary()); + assertSame(replica, shard); + if (acquireAllPermits) { + shard.acquireReplicaAllOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNo, onAcquired, timeout); + } else { + super.acquireReplicaOperationPermit(shard, request, onAcquired, primaryTerm, globalCheckpoint, maxSeqNo); + } + } + + @Override + protected Response newResponseInstance() { + return new Response(); + } + + @Override + protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + assertSame(primary, shard); + if (acquireAllPermits) { + assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); + } else { + assertThat(shard.getActiveOperationsCount(), greaterThan(0)); + } + assertNoBlockOnSinglePermitOps(); + executedOnPrimary.set(true); + return new PrimaryResult<>(shardRequest, new Response()); + } + + @Override + protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + assertSame(replica, shard); + if (acquireAllPermits) { + assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); + } else { + assertThat(shard.getActiveOperationsCount(), greaterThan(0)); + } + assertNoBlockOnSinglePermitOps(); + executedOnReplica.set(true); + return new ReplicaResult(); + } + + private void assertNoBlockOnSinglePermitOps() { + // When a single permit operation is executed on primary/replica shard we must be sure that the block is not here, + // otherwise something went wrong. + if (acquireAllPermits == false) { + final ClusterState clusterState = clusterService.state(); + assertFalse("Global block must not exist", clusterState.blocks().hasGlobalBlock(block)); + assertFalse("Index block must not exist", clusterState.blocks().hasIndexBlock(shardId.getIndexName(), block)); + } + } + } + + static class Request extends ReplicationRequest { + @Override + public String toString() { + return getTestClass().getName() + ".Request"; + } + } + + static class Response extends ReplicationResponse { + } + + /** + * Transport channel that is needed for replica operation testing. + */ + public TransportChannel transportChannel(final PlainActionFuture listener) { + return new TransportChannel() { + + @Override + public String getProfileName() { + return ""; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + listener.onResponse(((Response) response)); + } + + @Override + public void sendResponse(Exception exception) throws IOException { + listener.onFailure(exception); + } + + @Override + public String getChannelType() { + return "replica_test"; + } + }; + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8eede7542bd75..9ce9dad0df5f2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.elasticsearch.Assertions; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -60,6 +61,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -69,6 +71,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -124,7 +127,6 @@ import org.elasticsearch.test.FieldMaskingReader; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.ElasticsearchException; import java.io.IOException; import java.nio.charset.Charset; @@ -304,30 +306,27 @@ public void testShardStateMetaHashCodeEquals() { } - public void testClosesPreventsNewOperations() throws InterruptedException, ExecutionException, IOException { + public void testClosesPreventsNewOperations() throws Exception { IndexShard indexShard = newStartedShard(); closeShards(indexShard); assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); - try { - indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, ""); - fail("we should not be able to increment anymore"); - } catch (IndexShardClosedException e) { - // expected - } - try { - indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, - randomNonNegativeLong(), null, ThreadPool.Names.WRITE, ""); - fail("we should not be able to increment anymore"); - } catch (IndexShardClosedException e) { - // expected - } + expectThrows(IndexShardClosedException.class, + () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, "")); + expectThrows(IndexShardClosedException.class, + () -> indexShard.acquirePrimaryAllOperationsPermits(null, TimeValue.timeValueSeconds(30L))); + expectThrows(IndexShardClosedException.class, + () -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, + randomNonNegativeLong(), null, ThreadPool.Names.WRITE, "")); + expectThrows(IndexShardClosedException.class, + () -> indexShard.acquireReplicaAllOperationsPermits(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, + randomNonNegativeLong(), null, TimeValue.timeValueSeconds(30L))); } public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOException { IndexShard indexShard = newShard(false); expectThrows(IndexShardNotStartedException.class, () -> - indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100), - SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ThreadPool.Names.WRITE, "")); + randomReplicaOperationPermitAcquisition(indexShard, indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100), + SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, "")); closeShards(indexShard); } @@ -620,6 +619,106 @@ public void onFailure(Exception e) { closeShards(indexShard); } + public void testAcquirePrimaryAllOperationsPermits() throws Exception { + final IndexShard indexShard = newStartedShard(true); + assertEquals(0, indexShard.getActiveOperationsCount()); + + final CountDownLatch allPermitsAcquired = new CountDownLatch(1); + + final Thread[] threads = new Thread[randomIntBetween(2, 5)]; + final List> futures = new ArrayList<>(threads.length); + final AtomicArray> results = new AtomicArray<>(threads.length); + final CountDownLatch allOperationsDone = new CountDownLatch(threads.length); + + for (int i = 0; i < threads.length; i++) { + final int threadId = i; + final boolean singlePermit = randomBoolean(); + + final PlainActionFuture future = new PlainActionFuture() { + @Override + public void onResponse(final Releasable releasable) { + if (singlePermit) { + assertThat(indexShard.getActiveOperationsCount(), greaterThan(0)); + } else { + assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); + } + releasable.close(); + super.onResponse(releasable); + results.setOnce(threadId, Tuple.tuple(Boolean.TRUE, null)); + allOperationsDone.countDown(); + } + + @Override + public void onFailure(final Exception e) { + results.setOnce(threadId, Tuple.tuple(Boolean.FALSE, e)); + allOperationsDone.countDown(); + } + }; + futures.add(threadId, future); + + threads[threadId] = new Thread(() -> { + try { + allPermitsAcquired.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + if (singlePermit) { + indexShard.acquirePrimaryOperationPermit(future, ThreadPool.Names.WRITE, ""); + } else { + indexShard.acquirePrimaryAllOperationsPermits(future, TimeValue.timeValueHours(1L)); + } + assertEquals(0, indexShard.getActiveOperationsCount()); + }); + threads[threadId].start(); + } + + final AtomicBoolean blocked = new AtomicBoolean(); + final CountDownLatch allPermitsTerminated = new CountDownLatch(1); + + final PlainActionFuture futureAllPermits = new PlainActionFuture() { + @Override + public void onResponse(final Releasable releasable) { + try { + blocked.set(true); + allPermitsAcquired.countDown(); + super.onResponse(releasable); + allPermitsTerminated.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + indexShard.acquirePrimaryAllOperationsPermits(futureAllPermits, TimeValue.timeValueSeconds(30L)); + allPermitsAcquired.await(); + assertTrue(blocked.get()); + assertEquals(0, indexShard.getActiveOperationsCount()); + assertTrue("Expected no results, operations are blocked", results.asList().isEmpty()); + futures.forEach(future -> assertFalse(future.isDone())); + + allPermitsTerminated.countDown(); + + final Releasable allPermits = futureAllPermits.get(); + assertTrue(futureAllPermits.isDone()); + + assertTrue("Expected no results, operations are blocked", results.asList().isEmpty()); + futures.forEach(future -> assertFalse(future.isDone())); + + Releasables.close(allPermits); + allOperationsDone.await(); + for (Thread thread : threads) { + thread.join(); + } + + futures.forEach(future -> assertTrue(future.isDone())); + assertEquals(threads.length, results.asList().size()); + results.asList().forEach(result -> { + assertTrue(result.v1()); + assertNull(result.v2()); + }); + + closeShards(indexShard); + } + private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); @@ -676,10 +775,14 @@ public void testOperationPermitOnReplicaShards() throws Exception { assertEquals(0, indexShard.getActiveOperationsCount()); if (shardRouting.primary() == false && Assertions.ENABLED) { - final AssertionError e = + AssertionError e = expectThrows(AssertionError.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, "")); assertThat(e, hasToString(containsString("acquirePrimaryOperationPermit should only be called on primary shard"))); + + e = expectThrows(AssertionError.class, + () -> indexShard.acquirePrimaryAllOperationsPermits(null, TimeValue.timeValueSeconds(30L))); + assertThat(e, hasToString(containsString("acquirePrimaryAllOperationsPermits should only be called on primary shard"))); } final long primaryTerm = indexShard.getPendingPrimaryTerm(); @@ -697,34 +800,6 @@ public void testOperationPermitOnReplicaShards() throws Exception { operation2 = null; } - { - final AtomicBoolean onResponse = new AtomicBoolean(); - final AtomicBoolean onFailure = new AtomicBoolean(); - final AtomicReference onFailureException = new AtomicReference<>(); - ActionListener onLockAcquired = new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - onResponse.set(true); - } - - @Override - public void onFailure(Exception e) { - onFailure.set(true); - onFailureException.set(e); - } - }; - - indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, - randomNonNegativeLong(), onLockAcquired, ThreadPool.Names.WRITE, ""); - - assertFalse(onResponse.get()); - assertTrue(onFailure.get()); - assertThat(onFailureException.get(), instanceOf(IllegalStateException.class)); - assertThat( - onFailureException.get(), - hasToString(containsString("operation primary term [" + (primaryTerm - 1) + "] is too old"))); - } - { final AtomicBoolean onResponse = new AtomicBoolean(); final AtomicReference onFailure = new AtomicReference<>(); @@ -785,12 +860,12 @@ private void finish() { } }; try { - indexShard.acquireReplicaOperationPermit( + randomReplicaOperationPermitAcquisition(indexShard, newPrimaryTerm, newGlobalCheckPoint, randomNonNegativeLong(), listener, - ThreadPool.Names.SAME, ""); + ""); } catch (Exception e) { listener.onFailure(e); } @@ -837,6 +912,37 @@ private void finish() { assertEquals(0, indexShard.getActiveOperationsCount()); } + { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean onResponse = new AtomicBoolean(); + final AtomicBoolean onFailure = new AtomicBoolean(); + final AtomicReference onFailureException = new AtomicReference<>(); + ActionListener onLockAcquired = new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + onResponse.set(true); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + onFailure.set(true); + onFailureException.set(e); + latch.countDown(); + } + }; + + final long oldPrimaryTerm = indexShard.pendingPrimaryTerm - 1; + randomReplicaOperationPermitAcquisition(indexShard, oldPrimaryTerm, indexShard.getGlobalCheckpoint(), + randomNonNegativeLong(), onLockAcquired, ""); + latch.await(); + assertFalse(onResponse.get()); + assertTrue(onFailure.get()); + assertThat(onFailureException.get(), instanceOf(IllegalStateException.class)); + assertThat( + onFailureException.get(), hasToString(containsString("operation primary term [" + oldPrimaryTerm + "] is too old"))); + } + closeShards(indexShard); } @@ -848,8 +954,8 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception long newMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); PlainActionFuture fut = new PlainActionFuture<>(); - replica.acquireReplicaOperationPermit(replica.operationPrimaryTerm, replica.getGlobalCheckpoint(), - newMaxSeqNoOfUpdates, fut, ThreadPool.Names.WRITE, ""); + randomReplicaOperationPermitAcquisition(replica, replica.operationPrimaryTerm, replica.getGlobalCheckpoint(), + newMaxSeqNoOfUpdates, fut, ""); try (Releasable ignored = fut.actionGet()) { assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates))); } @@ -932,7 +1038,7 @@ public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, final Set docsBeforeRollback = getShardDocUIDs(indexShard); final CountDownLatch latch = new CountDownLatch(1); final boolean shouldRollback = Math.max(globalCheckpointOnReplica, globalCheckpoint) < maxSeqNo; - indexShard.acquireReplicaOperationPermit( + randomReplicaOperationPermitAcquisition(indexShard, indexShard.getPendingPrimaryTerm() + 1, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, @@ -947,8 +1053,7 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { } - }, - ThreadPool.Names.SAME, ""); + }, ""); latch.await(); if (shouldRollback) { @@ -999,7 +1104,7 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt && indexShard.seqNoStats().getMaxSeqNo() != SequenceNumbers.NO_OPS_PERFORMED; final Engine beforeRollbackEngine = indexShard.getEngine(); final long newMaxSeqNoOfUpdates = randomLongBetween(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), Long.MAX_VALUE); - indexShard.acquireReplicaOperationPermit( + randomReplicaOperationPermitAcquisition(indexShard, indexShard.pendingPrimaryTerm + 1, globalCheckpoint, newMaxSeqNoOfUpdates, @@ -1014,8 +1119,7 @@ public void onResponse(final Releasable releasable) { public void onFailure(final Exception e) { } - }, - ThreadPool.Names.SAME, ""); + }, ""); latch.await(); if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO && globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { @@ -3497,4 +3601,23 @@ public void testResetEngine() throws Exception { public Settings threadPoolSettings() { return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build(); } + + /** + * Randomizes the usage of {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)} and + * {@link IndexShard#acquireReplicaAllOperationsPermits(long, long, long, ActionListener, TimeValue)} in order to acquire a permit. + */ + private void randomReplicaOperationPermitAcquisition(final IndexShard indexShard, + final long opPrimaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes, + final ActionListener listener, + final String info) { + if (randomBoolean()) { + final String executor = ThreadPool.Names.WRITE; + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, executor, info); + } else { + final TimeValue timeout = TimeValue.timeValueSeconds(30L); + indexShard.acquireReplicaAllOperationsPermits(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, timeout); + } + } } From c43177ed9fba75ca0d811d54b81ad39d017a7cba Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 14 Nov 2018 18:14:40 +0100 Subject: [PATCH 2/4] Get rid of onReferenceAcquired --- .../TransportReplicationAction.java | 21 ++++++------------- ...ReplicationAllPermitsAcquisitionTests.java | 8 +++---- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 691d2e4939b30..84b750608c4dd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -313,7 +313,7 @@ public void messageReceived(ConcreteShardRequest request, TransportChan } } - class AsyncPrimaryAction extends AbstractRunnable implements ActionListener { + class AsyncPrimaryAction extends AbstractRunnable { private final Request request; // targetAllocationID of the shard this request is meant for @@ -354,22 +354,13 @@ protected void doRun() throws Exception { primaryTerm, actualTerm); } - final ActionListener onReferenceAcquired = this; - acquirePrimaryOperationPermit(indexShard, request, new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable)); - } - - @Override - public void onFailure(Exception e) { - onReferenceAcquired.onFailure(e); - } - }); + acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap( + releasable -> runWithReleasable(new PrimaryShardReference(indexShard, releasable)), + this::onFailure + )); } - @Override - public void onResponse(PrimaryShardReference primaryShardReference) { + void runWithReleasable(final PrimaryShardReference primaryShardReference) { try { final ClusterState clusterState = clusterService.state(); final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 0696c5e4e0b15..0b40158f2a7bc 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -208,11 +208,11 @@ protected void doRun() throws Exception { } @Override - public void onResponse(final TransportReplicationAction.PrimaryShardReference reference) { + void runWithReleasable(final TransportReplicationAction.PrimaryShardReference reference) { assertThat(reference.indexShard.getActiveOperationsCount(), greaterThan(0)); assertSame(primary, reference.indexShard); assertBlockIsPresentForDelayedOp(); - super.onResponse(reference); + super.runWithReleasable(reference); } @Override @@ -250,7 +250,7 @@ private void assertBlockIsPresentForDelayedOp() { TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(allPermitFuture), null) { @Override - public void onResponse(TransportReplicationAction.PrimaryShardReference reference) { + void runWithReleasable(final TransportReplicationAction.PrimaryShardReference reference) { assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); assertSame(primary, reference.indexShard); @@ -274,7 +274,7 @@ public void onResponse(TransportReplicationAction.Pr } catch (InterruptedException | BrokenBarrierException e) { onFailure(e); } - super.onResponse(reference); + super.runWithReleasable(reference); } }; asyncPrimaryAction.run(); From 493cd1655d72799f66a5c6fc30e048b6fb875c01 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 15 Nov 2018 11:20:34 +0100 Subject: [PATCH 3/4] Rename methods --- .../replication/TransportReplicationAction.java | 4 ++-- .../java/org/elasticsearch/index/shard/IndexShard.java | 6 +++--- ...TransportReplicationAllPermitsAcquisitionTests.java | 10 +++++----- .../org/elasticsearch/index/shard/IndexShardTests.java | 8 ++++---- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 84b750608c4dd..571d7e208624b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -355,12 +355,12 @@ protected void doRun() throws Exception { } acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap( - releasable -> runWithReleasable(new PrimaryShardReference(indexShard, releasable)), + releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)), this::onFailure )); } - void runWithReleasable(final PrimaryShardReference primaryShardReference) { + void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) { try { final ClusterState clusterState = clusterService.state(); final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b6594d33c8c26..11e001c3fdaf9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2306,9 +2306,9 @@ public void acquirePrimaryOperationPermit(ActionListener onPermitAcq * Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called. * It is the responsibility of the caller to close the {@link Releasable}. */ - public void acquirePrimaryAllOperationsPermits(final ActionListener onPermitAcquired, final TimeValue timeout) { + public void acquireAllPrimaryOperationsPermits(final ActionListener onPermitAcquired, final TimeValue timeout) { verifyNotClosed(); - assert shardRouting.primary() : "acquirePrimaryAllOperationsPermits should only be called on primary shard: " + shardRouting; + assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting; indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit()); } @@ -2458,7 +2458,7 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, /** * Acquire all replica operation permits whenever the shard is ready for indexing (see - * {@link #acquirePrimaryAllOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in + * {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an * {@link IllegalStateException}. * diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 0b40158f2a7bc..4f03f0bc6813e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -208,11 +208,11 @@ protected void doRun() throws Exception { } @Override - void runWithReleasable(final TransportReplicationAction.PrimaryShardReference reference) { + void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) { assertThat(reference.indexShard.getActiveOperationsCount(), greaterThan(0)); assertSame(primary, reference.indexShard); assertBlockIsPresentForDelayedOp(); - super.runWithReleasable(reference); + super.runWithPrimaryShardReference(reference); } @Override @@ -250,7 +250,7 @@ private void assertBlockIsPresentForDelayedOp() { TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(allPermitFuture), null) { @Override - void runWithReleasable(final TransportReplicationAction.PrimaryShardReference reference) { + void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) { assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); assertSame(primary, reference.indexShard); @@ -274,7 +274,7 @@ void runWithReleasable(final TransportReplicationAction.PrimaryShardReference re } catch (InterruptedException | BrokenBarrierException e) { onFailure(e); } - super.runWithReleasable(reference); + super.runWithPrimaryShardReference(reference); } }; asyncPrimaryAction.run(); @@ -434,7 +434,7 @@ protected void acquirePrimaryOperationPermit(IndexShard shard, Request request, assertTrue(shard.routingEntry().primary()); assertSame(primary, shard); if (acquireAllPermits) { - shard.acquirePrimaryAllOperationsPermits(onAcquired, timeout); + shard.acquireAllPrimaryOperationsPermits(onAcquired, timeout); } else { super.acquirePrimaryOperationPermit(shard, request, onAcquired); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9ce9dad0df5f2..3c8f7c9eab712 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -313,7 +313,7 @@ public void testClosesPreventsNewOperations() throws Exception { expectThrows(IndexShardClosedException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, "")); expectThrows(IndexShardClosedException.class, - () -> indexShard.acquirePrimaryAllOperationsPermits(null, TimeValue.timeValueSeconds(30L))); + () -> indexShard.acquireAllPrimaryOperationsPermits(null, TimeValue.timeValueSeconds(30L))); expectThrows(IndexShardClosedException.class, () -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ThreadPool.Names.WRITE, "")); @@ -665,7 +665,7 @@ public void onFailure(final Exception e) { if (singlePermit) { indexShard.acquirePrimaryOperationPermit(future, ThreadPool.Names.WRITE, ""); } else { - indexShard.acquirePrimaryAllOperationsPermits(future, TimeValue.timeValueHours(1L)); + indexShard.acquireAllPrimaryOperationsPermits(future, TimeValue.timeValueHours(1L)); } assertEquals(0, indexShard.getActiveOperationsCount()); }); @@ -688,7 +688,7 @@ public void onResponse(final Releasable releasable) { } } }; - indexShard.acquirePrimaryAllOperationsPermits(futureAllPermits, TimeValue.timeValueSeconds(30L)); + indexShard.acquireAllPrimaryOperationsPermits(futureAllPermits, TimeValue.timeValueSeconds(30L)); allPermitsAcquired.await(); assertTrue(blocked.get()); assertEquals(0, indexShard.getActiveOperationsCount()); @@ -781,7 +781,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { assertThat(e, hasToString(containsString("acquirePrimaryOperationPermit should only be called on primary shard"))); e = expectThrows(AssertionError.class, - () -> indexShard.acquirePrimaryAllOperationsPermits(null, TimeValue.timeValueSeconds(30L))); + () -> indexShard.acquireAllPrimaryOperationsPermits(null, TimeValue.timeValueSeconds(30L))); assertThat(e, hasToString(containsString("acquirePrimaryAllOperationsPermits should only be called on primary shard"))); } From ef255a2be13004f1553683103c77c50afde8af4c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 15 Nov 2018 14:06:06 +0100 Subject: [PATCH 4/4] Apply feedback --- .../TransportReplicationAction.java | 6 +- .../elasticsearch/index/shard/IndexShard.java | 134 +++++------- .../TransportReplicationActionTests.java | 2 +- ...ReplicationAllPermitsAcquisitionTests.java | 205 ++++++++++-------- .../index/shard/IndexShardTests.java | 8 +- 5 files changed, 183 insertions(+), 172 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 571d7e208624b..2938e5edb950b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -335,7 +335,7 @@ class AsyncPrimaryAction extends AbstractRunnable { @Override protected void doRun() throws Exception { final ShardId shardId = request.shardId(); - final IndexShard indexShard = getIndexShard(shardId, targetAllocationID); + final IndexShard indexShard = getIndexShard(shardId); final ShardRouting shardRouting = indexShard.routingEntry(); // we may end up here if the cluster state used to route the primary is so stale that the underlying // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails @@ -609,7 +609,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; final ShardId shardId = request.shardId(); assert shardId != null : "request shardId must be set"; - this.replica = getIndexShard(shardId, targetAllocationID); + this.replica = getIndexShard(shardId); } @Override @@ -719,7 +719,7 @@ public void onFailure(Exception e) { } } - protected IndexShard getIndexShard(final ShardId shardId, final String targetAllocationID) { + protected IndexShard getIndexShard(final ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 11e001c3fdaf9..05ead45cd128d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2348,7 +2348,58 @@ public void onResponse(final Releasable releasable) { termUpdated.countDown(); } - private void updatePrimaryTermIfNeeded(final long opPrimaryTerm, final long globalCheckpoint) { + /** + * Acquire a replica operation permit whenever the shard is ready for indexing (see + * {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in + * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an + * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified + * name. + * + * @param opPrimaryTerm the operation primary term + * @param globalCheckpoint the global checkpoint associated with the request + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary + * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} + * @param onPermitAcquired the listener for permit acquisition + * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are + * enabled the tracing will capture the supplied object's {@link Object#toString()} value. + * Otherwise the object isn't used + */ + public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, + final ActionListener onPermitAcquired, final String executorOnDelay, + final Object debugInfo) { + innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, + (listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo)); + } + + /** + * Acquire all replica operation permits whenever the shard is ready for indexing (see + * {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in + * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an + * {@link IllegalStateException}. + * + * @param opPrimaryTerm the operation primary term + * @param globalCheckpoint the global checkpoint associated with the request + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary + * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} + * @param onPermitAcquired the listener for permit acquisition + * @param timeout the maximum time to wait for the in-flight operations block + */ + public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes, + final ActionListener onPermitAcquired, + final TimeValue timeout) { + innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, + (listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())); + } + + private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes, + final ActionListener onPermitAcquired, + final Consumer> consumer) { + verifyNotClosed(); if (opPrimaryTerm > pendingPrimaryTerm) { synchronized (mutex) { if (opPrimaryTerm > pendingPrimaryTerm) { @@ -2381,19 +2432,7 @@ private void updatePrimaryTermIfNeeded(final long opPrimaryTerm, final long glob } assert opPrimaryTerm <= pendingPrimaryTerm : "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]"; - } - - /** - * Creates a new action listener which verifies that the operation primary term is not too old. If the given primary - * term is lower than the current one, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with - * an {@link IllegalStateException}. Otherwise the global checkpoint and the max_seq_no_of_updates marker of the replica are updated - * before the invocation of the {@link ActionListener#onResponse(Object)}} method of the provided listener. - */ - private ActionListener createListener(final ActionListener listener, - final long opPrimaryTerm, - final long globalCheckpoint, - final long maxSeqNoOfUpdatesOrDeletes) { - return new ActionListener() { + consumer.accept(new ActionListener() { @Override public void onResponse(final Releasable releasable) { if (opPrimaryTerm < operationPrimaryTerm) { @@ -2404,81 +2443,26 @@ public void onResponse(final Releasable releasable) { shardId, opPrimaryTerm, operationPrimaryTerm); - listener.onFailure(new IllegalStateException(message)); + onPermitAcquired.onFailure(new IllegalStateException(message)); } else { assert assertReplicationTarget(); try { updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); - } catch (final Exception e) { + } catch (Exception e) { releasable.close(); - listener.onFailure(e); + onPermitAcquired.onFailure(e); return; } - listener.onResponse(releasable); + onPermitAcquired.onResponse(releasable); } } @Override public void onFailure(final Exception e) { - listener.onFailure(e); + onPermitAcquired.onFailure(e); } - }; - } - - /** - * Acquire a replica operation permit whenever the shard is ready for indexing (see - * {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in - * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an - * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified - * name. - * - * @param opPrimaryTerm the operation primary term - * @param globalCheckpoint the global checkpoint associated with the request - * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary - * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} - * @param onPermitAcquired the listener for permit acquisition - * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed - * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are - * enabled the tracing will capture the supplied object's {@link Object#toString()} value. - * Otherwise the object isn't used - */ - public void acquireReplicaOperationPermit(final long opPrimaryTerm, - final long globalCheckpoint, - final long maxSeqNoOfUpdatesOrDeletes, - final ActionListener onPermitAcquired, - final String executorOnDelay, - final Object debugInfo) { - verifyNotClosed(); - updatePrimaryTermIfNeeded(opPrimaryTerm, globalCheckpoint); - - ActionListener listener = createListener(onPermitAcquired, opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); - indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo); - } - - /** - * Acquire all replica operation permits whenever the shard is ready for indexing (see - * {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in - * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an - * {@link IllegalStateException}. - * - * @param opPrimaryTerm the operation primary term - * @param globalCheckpoint the global checkpoint associated with the request - * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary - * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} - * @param onPermitAcquired the listener for permit acquisition - * @param timeout the maximum time to wait for the in-flight operations block - */ - public void acquireReplicaAllOperationsPermits(final long opPrimaryTerm, - final long globalCheckpoint, - final long maxSeqNoOfUpdatesOrDeletes, - final ActionListener onPermitAcquired, - final TimeValue timeout) { - verifyNotClosed(); - updatePrimaryTermIfNeeded(opPrimaryTerm, globalCheckpoint); - - ActionListener listener = createListener(onPermitAcquired, opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); - indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()); + }); } public int getActiveOperationsCount() { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 0b07b8bba338f..c1991a8f3a17a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -784,7 +784,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception { new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool) { @Override - protected IndexShard getIndexShard(ShardId shardId, String targetAllocationId) { + protected IndexShard getIndexShard(ShardId shardId) { return shard; } }; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 4f03f0bc6813e..8cad76bcdfe5e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -48,7 +48,6 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; @@ -64,7 +63,6 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.BrokenBarrierException; @@ -88,6 +86,16 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; + +/** + * This test tests the concurrent execution of several transport replication actions. All of these actions (except one) acquire a single + * permit during their execution on shards and are expected to fail if a global level or index level block is present in the cluster state. + * These actions are all started at the same time, but some are delayed until one last action. + * + * This last action is special because it acquires all the permits on shards, adds the block to the cluster state and then "releases" the + * previously delayed single permit actions. This way, there is a clear transition between the single permit actions executed before the + * all permit action that sets the block and those executed afterwards that are doomed to fail because of the block. + */ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTestCase { private ClusterService clusterService; @@ -190,9 +198,8 @@ public void testTransportReplicationActionWithAllPermits() throws Exception { final PlainActionFuture listener = new PlainActionFuture<>(); futures[threadId] = listener; - // An action with blocks which acquires a single operation permit during execution - final TestAction singlePermitAction = new TestAction(Settings.EMPTY, "internal:singlePermitWithBlocks[" + threadId + "]", - transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, false, Optional.of(globalBlock)); + final TestAction singlePermitAction = new SinglePermitWithBlocksAction(Settings.EMPTY, "internalSinglePermit[" + threadId + "]", + transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, globalBlock); actions[threadId] = singlePermitAction; Thread thread = new Thread(() -> { @@ -242,8 +249,8 @@ private void assertBlockIsPresentForDelayedOp() { logger.trace("now starting the operation that acquires all permits and sets the block in the cluster state"); // An action which acquires all operation permits during execution and set a block - final TestAction allPermitsAction = new TestAction(Settings.EMPTY, "internal:allPermits", transportService, clusterService, - shardStateAction, threadPool, shardId, primary, replica, true, Optional.empty()); + final TestAction allPermitsAction = new AllPermitsThenBlockAction(Settings.EMPTY, "internalAllPermits", transportService, + clusterService, shardStateAction, threadPool, shardId, primary, replica); final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); Thread thread = new Thread(() -> { @@ -337,30 +344,32 @@ private Request request() { return new Request().setShardId(primary.shardId()); } + /** + * A type of {@link TransportReplicationAction} that allows to use the primary and replica shards passed to the constructor for the + * execution of the replication action. Also records if the operation is executed on the primary and the replica. + */ + private abstract class TestAction extends TransportReplicationAction { - private class TestAction extends TransportReplicationAction { - - private final ShardId shardId; - private final IndexShard primary; - private final IndexShard replica; - private final boolean acquireAllPermits; - private final Optional globalBlock; - private final TimeValue timeout = TimeValue.timeValueSeconds(30L); - - private final SetOnce executedOnPrimary = new SetOnce<>(); - private final SetOnce executedOnReplica = new SetOnce<>(); + protected final ShardId shardId; + protected final IndexShard primary; + protected final IndexShard replica; + protected final SetOnce executedOnPrimary = new SetOnce<>(); + protected final SetOnce executedOnReplica = new SetOnce<>(); TestAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, - ShardStateAction shardStateAction, ThreadPool threadPool, - ShardId shardId, IndexShard primary, IndexShard replica, - boolean acquireAllPermits, Optional globalBlock) { + ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica) { super(settings, actionName, transportService, clusterService, null, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME); this.shardId = Objects.requireNonNull(shardId); this.primary = Objects.requireNonNull(primary); + assertEquals(shardId, primary.shardId()); this.replica = Objects.requireNonNull(replica); - this.acquireAllPermits = acquireAllPermits; - this.globalBlock = globalBlock; + assertEquals(shardId, replica.shardId()); + } + + @Override + protected Response newResponseInstance() { + return new Response(); } public String getActionName() { @@ -368,40 +377,37 @@ public String getActionName() { } @Override - protected ClusterBlockLevel globalBlockLevel() { - if (globalBlock.isPresent()) { - return globalBlock.get() ? ClusterBlockLevel.WRITE : super.globalBlockLevel(); - } - return null; + protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + executedOnPrimary.set(true); + // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here + // that the permit has been acquired on the primary shard + assertSame(primary, shard); + return new PrimaryResult<>(shardRequest, new Response()); } @Override - protected ClusterBlockLevel indexBlockLevel() { - if (globalBlock.isPresent()) { - return globalBlock.get() == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel(); - } - return null; + protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + executedOnReplica.set(true); + // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here + // that the permit has been acquired on the replica shard + assertSame(replica, shard); + return new ReplicaResult(); } @Override - protected IndexShard getIndexShard(final ShardId indexShardId, final String targetAllocationId) { - if (shardId.equals(indexShardId) == false) { + protected IndexShard getIndexShard(final ShardId shardId) { + if (this.shardId.equals(shardId) == false) { throw new AssertionError("shard id differs from " + shardId); } - if (Objects.equals(primary.routingEntry().allocationId().getId(), targetAllocationId)) { - return primary; - } else if (Objects.equals(replica.routingEntry().allocationId().getId(), targetAllocationId)) { - return replica; - } - throw new ShardNotFoundException(shardId, "something went wrong"); + return (executedOnPrimary.get() == null) ? primary : replica; } @Override protected void sendReplicaRequest(final ConcreteReplicaRequest replicaRequest, final DiscoveryNode node, final ActionListener listener) { - assertEquals(clusterService.state().nodes().get("_node2"), node); - ReplicaOperationTransportHandler replicaOperationTransportHandler = this.new ReplicaOperationTransportHandler(); + assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node); + ReplicaOperationTransportHandler replicaOperationTransportHandler = new ReplicaOperationTransportHandler(); try { replicaOperationTransportHandler.messageReceived(replicaRequest, new TransportChannel() { @Override @@ -428,69 +434,90 @@ public void sendResponse(Exception exception) throws IOException { listener.onFailure(e); } } + } - @Override - protected void acquirePrimaryOperationPermit(IndexShard shard, Request request, ActionListener onAcquired) { - assertTrue(shard.routingEntry().primary()); - assertSame(primary, shard); - if (acquireAllPermits) { - shard.acquireAllPrimaryOperationsPermits(onAcquired, timeout); - } else { - super.acquirePrimaryOperationPermit(shard, request, onAcquired); - } + /** + * A type of {@link TransportReplicationAction} that acquires a single permit during execution and that blocks + * on {@link ClusterBlockLevel#WRITE}. The block can be a global level or an index level block depending of the + * value of the {@code globalBlock} parameter in the constructor. When the operation is executed on shards it + * verifies that at least 1 permit is acquired and that there is no blocks in the cluster state. + */ + private class SinglePermitWithBlocksAction extends TestAction { + + private final boolean globalBlock; + + SinglePermitWithBlocksAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ShardStateAction shardStateAction, ThreadPool threadPool, + ShardId shardId, IndexShard primary, IndexShard replica, boolean globalBlock) { + super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); + this.globalBlock = globalBlock; } @Override - protected void acquireReplicaOperationPermit(IndexShard shard, Request request, ActionListener onAcquired, - long primaryTerm, long globalCheckpoint, long maxSeqNo) { - assertFalse(shard.routingEntry().primary()); - assertSame(replica, shard); - if (acquireAllPermits) { - shard.acquireReplicaAllOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNo, onAcquired, timeout); - } else { - super.acquireReplicaOperationPermit(shard, request, onAcquired, primaryTerm, globalCheckpoint, maxSeqNo); - } + protected ClusterBlockLevel globalBlockLevel() { + return globalBlock ? ClusterBlockLevel.WRITE : super.globalBlockLevel(); } @Override - protected Response newResponseInstance() { - return new Response(); + protected ClusterBlockLevel indexBlockLevel() { + return globalBlock == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel(); } @Override protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { - assertSame(primary, shard); - if (acquireAllPermits) { - assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); - } else { - assertThat(shard.getActiveOperationsCount(), greaterThan(0)); - } - assertNoBlockOnSinglePermitOps(); - executedOnPrimary.set(true); - return new PrimaryResult<>(shardRequest, new Response()); + assertNoBlocks("block must not exist when executing the operation on primary shard: it should have been blocked before"); + assertThat(shard.getActiveOperationsCount(), greaterThan(0)); + return super.shardOperationOnPrimary(shardRequest, shard); } @Override protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { - assertSame(replica, shard); - if (acquireAllPermits) { - assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); - } else { - assertThat(shard.getActiveOperationsCount(), greaterThan(0)); - } - assertNoBlockOnSinglePermitOps(); - executedOnReplica.set(true); - return new ReplicaResult(); + assertNoBlocks("block must not exist when executing the operation on replica shard: it should have been blocked before"); + assertThat(shard.getActiveOperationsCount(), greaterThan(0)); + return super.shardOperationOnReplica(shardRequest, shard); } - private void assertNoBlockOnSinglePermitOps() { - // When a single permit operation is executed on primary/replica shard we must be sure that the block is not here, - // otherwise something went wrong. - if (acquireAllPermits == false) { - final ClusterState clusterState = clusterService.state(); - assertFalse("Global block must not exist", clusterState.blocks().hasGlobalBlock(block)); - assertFalse("Index block must not exist", clusterState.blocks().hasIndexBlock(shardId.getIndexName(), block)); - } + private void assertNoBlocks(final String error) { + final ClusterState clusterState = clusterService.state(); + assertFalse("Global level " + error, clusterState.blocks().hasGlobalBlock(block)); + assertFalse("Index level " + error, clusterState.blocks().hasIndexBlock(shardId.getIndexName(), block)); + } + } + + /** + * A type of {@link TransportReplicationAction} that acquires all permits during execution. + */ + private class AllPermitsThenBlockAction extends TestAction { + + private final TimeValue timeout = TimeValue.timeValueSeconds(30L); + + AllPermitsThenBlockAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ShardStateAction shardStateAction, ThreadPool threadPool, + ShardId shardId, IndexShard primary, IndexShard replica) { + super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); + } + + @Override + protected void acquirePrimaryOperationPermit(IndexShard shard, Request request, ActionListener onAcquired) { + shard.acquireAllPrimaryOperationsPermits(onAcquired, timeout); + } + + @Override + protected void acquireReplicaOperationPermit(IndexShard shard, Request request, ActionListener onAcquired, + long primaryTerm, long globalCheckpoint, long maxSeqNo) { + shard.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNo, onAcquired, timeout); + } + + @Override + protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); + return super.shardOperationOnPrimary(shardRequest, shard); + } + + @Override + protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); + return super.shardOperationOnReplica(shardRequest, shard); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3c8f7c9eab712..1baa61e144b73 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -318,7 +318,7 @@ public void testClosesPreventsNewOperations() throws Exception { () -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ThreadPool.Names.WRITE, "")); expectThrows(IndexShardClosedException.class, - () -> indexShard.acquireReplicaAllOperationsPermits(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, + () -> indexShard.acquireAllReplicaOperationsPermits(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, TimeValue.timeValueSeconds(30L))); } @@ -782,7 +782,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { e = expectThrows(AssertionError.class, () -> indexShard.acquireAllPrimaryOperationsPermits(null, TimeValue.timeValueSeconds(30L))); - assertThat(e, hasToString(containsString("acquirePrimaryAllOperationsPermits should only be called on primary shard"))); + assertThat(e, hasToString(containsString("acquireAllPrimaryOperationsPermits should only be called on primary shard"))); } final long primaryTerm = indexShard.getPendingPrimaryTerm(); @@ -3604,7 +3604,7 @@ public Settings threadPoolSettings() { /** * Randomizes the usage of {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)} and - * {@link IndexShard#acquireReplicaAllOperationsPermits(long, long, long, ActionListener, TimeValue)} in order to acquire a permit. + * {@link IndexShard#acquireAllReplicaOperationsPermits(long, long, long, ActionListener, TimeValue)} in order to acquire a permit. */ private void randomReplicaOperationPermitAcquisition(final IndexShard indexShard, final long opPrimaryTerm, @@ -3617,7 +3617,7 @@ private void randomReplicaOperationPermitAcquisition(final IndexShard indexShard indexShard.acquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, executor, info); } else { final TimeValue timeout = TimeValue.timeValueSeconds(30L); - indexShard.acquireReplicaAllOperationsPermits(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, timeout); + indexShard.acquireAllReplicaOperationsPermits(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, timeout); } } }