diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 4a614d8874aff..bc51bc7b67164 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -92,6 +93,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ volatile boolean primaryMode; + /** + * The current operation primary term. Management of this value is done through {@link IndexShard} and must only be done when safe. See + * {@link #setOperationPrimaryTerm(long)}. + */ + private volatile long operationPrimaryTerm; + /** * Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff} * and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the @@ -408,6 +415,25 @@ public boolean isPrimaryMode() { return primaryMode; } + /** + * Returns the current operation primary term. + * + * @return the primary term + */ + public long getOperationPrimaryTerm() { + return operationPrimaryTerm; + } + + /** + * Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That + * is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance. + * + * @param operationPrimaryTerm the new operation primary term + */ + public void setOperationPrimaryTerm(final long operationPrimaryTerm) { + this.operationPrimaryTerm = operationPrimaryTerm; + } + /** * Returns whether the replication tracker has relocated away to another shard copy. */ @@ -527,6 +553,7 @@ private static long inSyncCheckpointStates( * @param shardId the shard ID * @param allocationId the allocation ID * @param indexSettings the index settings + * @param operationPrimaryTerm the current primary term * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires */ @@ -534,6 +561,7 @@ public ReplicationTracker( final ShardId shardId, final String allocationId, final IndexSettings indexSettings, + final long operationPrimaryTerm, final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, @@ -542,6 +570,7 @@ public ReplicationTracker( assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; this.primaryMode = false; + this.operationPrimaryTerm = operationPrimaryTerm; this.handoffInProgress = false; this.appliedClusterStateVersion = -1L; this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 446b21269d5a5..42822942e3adf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -200,7 +200,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - private volatile long operationPrimaryTerm; protected final AtomicReference currentEngineReference = new AtomicReference<>(); final EngineFactory engineFactory; @@ -307,17 +306,21 @@ public IndexShard( this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); final String aId = shardRouting.allocationId().getId(); + final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); + this.pendingPrimaryTerm = primaryTerm; this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger); - this.replicationTracker = + final ReplicationTracker replicationTracker = new ReplicationTracker( shardId, aId, indexSettings, + primaryTerm, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, retentionLeaseSyncer); + this.replicationTracker = replicationTracker; // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -337,8 +340,6 @@ public boolean shouldCache(Query query) { } indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool); searcherWrapper = indexSearcherWrapper; - pendingPrimaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); - operationPrimaryTerm = pendingPrimaryTerm; refreshListeners = buildRefreshListeners(); lastSearcherAccess.set(threadPool.relativeTimeInMillis()); persistMetadata(path, indexSettings, shardRouting, null, logger); @@ -400,7 +401,7 @@ public long getPendingPrimaryTerm() { /** Returns the primary term that is currently being used to assign to operations */ public long getOperationPrimaryTerm() { - return this.operationPrimaryTerm; + return replicationTracker.getOperationPrimaryTerm(); } /** @@ -509,7 +510,7 @@ public void updateShardState(final ShardRouting newRouting, assert pendingPrimaryTerm == newPrimaryTerm : "shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]" + ", current routing: " + currentRouting + ", new routing: " + newRouting; - assert operationPrimaryTerm == newPrimaryTerm; + assert getOperationPrimaryTerm() == newPrimaryTerm; try { replicationTracker.activatePrimaryMode(getLocalCheckpoint()); /* @@ -705,14 +706,14 @@ public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType boolean isRetry) throws IOException { assert versionType.validateVersionForWrites(version); - return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, ifSeqNo, + return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, getOperationPrimaryTerm(), version, versionType, ifSeqNo, ifPrimaryTerm, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse) throws IOException { - return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, UNASSIGNED_SEQ_NO, 0, + return applyIndexOperation(getEngine(), seqNo, getOperationPrimaryTerm(), version, null, UNASSIGNED_SEQ_NO, 0, autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse); } @@ -720,8 +721,8 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o @Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm, long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { - assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm - + "]"; + assert opPrimaryTerm <= getOperationPrimaryTerm() + : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]"; ensureWriteAllowed(origin); Engine.Index operation; try { @@ -784,13 +785,13 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc } public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException { - return markSeqNoAsNoop(getEngine(), seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA); + return markSeqNoAsNoop(getEngine(), seqNo, getOperationPrimaryTerm(), reason, Engine.Operation.Origin.REPLICA); } private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrimaryTerm, String reason, Engine.Operation.Origin origin) throws IOException { - assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm - + "]"; + assert opPrimaryTerm <= getOperationPrimaryTerm() + : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]"; long startTime = System.nanoTime(); ensureWriteAllowed(origin); final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason); @@ -806,31 +807,31 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) { } public Engine.IndexResult getFailedIndexResult(Exception e, long version) { - return new Engine.IndexResult(e, version, operationPrimaryTerm); + return new Engine.IndexResult(e, version, getOperationPrimaryTerm()); } public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) { - return new Engine.DeleteResult(e, version, operationPrimaryTerm); + return new Engine.DeleteResult(e, version, getOperationPrimaryTerm()); } public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType, long ifSeqNo, long ifPrimaryTerm) throws IOException { assert versionType.validateVersionForWrites(version); - return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, + return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, getOperationPrimaryTerm(), version, type, id, versionType, ifSeqNo, ifPrimaryTerm, Engine.Operation.Origin.PRIMARY); } public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException { return applyDeleteOperation( - getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA); + getEngine(), seqNo, getOperationPrimaryTerm(), version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA); } private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id, @Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm, Engine.Operation.Origin origin) throws IOException { - assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm - + "]"; + assert opPrimaryTerm <= getOperationPrimaryTerm() + : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]"; ensureWriteAllowed(origin); // When there is a single type, the unique identifier is only composed of the _id, // so there is no way to differentiate foo#1 from bar#1. This is especially an issue @@ -846,7 +847,7 @@ private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long return new Engine.DeleteResult(update); } } catch (MapperParsingException | IllegalArgumentException | TypeMissingException e) { - return new Engine.DeleteResult(e, version, operationPrimaryTerm, seqNo, false); + return new Engine.DeleteResult(e, version, getOperationPrimaryTerm(), seqNo, false); } if (mapperService.resolveDocumentType(type).equals(mapperService.documentMapper().type()) == false) { // We should never get there due to the fact that we generate mapping updates on deletes, @@ -1273,7 +1274,7 @@ public void prepareForIndexRecovery() { } public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { - getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo); + getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } /** @@ -2388,7 +2389,7 @@ private EngineConfig newEngineConfig() { Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases, - () -> operationPrimaryTerm, tombstoneDocSupplier()); + () -> getOperationPrimaryTerm(), tombstoneDocSupplier()); } /** @@ -2468,7 +2469,7 @@ private void bumpPrimaryTerm(final long newPrimaryTerm, @Nullable ActionListener combineWithAction) { assert Thread.holdsLock(mutex); assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null); - assert operationPrimaryTerm <= pendingPrimaryTerm; + assert getOperationPrimaryTerm() <= pendingPrimaryTerm; final CountDownLatch termUpdated = new CountDownLatch(1); asyncBlockOperations(new ActionListener() { @Override @@ -2494,12 +2495,12 @@ private void innerFail(final Exception e) { public void onResponse(final Releasable releasable) { final RunOnce releaseOnce = new RunOnce(releasable::close); try { - assert operationPrimaryTerm <= pendingPrimaryTerm; + assert getOperationPrimaryTerm() <= pendingPrimaryTerm; termUpdated.await(); // indexShardOperationPermits doesn't guarantee that async submissions are executed // in the order submitted. We need to guard against another term bump - if (operationPrimaryTerm < newPrimaryTerm) { - operationPrimaryTerm = newPrimaryTerm; + if (getOperationPrimaryTerm() < newPrimaryTerm) { + replicationTracker.setOperationPrimaryTerm(newPrimaryTerm); onBlocked.run(); } } catch (final Exception e) { @@ -2585,14 +2586,14 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, final ActionListener operationListener = new ActionListener() { @Override public void onResponse(final Releasable releasable) { - if (opPrimaryTerm < operationPrimaryTerm) { + if (opPrimaryTerm < getOperationPrimaryTerm()) { releasable.close(); final String message = String.format( Locale.ROOT, "%s operation primary term [%d] is too old (current [%d])", shardId, opPrimaryTerm, - operationPrimaryTerm); + getOperationPrimaryTerm()); onPermitAcquired.onFailure(new IllegalStateException(message)); } else { assert assertReplicationTarget(); @@ -2653,7 +2654,7 @@ public void onFailure(final Exception e) { } private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) { - return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm); + return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > getOperationPrimaryTerm()); } public int getActiveOperationsCount() { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 90eb162374469..d7f135ffe4816 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -53,6 +53,7 @@ public void testAddOrRenewRetentionLease() { new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, @@ -88,6 +89,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, @@ -143,6 +145,7 @@ private void runExpirationTest(final boolean primaryMode) { new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", settings), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, value -> {}, currentTimeMillis::get, @@ -215,6 +218,7 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", settings), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, value -> {}, currentTimeMillis::get, diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java index a36006a5fc4c1..5165f2e8dc9e4 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java @@ -45,6 +45,7 @@ ReplicationTracker newTracker( new ShardId("test", "_na_", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, updatedGlobalCheckpoint, currentTimeMillisSupplier, diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index b61e3f647b9d2..7731f3cbf1d5f 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -683,15 +683,16 @@ public void testPrimaryContextHandoff() throws IOException { final ShardId shardId = new ShardId("test", "_na_", 0); FakeClusterState clusterState = initialState(); - final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId(); + final AllocationId aId = clusterState.routingTable.primaryShard().allocationId(); final LongConsumer onUpdate = updatedGlobalCheckpoint -> {}; + final long primaryTerm = randomNonNegativeLong(); final long globalCheckpoint = UNASSIGNED_SEQ_NO; final BiConsumer, ActionListener> onNewRetentionLease = (leases, listener) -> {}; ReplicationTracker oldPrimary = new ReplicationTracker( - shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); + shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); ReplicationTracker newPrimary = new ReplicationTracker( - shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); + shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 35667b0f87a1c..d893168b08205 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -655,6 +655,7 @@ public EngineConfig config( shardId, allocationId.getId(), indexSettings, + randomNonNegativeLong(), SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L,