Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,29 +188,28 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res

synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
final long newMinRequiredSeqNo;
final long newFromSeqNo;
if (response.getOperations().length == 0) {
newMinRequiredSeqNo = from;
newFromSeqNo = from;
} else {
assert response.getOperations()[0].seqNo() == from :
"first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0];
buffer.addAll(Arrays.asList(response.getOperations()));
final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo();
assert maxSeqNo ==
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong();
newMinRequiredSeqNo = maxSeqNo + 1;
newFromSeqNo = maxSeqNo + 1;
// update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again.
lastRequestedSeqno = Math.max(lastRequestedSeqno, maxSeqNo);
assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno +
"] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]";
coordinateWrites();
}

if (newMinRequiredSeqNo < maxRequiredSeqNo && isStopped() == false) {
int newSize = (int) (maxRequiredSeqNo - newMinRequiredSeqNo) + 1;
if (newFromSeqNo <= maxRequiredSeqNo && isStopped() == false) {
int newSize = Math.toIntExact(maxRequiredSeqNo - newFromSeqNo + 1);
LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...",
params.getFollowShardId(), response.getOperations().length, newMinRequiredSeqNo, maxRequiredSeqNo);
sendShardChangesRequest(newMinRequiredSeqNo, newSize, maxRequiredSeqNo);
params.getFollowShardId(), response.getOperations().length, newFromSeqNo, maxRequiredSeqNo);
sendShardChangesRequest(newFromSeqNo, newSize, maxRequiredSeqNo);
} else {
// read is completed, decrement
numConcurrentReads--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testCoordinateReads() {
public void testWriteBuffer() {
// Need to set concurrentWrites to 0, other the write buffer gets flushed immediately:
ShardFollowNodeTask task = createShardFollowTask(64, 1, 0, 32, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);

task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
Expand Down Expand Up @@ -234,11 +234,11 @@ public void testReceiveNonRetryableError() {

public void testHandleReadResponse() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);

task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
task.innerHandleReadResponse(0L, 64L, response);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
task.innerHandleReadResponse(0L, 63L, response);

assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
Expand All @@ -248,8 +248,8 @@ public void testHandleReadResponse() {
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}

Expand All @@ -263,7 +263,7 @@ public void testReceiveLessThanRequested() {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
task.innerHandleReadResponse(0L, 64L, response);

assertThat(shardChangesRequests.size(), equalTo(1));
Expand All @@ -288,7 +288,7 @@ public void testCancelAndReceiveLessThanRequested() {

shardChangesRequests.clear();
task.markAsCompleted();
ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
task.innerHandleReadResponse(0L, 64L, response);

assertThat(shardChangesRequests.size(), equalTo(0));
Expand All @@ -310,8 +310,8 @@ public void testReceiveNothingExpectedSomething() {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 0, 0L, 0L);
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 64L,
new ShardChangesAction.Response(0, 0, new Translog.Operation[0]));

assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
Expand All @@ -331,30 +331,30 @@ public void testDelayCoordinatesRead() {
task.run();
};
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);

task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 65, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinateReads()
task.innerHandleReadResponse(0L, 64L, response);
response = generateShardChangesResponse(0, 0, 0L, 64L);
task.innerHandleReadResponse(65L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);
task.innerHandleReadResponse(64L, 63L,
new ShardChangesAction.Response(0, 63L, new Translog.Operation[0]));
assertThat(counter[0], equalTo(1));
}

public void testMappingUpdate() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);

imdVersions.add(1L);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
task.handleReadResponse(0L, 64L, response);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
task.handleReadResponse(0L, 63L, response);

assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
Expand All @@ -363,23 +363,23 @@ public void testMappingUpdate() {
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}

public void testMappingUpdateRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);

int max = randomIntBetween(1, 10);
for (int i = 0; i < max; i++) {
mappingUpdateFailures.add(new ConnectException());
}
imdVersions.add(1L);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
task.handleReadResponse(0L, 64L, response);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
task.handleReadResponse(0L, 63L, response);

assertThat(mappingUpdateFailures.size(), equalTo(0));
assertThat(bulkShardOperationRequests.size(), equalTo(1));
Expand All @@ -388,8 +388,8 @@ public void testMappingUpdateRetryableError() {
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));

}

Expand Down Expand Up @@ -439,25 +439,25 @@ public void testMappingUpdateNonRetryableError() {

public void testCoordinateWrites() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);

task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);

assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));

ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
}

Expand Down Expand Up @@ -507,7 +507,7 @@ public void testMaxBatchOperationCount() {

public void testRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);

task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
Expand All @@ -518,9 +518,9 @@ public void testRetryableError() {
for (int i = 0; i < max; i++) {
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);

// Number of requests is equal to initial request + retried attempts:
assertThat(bulkShardOperationRequests.size(), equalTo(max + 1));
Expand All @@ -535,7 +535,7 @@ public void testRetryableError() {

public void testRetryableErrorRetriedTooManyTimes() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);

task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
Expand All @@ -546,9 +546,9 @@ public void testRetryableErrorRetriedTooManyTimes() {
for (int i = 0; i < max; i++) {
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 643);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);

// Number of requests is equal to initial request + retried attempts:
assertThat(bulkShardOperationRequests.size(), equalTo(11));
Expand All @@ -563,17 +563,17 @@ public void testRetryableErrorRetriedTooManyTimes() {

public void testNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);
startTask(task, 63, -1);

task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

writeFailures.add(new RuntimeException());
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
task.innerHandleReadResponse(0L, 63L, response);

assertThat(bulkShardOperationRequests.size(), equalTo(1));
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
Expand All @@ -592,7 +592,7 @@ public void testMaxBatchBytesLimit() {
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 64L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);

Expand All @@ -610,7 +610,7 @@ public void testHandleWriteResponse() {

shardChangesRequests.clear();
followerGlobalCheckpoints.add(63L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 63L);
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 63L, response);

Expand Down Expand Up @@ -702,10 +702,10 @@ public void markAsFailed(Exception e) {
};
}

private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, int size, long imdVersion,
private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long imdVersion,
long leaderGlobalCheckPoint) {
List<Translog.Operation> ops = new ArrayList<>();
for (long seqNo = fromSeqNo; seqNo < size; seqNo++) {
for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
String id = UUIDs.randomBase64UUID();
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.function.Consumer;
import java.util.function.LongConsumer;

import static org.hamcrest.Matchers.equalTo;

public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase {

public void testSimpleCcrReplication() throws Exception {
Expand All @@ -51,7 +53,10 @@ public void testSimpleCcrReplication() throws Exception {

leaderGroup.assertAllEqual(docCount);
int expectedCount = docCount;
assertBusy(() -> followerGroup.assertAllEqual(expectedCount));
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(expectedCount);
});
shardFollowTask.markAsCompleted();
}
}
Expand Down