From 7d3791969cb7a3e48531140714fb2be2a8e402b9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 5 Feb 2019 08:46:52 -0500 Subject: [PATCH 1/5] Integrate retention leases with recovery This commit integrates retention leases with recovery. With this change, we copy the current retention leases on primary to the replica during phase two of recovery. At this point, the replica will have been added to the replication group and so is already receiving retention lease sync requests from the primary. This means that if any retention lease syncs are triggered on the primary after we sample the retention leases here during phase two, that sync request will also arrive on the replica ensuring that the replica is from this point on up to date with the retention leases on the primary. We have to copy these during phase two since we will be applying indexing operations, potentially triggering merges, and therefore must ensure the correct retention leases are in place beforehand. --- .../elasticsearch/index/shard/IndexShard.java | 3 +- .../recovery/PeerRecoveryTargetService.java | 28 +++--- .../recovery/RecoverySourceHandler.java | 75 +++++++++++---- .../indices/recovery/RecoveryTarget.java | 17 +++- .../recovery/RecoveryTargetHandler.java | 20 ++-- .../RecoveryTranslogOperationsRequest.java | 23 ++++- .../recovery/RemoteRecoveryTargetHandler.java | 18 +++- .../RecoveryDuringReplicationTests.java | 48 +++++++--- .../index/seqno/RetentionLeaseSyncIT.java | 92 ++++++++++++++++++- .../index/shard/IndexShardTests.java | 77 ++++++++++++---- .../recovery/RecoverySourceHandlerTests.java | 28 ++++-- 11 files changed, 341 insertions(+), 88 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c3d653e2fde06..8d4383e9ed7bf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -132,6 +132,7 @@ import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; @@ -3064,7 +3065,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { * which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary. * * @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object) - * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener) + * @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, ActionListener) */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { assert seqNo != UNASSIGNED_SEQ_NO diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 74585dcc261c2..906728e927a5c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -447,7 +447,7 @@ public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportCh try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final ActionListener listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request); recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), - ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); + ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); } } } @@ -518,17 +518,21 @@ public void onTimeout(TimeValue timeout) { } }); }; - recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), - request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(), - ActionListener.wrap( - checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)), - e -> { - if (e instanceof MapperException) { - retryOnMappingException.accept(e); - } else { - listener.onFailure(e); - } - }) + recoveryTarget.indexTranslogOperations( + request.operations(), + request.totalTranslogOps(), + request.maxSeenAutoIdTimestampOnPrimary(), + request.maxSeqNoOfUpdatesOrDeletesOnPrimary(), + request.retentionLeases(), + ActionListener.wrap( + checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)), + e -> { + if (e instanceof MapperException) { + retryOnMappingException.accept(e); + } else { + listener.onFailure(e); + } + }) ); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index f360a68b7a83c..a6973a46926a7 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -231,8 +232,16 @@ public void recoverToTarget(ActionListener listener) { // are at least as high as the corresponding values on the primary when any of these operations were executed on it. final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); + final RetentionLeases retentionLeases = shard.getRetentionLeases(); + phase2( + startingSeqNo, + requiredSeqNoRangeStart, + endingSeqNo, + phase2Snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + sendSnapshotStep); sendSnapshotStep.whenComplete( r -> IOUtils.close(phase2Snapshot), e -> { @@ -517,8 +526,15 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it. * @param listener a listener which will be notified with the local checkpoint on the target. */ - void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) throws IOException { + void phase2( + final long startingSeqNo, + final long requiredSeqNoRangeStart, + final long endingSeqNo, + final Translog.Snapshot snapshot, + final long maxSeenAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener) throws IOException { assert requiredSeqNoRangeStart <= endingSeqNo + 1: "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : @@ -584,25 +600,50 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, listener::onFailure ); - sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, snapshot.totalOperations(), - maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener); + sendBatch( + readNextBatch, + true, + SequenceNumbers.UNASSIGNED_SEQ_NO, + snapshot.totalOperations(), + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + batchedListener); } - private void sendBatch(CheckedSupplier, IOException> nextBatch, boolean firstBatch, - long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) throws IOException { + private void sendBatch( + final CheckedSupplier, IOException> nextBatch, + final boolean firstBatch, + final long targetLocalCheckpoint, + final int totalTranslogOps, + final long maxSeenAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener) throws IOException { final List operations = nextBatch.get(); // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint if (operations.isEmpty() == false || firstBatch) { cancellableThreads.execute(() -> { - recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, - ActionListener.wrap( - newCheckpoint -> { - sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), - totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener); - }, - listener::onFailure - )); + recoveryTarget.indexTranslogOperations( + operations, + totalTranslogOps, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + ActionListener.wrap( + newCheckpoint -> { + sendBatch( + nextBatch, + false, + SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), + totalTranslogOps, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + listener); + }, + listener::onFailure + )); }); } else { listener.onResponse(targetLocalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b300490542968..682cf7782b7a5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; @@ -375,7 +376,7 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public void finalizeRecovery(final long globalCheckpoint, ActionListener listener) { + public void finalizeRecovery(final long globalCheckpoint, final ActionListener listener) { ActionListener.completeWith(listener, () -> { final IndexShard indexShard = indexShard(); indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); @@ -397,8 +398,13 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, - long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestampOnPrimary, + final long maxSeqNoOfDeletesOrUpdatesOnPrimary, + final RetentionLeases retentionLeases, + final ActionListener listener) { ActionListener.completeWith(listener, () -> { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); @@ -418,6 +424,11 @@ public void indexTranslogOperations(List operations, int tot * replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on. */ indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary); + /* + * We have to update the retention leases before we start applying translog operations to ensure we are retaining according to + * the policy. + */ + indexShard().updateRetentionLeasesOnReplica(retentionLeases); for (Translog.Operation operation : operations) { Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 0fa5e94c63cd2..be7c00d52c94d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -32,14 +33,15 @@ public interface RecoveryTargetHandler { /** * Prepares the target to receive translog operations, after all file have been copied - * @param fileBasedRecovery whether or not this call is part of an file based recovery - * @param totalTranslogOps total translog operations expected to be sent + * + * @param fileBasedRecovery whether or not this call is part of an file based recovery + * @param totalTranslogOps total translog operations expected to be sent */ void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener); /** - * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and - * updates the global checkpoint. + * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates + * the global checkpoint. * * @param globalCheckpoint the global checkpoint on the recovery source * @param listener the listener which will be notified when this method is completed @@ -67,11 +69,17 @@ public interface RecoveryTargetHandler { * @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on * the primary shard when capturing these operations. This value is at least as high as the * max_seq_no_of_updates on the primary was when any of these ops were processed on it. + * @param retentionLeases the retention leases on the primary * @param listener a listener which will be notified with the local checkpoint on the target * after these operations are successfully indexed on the target. */ - void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, - long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener listener); + void indexTranslogOperations( + List operations, + int totalTranslogOps, + long maxSeenAutoIdTimestampOnPrimary, + long maxSeqNoOfUpdatesOrDeletesOnPrimary, + RetentionLeases retentionLeases, + ActionListener listener); /** * Notifies the target of the files it is going to receive diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index 0ae5d507eb357..e727ee6d1af62 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -39,18 +40,26 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest { private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; private long maxSeenAutoIdTimestampOnPrimary; private long maxSeqNoOfUpdatesOrDeletesOnPrimary; + private RetentionLeases retentionLeases; public RecoveryTranslogOperationsRequest() { } - RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations, int totalTranslogOps, - long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) { + RecoveryTranslogOperationsRequest( + final long recoveryId, + final ShardId shardId, + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestampOnPrimary, + final long maxSeqNoOfUpdatesOrDeletesOnPrimary, + final RetentionLeases retentionLeases) { this.recoveryId = recoveryId; this.shardId = shardId; this.operations = operations; this.totalTranslogOps = totalTranslogOps; this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary; this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary; + this.retentionLeases = retentionLeases; } public long recoveryId() { @@ -77,6 +86,10 @@ public long maxSeqNoOfUpdatesOrDeletesOnPrimary() { return maxSeqNoOfUpdatesOrDeletesOnPrimary; } + public RetentionLeases retentionLeases() { + return retentionLeases; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -95,6 +108,9 @@ public void readFrom(StreamInput in) throws IOException { // UNASSIGNED_SEQ_NO means uninitialized and replica won't enable optimization using seq_no maxSeqNoOfUpdatesOrDeletesOnPrimary = SequenceNumbers.UNASSIGNED_SEQ_NO; } + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + retentionLeases = new RetentionLeases(in); + } } @Override @@ -110,5 +126,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_5_0)) { out.writeZLong(maxSeqNoOfUpdatesOrDeletesOnPrimary); } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + retentionLeases.writeTo(out); + } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 0799cc6595189..436d59dba85ad 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -113,10 +114,21 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, - long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestampOnPrimary, + final long maxSeqNoOfDeletesOrUpdatesOnPrimary, + final RetentionLeases retentionLeases, + final ActionListener listener) { final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest( - recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary); + recoveryId, + shardId, + operations, + totalTranslogOps, + maxSeenAutoIdTimestampOnPrimary, + maxSeqNoOfDeletesOrUpdatesOnPrimary, + retentionLeases); transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions, new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(r.localCheckpoint), listener::onFailure), RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC)); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 53db2b7dd8a9e..5d893cd560e1b 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -502,13 +503,17 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { AtomicBoolean opsSent = new AtomicBoolean(false); final Future recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> { recoveryStart.countDown(); - return new RecoveryTarget(indexShard, node, recoveryListener, l -> { - }) { + return new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, - long maxSeenAutoIdTimestamp, long msu, ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestamp, + final long msu, + final RetentionLeases retentionLeases, + final ActionListener listener) { opsSent.set(true); - super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, msu, listener); + super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, msu, retentionLeases, listener); } }; }); @@ -575,9 +580,13 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { replica, (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override - public void indexTranslogOperations(final List operations, final int totalTranslogOps, - final long maxAutoIdTimestamp, long maxSeqNoOfUpdates, - ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxAutoIdTimestamp, + final long maxSeqNoOfUpdates, + final RetentionLeases retentionLeases, + final ActionListener listener) { // index a doc which is not part of the snapshot, but also does not complete on replica replicaEngineFactory.latchIndexers(1); threadPool.generic().submit(() -> { @@ -604,7 +613,13 @@ public void indexTranslogOperations(final List operations, f } catch (InterruptedException e) { throw new AssertionError(e); } - super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener); + super.indexTranslogOperations( + operations, + totalTranslogOps, + maxAutoIdTimestamp, + maxSeqNoOfUpdates, + retentionLeases, + listener); } }); pendingDocActiveWithExtraDocIndexed.await(); @@ -846,12 +861,17 @@ private void blockIfNeeded(RecoveryState.Stage currentStage) { } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp, long maxSeqNoOfUpdates, ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxAutoIdTimestamp, + final long maxSeqNoOfUpdates, + final RetentionLeases retentionLeases, + final ActionListener listener) { if (hasBlocked() == false) { blockIfNeeded(RecoveryState.Stage.TRANSLOG); } - super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener); + super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, listener); } @Override @@ -861,7 +881,9 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa } @Override - public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { + public void finalizeRecovery( + final long globalCheckpoint, + final ActionListener listener) { if (hasBlocked() == false) { // it maybe that not ops have been transferred, block now blockIfNeeded(RecoveryState.Stage.TRANSLOG); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index 3e69c84e3cde3..f75846528fb3c 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -20,23 +20,32 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; @@ -47,6 +56,23 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class RetentionLeaseSyncIT extends ESIntegTestCase { + public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin { + + @Override + public List> getSettings() { + return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING); + } + + } + + @Override + protected Collection> nodePlugins() { + return Stream.concat( + super.nodePlugins().stream(), + Stream.of(RetentionLeaseBackgroundSyncIT.RetentionLeaseSyncIntervalSettingPlugin.class)) + .collect(Collectors.toList()); + } + public void testRetentionLeasesSyncedOnAdd() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); @@ -70,7 +96,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final String source = randomAlphaOfLength(8); final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); - // simulate a peer-recovery which locks the soft-deletes policy on the primary. + // simulate a peer recovery which locks the soft deletes policy on the primary final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); @@ -169,4 +195,68 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { } } + public void testRetentionLeasesSyncOnRecovery() throws Exception { + final int numberOfReplicas = 1; + // + /* + * We effectively disable the background sync to ensure that the retention leases are not synced in the background so that the only + * + */ + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueHours(24)) + .build(); + createIndex("index", settings); + ensureYellow("index"); + // exclude the replicas from being allocated + allowNodes("index", 1); + final AcknowledgedResponse response = client().admin() + .indices() + .prepareUpdateSettings("index").setSettings(Settings.builder().put("index.number_of_replicas", numberOfReplicas).build()) + .get(); + assertTrue(response.isAcknowledged()); + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + final int length = randomIntBetween(1, 8); + final Map currentRetentionLeases = new HashMap<>(); + for (int i = 0; i < length; i++) { + final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); + final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); + latch.await(); + /* + * Now renew the leases; since we do not flush immediately on renewal, this means that the latest retention leases will not be + * in the latest commit point and therefore not transferred during the file-copy phase of recovery. + */ + currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source)); + } + + // now allow the replicas to be allocated and wait for recovery to finalize + allowNodes("index", 1 + numberOfReplicas); + ensureGreen("index"); + + // check current retention leases have been synced to all replicas + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); + assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); + + // check retention leases have been committed on the replica + final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( + replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases))); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 12a7fad466e29..dc1666916869b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -105,6 +105,7 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -2432,10 +2433,20 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener){ - super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, - ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded()))); + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener){ + super.indexTranslogOperations( + operations, + totalTranslogOps, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded()))); } }, true, true); @@ -2539,14 +2550,26 @@ public void testShardActiveDuringPeerRecovery() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener){ - super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, - ActionListener.wrap(checkpoint -> { - listener.onResponse(checkpoint); - // Shard should now be active since we did recover: - assertTrue(replica.isActive()); - }, listener::onFailure)); + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener){ + super.indexTranslogOperations( + operations, + totalTranslogOps, + maxAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + ActionListener.wrap( + checkpoint -> { + listener.onResponse(checkpoint); + // Shard should now be active since we did recover: + assertTrue(replica.isActive()); + }, + listener::onFailure)); } }, false, true); @@ -2589,18 +2612,32 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) { - super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, - ActionListener.wrap(checkpoint -> { - assertListenerCalled.accept(replica); - listener.onResponse(checkpoint); - }, listener::onFailure)); + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener) { + super.indexTranslogOperations( + operations, + totalTranslogOps, + maxAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + ActionListener.wrap( + checkpoint -> { + assertListenerCalled.accept(replica); + listener.onResponse(checkpoint); + }, + listener::onFailure)); } @Override public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { - super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica))); + super.finalizeRecovery( + globalCheckpoint, + ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica))); } }, false, true); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index cd495b7e17bee..1e4503c7ae569 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -63,6 +63,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -238,7 +239,7 @@ public void testSendSnapshotSendsOps() throws IOException { RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { @Override public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, - ActionListener listener) { + RetentionLeases retentionLeases, ActionListener listener) { shippedOps.addAll(operations); checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE)); maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get())); @@ -247,7 +248,7 @@ public void indexTranslogOperations(List operations, int tot RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), - randomNonNegativeLong(), randomNonNegativeLong(), future); + randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); RecoverySourceHandler.SendSnapshotResult result = future.actionGet(); assertThat(result.totalOperations, equalTo(expectedOps)); @@ -265,7 +266,7 @@ public void indexTranslogOperations(List operations, int tot PlainActionFuture failedFuture = new PlainActionFuture<>(); expectThrows(IllegalStateException.class, () -> { handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip), - randomNonNegativeLong(), randomNonNegativeLong(), failedFuture); + randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture); failedFuture.actionGet(); }); } @@ -285,7 +286,7 @@ public void testSendSnapshotStopOnError() throws Exception { RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { @Override public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, - long msu, ActionListener listener) { + long msu, RetentionLeases retentionLeases, ActionListener listener) { if (randomBoolean()) { maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED)); } else { @@ -299,7 +300,7 @@ public void indexTranslogOperations(List operations, int tot final long startingSeqNo = randomLongBetween(0, ops.size() - 1L); final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L); handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()), - randomNonNegativeLong(), randomNonNegativeLong(), future); + randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); if (wasFailed.get()) { assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index")); } @@ -498,11 +499,11 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A @Override void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, - long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, + long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, ActionListener listener) throws IOException { phase2Called.set(true); super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, - maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener); + maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener); } }; @@ -704,7 +705,9 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { + public void finalizeRecovery( + final long globalCheckpoint, + final ActionListener listener) { } @Override @@ -716,8 +719,13 @@ public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryConte } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, - ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long timestamp, + final long msu, + final RetentionLeases retentionLeases, + final ActionListener listener) { } @Override From 5ffb91c225e5f44e7111a71811e82fad98b778e4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 5 Feb 2019 10:34:33 -0500 Subject: [PATCH 2/5] Revert some formatting changes --- .../indices/recovery/PeerRecoveryTargetService.java | 2 +- .../org/elasticsearch/indices/recovery/RecoveryTarget.java | 2 +- .../index/replication/RecoveryDuringReplicationTests.java | 4 +--- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 4 +--- .../indices/recovery/RecoverySourceHandlerTests.java | 4 +--- 5 files changed, 5 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 906728e927a5c..068b92991db09 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -447,7 +447,7 @@ public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportCh try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final ActionListener listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request); recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), - ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); + ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 682cf7782b7a5..e63b9ba8fd5ea 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -376,7 +376,7 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public void finalizeRecovery(final long globalCheckpoint, final ActionListener listener) { + public void finalizeRecovery(final long globalCheckpoint, ActionListener listener) { ActionListener.completeWith(listener, () -> { final IndexShard indexShard = indexShard(); indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 5d893cd560e1b..725225773a682 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -881,9 +881,7 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa } @Override - public void finalizeRecovery( - final long globalCheckpoint, - final ActionListener listener) { + public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { if (hasBlocked() == false) { // it maybe that not ops have been transferred, block now blockIfNeeded(RecoveryState.Stage.TRANSLOG); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index dc1666916869b..161850169862b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2635,9 +2635,7 @@ public void indexTranslogOperations( @Override public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { - super.finalizeRecovery( - globalCheckpoint, - ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica))); + super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica))); } }, false, true); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 1e4503c7ae569..8391827b2f83c 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -705,9 +705,7 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public void finalizeRecovery( - final long globalCheckpoint, - final ActionListener listener) { + public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { } @Override From 439c1d01e836b4a0f0281b36a8fc11e98f0ea956 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 5 Feb 2019 11:11:41 -0500 Subject: [PATCH 3/5] Fix comment --- .../org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index f75846528fb3c..8bf76e08ec788 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -197,10 +197,9 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { public void testRetentionLeasesSyncOnRecovery() throws Exception { final int numberOfReplicas = 1; - // /* * We effectively disable the background sync to ensure that the retention leases are not synced in the background so that the only - * + * source of retention leases on the replicas would be from the commit point and recovery. */ final Settings settings = Settings.builder() .put("index.number_of_shards", 1) From 4d9fa1acddfe0b6e0ab8a91c10866ccbfa530e17 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 5 Feb 2019 14:46:18 -0500 Subject: [PATCH 4/5] Initialize --- .../indices/recovery/RecoveryTranslogOperationsRequest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index e727ee6d1af62..7d5f8c07c1602 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; @@ -110,6 +111,8 @@ public void readFrom(StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.V_7_0_0)) { retentionLeases = new RetentionLeases(in); + } else { + retentionLeases = RetentionLeases.EMPTY; } } From 8612a2619ea44d42df56283ad163e5cd26943275 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 5 Feb 2019 15:20:59 -0500 Subject: [PATCH 5/5] Fix precommit --- .../indices/recovery/RecoveryTranslogOperationsRequest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index 7d5f8c07c1602..239e423b3aaf9 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId;