Skip to content

Commit 6647122

Browse files
authored
Prepare to make send translog of recovery non-blocking (#37458)
This commit prepares the required infra to make send a translog snapshot of the recovery source non-blocking. I'll make a follow-up to make the send snapshot method non-blocking. Relates #37291
1 parent 02d4d8b commit 6647122

File tree

10 files changed

+162
-158
lines changed

10 files changed

+162
-158
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2998,7 +2998,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
29982998
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
29992999
*
30003000
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
3001-
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long)
3001+
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener)
30023002
*/
30033003
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
30043004
assert seqNo != UNASSIGNED_SEQ_NO

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -485,14 +485,12 @@ class TranslogOperationsRequestHandler implements TransportRequestHandler<Recove
485485
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel,
486486
Task task) throws IOException {
487487
try (RecoveryRef recoveryRef =
488-
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
488+
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
489489
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
490490
final RecoveryTarget recoveryTarget = recoveryRef.target();
491-
try {
492-
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
493-
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary());
494-
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
495-
} catch (MapperException exception) {
491+
final ActionListener<RecoveryTranslogOperationsResponse> listener =
492+
new HandledTransportAction.ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request);
493+
final Consumer<Exception> retryOnMappingException = exception -> {
496494
// in very rare cases a translog replay from primary is processed before a mapping update on this node
497495
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
498496
logger.debug("delaying recovery due to missing mapping changes", exception);
@@ -504,31 +502,36 @@ public void onNewClusterState(ClusterState state) {
504502
try {
505503
messageReceived(request, channel, task);
506504
} catch (Exception e) {
507-
onFailure(e);
508-
}
509-
}
510-
511-
protected void onFailure(Exception e) {
512-
try {
513-
channel.sendResponse(e);
514-
} catch (IOException e1) {
515-
logger.warn("failed to send error back to recovery source", e1);
505+
listener.onFailure(e);
516506
}
517507
}
518508

519509
@Override
520510
public void onClusterServiceClose() {
521-
onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates"));
511+
listener.onFailure(new ElasticsearchException(
512+
"cluster service was closed while waiting for mapping updates"));
522513
}
523514

524515
@Override
525516
public void onTimeout(TimeValue timeout) {
526517
// note that we do not use a timeout (see comment above)
527-
onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout +
528-
"])"));
518+
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " +
519+
"(timeout [" + timeout + "])"));
529520
}
530521
});
531-
}
522+
};
523+
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
524+
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
525+
ActionListener.wrap(
526+
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
527+
e -> {
528+
if (e instanceof MapperException) {
529+
retryOnMappingException.accept(e);
530+
} else {
531+
listener.onFailure(e);
532+
}
533+
})
534+
);
532535
}
533536
}
534537
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.Version;
3434
import org.elasticsearch.action.ActionListener;
3535
import org.elasticsearch.action.StepListener;
36+
import org.elasticsearch.action.support.PlainActionFuture;
3637
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3738
import org.elasticsearch.cluster.routing.ShardRouting;
3839
import org.elasticsearch.common.StopWatch;
@@ -226,25 +227,27 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
226227
logger.trace("snapshot translog for recovery; current size is [{}]",
227228
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
228229
}
229-
final SendSnapshotResult sendSnapshotResult;
230-
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
231-
// we can release the retention lock here because the snapshot itself will retain the required operations.
232-
IOUtils.close(retentionLock, () -> resources.remove(retentionLock));
233-
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
234-
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
235-
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
236-
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
237-
sendSnapshotResult = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
238-
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
239-
} catch (Exception e) {
240-
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
241-
}
242230

