Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public void messageReceived(ConcreteShardRequest<Request> request, TransportChan
}
}

class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<PrimaryShardReference> {
class AsyncPrimaryAction extends AbstractRunnable {

private final Request request;
// targetAllocationID of the shard this request is meant for
Expand All @@ -334,11 +334,33 @@ class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<Prim

@Override
protected void doRun() throws Exception {
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
final ShardId shardId = request.shardId();
final IndexShard indexShard = getIndexShard(shardId);
final ShardRouting shardRouting = indexShard.routingEntry();
// we may end up here if the cluster state used to route the primary is so stale that the underlying
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
// the replica will take over and a replica will be assigned to the first node.
if (shardRouting.primary() == false) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
}
final String actualAllocationId = shardRouting.allocationId().getId();
if (actualAllocationId.equals(targetAllocationID) == false) {
throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
final long actualTerm = indexShard.getPendingPrimaryTerm();
if (actualTerm != primaryTerm) {
throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID,
primaryTerm, actualTerm);
}

acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap(
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
this::onFailure
));
}

@Override
public void onResponse(PrimaryShardReference primaryShardReference) {
void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
try {
final ClusterState clusterState = clusterService.state();
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());
Expand Down Expand Up @@ -660,10 +682,10 @@ protected void doRun() throws Exception {
setPhase(task, "replica");
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
if (actualAllocationId.equals(targetAllocationID) == false) {
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
}

/**
Expand Down Expand Up @@ -697,7 +719,7 @@ public void onFailure(Exception e) {
}
}

protected IndexShard getIndexShard(ShardId shardId) {
protected IndexShard getIndexShard(final ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
}
Expand Down Expand Up @@ -938,42 +960,26 @@ void retryBecauseUnavailable(ShardId shardId, String message) {
}

/**
* Tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
* Executes the logic for acquiring one or more operation permit on a primary shard. The default is to acquire a single permit but this
* method can be overridden to acquire more.
*/
private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm,
ActionListener<PrimaryShardReference> onReferenceAcquired, Object debugInfo) {
IndexShard indexShard = getIndexShard(shardId);
// we may end up here if the cluster state used to route the primary is so stale that the underlying
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
// the replica will take over and a replica will be assigned to the first node.
if (indexShard.routingEntry().primary() == false) {
throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
"actual shard is not a primary " + indexShard.routingEntry());
}
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}
final long actualTerm = indexShard.getPendingPrimaryTerm();
if (actualTerm != primaryTerm) {
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
primaryTerm, actualTerm);
}

ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable));
}

@Override
public void onFailure(Exception e) {
onReferenceAcquired.onFailure(e);
}
};
protected void acquirePrimaryOperationPermit(final IndexShard primary,
final Request request,
final ActionListener<Releasable> onAcquired) {
primary.acquirePrimaryOperationPermit(onAcquired, executor, request);
}

indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
/**
* Executes the logic for acquiring one or more operation permit on a replica shard. The default is to acquire a single permit but this
* method can be overridden to acquire more.
*/
protected void acquireReplicaOperationPermit(final IndexShard replica,
final ReplicaRequest request,
final ActionListener<Releasable> onAcquired,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes) {
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request);
}

class ShardReference implements Releasable {
Expand Down
130 changes: 84 additions & 46 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2302,7 +2302,18 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
}

private <E extends Exception> void bumpPrimaryTerm(long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
/**
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
* It is the responsibility of the caller to close the {@link Releasable}.
*/
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
}

private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
assert Thread.holdsLock(mutex);
assert newPrimaryTerm > pendingPrimaryTerm;
assert operationPrimaryTerm <= pendingPrimaryTerm;
Expand Down Expand Up @@ -2357,11 +2368,42 @@ public void onResponse(final Releasable releasable) {
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo));
}

/**
* Acquire all replica operation permits whenever the shard is ready for indexing (see
* {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
* {@link IllegalStateException}.
*
* @param opPrimaryTerm the operation primary term
* @param globalCheckpoint the global checkpoint associated with the request
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
* @param onPermitAcquired the listener for permit acquisition
* @param timeout the maximum time to wait for the in-flight operations block
*/
public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
}

private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final Consumer<ActionListener<Releasable>> consumer) {
verifyNotClosed();
if (opPrimaryTerm > pendingPrimaryTerm) {
synchronized (mutex) {
if (opPrimaryTerm > pendingPrimaryTerm) {
IndexShardState shardState = state();
final IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
Expand All @@ -2373,58 +2415,54 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g

if (opPrimaryTerm > pendingPrimaryTerm) {
bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
});
}
}
}
}

assert opPrimaryTerm <= pendingPrimaryTerm
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
indexShardOperationPermits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (opPrimaryTerm < operationPrimaryTerm) {
releasable.close();
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
opPrimaryTerm,
operationPrimaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
assert assertReplicationTarget();
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
releasable.close();
onPermitAcquired.onFailure(e);
return;
}
onPermitAcquired.onResponse(releasable);
}
}

@Override
public void onFailure(final Exception e) {
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
consumer.accept(new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (opPrimaryTerm < operationPrimaryTerm) {
releasable.close();
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
opPrimaryTerm,
operationPrimaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
assert assertReplicationTarget();
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
releasable.close();
onPermitAcquired.onFailure(e);
return;
}
},
executorOnDelay,
true, debugInfo);
onPermitAcquired.onResponse(releasable);
}
}

@Override
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
});
}

public int getActiveOperationsCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,11 +949,11 @@ action.new PrimaryOperationTransportHandler().messageReceived(
logger.debug("got exception:" , throwable);
assertTrue(throwable.getClass() + " is not a retry exception", action.retryPrimaryException(throwable));
if (wrongAllocationId) {
assertThat(throwable.getMessage(), containsString("expected aID [_not_a_valid_aid_] but found [" +
assertThat(throwable.getMessage(), containsString("expected allocation id [_not_a_valid_aid_] but found [" +
primary.allocationId().getId() + "]"));
} else {
assertThat(throwable.getMessage(), containsString("expected aID [" + primary.allocationId().getId() + "] with term [" +
requestTerm + "] but found [" + primaryTerm + "]"));
assertThat(throwable.getMessage(), containsString("expected allocation id [" + primary.allocationId().getId()
+ "] with term [" + requestTerm + "] but found [" + primaryTerm + "]"));
}
}
}
Expand Down
Loading