Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected void sendReplicaRequest(
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint());
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
Expand Down Expand Up @@ -429,9 +429,6 @@ public interface ReplicaResponse {

/** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */
long localCheckpoint();

/** the allocation id of the replica shard */
String allocationId();
}

public static class RetryOnPrimaryException extends ElasticsearchException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -523,8 +524,7 @@ public void onResponse(Releasable releasable) {
try {
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint());
final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint());
replicaResult.respond(new ResponseListener(response));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
Expand Down Expand Up @@ -1011,14 +1011,12 @@ public long globalCheckpoint() {

public static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
private long localCheckpoint;
private String allocationId;

ReplicaResponse() {

}

public ReplicaResponse(String allocationId, long localCheckpoint) {
this.allocationId = allocationId;
public ReplicaResponse(long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
}

Expand All @@ -1027,9 +1025,9 @@ public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.readFrom(in);
localCheckpoint = in.readZLong();
allocationId = in.readString();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

Expand All @@ -1038,7 +1036,6 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.writeTo(out);
out.writeZLong(localCheckpoint);
out.writeString(allocationId);
} else {
// we use to write empty responses
Empty.INSTANCE.writeTo(out);
Expand All @@ -1049,11 +1046,6 @@ public void writeTo(StreamOutput out) throws IOException {
public long localCheckpoint() {
return localCheckpoint;
}

@Override
public String allocationId() {
return allocationId;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected void sendReplicaRequest(
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,23 +464,16 @@ public long globalCheckpoint() {
}

static class ReplicaResponse implements ReplicationOperation.ReplicaResponse {
final String allocationId;
final long localCheckpoint;

ReplicaResponse(String allocationId, long localCheckpoint) {
this.allocationId = allocationId;
ReplicaResponse(long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
}

@Override
public long localCheckpoint() {
return localCheckpoint;
}

@Override
public String allocationId() {
return allocationId;
}
}

static class TestReplicaProxy implements ReplicationOperation.Replicas<Request> {
Expand Down Expand Up @@ -515,7 +508,7 @@ public void performOn(
final String allocationId = replica.allocationId().getId();
Long existing = generatedLocalCheckpoints.put(allocationId, checkpoint);
assertNull(existing);
listener.onResponse(new ReplicaResponse(allocationId, checkpoint));
listener.onResponse(new ReplicaResponse(checkpoint));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
assertThat(captures, arrayWithSize(1));
if (randomBoolean()) {
final TransportReplicationAction.ReplicaResponse response =
new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong());
final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong());
transport.handleResponse(captures[0].requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
assertThat(captures, arrayWithSize(1));
if (randomBoolean()) {
final TransportReplicationAction.ReplicaResponse response =
new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong());
final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong());
transport.handleResponse(captures[0].requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,9 +537,7 @@ public void onResponse(Releasable releasable) {
try {
performOnReplica(request, replica);
releasable.close();
listener.onResponse(
new ReplicaResponse(
replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint()));
listener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint()));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable);
listener.onFailure(e);
Expand Down