231+
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
232+
resources.add(phase2Snapshot);
233+
// we can release the retention lock here because the snapshot itself will retain the required operations.
234+
IOUtils.close(retentionLock);
235+
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
236+
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
237+
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
238+
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
239+
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
240+
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
241+
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
242+
sendSnapshotStep.whenComplete(
243+
r -> IOUtils.close(phase2Snapshot),
244+
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)));
243245
final StepListener<Void> finalizeStep = new StepListener<>();
244-
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep);
246+
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
247+
245248
finalizeStep.whenComplete(r -> {
246-
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
247249
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
250+
final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
248251
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
249252
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
250253
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
@@ -507,10 +510,17 @@ TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int to
507510
* @param snapshot a snapshot of the translog
508511
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
509512
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
510-
* @return the send snapshot result
513+
* @param listener a listener which will be notified with the local checkpoint on the target.
511514
*/
512-
SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
513-
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
515+
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
516+
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
517+
ActionListener.completeWith(listener, () -> sendSnapshotBlockingly(
518+
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes));
519+
}
520+
521+
private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
522+
Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
523+
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
514524
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
515525
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
516526
assert startingSeqNo <= requiredSeqNoRangeStart :
@@ -538,9 +548,11 @@ SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long
538548
}
539549

540550
final CancellableThreads.IOInterruptible sendBatch = () -> {
541-
final long targetCheckpoint = recoveryTarget.indexTranslogOperations(
542-
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
543-
targetLocalCheckpoint.set(targetCheckpoint);
551+
// TODO: Make this non-blocking
552+
final PlainActionFuture<Long> future = new PlainActionFuture<>();
553+
recoveryTarget.indexTranslogOperations(
554+
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, future);
555+
targetLocalCheckpoint.set(future.actionGet());
544556
};
545557

546558
// send operations in batches

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -394,40 +394,42 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
394394
}
395395

396396
@Override
397-
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
398-
long maxSeqNoOfDeletesOrUpdatesOnPrimary) throws IOException {
399-
final RecoveryState.Translog translog = state().getTranslog();
400-
translog.totalOperations(totalTranslogOps);
401-
assert indexShard().recoveryState() == state();
402-
if (indexShard().state() != IndexShardState.RECOVERING) {
403-
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
404-
}
405-
/*
406-
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
407-
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
408-
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
409-
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
410-
*/
411-
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
412-
/*
413-
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
414-
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that operation was executed on.
415-
*/
416-
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
417-
for (Translog.Operation operation : operations) {
418-
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
419-
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
420-
throw new MapperException("mapping updates are not allowed [" + operation + "]");
397+
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
398+
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
399+
ActionListener.completeWith(listener, () -> {
400+
final RecoveryState.Translog translog = state().getTranslog();
401+
translog.totalOperations(totalTranslogOps);
402+
assert indexShard().recoveryState() == state();
403+
if (indexShard().state() != IndexShardState.RECOVERING) {
404+
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
421405
}
422-
assert result.getFailure() == null: "unexpected failure while replicating translog entry: " + result.getFailure();
423-
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
424-
}
425-
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
426-
translog.incrementRecoveredOperations(operations.size());
427-
indexShard().sync();
428-
// roll over / flush / trim if needed
429-
indexShard().afterWriteOperation();
430-
return indexShard().getLocalCheckpoint();
406+
/*
407+
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
408+
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
409+
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
410+
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
411+
*/
412+
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
413+
/*
414+
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
415+
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
416+
*/
417+
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
418+
for (Translog.Operation operation : operations) {
419+
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
420+
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
421+
throw new MapperException("mapping updates are not allowed [" + operation + "]");
422+
}
423+
assert result.getFailure() == null : "unexpected failure while replicating translog entry: " + result.getFailure();
424+
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
425+
}
426+
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
427+
translog.incrementRecoveredOperations(operations.size());
428+
indexShard().sync();
429+
// roll over / flush / trim if needed
430+
indexShard().afterWriteOperation();
431+
return indexShard().getLocalCheckpoint();
432+
});
431433
}
432434

433435
@Override

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,11 @@ public interface RecoveryTargetHandler {
6767
* @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on
6868
* the primary shard when capturing these operations. This value is at least as high as the
6969
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
70-
* @return the local checkpoint on the target shard
70+
* @param listener a listener which will be notified with the local checkpoint on the target
71+
* after these operations are successfully indexed on the target.
7172
*/
72-
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
73-
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) throws IOException;
73+
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
74+
long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener<Long> listener);
7475

7576
/**
7677
* Notifies the target of the files it is going to receive

0 commit comments

Comments
 (0)