diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 78bf9f2fc722e..0da39a593a2c1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -114,9 +115,13 @@ public void execute() throws Exception { // of the sampled replication group, and advanced further than what the given replication group would allow it to. // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint. final long globalCheckpoint = primary.globalCheckpoint(); + // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of + // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on. + final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes(); + assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized"; final ReplicationGroup replicationGroup = primary.getReplicationGroup(); markUnavailableShardsAsStale(replicaRequest, replicationGroup); - performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup); + performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); } successfulShards.incrementAndGet(); // mark primary as successful @@ -136,7 +141,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica } private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint, - final ReplicationGroup replicationGroup) { + final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) { // for total stats, add number of unassigned shards and // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target) totalShards.addAndGet(replicationGroup.getSkippedShards().size()); @@ -145,19 +150,20 @@ private void performOnReplicas(final ReplicaRequest replicaRequest, final long g for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { if (shard.isSameAllocation(primaryRouting) == false) { - performOnReplica(shard, replicaRequest, globalCheckpoint); + performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); } } } - private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) { + private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, + final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) { if (logger.isTraceEnabled()) { logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest); } totalShards.incrementAndGet(); pendingActions.incrementAndGet(); - replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener() { + replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener() { @Override public void onResponse(ReplicaResponse response) { successfulShards.incrementAndGet(); @@ -322,6 +328,12 @@ public interface Primary< */ long globalCheckpoint(); + /** + * Returns the maximum seq_no of updates (index operations overwrite Lucene) or deletes on the primary. + * This value must be captured after the execution of a replication request on the primary is completed. + */ + long maxSeqNoOfUpdatesOrDeletes(); + /** * Returns the current replication group on the primary shard * @@ -338,12 +350,15 @@ public interface Replicas> { /** * Performs the specified request on the specified replica. * - * @param replica the shard this request should be executed on - * @param replicaRequest the operation to perform - * @param globalCheckpoint the global checkpoint on the primary - * @param listener callback for handling the response or failure + * @param replica the shard this request should be executed on + * @param replicaRequest the operation to perform + * @param globalCheckpoint the global checkpoint on the primary + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary + * after this replication was executed on it. + * @param listener callback for handling the response or failure */ - void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, ActionListener listener); + void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, + long maxSeqNoOfUpdatesOrDeletes, ActionListener listener); /** * Fail the specified shard if needed, removing it from the current set diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index dbdd5acae1fc6..dff635f6e7489 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -200,7 +200,7 @@ protected abstract PrimaryResult shardOperationOnPrima /** * Synchronously execute the specified replica operation. This is done under a permit from - * {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}. + * {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}. * * @param shardRequest the request to the replica shard * @param replica the replica shard to perform the operation on @@ -489,6 +489,7 @@ public void messageReceived( replicaRequest.getTargetAllocationID(), replicaRequest.getPrimaryTerm(), replicaRequest.getGlobalCheckpoint(), + replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(), channel, (ReplicationTask) task).run(); } @@ -513,6 +514,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio private final String targetAllocationID; private final long primaryTerm; private final long globalCheckpoint; + private final long maxSeqNoOfUpdatesOrDeletes; private final TransportChannel channel; private final IndexShard replica; /** @@ -528,6 +530,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio String targetAllocationID, long primaryTerm, long globalCheckpoint, + long maxSeqNoOfUpdatesOrDeletes, TransportChannel channel, ReplicationTask task) { this.request = request; @@ -536,6 +539,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio this.targetAllocationID = targetAllocationID; this.primaryTerm = primaryTerm; this.globalCheckpoint = globalCheckpoint; + this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; final ShardId shardId = request.shardId(); assert shardId != null : "request shardId must be set"; this.replica = getIndexShard(shardId); @@ -575,7 +579,8 @@ public void onNewClusterState(ClusterState state) { new TransportChannelResponseHandler<>(logger, channel, extraMessage, () -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, - new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint), + new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, + globalCheckpoint, maxSeqNoOfUpdatesOrDeletes), handler); } @@ -613,7 +618,7 @@ protected void doRun() throws Exception { throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID, actualAllocationId); } - replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request); + replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request); } /** @@ -1023,6 +1028,11 @@ public long globalCheckpoint() { return indexShard.getGlobalCheckpoint(); } + @Override + public long maxSeqNoOfUpdatesOrDeletes() { + return indexShard.getMaxSeqNoOfUpdatesOrDeletes(); + } + @Override public ReplicationGroup getReplicationGroup() { return indexShard.getReplicationGroup(); @@ -1107,6 +1117,7 @@ public void performOn( final ShardRouting replica, final ReplicaRequest request, final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes, final ActionListener listener) { String nodeId = replica.currentNodeId(); final DiscoveryNode node = clusterService.state().nodes().get(nodeId); @@ -1114,8 +1125,8 @@ public void performOn( listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]")); return; } - final ConcreteReplicaRequest replicaRequest = - new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint); + final ConcreteReplicaRequest replicaRequest = new ConcreteReplicaRequest<>( + request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); sendReplicaRequest(replicaRequest, node, listener); } @@ -1263,15 +1274,17 @@ public String toString() { protected static final class ConcreteReplicaRequest extends ConcreteShardRequest { private long globalCheckpoint; + private long maxSeqNoOfUpdatesOrDeletes; public ConcreteReplicaRequest(final Supplier requestSupplier) { super(requestSupplier); } public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm, - final long globalCheckpoint) { + final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) { super(request, targetAllocationID, primaryTerm); this.globalCheckpoint = globalCheckpoint; + this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; } @Override @@ -1282,6 +1295,13 @@ public void readFrom(StreamInput in) throws IOException { } else { globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; } + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + maxSeqNoOfUpdatesOrDeletes = in.readZLong(); + } else { + // UNASSIGNED_SEQ_NO (-2) means uninitialized, and replicas will disable + // optimization using seq_no if its max_seq_no_of_updates is still uninitialized + maxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO; + } } @Override @@ -1290,12 +1310,19 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { out.writeZLong(globalCheckpoint); } + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeZLong(maxSeqNoOfUpdatesOrDeletes); + } } public long getGlobalCheckpoint() { return globalCheckpoint; } + public long getMaxSeqNoOfUpdatesOrDeletes() { + return maxSeqNoOfUpdatesOrDeletes; + } + @Override public String toString() { return "ConcreteReplicaRequest{" + @@ -1303,6 +1330,7 @@ public String toString() { ", primaryTerm='" + getPrimaryTerm() + '\'' + ", request=" + getRequest() + ", globalCheckpoint=" + globalCheckpoint + + ", maxSeqNoOfUpdatesOrDeletes=" + maxSeqNoOfUpdatesOrDeletes + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2f38562b7af06..955fd2bec41da 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -47,6 +47,7 @@ import org.apache.lucene.util.InfoStream; import org.elasticsearch.Assertions; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; @@ -976,6 +977,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) if (plan.addStaleOpToLucene) { addStaleDocs(index.docs(), indexWriter); } else if (plan.useLuceneUpdateDocument) { + assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), plan.seqNoForIndexing, true, true); updateDocs(index.uid(), index.docs(), indexWriter); } else { // document does not exists, we can optimize for create, but double check if assertions are running @@ -1275,8 +1277,8 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE return plan; } - private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) - throws IOException { + private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { + assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), plan.seqNoOfDeletion, false, false); try { if (softDeleteEnabled) { final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id()); @@ -2556,6 +2558,29 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) { assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get(); } + private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) { + final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes(); + // If the primary is on an old version which does not replicate msu, we need to relax this assertion for that. + if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) { + assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_7_0_0_alpha1); + return true; + } + // We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument). + if (allowDeleted) { + final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes()); + if (versionValue != null && versionValue.isDelete()) { + return true; + } + } + // Operations can be processed on a replica in a different order than on the primary. If the order on the primary is index-1, + // delete-2, index-3, and the order on a replica is index-1, index-3, delete-2, then the msu of index-3 on the replica is 2 + // even though it is an update (overwrites index-1). We should relax this assertion if there is a pending gap in the seq_no. + if (relaxIfGapInSeqNo && getLocalCheckpoint() < maxSeqNoOfUpdates) { + return true; + } + assert seqNo <= maxSeqNoOfUpdates : "id=" + id + " seq_no=" + seqNo + " msu=" + maxSeqNoOfUpdates; + return true; + } @Override public void initializeMaxSeqNoOfUpdatesOrDeletes() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f5f8d70925f5f..f28db00ac8a53 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -502,6 +502,12 @@ public void updateShardState(final ShardRouting newRouting, * the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes. */ final Engine engine = getEngine(); + if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { + // If the old primary was on an old version that did not replicate the msu, + // we need to bootstrap it manually from its local history. + assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); + } engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) -> runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {})); /* Rolling the translog generation is not strictly needed here (as we will never have collisions between @@ -511,12 +517,6 @@ public void updateShardState(final ShardRouting newRouting, */ engine.rollTranslogGeneration(); engine.fillSeqNoGaps(newPrimaryTerm); - if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { - // TODO: Enable this assertion after we replicate max_seq_no_updates during replication - // assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : - // indexSettings.getIndexVersionCreated(); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); - } replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @Override @@ -1955,12 +1955,11 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex - // If the old primary was on an old version, this primary (was replica before) - // does not have max_of_updates yet. Thus we need to bootstrap it manually. if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { - // TODO: Enable this assertion after we replicate max_seq_no_updates during replication - // assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : indexSettings.getIndexVersionCreated(); - getEngine().initializeMaxSeqNoOfUpdatesOrDeletes(); + // If the old primary was on an old version that did not replicate the msu, + // we need to bootstrap it manually from its local history. + assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1); + getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); } } } @@ -2316,15 +2315,17 @@ private void bumpPrimaryTerm(long newPrimaryTerm, final Ch * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified * name. * - * @param opPrimaryTerm the operation primary term - * @param globalCheckpoint the global checkpoint associated with the request - * @param onPermitAcquired the listener for permit acquisition - * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed - * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled - * the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object - * isn't used - */ - public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, + * @param opPrimaryTerm the operation primary term + * @param globalCheckpoint the global checkpoint associated with the request + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary + * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} + * @param onPermitAcquired the listener for permit acquisition + * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are + * enabled the tracing will capture the supplied object's {@link Object#toString()} value. + * Otherwise the object isn't used + */ + public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener onPermitAcquired, final String executorOnDelay, final Object debugInfo) { verifyNotClosed(); @@ -2378,6 +2379,7 @@ public void onResponse(final Releasable releasable) { assert assertReplicationTarget(); try { updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); + advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); } catch (Exception e) { releasable.close(); onPermitAcquired.onFailure(e); @@ -2729,12 +2731,11 @@ void resetEngineToGlobalCheckpoint() throws IOException { newEngine = createNewEngine(newEngineConfig()); active.set(true); } + newEngine.advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint); final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { // TODO: add a dedicate recovery stats for the reset translog }); - // TODO: do not use init method here but use advance with the max_seq_no received from the primary - newEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); } @@ -2763,10 +2764,13 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { * These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value * which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary. * - * @see #acquireReplicaOperationPermit(long, long, ActionListener, String, Object) - * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long) + * @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object) + * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long) */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { + assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO + || getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO : + "replica has max_seq_no_of_updates=" + getMaxSeqNoOfUpdatesOrDeletes() + " but primary does not"; getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index ba88e30727d61..f60994a4bced4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -456,7 +456,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin final RecoveryTarget recoveryTarget = recoveryRef.target(); try { recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), - request.maxSeenAutoIdTimestampOnPrimary()); + request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary()); channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint())); } catch (MapperException exception) { // in very rare cases a translog replay from primary is processed before a mapping update on this node diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 20e6d8578732d..46f98275740ae 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -215,10 +215,12 @@ public RecoveryResponse recoverToTarget() throws IOException { } final long targetLocalCheckpoint; try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) { - // We have to capture the max auto_id_timestamp after taking a snapshot of operations to guarantee - // that the auto_id_timestamp of every operation in the snapshot is at most this timestamp value. + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values + // are at least as high as the corresponding values on the primary when any of these operations were executed on it. final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); - targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp); + final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, + maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } @@ -445,16 +447,17 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new * shard. * - * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all - * ops should be sent - * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) - * @param endingSeqNo the highest sequence number that should be sent - * @param snapshot a snapshot of the translog - * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary + * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all + * ops should be sent + * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) + * @param endingSeqNo the highest sequence number that should be sent + * @param snapshot a snapshot of the translog + * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it. * @return the local checkpoint on the target */ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot, - final long maxSeenAutoIdTimestamp) + final long maxSeenAutoIdTimestamp, final long maxSeqNoOfUpdatesOrDeletes) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); @@ -468,7 +471,7 @@ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingS // send all the snapshot's translog operations to the target final SendSnapshotResult result = sendSnapshot( - startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp); + startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); stopWatch.stop(); logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); @@ -531,16 +534,18 @@ static class SendSnapshotResult { *

* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. * - * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent - * @param requiredSeqNoRangeStart the lower sequence number of the required range - * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) - * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the - * total number of operations sent - * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary + * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent + * @param requiredSeqNoRangeStart the lower sequence number of the required range + * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) + * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the + * total number of operations sent + * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it. * @throws IOException if an I/O exception occurred reading the translog snapshot */ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, - final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp) throws IOException { + final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes) throws IOException { assert requiredSeqNoRangeStart <= endingSeqNo + 1: "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : @@ -558,8 +563,11 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require logger.trace("no translog operations to send"); } - final CancellableThreads.IOInterruptable sendBatch = () -> - targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, maxSeenAutoIdTimestamp)); + final CancellableThreads.IOInterruptable sendBatch = () -> { + final long targetCheckpoint = recoveryTarget.indexTranslogOperations( + operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); + targetLocalCheckpoint.set(targetCheckpoint); + }; // send operations in batches Translog.Operation operation; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index e2f21fe8edd2e..3a3a78941b1b7 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -386,8 +386,8 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxSeenAutoIdTimestampOnPrimary) throws IOException { + public long indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, + long maxSeqNoOfDeletesOrUpdatesOnPrimary) throws IOException { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); assert indexShard().recoveryState() == state(); @@ -401,6 +401,11 @@ public long indexTranslogOperations(List operations, int tot * replay these operations first (without timestamp), then optimize append-only requests (with timestamp). */ indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary); + /* + * Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when + * replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that operation was executed on. + */ + indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary); for (Translog.Operation operation : operations) { Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 53220c5860949..b7c3de97b4e9c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -59,13 +59,17 @@ public interface RecoveryTargetHandler { /** * Index a set of translog operations on the target - * @param operations operations to index - * @param totalTranslogOps current number of total operations expected to be indexed - * @param maxSeenAutoIdTimestampOnPrimary the maximum auto_id_timestamp of all append-only requests processed by the primary shard + * + * @param operations operations to index + * @param totalTranslogOps current number of total operations expected to be indexed + * @param maxSeenAutoIdTimestampOnPrimary the maximum auto_id_timestamp of all append-only requests processed by the primary shard + * @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on + * the primary shard when capturing these operations. This value is at least as high as the + * max_seq_no_of_updates on the primary was when any of these ops were processed on it. * @return the local checkpoint on the target shard */ long indexTranslogOperations(List operations, int totalTranslogOps, - long maxSeenAutoIdTimestampOnPrimary) throws IOException; + long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) throws IOException; /** * Notifies the target of the files it is going to receive diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index ae74673d30d80..58b5fb927b5d4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.transport.TransportRequest; @@ -37,17 +38,19 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest { private List operations; private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; private long maxSeenAutoIdTimestampOnPrimary; + private long maxSeqNoOfUpdatesOrDeletesOnPrimary; public RecoveryTranslogOperationsRequest() { } - RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations, - int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) { + RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations, int totalTranslogOps, + long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) { this.recoveryId = recoveryId; this.shardId = shardId; this.operations = operations; this.totalTranslogOps = totalTranslogOps; this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary; + this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary; } public long recoveryId() { @@ -70,6 +73,10 @@ public long maxSeenAutoIdTimestampOnPrimary() { return maxSeenAutoIdTimestampOnPrimary; } + public long maxSeqNoOfUpdatesOrDeletesOnPrimary() { + return maxSeqNoOfUpdatesOrDeletesOnPrimary; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -82,6 +89,12 @@ public void readFrom(StreamInput in) throws IOException { } else { maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; } + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + maxSeqNoOfUpdatesOrDeletesOnPrimary = in.readZLong(); + } else { + // UNASSIGNED_SEQ_NO means uninitialized and replica won't enable optimization using seq_no + maxSeqNoOfUpdatesOrDeletesOnPrimary = SequenceNumbers.UNASSIGNED_SEQ_NO; + } } @Override @@ -94,5 +107,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_5_0)) { out.writeZLong(maxSeenAutoIdTimestampOnPrimary); } + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeZLong(maxSeqNoOfUpdatesOrDeletesOnPrimary); + } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 3a7f28e8eb7e2..b37fefee7daff 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -110,9 +110,10 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) { - final RecoveryTranslogOperationsRequest translogOperationsRequest = - new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary); + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfDeletesOrUpdatesOnPrimary) { + final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( + recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary); final TransportFuture future = transportService.submitRequest( targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index e85c03411f7e2..8fa10c4ee26d7 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -443,6 +443,7 @@ static class TestPrimary implements ReplicationOperation.Primary replicationGroupSupplier; final Map knownLocalCheckpoints = new HashMap<>(); final Map knownGlobalCheckpoints = new HashMap<>(); @@ -452,6 +453,7 @@ static class TestPrimary implements ReplicationOperation.Primary listener) { assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica)); if (opFailures.containsKey(replica)) { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 7f1b4adf8df1e..52dc585cf7ac6 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -625,6 +625,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState), new Request(), randomNonNegativeLong(), + randomNonNegativeLong(), listener); assertTrue(listener.isDone()); assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class); @@ -633,7 +634,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream() .filter(ShardRouting::assignedToNode).collect(Collectors.toList())); listener = new PlainActionFuture<>(); - proxy.performOn(replica, new Request(), randomNonNegativeLong(), listener); + proxy.performOn(replica, new Request(), randomNonNegativeLong(), randomNonNegativeLong(), listener); assertFalse(listener.isDone()); CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); @@ -805,7 +806,7 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl replicaOperationTransportHandler.messageReceived( new TransportReplicationAction.ConcreteReplicaRequest<>( new Request().setShardId(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(), - randomNonNegativeLong()), + randomNonNegativeLong(), randomNonNegativeLong()), createTransportChannel(new PlainActionFuture<>()), task); } catch (ElasticsearchException e) { assertThat(e.getMessage(), containsString("simulated")); @@ -895,7 +896,7 @@ public void testReplicaActionRejectsWrongAid() throws Exception { Request request = new Request(shardId).timeout("1ms"); action.new ReplicaOperationTransportHandler().messageReceived( new TransportReplicationAction.ConcreteReplicaRequest<>(request, "_not_a_valid_aid_", randomNonNegativeLong(), - randomNonNegativeLong()), + randomNonNegativeLong(), randomNonNegativeLong()), createTransportChannel(listener), maybeTask() ); try { @@ -939,8 +940,10 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); + final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong(); replicaOperationTransportHandler.messageReceived( - new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint), + new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), + primaryTerm, checkpoint, maxSeqNoOfUpdatesOrDeletes), createTransportChannel(listener), task); if (listener.isDone()) { listener.get(); // fail with the exception if there @@ -964,6 +967,8 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl assertThat(capturedRequest.request, instanceOf(TransportReplicationAction.ConcreteReplicaRequest.class)); assertThat(((TransportReplicationAction.ConcreteReplicaRequest) capturedRequest.request).getGlobalCheckpoint(), equalTo(checkpoint)); + assertThat(((TransportReplicationAction.ConcreteReplicaRequest) capturedRequest.request).getMaxSeqNoOfUpdatesOrDeletes(), + equalTo(maxSeqNoOfUpdatesOrDeletes)); assertConcreteShardRequest(capturedRequest.request, request, replica.allocationId()); } @@ -1004,8 +1009,10 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); + final long maxSeqNoOfUpdates = randomNonNegativeLong(); replicaOperationTransportHandler.messageReceived( - new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint), + new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), + primaryTerm, checkpoint, maxSeqNoOfUpdates), createTransportChannel(listener), task); if (listener.isDone()) { listener.get(); // fail with the exception if there @@ -1198,7 +1205,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; - ActionListener callback = (ActionListener) invocation.getArguments()[2]; + ActionListener callback = (ActionListener) invocation.getArguments()[3]; final long primaryTerm = indexShard.getPendingPrimaryTerm(); if (term < primaryTerm) { throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])", @@ -1207,7 +1214,8 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); + }).when(indexShard) + .acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 9a571f1e9d8eb..18bbe5b9593de 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -277,7 +277,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { TestShardRouting.newShardRouting(shardId, "NOT THERE", routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState), new TestRequest(), - randomNonNegativeLong(), listener); + randomNonNegativeLong(), randomNonNegativeLong(), listener); assertTrue(listener.isDone()); assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class); @@ -285,7 +285,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream() .filter(ShardRouting::assignedToNode).collect(Collectors.toList())); listener = new PlainActionFuture<>(); - proxy.performOn(replica, new TestRequest(), randomNonNegativeLong(), listener); + proxy.performOn(replica, new TestRequest(), randomNonNegativeLong(), randomNonNegativeLong(), listener); assertFalse(listener.isDone()); CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); @@ -462,7 +462,8 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); + }).when(indexShard) + .acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 2f38ef709d1c5..22576f8818975 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -338,7 +338,8 @@ public void testReplicaTermIncrementWithConcurrentPrimaryPromotion() throws Exce barrier.await(); indexOnReplica(replicationRequest, shards, replica2, newReplica1Term); } catch (IllegalStateException ise) { - assertThat(ise.getMessage(), either(containsString("is too old")).or(containsString("cannot be a replication target"))); + assertThat(ise.getMessage(), either(containsString("is too old")) + .or(containsString("cannot be a replication target")).or(containsString("engine is closed"))); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index c38fb8a495650..e32161af7fe0c 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -405,6 +405,10 @@ public void testResyncAfterPrimaryPromotion() throws Exception { assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs)); } shards.assertAllEqual(initialDocs + extraDocs); + for (IndexShard replica : shards.getReplicas()) { + assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), + greaterThanOrEqualTo(shards.getPrimary().getMaxSeqNoOfUpdatesOrDeletes())); + } // check translog on replica is trimmed int translogOperations = 0; @@ -490,9 +494,9 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { }) { @Override public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxSeenAutoIdTimestampOnPrimary) throws IOException { + long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdates) throws IOException { opsSent.set(true); - return super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary); + return super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdates); } }; }); @@ -560,7 +564,7 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override public long indexTranslogOperations(final List operations, final int totalTranslogOps, - final long maxAutoIdTimestamp) + final long maxAutoIdTimestamp, long maxSeqNoOfUpdates) throws IOException { // index a doc which is not part of the snapshot, but also does not complete on replica replicaEngineFactory.latchIndexers(1); @@ -588,7 +592,7 @@ public long indexTranslogOperations(final List operations, f } catch (InterruptedException e) { throw new AssertionError(e); } - return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); + return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates); } }); pendingDocActiveWithExtraDocIndexed.await(); @@ -718,11 +722,11 @@ private void blockIfNeeded(RecoveryState.Stage currentStage) { @Override public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp) throws IOException { + long maxAutoIdTimestamp, long maxSeqNoOfUpdates) throws IOException { if (hasBlocked() == false) { blockIfNeeded(RecoveryState.Stage.TRANSLOG); } - return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); + return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e8715f9e8ecef..4d41e2b41b8b4 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -315,8 +315,8 @@ public void testClosesPreventsNewOperations() throws InterruptedException, Execu // expected } try { - indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null, - ThreadPool.Names.WRITE, ""); + indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, + randomNonNegativeLong(), null, ThreadPool.Names.WRITE, ""); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected @@ -327,7 +327,7 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc IndexShard indexShard = newShard(false); expectThrows(IndexShardNotStartedException.class, () -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100), - SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.WRITE, "")); + SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ThreadPool.Names.WRITE, "")); closeShards(indexShard); } @@ -351,6 +351,7 @@ public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBar indexShard.acquireReplicaOperationPermit( indexShard.getPendingPrimaryTerm(), indexShard.getGlobalCheckpoint(), + indexShard.getMaxSeqNoOfUpdatesOrDeletes(), new ActionListener() { @Override public void onResponse(Releasable releasable) { @@ -602,7 +603,7 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) { assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(primaryTerm, - indexShard.getGlobalCheckpoint(), new ActionListener() { + indexShard.getGlobalCheckpoint(), indexShard.getMaxSeqNoOfUpdatesOrDeletes(), new ActionListener() { @Override public void onResponse(Releasable releasable) { fail(); @@ -628,7 +629,8 @@ private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.WRITE, ""); + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), + randomNonNegativeLong(), fut, ThreadPool.Names.WRITE, ""); return fut.get(); } @@ -712,8 +714,8 @@ public void onFailure(Exception e) { } }; - indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired, - ThreadPool.Names.WRITE, ""); + indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, + randomNonNegativeLong(), onLockAcquired, ThreadPool.Names.WRITE, ""); assertFalse(onResponse.get()); assertTrue(onFailure.get()); @@ -785,6 +787,7 @@ private void finish() { indexShard.acquireReplicaOperationPermit( newPrimaryTerm, newGlobalCheckPoint, + randomNonNegativeLong(), listener, ThreadPool.Names.SAME, ""); } catch (Exception e) { @@ -836,6 +839,22 @@ private void finish() { closeShards(indexShard); } + public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception { + IndexShard replica = newStartedShard(false); + assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + long currentMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + replica.advanceMaxSeqNoOfUpdatesOrDeletes(currentMaxSeqNoOfUpdates); + + long newMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + PlainActionFuture fut = new PlainActionFuture<>(); + replica.acquireReplicaOperationPermit(replica.operationPrimaryTerm, replica.getGlobalCheckpoint(), + newMaxSeqNoOfUpdates, fut, ThreadPool.Names.WRITE, ""); + try (Releasable ignored = fut.actionGet()) { + assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates))); + } + closeShards(replica); + } + public void testGlobalCheckpointSync() throws IOException { // create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked final ShardId shardId = new ShardId("index", "_na_", 0); @@ -906,11 +925,14 @@ public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); + final long currentMaxSeqNoOfUpdates = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); + final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo); final Set docsBeforeRollback = getShardDocUIDs(indexShard); final CountDownLatch latch = new CountDownLatch(1); indexShard.acquireReplicaOperationPermit( indexShard.getPendingPrimaryTerm() + 1, globalCheckpoint, + maxSeqNoOfUpdatesOrDeletes, new ActionListener() { @Override public void onResponse(Releasable releasable) { @@ -926,6 +948,9 @@ public void onFailure(Exception e) { ThreadPool.Names.SAME, ""); latch.await(); + assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max( + Arrays.asList(maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica)) + )); final ShardRouting newRouting = indexShard.routingEntry().moveActiveReplicaToPrimary(); final CountDownLatch resyncLatch = new CountDownLatch(1); @@ -941,7 +966,9 @@ public void onFailure(Exception e) { assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback)); - assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo)); + assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max( + Arrays.asList(currentMaxSeqNoOfUpdates, maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica)) + )); closeShard(indexShard, false); } @@ -961,9 +988,11 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt final boolean shouldRollback = Math.max(globalCheckpoint, globalCheckpointOnReplica) < indexShard.seqNoStats().getMaxSeqNo() && indexShard.seqNoStats().getMaxSeqNo() != SequenceNumbers.NO_OPS_PERFORMED; final Engine beforeRollbackEngine = indexShard.getEngine(); + final long newMaxSeqNoOfUpdates = randomLongBetween(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), Long.MAX_VALUE); indexShard.acquireReplicaOperationPermit( indexShard.pendingPrimaryTerm + 1, globalCheckpoint, + newMaxSeqNoOfUpdates, new ActionListener() { @Override public void onResponse(final Releasable releasable) { @@ -990,6 +1019,7 @@ public void onFailure(final Exception e) { } else { assertThat(indexShard.getEngine(), sameInstance(beforeRollbackEngine)); } + assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(newMaxSeqNoOfUpdates)); // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); @@ -1016,6 +1046,7 @@ public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierExcep indexShard.acquireReplicaOperationPermit( primaryTerm + increment, indexShard.getGlobalCheckpoint(), + randomNonNegativeLong(), new ActionListener() { @Override public void onResponse(Releasable releasable) { @@ -1628,6 +1659,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { * - If flush and then recover from the existing store, delete #1 will be removed while index #0 is still retained and replayed. */ final IndexShard shard = newStartedShard(false); + shard.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete shard.applyDeleteOperationOnReplica(1, 2, "_doc", "id"); shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, @@ -2193,9 +2225,10 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxSeenAutoIdTimestamp) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp); + public long indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes) throws IOException { + final long localCheckpoint = super.indexTranslogOperations( + operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); assertFalse(replica.isSyncNeeded()); return localCheckpoint; } @@ -2302,8 +2335,9 @@ public void testShardActiveDuringPeerRecovery() throws IOException { }) { @Override public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); + long maxAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException { + final long localCheckpoint = super.indexTranslogOperations( + operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); // Shard should now be active since we did recover: assertTrue(replica.isActive()); return localCheckpoint; @@ -2350,8 +2384,9 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra @Override public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); + long maxAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException { + final long localCheckpoint = super.indexTranslogOperations( + operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); assertListenerCalled.accept(replica); return localCheckpoint; } @@ -3434,6 +3469,7 @@ public void testResetEngine() throws Exception { assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint)); assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint)); assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations())); + assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(globalCheckpoint)); closeShard(shard, false); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 9b17962f91b1f..7791e51445a34 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -207,11 +207,12 @@ public int totalOperations() { public Translog.Operation next() throws IOException { return operations.get(counter++); } - }, randomNonNegativeLong()); + }, randomNonNegativeLong(), randomNonNegativeLong()); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); assertThat(result.totalOperations, equalTo(expectedOps)); final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); - verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture(), ArgumentCaptor.forClass(Long.class).capture()); + verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture(), + ArgumentCaptor.forClass(Long.class).capture(), ArgumentCaptor.forClass(Long.class).capture()); List shippedOps = new ArrayList<>(); for (List list: shippedOpsCaptor.getAllValues()) { shippedOps.addAll(list); @@ -249,7 +250,7 @@ public Translog.Operation next() throws IOException { } while (op != null && opsToSkip.contains(op)); return op; } - }, randomNonNegativeLong())); + }, randomNonNegativeLong(), randomNonNegativeLong())); } } @@ -421,7 +422,7 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr @Override long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, - long maxSeenAutoIdTimestamp) { + long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) { phase2Called.set(true); return SequenceNumbers.UNASSIGNED_SEQ_NO; } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 45535e19672c7..2a53c79448d15 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -127,6 +127,7 @@ public void testRecoveryWithOutOfOrderDeleteWithTranslog() throws Exception { final String indexName = orgReplica.shardId().getIndexName(); // delete #1 + orgReplica.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id"); getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation // index #0 @@ -190,6 +191,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { final String indexName = orgReplica.shardId().getIndexName(); // delete #1 + orgReplica.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id"); orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment // index #0 diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 69a9f51ab698e..1bbfb6fa73de3 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -503,7 +503,7 @@ public static InternalEngine createInternalEngine( @Nullable final ToLongBiFunction seqNoForOperation, final EngineConfig config) { if (localCheckpointTrackerSupplier == null) { - return new InternalEngine(config) { + return new InternalTestEngine(config) { @Override IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { return (indexWriterFactory != null) ? @@ -519,7 +519,7 @@ protected long doGenerateSeqNoForOperation(final Operation operation) { } }; } else { - return new InternalEngine(config, localCheckpointTrackerSupplier) { + return new InternalTestEngine(config, localCheckpointTrackerSupplier) { @Override IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { return (indexWriterFactory != null) ? diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java b/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java new file mode 100644 index 0000000000000..8c52d57aabc39 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.SequenceNumbers; + +import java.io.IOException; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * An alternative of {@link InternalEngine} that allows tweaking internals to reduce noise in engine tests. + */ +class InternalTestEngine extends InternalEngine { + private final Map idToMaxSeqNo = ConcurrentCollections.newConcurrentMap(); + + InternalTestEngine(EngineConfig engineConfig) { + super(engineConfig); + } + + InternalTestEngine(EngineConfig engineConfig, BiFunction localCheckpointTrackerSupplier) { + super(engineConfig, localCheckpointTrackerSupplier); + } + + @Override + public IndexResult index(Index index) throws IOException { + if (index.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + idToMaxSeqNo.compute(index.id(), (id, existing) -> { + if (existing == null) { + return index.seqNo(); + } else { + long maxSeqNo = Math.max(index.seqNo(), existing); + advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo); + return maxSeqNo; + } + }); + } + return super.index(index); + } + + @Override + public DeleteResult delete(Delete delete) throws IOException { + if (delete.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + final long maxSeqNo = idToMaxSeqNo.compute(delete.id(), (id, existing) -> { + if (existing == null) { + return delete.seqNo(); + } else { + return Math.max(delete.seqNo(), existing); + } + }); + advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo); + } + return super.delete(delete); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index f590b99b48172..9021fd1efbb30 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -97,6 +97,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase { @@ -444,6 +445,7 @@ public synchronized void close() throws Exception { for (IndexShard replica : replicas) { try { assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp())); + assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(primary.getMaxSeqNoOfUpdatesOrDeletes())); } catch (AlreadyClosedException ignored) { } } @@ -563,6 +565,11 @@ public long globalCheckpoint() { return replicationGroup.getPrimary().getGlobalCheckpoint(); } + @Override + public long maxSeqNoOfUpdatesOrDeletes() { + return replicationGroup.getPrimary().getMaxSeqNoOfUpdatesOrDeletes(); + } + @Override public org.elasticsearch.index.shard.ReplicationGroup getReplicationGroup() { return replicationGroup.primary.getReplicationGroup(); @@ -577,12 +584,14 @@ public void performOn( final ShardRouting replicaRouting, final ReplicaRequest request, final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes, final ActionListener listener) { IndexShard replica = replicationGroup.replicas.stream() .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); replica.acquireReplicaOperationPermit( replicationGroup.primary.getPendingPrimaryTerm(), globalCheckpoint, + maxSeqNoOfUpdatesOrDeletes, new ActionListener() { @Override public void onResponse(Releasable releasable) { @@ -659,7 +668,8 @@ protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardRequest re @Override protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { - executeShardBulkOnReplica(request, replica, getPrimaryShard().getPendingPrimaryTerm(), getPrimaryShard().getGlobalCheckpoint()); + executeShardBulkOnReplica(request, replica, getPrimaryShard().getPendingPrimaryTerm(), + getPrimaryShard().getGlobalCheckpoint(), getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes()); } } @@ -690,10 +700,10 @@ BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request } private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, - long globalCheckpointOnPrimary) throws Exception { + long globalCheckpointOnPrimary, long maxSeqNoOfUpdatesOrDeletes) throws Exception { final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); - replica.acquireReplicaOperationPermit( - operationPrimaryTerm, globalCheckpointOnPrimary, permitAcquiredFuture, ThreadPool.Names.SAME, request); + replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary, + maxSeqNoOfUpdatesOrDeletes, permitAcquiredFuture, ThreadPool.Names.SAME, request); final Translog.Location location; try (Releasable ignored = permitAcquiredFuture.actionGet()) { location = TransportShardBulkAction.performOnReplica(request, replica); @@ -723,14 +733,16 @@ void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard } void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica, long term) throws Exception { - executeShardBulkOnReplica(request, replica, term, group.primary.getGlobalCheckpoint()); + executeShardBulkOnReplica(request, replica, term, + group.primary.getGlobalCheckpoint(), group.primary.getMaxSeqNoOfUpdatesOrDeletes()); } /** * Executes the delete request on the given replica shard. */ void deleteOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception { - executeShardBulkOnReplica(request, replica, group.primary.getPendingPrimaryTerm(), group.primary.getGlobalCheckpoint()); + executeShardBulkOnReplica(request, replica, group.primary.getPendingPrimaryTerm(), + group.primary.getGlobalCheckpoint(), group.primary.getMaxSeqNoOfUpdatesOrDeletes()); } class GlobalCheckpointSync extends ReplicationAction< @@ -774,7 +786,8 @@ protected PrimaryResult performOnPrimary(IndexShard primary, ResyncReplicationRe @Override protected void performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception { - executeResyncOnReplica(replica, request, getPrimaryShard().getPendingPrimaryTerm(), getPrimaryShard().getGlobalCheckpoint()); + executeResyncOnReplica(replica, request, getPrimaryShard().getPendingPrimaryTerm(), + getPrimaryShard().getGlobalCheckpoint(), getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes()); } } @@ -787,12 +800,12 @@ private TransportWriteAction.WritePrimaryResult acquirePermitFuture = new PlainActionFuture<>(); - replica.acquireReplicaOperationPermit( - operationPrimaryTerm, globalCheckpointOnPrimary, acquirePermitFuture, ThreadPool.Names.SAME, request); + replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary, + maxSeqNoOfUpdatesOrDeletes, acquirePermitFuture, ThreadPool.Names.SAME, request); try (Releasable ignored = acquirePermitFuture.actionGet()) { location = TransportResyncReplicationAction.performOnReplica(request, replica); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 370c29740b1b6..c9ef79720a29e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -699,8 +699,9 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getLocalCheckpoint()); } else { - result = shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); + final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1; + shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates + result = shard.applyIndexOperationOnReplica(seqNo, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { throw new TransportReplicationAction.RetryOnReplicaException(shard.shardId, "Mappings are not available on the replica yet, triggered update: " + result.getRequiredMappingUpdate()); @@ -720,7 +721,9 @@ protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id result = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL); shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getEngine().getLocalCheckpoint()); } else { - result = shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id); + final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1; + shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates + result = shard.applyDeleteOperationOnReplica(seqNo, 0L, type, id); } return result; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 4b4bce1fcce03..14f01b7754957 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -103,6 +103,8 @@ public static WritePrimaryResult ops = EngineTestCase.generateSingleDocHistory(true, versionType, 2, 2, 20, "id"); + ops.stream().mapToLong(op -> op.seqNo()).max().ifPresent(followingEngine::advanceMaxSeqNoOfUpdatesOrDeletes); EngineTestCase.assertOpsOnReplica(ops, followingEngine, true, logger); } } @@ -160,6 +161,7 @@ public void testDeleteSeqNoIsMaintained() throws IOException { seqNo, Engine.Operation.Origin.PRIMARY, (followingEngine, delete) -> { + followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(seqNo, Long.MAX_VALUE)); final Engine.DeleteResult result = followingEngine.delete(delete); assertThat(result.getSeqNo(), equalTo(seqNo)); });