Skip to content

Commit 1572b6f

Browse files
committed
CCR: Expose the operation primary term
Relates #32442
1 parent e4a919f commit 1572b6f

File tree

4 files changed

+11
-6
lines changed

4 files changed

+11
-6
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,11 @@ public long getPendingPrimaryTerm() {
377377
return this.pendingPrimaryTerm;
378378
}
379379

380+
/** Returns the primary term that is currently being used to assign to operations */
381+
public long getOperationPrimaryTerm() {
382+
return this.operationPrimaryTerm;
383+
}
384+
380385
/**
381386
* Returns the latest cluster routing entry received with this shard.
382387
*/

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperations
7878
index.type(),
7979
index.id(),
8080
index.seqNo(),
81-
primary.getPrimaryTerm(),
81+
primary.getOperationPrimaryTerm(),
8282
index.version(),
8383
index.versionType(),
8484
BytesReference.toBytes(index.source()),
@@ -93,13 +93,13 @@ public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperations
9393
delete.id(),
9494
delete.uid(),
9595
delete.seqNo(),
96-
primary.getPrimaryTerm(),
96+
primary.getOperationPrimaryTerm(),
9797
delete.version(),
9898
delete.versionType());
9999
break;
100100
case NO_OP:
101101
final Translog.NoOp noOp = (Translog.NoOp) operation;
102-
operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getPrimaryTerm(), noOp.reason());
102+
operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getOperationPrimaryTerm(), noOp.reason());
103103
break;
104104
default:
105105
throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]");

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ public void testPrimaryTermFromFollower() throws IOException {
6666
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
6767
Translog.Operation operation;
6868
while ((operation = snapshot.next()) != null) {
69-
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm()));
69+
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getOperationPrimaryTerm()));
7070
}
7171
}
7272

7373
for (final Translog.Operation operation : result.replicaRequest().getOperations()) {
74-
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm()));
74+
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getOperationPrimaryTerm()));
7575
}
7676

7777
closeShards(followerPrimary);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void testDoNotFillGaps() throws Exception {
6060
true,
6161
ShardRoutingState.STARTED,
6262
replicaRouting.allocationId());
63-
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
63+
indexShard.updateShardState(primaryRouting, indexShard.getOperationPrimaryTerm() + 1, (shard, listener) -> {},
6464
0L, Collections.singleton(primaryRouting.allocationId().getId()),
6565
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());
6666

0 commit comments

Comments
 (0)