Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl

protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
protected volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
protected volatile long operationPrimaryTerm;
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
private volatile long operationPrimaryTerm;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
final EngineFactory engineFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,18 +956,18 @@ private void finish() {
// our operation should be blocked until the previous operations complete
assertFalse(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm));
assertThat(indexShard.getOperationPrimaryTerm(), equalTo(primaryTerm));
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm));
Releasables.close(operation1);
// our operation should still be blocked
assertFalse(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm));
assertThat(indexShard.getOperationPrimaryTerm(), equalTo(primaryTerm));
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm));
Releasables.close(operation2);
barrier.await();
// now lock acquisition should have succeeded
assertThat(indexShard.operationPrimaryTerm, equalTo(newPrimaryTerm));
assertThat(indexShard.getOperationPrimaryTerm(), equalTo(newPrimaryTerm));
assertThat(indexShard.getPendingPrimaryTerm(), equalTo(newPrimaryTerm));
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
if (engineClosed) {
Expand Down Expand Up @@ -1008,7 +1008,7 @@ public void onFailure(Exception e) {
}
};

final long oldPrimaryTerm = indexShard.pendingPrimaryTerm - 1;
final long oldPrimaryTerm = indexShard.getPendingPrimaryTerm() - 1;
randomReplicaOperationPermitAcquisition(indexShard, oldPrimaryTerm, indexShard.getGlobalCheckpoint(),
randomNonNegativeLong(), onLockAcquired, "");
latch.await();
Expand All @@ -1030,7 +1030,7 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception

long newMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
randomReplicaOperationPermitAcquisition(replica, replica.operationPrimaryTerm, replica.getGlobalCheckpoint(),
randomReplicaOperationPermitAcquisition(replica, replica.getOperationPrimaryTerm(), replica.getGlobalCheckpoint(),
newMaxSeqNoOfUpdates, fut, "");
try (Releasable ignored = fut.actionGet()) {
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates)));
Expand Down Expand Up @@ -1181,7 +1181,7 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt
final Engine beforeRollbackEngine = indexShard.getEngine();
final long newMaxSeqNoOfUpdates = randomLongBetween(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), Long.MAX_VALUE);
randomReplicaOperationPermitAcquisition(indexShard,
indexShard.pendingPrimaryTerm + 1,
indexShard.getPendingPrimaryTerm() + 1,
globalCheckpoint,
newMaxSeqNoOfUpdates,
new ActionListener<Releasable>() {
Expand Down Expand Up @@ -2105,10 +2105,6 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
new SourceToParse(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON));
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
// Here we try to increase term (i.e. a new primary is promoted) without rolling back a replica so we can keep stale operations
// in the index commit; then verify that a recovery from store (started with the safe commit) will remove all stale operations.
shard.pendingPrimaryTerm++;
shard.operationPrimaryTerm++;
shard.getEngine().rollTranslogGeneration();
shard.markSeqNoAsNoop(1, "test");
shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
Expand All @@ -2118,11 +2114,20 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
closeShard(shard, false);
// Recovering from store should discard doc #1
final ShardRouting replicaRouting = shard.routingEntry();
IndexShard newShard = reinitShard(shard,
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
newShard.pendingPrimaryTerm++;
newShard.operationPrimaryTerm++;
final IndexMetaData newShardIndexMetadata = IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
.primaryTerm(replicaRouting.shardId().id(), shard.getOperationPrimaryTerm() + 1)
.build();
closeShards(shard);
IndexShard newShard = newShard(
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.ExistingStoreRecoverySource.INSTANCE),
shard.shardPath(),
newShardIndexMetadata,
null,
null,
shard.getEngineFactory(),
shard.getGlobalCheckpointSyncer(),
EMPTY_EVENT_LISTENER);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testGetForUpdate() throws IOException {
assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue());

final long primaryTerm = primary.operationPrimaryTerm;
final long primaryTerm = primary.getOperationPrimaryTerm();
testGet1 = primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm);
assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");

Expand Down