Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -114,9 +115,13 @@ public void execute() throws Exception {
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}

successfulShards.incrementAndGet(); // mark primary as successful
Expand All @@ -136,7 +141,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
}

private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
final ReplicationGroup replicationGroup) {
final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
// for total stats, add number of unassigned shards and
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
Expand All @@ -145,19 +150,20 @@ private void performOnReplicas(final ReplicaRequest replicaRequest, final long g

for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint);
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
}
}
}

private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}

totalShards.incrementAndGet();
pendingActions.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
Expand Down Expand Up @@ -322,6 +328,12 @@ public interface Primary<
*/
long globalCheckpoint();

/**
* Returns the maximum seq_no of updates (index operations overwrite Lucene) or deletes on the primary.
* This value must be captured after the execution of a replication request on the primary is completed.
*/
long maxSeqNoOfUpdatesOrDeletes();

/**
* Returns the current replication group on the primary shard
*
Expand All @@ -338,12 +350,15 @@ public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
/**
* Performs the specified request on the specified replica.
*
* @param replica the shard this request should be executed on
* @param replicaRequest the operation to perform
* @param globalCheckpoint the global checkpoint on the primary
* @param listener callback for handling the response or failure
* @param replica the shard this request should be executed on
* @param replicaRequest the operation to perform
* @param globalCheckpoint the global checkpoint on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary
* after this replication was executed on it.
* @param listener callback for handling the response or failure
*/
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, ActionListener<ReplicaResponse> listener);
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);

/**
* Fail the specified shard if needed, removing it from the current set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima

/**
* Synchronously execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}.
* {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
Expand Down Expand Up @@ -489,6 +489,7 @@ public void messageReceived(
replicaRequest.getTargetAllocationID(),
replicaRequest.getPrimaryTerm(),
replicaRequest.getGlobalCheckpoint(),
replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(),
channel,
(ReplicationTask) task).run();
}
Expand All @@ -513,6 +514,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
private final String targetAllocationID;
private final long primaryTerm;
private final long globalCheckpoint;
private final long maxSeqNoOfUpdatesOrDeletes;
private final TransportChannel channel;
private final IndexShard replica;
/**
Expand All @@ -528,6 +530,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
String targetAllocationID,
long primaryTerm,
long globalCheckpoint,
long maxSeqNoOfUpdatesOrDeletes,
TransportChannel channel,
ReplicationTask task) {
this.request = request;
Expand All @@ -536,6 +539,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
this.targetAllocationID = targetAllocationID;
this.primaryTerm = primaryTerm;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
final ShardId shardId = request.shardId();
assert shardId != null : "request shardId must be set";
this.replica = getIndexShard(shardId);
Expand Down Expand Up @@ -575,7 +579,8 @@ public void onNewClusterState(ClusterState state) {
new TransportChannelResponseHandler<>(logger, channel, extraMessage,
() -> TransportResponse.Empty.INSTANCE);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint),
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm,
globalCheckpoint, maxSeqNoOfUpdatesOrDeletes),
handler);
}

Expand Down Expand Up @@ -613,7 +618,7 @@ protected void doRun() throws Exception {
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request);
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
}

/**
Expand Down Expand Up @@ -1023,6 +1028,11 @@ public long globalCheckpoint() {
return indexShard.getGlobalCheckpoint();
}

@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return indexShard.getMaxSeqNoOfUpdatesOrDeletes();
}

@Override
public ReplicationGroup getReplicationGroup() {
return indexShard.getReplicationGroup();
Expand Down Expand Up @@ -1107,15 +1117,16 @@ public void performOn(
final ShardRouting replica,
final ReplicaRequest request,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
String nodeId = replica.currentNodeId();
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
if (node == null) {
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
return;
}
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest =
new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint);
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
sendReplicaRequest(replicaRequest, node, listener);
}

Expand Down Expand Up @@ -1263,15 +1274,17 @@ public String toString() {
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {

private long globalCheckpoint;
private long maxSeqNoOfUpdatesOrDeletes;

public ConcreteReplicaRequest(final Supplier<R> requestSupplier) {
super(requestSupplier);
}

public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm,
final long globalCheckpoint) {
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
super(request, targetAllocationID, primaryTerm);
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
}

@Override
Expand All @@ -1282,6 +1295,13 @@ public void readFrom(StreamInput in) throws IOException {
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
} else {
// UNASSIGNED_SEQ_NO (-2) means uninitialized, and replicas will disable
// optimization using seq_no if its max_seq_no_of_updates is still uninitialized
maxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

@Override
Expand All @@ -1290,19 +1310,27 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(globalCheckpoint);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
}
}

public long getGlobalCheckpoint() {
return globalCheckpoint;
}

public long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes;
}

@Override
public String toString() {
return "ConcreteReplicaRequest{" +
"targetAllocationID='" + getTargetAllocationID() + '\'' +
", primaryTerm='" + getPrimaryTerm() + '\'' +
", request=" + getRequest() +
", globalCheckpoint=" + globalCheckpoint +
", maxSeqNoOfUpdatesOrDeletes=" + maxSeqNoOfUpdatesOrDeletes +
'}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
Expand Down Expand Up @@ -976,6 +977,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
if (plan.addStaleOpToLucene) {
addStaleDocs(index.docs(), indexWriter);
} else if (plan.useLuceneUpdateDocument) {
assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), plan.seqNoForIndexing, true, true);
updateDocs(index.uid(), index.docs(), indexWriter);
} else {
// document does not exists, we can optimize for create, but double check if assertions are running
Expand Down Expand Up @@ -1275,8 +1277,8 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE
return plan;
}

private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
throws IOException {
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException {
assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), plan.seqNoOfDeletion, false, false);
try {
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
Expand Down Expand Up @@ -2556,6 +2558,29 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
}

private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) {
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
// If the primary is on an old version which does not replicate msu, we need to relax this assertion for that.
if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_7_0_0_alpha1);
return true;
}
// We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument).
if (allowDeleted) {
final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes());
if (versionValue != null && versionValue.isDelete()) {
return true;
}
}
// Operations can be processed on a replica in a different order than on the primary. If the order on the primary is index-1,
// delete-2, index-3, and the order on a replica is index-1, index-3, delete-2, then the msu of index-3 on the replica is 2
// even though it is an update (overwrites index-1). We should relax this assertion if there is a pending gap in the seq_no.
if (relaxIfGapInSeqNo && getLocalCheckpoint() < maxSeqNoOfUpdates) {
return true;
}
assert seqNo <= maxSeqNoOfUpdates : "id=" + id + " seq_no=" + seqNo + " msu=" + maxSeqNoOfUpdates;
return true;
}

@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
Expand Down
Loading