Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
*/
package org.elasticsearch.xpack.ccr.action;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -19,6 +21,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -36,8 +39,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public class ShardChangesAction extends Action<ShardChangesAction.Response> {

Expand All @@ -59,6 +64,7 @@ public static class Request extends SingleShardRequest<Request> {
private int maxOperationCount;
private ShardId shardId;
private String expectedHistoryUUID;
private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;

public Request(ShardId shardId, String expectedHistoryUUID) {
Expand Down Expand Up @@ -102,6 +108,14 @@ public String getExpectedHistoryUUID() {
return expectedHistoryUUID;
}

public TimeValue getPollTimeout() {
return pollTimeout;
}

public void setPollTimeout(final TimeValue pollTimeout) {
this.pollTimeout = Objects.requireNonNull(pollTimeout, "pollTimeout");
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -126,6 +140,7 @@ public void readFrom(StreamInput in) throws IOException {
maxOperationCount = in.readVInt();
shardId = ShardId.readShardId(in);
expectedHistoryUUID = in.readString();
pollTimeout = in.readTimeValue();
maxOperationSizeInBytes = in.readVLong();
}

Expand All @@ -136,6 +151,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(maxOperationCount);
shardId.writeTo(out);
out.writeString(expectedHistoryUUID);
out.writeTimeValue(pollTimeout);
out.writeVLong(maxOperationSizeInBytes);
}

Expand All @@ -149,12 +165,13 @@ public boolean equals(final Object o) {
maxOperationCount == request.maxOperationCount &&
Objects.equals(shardId, request.shardId) &&
Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) &&
Objects.equals(pollTimeout, request.pollTimeout) &&
maxOperationSizeInBytes == request.maxOperationSizeInBytes;
}

@Override
public int hashCode() {
return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes);
return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, pollTimeout, maxOperationSizeInBytes);
}

@Override
Expand All @@ -164,6 +181,7 @@ public String toString() {
", maxOperationCount=" + maxOperationCount +
", shardId=" + shardId +
", expectedHistoryUUID=" + expectedHistoryUUID +
", pollTimeout=" + pollTimeout +
", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
'}';
}
Expand Down Expand Up @@ -265,19 +283,90 @@ public TransportAction(Settings settings,

@Override
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();

final Translog.Operation[] operations = getOperations(
indexShard,
seqNoStats.getGlobalCheckpoint(),
request.fromSeqNo,
request.maxOperationCount,
request.expectedHistoryUUID,
request.maxOperationSizeInBytes);
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
request.getFromSeqNo(),
request.getMaxOperationCount(),
request.getExpectedHistoryUUID(),
request.getMaxOperationSizeInBytes());
return getResponse(mappingVersion, seqNoStats, operations);
}

@Override
protected void asyncShardOperation(
final Request request,
final ShardId shardId,
final ActionListener<Response> listener) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();

if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) {
logger.trace(
"{} waiting for global checkpoint advancement from [{}] to [{}]",
shardId,
seqNoStats.getGlobalCheckpoint(),
request.getFromSeqNo());
indexShard.addGlobalCheckpointListener(
request.getFromSeqNo(),
(g, e) -> {
if (g != UNASSIGNED_SEQ_NO) {
assert request.getFromSeqNo() <= g
: shardId + " only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]";
globalCheckpointAdvanced(shardId, g, request, listener);
} else {
assert e != null;
globalCheckpointAdvancementFailure(shardId, e, request, listener, indexShard);
}
},
request.getPollTimeout());
} else {
super.asyncShardOperation(request, shardId, listener);
}
}

private void globalCheckpointAdvanced(
final ShardId shardId,
final long globalCheckpoint,
final Request request,
final ActionListener<Response> listener) {
logger.trace("{} global checkpoint advanced to [{}] after waiting for [{}]", shardId, globalCheckpoint, request.getFromSeqNo());
try {
super.asyncShardOperation(request, shardId, listener);
} catch (final IOException caught) {
listener.onFailure(caught);
}
}

private void globalCheckpointAdvancementFailure(
final ShardId shardId,
final Exception e,
final Request request,
final ActionListener<Response> listener,
final IndexShard indexShard) {
logger.trace(
() -> new ParameterizedMessage(
"{} exception waiting for global checkpoint advancement to [{}]", shardId, request.getFromSeqNo()),
e);
if (e instanceof TimeoutException) {
try {
final long mappingVersion =
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY));
} catch (final Exception caught) {
caught.addSuppressed(e);
listener.onFailure(caught);
}
} else {
listener.onFailure(e);
}
}

@Override
Expand All @@ -300,7 +389,7 @@ protected Response newResponse() {

}

private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];

/**
* Returns at most maxOperationCount operations from the specified from sequence number.
Expand All @@ -324,7 +413,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
historyUUID + "]");
}
if (fromSeqNo > globalCheckpoint) {
return EMPTY_OPERATIONS_ARRAY;
throw new IllegalStateException(
"not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]");
}
int seenBytes = 0;
// - 1 is needed, because toSeqNo is inclusive
Expand All @@ -344,4 +434,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}

static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) {
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {

private final String leaderIndex;
private final ShardFollowTask params;
private final TimeValue pollTimeout;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardChangesRequestDelay;
private final BiConsumer<TimeValue, Runnable> scheduler;
private final LongSupplier relativeTimeProvider;

Expand Down Expand Up @@ -82,8 +82,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.params = params;
this.scheduler = scheduler;
this.relativeTimeProvider = relativeTimeProvider;
this.pollTimeout = params.getPollTimeout();
this.maxRetryDelay = params.getMaxRetryDelay();
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
/*
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
* concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
Expand Down Expand Up @@ -229,12 +229,16 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
}
innerSendShardChangesRequest(from, maxOperationCount,
response -> {
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
fetchExceptions.remove(from);
operationsReceived += response.getOperations().length;
totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
if (response.getOperations().length > 0) {
// do not count polls against fetch stats
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
fetchExceptions.remove(from);
operationsReceived += response.getOperations().length;
totalTransferredBytes +=
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}
}
handleReadResponse(from, maxRequiredSeqNo, response);
},
Expand Down Expand Up @@ -286,15 +290,7 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar
} else {
// read is completed, decrement
numConcurrentReads--;
if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqNo) {
// we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay
// future requests
LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads",
params.getFollowShardId());
scheduler.accept(idleShardChangesRequestDelay, this::coordinateReads);
} else {
coordinateReads();
}
coordinateReads();
}
}

Expand Down
Loading