diff --git a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 8f535bfed2676..c723a175ad731 100644 --- a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -93,7 +93,7 @@ protected void sendReplicaRequest( if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { super.sendReplicaRequest(replicaRequest, node, listener); } else { - listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO)); + listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO)); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 5623d9bbc1174..fb04e0f262588 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -187,7 +187,7 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep public void onResponse(ReplicaResponse response) { successfulShards.incrementAndGet(); try { - primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint()); + primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint()); } catch (final AlreadyClosedException e) { // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally } catch (final Exception e) { @@ -429,9 +429,6 @@ public interface ReplicaResponse { /** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */ long localCheckpoint(); - - /** the allocation id of the replica shard */ - String allocationId(); } public static class RetryOnPrimaryException extends ElasticsearchException { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index b364870e23ad6..71594dc1ec6cf 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -53,6 +53,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; @@ -523,8 +524,7 @@ public void onResponse(Releasable releasable) { try { final ReplicaResult replicaResult = shardOperationOnReplica(request, replica); releasable.close(); // release shard operation lock before responding to caller - final TransportReplicationAction.ReplicaResponse response = - new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint()); + final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint()); replicaResult.respond(new ResponseListener(response)); } catch (final Exception e) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller @@ -1011,14 +1011,12 @@ public long globalCheckpoint() { public static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse { private long localCheckpoint; - private String allocationId; ReplicaResponse() { } - public ReplicaResponse(String allocationId, long localCheckpoint) { - this.allocationId = allocationId; + public ReplicaResponse(long localCheckpoint) { this.localCheckpoint = localCheckpoint; } @@ -1027,9 +1025,9 @@ public void readFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { super.readFrom(in); localCheckpoint = in.readZLong(); - allocationId = in.readString(); } else { // 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing. + localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; } } @@ -1038,7 +1036,6 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { super.writeTo(out); out.writeZLong(localCheckpoint); - out.writeString(allocationId); } else { // we use to write empty responses Empty.INSTANCE.writeTo(out); @@ -1049,11 +1046,6 @@ public void writeTo(StreamOutput out) throws IOException { public long localCheckpoint() { return localCheckpoint; } - - @Override - public String allocationId() { - return allocationId; - } } /** diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 7baed972fcba8..897dc9beb32e5 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -89,7 +89,7 @@ protected void sendReplicaRequest( if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { super.sendReplicaRequest(replicaRequest, node, listener); } else { - listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO)); + listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO)); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 88cf5769a4850..dc33be6e4d4de 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -464,11 +464,9 @@ public long globalCheckpoint() { } static class ReplicaResponse implements ReplicationOperation.ReplicaResponse { - final String allocationId; final long localCheckpoint; - ReplicaResponse(String allocationId, long localCheckpoint) { - this.allocationId = allocationId; + ReplicaResponse(long localCheckpoint) { this.localCheckpoint = localCheckpoint; } @@ -476,11 +474,6 @@ static class ReplicaResponse implements ReplicationOperation.ReplicaResponse { public long localCheckpoint() { return localCheckpoint; } - - @Override - public String allocationId() { - return allocationId; - } } static class TestReplicaProxy implements ReplicationOperation.Replicas { @@ -515,7 +508,7 @@ public void performOn( final String allocationId = replica.allocationId().getId(); Long existing = generatedLocalCheckpoints.put(allocationId, checkpoint); assertNull(existing); - listener.onResponse(new ReplicaResponse(allocationId, checkpoint)); + listener.onResponse(new ReplicaResponse(checkpoint)); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index a4a34b7002c94..8bfc31f87111d 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -636,8 +636,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); assertThat(captures, arrayWithSize(1)); if (randomBoolean()) { - final TransportReplicationAction.ReplicaResponse response = - new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong()); + final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong()); transport.handleResponse(captures[0].requestId, response); assertTrue(listener.isDone()); assertThat(listener.get(), equalTo(response)); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 7e1ff9e1ca06a..a1d33960f7b13 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -286,8 +286,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); assertThat(captures, arrayWithSize(1)); if (randomBoolean()) { - final TransportReplicationAction.ReplicaResponse response = - new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong()); + final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong()); transport.handleResponse(captures[0].requestId, response); assertTrue(listener.isDone()); assertThat(listener.get(), equalTo(response)); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 75a59a36d7c98..be1b4661c5329 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -537,9 +537,7 @@ public void onResponse(Releasable releasable) { try { performOnReplica(request, replica); releasable.close(); - listener.onResponse( - new ReplicaResponse( - replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint())); + listener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint())); } catch (final Exception e) { Releasables.closeWhileHandlingException(releasable); listener.onFailure(e);