From 01ad532cd9bfe353681030183efd2df8f1d054b2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 09:58:08 +0100 Subject: [PATCH 01/15] Create peer-recovery retention leases This creates a peer-recovery retention lease for every shard during recovery, ensuring that the replication group retains history for future peer recoveries. It also ensures that leases for active shard copies do not expire, and leases for inactive shard copies expire immediately if the shard is fully-allocated. Relates #41536 --- .../index/seqno/ReplicationTracker.java | 136 +++++++++++-- .../index/seqno/RetentionLease.java | 3 + .../index/seqno/RetentionLeaseSyncer.java | 2 +- .../index/seqno/RetentionLeases.java | 13 +- .../elasticsearch/index/shard/IndexShard.java | 14 ++ .../recovery/RecoverySourceHandler.java | 27 ++- .../index/engine/InternalEngineTests.java | 27 ++- .../RetentionLeasesReplicationTests.java | 8 +- ...PeerRecoveryRetentionLeaseExpiryTests.java | 179 ++++++++++++++++++ ...ReplicationTrackerRetentionLeaseTests.java | 37 ++-- .../seqno/ReplicationTrackerTestCase.java | 10 +- .../index/seqno/ReplicationTrackerTests.java | 51 +++-- .../seqno/RetentionLeaseActionsTests.java | 47 +++-- .../index/seqno/RetentionLeaseIT.java | 40 ++-- .../index/seqno/RetentionLeaseStatsTests.java | 3 +- .../shard/IndexShardRetentionLeaseTests.java | 91 +++++---- .../index/shard/IndexShardTests.java | 28 ++- .../indices/stats/IndexStatsIT.java | 5 + .../index/engine/EngineTestCase.java | 3 +- .../ESIndexLevelReplicationTestCase.java | 1 + .../index/shard/IndexShardTestCase.java | 7 +- 21 files changed, 588 insertions(+), 144 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 1a67eb55e0576..a33b791a666b0 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -24,6 +24,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -217,10 +218,22 @@ public synchronized Tuple getRetentionLeases(final boo // the primary calculates the non-expired retention leases and syncs them to replicas final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); + final Set leaseIdsForCurrentPeers + = routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()); final Map> partitionByExpiration = retentionLeases .leases() .stream() - .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); + .collect(Collectors.groupingBy(lease -> { + if (lease.source().equals(PEER_RECOVERY_RETENTION_LEASE_SOURCE)) { + if (leaseIdsForCurrentPeers.contains(lease.id())) { + return false; + } + if (routingTable.allShardsStarted()) { + return true; + } + } + return currentTimeMillis - lease.timestamp() > retentionLeaseMillis; + })); final Collection expiredLeases = partitionByExpiration.get(true); if (expiredLeases == null) { // early out as no retention leases have expired @@ -242,7 +255,7 @@ public synchronized Tuple getRetentionLeases(final boo * @param source the source of the retention lease * @param listener the callback when the retention lease is successfully added and synced to replicas * @return the new retention lease - * @throws IllegalArgumentException if the specified retention lease already exists + * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists */ public RetentionLease addRetentionLease( final String id, @@ -253,22 +266,38 @@ public RetentionLease addRetentionLease( final RetentionLease retentionLease; final RetentionLeases currentRetentionLeases; synchronized (this) { - assert primaryMode; - if (retentionLeases.contains(id)) { - throw new RetentionLeaseAlreadyExistsException(id); - } - retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); - logger.debug("adding new retention lease [{}] to current retention leases [{}]", retentionLease, retentionLeases); - retentionLeases = new RetentionLeases( - operationPrimaryTerm, - retentionLeases.version() + 1, - Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())); + retentionLease = innerAddRetentionLease(id, retainingSequenceNumber, source); currentRetentionLeases = retentionLeases; } onSyncRetentionLeases.accept(currentRetentionLeases, listener); return retentionLease; } + /** + * Adds a new retention lease, but does not synchronise it with the rest of the replication group. + * + * @param id the identifier of the retention lease + * @param retainingSequenceNumber the retaining sequence number + * @param source the source of the retention lease + * @return the new retention lease + * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists + */ + private RetentionLease innerAddRetentionLease(String id, long retainingSequenceNumber, String source) { + assert Thread.holdsLock(this); + assert primaryMode : id + "/" + retainingSequenceNumber + "/" + source; + if (retentionLeases.contains(id)) { + throw new RetentionLeaseAlreadyExistsException(id); + } + final RetentionLease retentionLease + = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); + logger.debug("adding new retention lease [{}] to current retention leases [{}]", retentionLease, retentionLeases); + retentionLeases = new RetentionLeases( + operationPrimaryTerm, + retentionLeases.version() + 1, + Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())); + return retentionLease; + } + /** * Renews an existing retention lease. * @@ -276,7 +305,7 @@ public RetentionLease addRetentionLease( * @param retainingSequenceNumber the retaining sequence number * @param source the source of the retention lease * @return the renewed retention lease - * @throws IllegalArgumentException if the specified retention lease does not exist + * @throws RetentionLeaseNotFoundException if the specified retention lease does not exist */ public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert primaryMode; @@ -390,6 +419,45 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio return true; } + + /** + * Retention leases for peer recovery have source {@link ReplicationTracker#PEER_RECOVERY_RETENTION_LEASE_SOURCE}, a lease ID + * containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations + * with sequence numbers strictly greater than the given global checkpoint. + */ + public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener listener) { + addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener); + } + + /** + * Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}. + */ + public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery"; + + /** + * Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}. + */ + static String getPeerRecoveryRetentionLeaseId(String nodeId) { + return "peer_recovery/" + nodeId; + } + + /** + * Id for a peer recovery retention lease for the given {@link ShardRouting}. + * See {@link ReplicationTracker#addPeerRecoveryRetentionLease}. + */ + public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting) { + return getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId()); + } + + /** + * Renew the peer-recovery retention lease for the given shard, advancing the retained sequence number to discard operations up to the + * given global checkpoint. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}. + */ + public void renewPeerRecoveryRetentionLease(ShardRouting shardRouting, long globalCheckpoint) { + assert primaryMode; + renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + public static class CheckpointState implements Writeable { /** @@ -616,6 +684,22 @@ private boolean invariant() { assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; } + if (primaryMode + && indexSettings.isSoftDeleteEnabled() + && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN + && indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)) { + // all tracked shard copies have a corresponding peer-recovery retention lease + for (final ShardRouting shardRouting : routingTable.assignedShards()) { + assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false + || retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : + "no retention lease for tracked shard " + shardRouting + " in " + retentionLeases; + assert shardRouting.relocating() == false + || checkpoints.get(shardRouting.allocationId().getRelocationId()).tracked == false + || retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting.getTargetRelocatingShard())) + : "no retention lease for relocation target " + shardRouting + " in " + retentionLeases; + } + } + return true; } @@ -669,6 +753,7 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; + assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -772,6 +857,31 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) { primaryMode = true; updateLocalCheckpoint(shardAllocationId, checkpoints.get(shardAllocationId), localCheckpoint); updateGlobalCheckpointOnPrimary(); + + if (indexSettings.isSoftDeleteEnabled()) { + final ShardRouting primaryShard = routingTable.primaryShard(); + final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); + if (retentionLeases.get(leaseId) == null) { + /* + * We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention + * leases for every shard copy, but in this case we do not expect any leases to exist. + */ + if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)) { + // We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then + // this copy must already be in-sync and active and therefore holds a retention lease for itself. + assert routingTable.activeShards().equals(Collections.singletonList(primaryShard)) : routingTable.activeShards(); + assert primaryShard.allocationId().getId().equals(shardAllocationId) + : routingTable.activeShards() + " vs " + shardAllocationId; + assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)); + + // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication + // group. + innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), + PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + } + } + assert invariant(); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index 9cfad7c36ea06..ec67448341fa3 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -211,4 +211,7 @@ public String toString() { '}'; } + public boolean isNotPeerRecoveryRetentionLease() { + return ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(source) == false; + } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java index 927d2ec499960..7de6bad3f1102 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java @@ -44,7 +44,7 @@ public interface RetentionLeaseSyncer { RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() { @Override public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener listener) { - + listener.onResponse(new ReplicationResponse()); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java index 81fd7e2fce047..8148d12fbb8aa 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java @@ -275,13 +275,20 @@ private static Map toMap(final Collection toMap(final RetentionLeases retentionLeases) { - return retentionLeases.leases; + public static Map toMapExcludingPeerRecoveryRetentionLeases(final RetentionLeases retentionLeases) { + return retentionLeases.leases.values().stream() + .filter(l -> ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(l.source()) == false) + .collect(Collectors.toMap(RetentionLease::id, Function.identity(), + (o1, o2) -> { + throw new AssertionError("unexpectedly merging " + o1 + " and " + o2); + }, + LinkedHashMap::new)); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fdd95614756b7..4d7c2e880cef6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2415,6 +2415,20 @@ public boolean isRelocatedPrimary() { return replicationTracker.isRelocated(); } + public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener listener) { + assert assertPrimaryMode(); + replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); + } + + /** + * Test-only method to advance the primary's peer-recovery retention lease so that operations up to the global checkpoint can be + * discarded. TODO Remove this when retention leases are advanced by other mechanisms. + */ + public void advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint() { + assert assertPrimaryMode(); + replicationTracker.renewPeerRecoveryRetentionLease(routingEntry(), getGlobalCheckpoint()); + } + class ShardEventListener implements Engine.EventListener { private final CopyOnWriteArrayList> delegates = new CopyOnWriteArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4b89e75691a76..0bde05d536a06 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -32,6 +32,8 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedSupplier; @@ -49,6 +51,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -188,10 +191,28 @@ public void recoverToTarget(ActionListener listener) { } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; + final StepListener establishRetentionLeaseStep = new StepListener<>(); + if (shard.indexSettings().isSoftDeleteEnabled() + && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { + runUnderPrimaryPermit(() -> { + try { + // blindly create the lease. TODO integrate this with the recovery process + shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), startingSeqNo - 1, establishRetentionLeaseStep); + } catch (RetentionLeaseAlreadyExistsException e) { + logger.debug("peer-recovery retention lease already exists", e); + establishRetentionLeaseStep.onResponse(null); + } + }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger); + } else { + establishRetentionLeaseStep.onResponse(null); + } + final StepListener prepareEngineStep = new StepListener<>(); - // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); + establishRetentionLeaseStep.whenComplete(r -> { + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); + }, onFailure); final StepListener sendSnapshotStep = new StepListener<>(); prepareEngineStep.whenComplete(prepareEngineTime -> { /* diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f4e1ecd2514b3..00438868cdb4f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -74,6 +74,7 @@ import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -2324,7 +2325,7 @@ public void testIndexWriterInfoStream() throws IllegalAccessException, IOExcepti } } - public void testSeqNoAndCheckpoints() throws IOException { + public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { final int opCount = randomIntBetween(1, 256); long primarySeqNo = SequenceNumbers.NO_OPS_PERFORMED; final String[] ids = new String[]{"1", "2", "3"}; @@ -2342,13 +2343,27 @@ public void testSeqNoAndCheckpoints() throws IOException { final ShardRouting primary = TestShardRouting.newShardRouting("test", shardId.id(), "node1", null, true, ShardRoutingState.STARTED, allocationId); - final ShardRouting replica = - TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.STARTED); + final ShardRouting initializingReplica = + TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.INITIALIZING); + ReplicationTracker gcpTracker = (ReplicationTracker) initialEngine.config().getGlobalCheckpointSupplier(); - gcpTracker.updateFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(), - replica.allocationId().getId())), - new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build()); + gcpTracker.updateFromMaster(1L, new HashSet<>(Collections.singletonList(primary.allocationId().getId())), + new IndexShardRoutingTable.Builder(shardId).addShard(primary).build()); gcpTracker.activatePrimaryMode(primarySeqNo); + if (defaultSettings.isSoftDeleteEnabled()) { + final CountDownLatch countDownLatch = new CountDownLatch(1); + gcpTracker.addPeerRecoveryRetentionLease(initializingReplica.currentNodeId(), + SequenceNumbers.NO_OPS_PERFORMED, ActionListener.wrap(countDownLatch::countDown)); + countDownLatch.await(); + } + gcpTracker.updateFromMaster(2L, new HashSet<>(Collections.singletonList(primary.allocationId().getId())), + new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(initializingReplica).build()); + gcpTracker.initiateTracking(initializingReplica.allocationId().getId()); + gcpTracker.markAllocationIdAsInSync(initializingReplica.allocationId().getId(), replicaLocalCheckpoint); + final ShardRouting replica = initializingReplica.moveToStarted(); + gcpTracker.updateFromMaster(3L, new HashSet<>(Arrays.asList(primary.allocationId().getId(), replica.allocationId().getId())), + new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build()); + for (int op = 0; op < opCount; op++) { final String id; // mostly index, sometimes delete diff --git a/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java index ce3986f0a2517..bc71fcc63510d 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java @@ -61,9 +61,10 @@ public void testSimpleSyncRetentionLeases() throws Exception { } } RetentionLeases leasesOnPrimary = group.getPrimary().getRetentionLeases(); - assertThat(leasesOnPrimary.version(), equalTo((long) iterations)); + assertThat(leasesOnPrimary.version(), equalTo(iterations + group.getReplicas().size() + 1L)); assertThat(leasesOnPrimary.primaryTerm(), equalTo(group.getPrimary().getOperationPrimaryTerm())); - assertThat(leasesOnPrimary.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0]))); + assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(leasesOnPrimary).values(), + containsInAnyOrder(leases.toArray(new RetentionLease[0]))); latch.await(); for (IndexShard replica : group.getReplicas()) { assertThat(replica.getRetentionLeases(), equalTo(leasesOnPrimary)); @@ -109,6 +110,9 @@ protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, Acti } }) { group.startAll(); + for (IndexShard replica : group.getReplicas()) { + replica.updateRetentionLeasesOnReplica(group.getPrimary().getRetentionLeases()); + } int numLeases = between(1, 100); IndexShard newPrimary = randomFrom(group.getReplicas()); RetentionLeases latestRetentionLeasesOnNewPrimary = RetentionLeases.EMPTY; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java new file mode 100644 index 0000000000000..22d4f5e86f964 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.IndexSettingsModule; +import org.junit.Before; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTestCase { + + private static final ActionListener EMPTY_LISTENER = ActionListener.wrap(() -> { }); + + private ReplicationTracker replicationTracker; + private AtomicLong currentTimeMillis; + private Settings settings; + + @Before + public void setUpReplicationTracker() throws InterruptedException { + final AllocationId primaryAllocationId = AllocationId.newInitializing(); + currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024)); + + if (randomBoolean()) { + settings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), + TimeValue.timeValueMillis(randomLongBetween(1, TimeValue.timeValueHours(12).millis()))) + .build(); + } else { + settings = Settings.EMPTY; + } + + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + primaryAllocationId.getId(), + IndexSettingsModule.newIndexSettings("test", settings), + primaryTerm, + UNASSIGNED_SEQ_NO, + value -> { }, + currentTimeMillis::get, + (leases, listener) -> { }); + replicationTracker.updateFromMaster(1L, Collections.singleton(primaryAllocationId.getId()), + routingTable(Collections.emptySet(), primaryAllocationId)); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final AllocationId replicaAllocationId = AllocationId.newInitializing(); + final IndexShardRoutingTable routingTableWithReplica + = routingTable(Collections.singleton(replicaAllocationId), primaryAllocationId); + replicationTracker.updateFromMaster(2L, Collections.singleton(primaryAllocationId.getId()), routingTableWithReplica); + replicationTracker.addPeerRecoveryRetentionLease( + routingTableWithReplica.getByAllocationId(replicaAllocationId.getId()).currentNodeId(), randomCheckpoint(), + EMPTY_LISTENER); + + replicationTracker.initiateTracking(replicaAllocationId.getId()); + replicationTracker.markAllocationIdAsInSync(replicaAllocationId.getId(), randomCheckpoint()); + } + + private long randomCheckpoint() { + return randomBoolean() ? SequenceNumbers.NO_OPS_PERFORMED : randomNonNegativeLong(); + } + + private void startReplica() { + final ShardRouting replicaShardRouting = replicationTracker.routingTable.replicaShards().get(0); + final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(replicationTracker.routingTable); + builder.removeShard(replicaShardRouting); + builder.addShard(replicaShardRouting.moveToStarted()); + replicationTracker.updateFromMaster(replicationTracker.appliedClusterStateVersion + 1, + replicationTracker.routingTable.shards().stream().map(sr -> sr.allocationId().getId()).collect(Collectors.toSet()), + builder.build()); + } + + public void testPeerRecoveryRetentionLeasesForAssignedCopiesDoNotEverExpire() { + if (randomBoolean()) { + startReplica(); + } + + currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get())); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertFalse(retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(2)); + assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); + } + + public void testPeerRecoveryRetentionLeasesForUnassignedCopiesDoNotExpireImmediatelyIfShardsNotAllStarted() { + final String unknownNodeId = randomAlphaOfLength(10); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + + currentTimeMillis.set(currentTimeMillis.get() + + randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis())); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertFalse("should not have expired anything", retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(3)); + assertThat(leaseIds, equalTo(Stream.concat(Stream.of(ReplicationTracker.getPeerRecoveryRetentionLeaseId(unknownNodeId)), + replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)).collect(Collectors.toSet()))); + } + + public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireEventually() { + if (randomBoolean()) { + startReplica(); + } + + final String unknownNodeId = randomAlphaOfLength(10); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + + currentTimeMillis.set(randomLongBetween( + currentTimeMillis.get() + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis() + 1, + Long.MAX_VALUE)); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertTrue("should have expired something", retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(2)); + assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); + } + + public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireImmediatelyIfShardsAllStarted() { + final String unknownNodeId = randomAlphaOfLength(10); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + + startReplica(); + + currentTimeMillis.set(currentTimeMillis.get() + + (usually() + ? randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis()) + : randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()))); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertTrue(retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(2)); + assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 2334cb4330887..8dc91db7d3350 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -83,7 +84,7 @@ public void testAddOrRenewRetentionLease() { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); replicationTracker.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); - assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); + assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 2 + i, true, false); } for (int i = 0; i < length; i++) { @@ -93,7 +94,7 @@ public void testAddOrRenewRetentionLease() { } minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, primaryTerm, 1 + length + i, true, false); + assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, primaryTerm, 2 + length + i, true, false); } } @@ -178,6 +179,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { Collections.singleton(allocationId.getId()), routingTable(Collections.emptySet(), allocationId)); replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + retainingSequenceNumbers.put(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)), 0L); final int length = randomIntBetween(0, 8); for (int i = 0; i < length; i++) { @@ -239,7 +241,7 @@ public void testRemoveRetentionLease() { length - i - 1, minimumRetainingSequenceNumbers, primaryTerm, - 1 + length + i, + 2 + length + i, true, false); } @@ -298,6 +300,7 @@ public void testRemoveRetentionLeaseCausesRetentionLeaseSync() { Collections.singleton(allocationId.getId()), routingTable(Collections.emptySet(), allocationId)); replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + retainingSequenceNumbers.put(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)), 0L); final int length = randomIntBetween(0, 8); for (int i = 0; i < length; i++) { @@ -365,11 +368,12 @@ private void runExpirationTest(final boolean primaryMode) { { final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); - assertThat(retentionLeases.version(), equalTo(1L)); - assertThat(retentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); + final long expectedVersion = primaryMode ? 2L : 1L; + assertThat(retentionLeases.version(), equalTo(expectedVersion)); + assertThat(retentionLeases.leases(), hasSize(primaryMode ? 2 : 1)); + final RetentionLease retentionLease = retentionLeases.get("0"); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 1, primaryMode, false); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, expectedVersion, primaryMode, false); } // renew the lease @@ -387,18 +391,19 @@ private void runExpirationTest(final boolean primaryMode) { { final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); - assertThat(retentionLeases.version(), equalTo(2L)); - assertThat(retentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); + final long expectedVersion = primaryMode ? 3L : 2L; + assertThat(retentionLeases.version(), equalTo(expectedVersion)); + assertThat(retentionLeases.leases(), hasSize(primaryMode ? 2 : 1)); + final RetentionLease retentionLease = retentionLeases.get("0"); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, primaryMode, false); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, expectedVersion, primaryMode, false); } // now force the lease to expire currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primaryMode) { - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, true, false); - assertRetentionLeases(replicationTracker, 0, new long[0], primaryTerm, 3, true, true); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 3, true, false); + assertRetentionLeases(replicationTracker, 0, new long[0], primaryTerm, 4, true, true); } else { // leases do not expire on replicas until synced from the primary assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, false, false); @@ -625,10 +630,8 @@ private void assertRetentionLeases( } assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); assertThat(retentionLeases.version(), equalTo(version)); - final Map idToRetentionLease = new HashMap<>(); - for (final RetentionLease retentionLease : retentionLeases.leases()) { - idToRetentionLease.put(retentionLease.id(), retentionLease); - } + final Map idToRetentionLease = retentionLeases.leases().stream() + .filter(RetentionLease::isNotPeerRecoveryRetentionLease).collect(Collectors.toMap(RetentionLease::id, Function.identity())); assertThat(idToRetentionLease.entrySet(), hasSize(size)); for (int i = 0; i < size; i++) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java index 5165f2e8dc9e4..5f035a3604f41 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java @@ -52,10 +52,14 @@ ReplicationTracker newTracker( (leases, listener) -> {}); } + static String nodeIdFromAllocationId(final AllocationId allocationId) { + return "n-" + allocationId.getId().substring(0, 8); + } + static IndexShardRoutingTable routingTable(final Set initializingIds, final AllocationId primaryId) { final ShardId shardId = new ShardId("test", "_na_", 0); - final ShardRouting primaryShard = - TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(10), null, true, ShardRoutingState.STARTED, primaryId); + final ShardRouting primaryShard = TestShardRouting.newShardRouting( + shardId, nodeIdFromAllocationId(primaryId), null, true, ShardRoutingState.STARTED, primaryId); return routingTable(initializingIds, primaryShard); } @@ -65,7 +69,7 @@ static IndexShardRoutingTable routingTable(final Set initializingI final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId); for (final AllocationId initializingId : initializingIds) { builder.addShard(TestShardRouting.newShardRouting( - shardId, randomAlphaOfLength(10), null, false, ShardRoutingState.INITIALIZING, initializingId)); + shardId, nodeIdFromAllocationId(initializingId), null, false, ShardRoutingState.INITIALIZING, initializingId)); } builder.addShard(primaryShard); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index 05ca0a5ea3006..802deab8e5234 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -149,6 +149,7 @@ public void testGlobalCheckpointUpdate() { newInitializing.add(extraId); tracker.updateFromMaster(initialClusterStateVersion + 1, ids(active), routingTable(newInitializing, primaryId)); + addPeerRecoveryRetentionLease(tracker, extraId); tracker.initiateTracking(extraId.getId()); // now notify for the new id @@ -190,6 +191,7 @@ public void testMarkAllocationIdAsInSync() throws BrokenBarrierException, Interr tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId)); final long localCheckpoint = randomLongBetween(0, Long.MAX_VALUE - 1); tracker.activatePrimaryMode(localCheckpoint); + addPeerRecoveryRetentionLease(tracker, replicaId); tracker.initiateTracking(replicaId.getId()); final CyclicBarrier barrier = new CyclicBarrier(2); final Thread thread = new Thread(() -> { @@ -357,6 +359,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { tracker.updateFromMaster(clusterStateVersion, Collections.singleton(inSyncAllocationId.getId()), routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId)); tracker.activatePrimaryMode(globalCheckpoint); + addPeerRecoveryRetentionLease(tracker, trackingAllocationId); final Thread thread = new Thread(() -> { try { // synchronize starting with the test thread @@ -421,6 +424,7 @@ public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBar tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()), routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId)); tracker.activatePrimaryMode(globalCheckpoint); + addPeerRecoveryRetentionLease(tracker, trackingAllocationId); final Thread thread = new Thread(() -> { try { // synchronize starting with the test thread @@ -563,6 +567,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { initialClusterStateVersion + 3, ids(newActiveAllocationIds), routingTable(newInitializingAllocationIds, primaryId)); + addPeerRecoveryRetentionLease(tracker, newSyncingAllocationId); final CyclicBarrier barrier = new CyclicBarrier(2); final Thread thread = new Thread(() -> { try { @@ -610,7 +615,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { * allocation ID to the in-sync set and removing it from pending, the local checkpoint update that freed the thread waiting for the * local checkpoint to advance could miss updating the global checkpoint in a race if the waiting thread did not add the allocation * ID to the in-sync set and remove it from the pending set before the local checkpoint updating thread executed the global checkpoint - * update. This test fails without an additional call to {@link ReplicationTracker#updateGlobalCheckpointOnPrimary()} after + * update. This test fails without an additional call to {@code ReplicationTracker#updateGlobalCheckpointOnPrimary()} after * removing the allocation ID from the pending set in {@link ReplicationTracker#markAllocationIdAsInSync(String, long)} (even if a * call is added after notifying all waiters in {@link ReplicationTracker#updateLocalCheckpoint(String, long)}). * @@ -630,6 +635,7 @@ public void testRaceUpdatingGlobalCheckpoint() throws InterruptedException, Brok Collections.singleton(active.getId()), routingTable(Collections.singleton(initializing), active)); tracker.activatePrimaryMode(activeLocalCheckpoint); + addPeerRecoveryRetentionLease(tracker, initializing); final int nextActiveLocalCheckpoint = randomIntBetween(activeLocalCheckpoint + 1, Integer.MAX_VALUE); final Thread activeThread = new Thread(() -> { try { @@ -693,7 +699,9 @@ public void testPrimaryContextHandoff() throws IOException { clusterState.apply(oldPrimary); clusterState.apply(newPrimary); - activatePrimary(oldPrimary); + oldPrimary.activatePrimaryMode(randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10)); + addPeerRecoveryRetentionLease(oldPrimary, newPrimary.shardAllocationId); + newPrimary.updateRetentionLeasesOnReplica(oldPrimary.getRetentionLeases()); final int numUpdates = randomInt(10); for (int i = 0; i < numUpdates; i++) { @@ -706,7 +714,7 @@ public void testPrimaryContextHandoff() throws IOException { randomLocalCheckpointUpdate(oldPrimary); } if (randomBoolean()) { - randomMarkInSync(oldPrimary); + randomMarkInSync(oldPrimary, newPrimary); } } @@ -738,7 +746,7 @@ public void testPrimaryContextHandoff() throws IOException { randomLocalCheckpointUpdate(oldPrimary); } if (randomBoolean()) { - randomMarkInSync(oldPrimary); + randomMarkInSync(oldPrimary, newPrimary); } // do another handoff @@ -876,7 +884,10 @@ private static FakeClusterState initialState() { final ShardId shardId = new ShardId("test", "_na_", 0); final ShardRouting primaryShard = TestShardRouting.newShardRouting( - shardId, randomAlphaOfLength(10), randomAlphaOfLength(10), true, ShardRoutingState.RELOCATING, relocatingId); + shardId, + nodeIdFromAllocationId(relocatingId), + nodeIdFromAllocationId(AllocationId.newInitializing(relocatingId.getRelocationId())), + true, ShardRoutingState.RELOCATING, relocatingId); return new FakeClusterState( initialClusterStateVersion, @@ -884,20 +895,17 @@ private static FakeClusterState initialState() { routingTable(initializingAllocationIds, primaryShard)); } - private static void activatePrimary(ReplicationTracker gcp) { - gcp.activatePrimaryMode(randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10)); - } - private static void randomLocalCheckpointUpdate(ReplicationTracker gcp) { String allocationId = randomFrom(gcp.checkpoints.keySet()); long currentLocalCheckpoint = gcp.checkpoints.get(allocationId).getLocalCheckpoint(); gcp.updateLocalCheckpoint(allocationId, Math.max(SequenceNumbers.NO_OPS_PERFORMED, currentLocalCheckpoint + randomInt(5))); } - private static void randomMarkInSync(ReplicationTracker gcp) { - String allocationId = randomFrom(gcp.checkpoints.keySet()); - long newLocalCheckpoint = Math.max(NO_OPS_PERFORMED, gcp.getGlobalCheckpoint() + randomInt(5)); - markAsTrackingAndInSyncQuietly(gcp, allocationId, newLocalCheckpoint); + private static void randomMarkInSync(ReplicationTracker oldPrimary, ReplicationTracker newPrimary) { + final String allocationId = randomFrom(oldPrimary.checkpoints.keySet()); + final long newLocalCheckpoint = Math.max(NO_OPS_PERFORMED, oldPrimary.getGlobalCheckpoint() + randomInt(5)); + markAsTrackingAndInSyncQuietly(oldPrimary, allocationId, newLocalCheckpoint); + newPrimary.updateRetentionLeasesOnReplica(oldPrimary.getRetentionLeases()); } private static FakeClusterState randomUpdateClusterState(Set allocationIds, FakeClusterState clusterState) { @@ -908,11 +916,14 @@ private static FakeClusterState randomUpdateClusterState(Set allocationI final Set inSyncIdsToRemove = new HashSet<>( exclude(randomSubsetOf(randomInt(clusterState.inSyncIds.size()), clusterState.inSyncIds), allocationIds)); final Set remainingInSyncIds = Sets.difference(clusterState.inSyncIds, inSyncIdsToRemove); + final Set initializingIdsExceptRelocationTargets = exclude(clusterState.initializingIds(), + clusterState.routingTable.activeShards().stream().filter(ShardRouting::relocating) + .map(s -> s.allocationId().getRelocationId()).collect(Collectors.toSet())); return new FakeClusterState( clusterState.version + randomIntBetween(1, 5), remainingInSyncIds.isEmpty() ? clusterState.inSyncIds : remainingInSyncIds, routingTable( - Sets.difference(Sets.union(clusterState.initializingIds(), initializingIdsToAdd), initializingIdsToRemove), + Sets.difference(Sets.union(initializingIdsExceptRelocationTargets, initializingIdsToAdd), initializingIdsToRemove), clusterState.routingTable.primaryShard())); } @@ -945,6 +956,7 @@ private static Set randomAllocationIdsExcludingExistingIds(final S private static void markAsTrackingAndInSyncQuietly( final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) { try { + addPeerRecoveryRetentionLease(tracker, allocationId); tracker.initiateTracking(allocationId); tracker.markAllocationIdAsInSync(allocationId, localCheckpoint); } catch (final InterruptedException e) { @@ -952,4 +964,15 @@ private static void markAsTrackingAndInSyncQuietly( } } + private static void addPeerRecoveryRetentionLease(final ReplicationTracker tracker, final AllocationId allocationId) { + final String nodeId = nodeIdFromAllocationId(allocationId); + if (tracker.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeId)) == false) { + tracker.addPeerRecoveryRetentionLease(nodeId, NO_OPS_PERFORMED, ActionListener.wrap(() -> { })); + } + } + + private static void addPeerRecoveryRetentionLease(final ReplicationTracker tracker, final String allocationId) { + addPeerRecoveryRetentionLease(tracker, AllocationId.newInitializing(allocationId)); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java index bff4493321289..511a93e8268d1 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -73,11 +73,14 @@ public void testAddAction() { assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(1)); assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); - assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); - final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); assertThat(retentionLease.id(), equalTo(id)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber)); assertThat(retentionLease.source(), equalTo(source)); + + assertTrue(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(stats.getShards()[0].getShardRouting()))); } public void testAddAlreadyExists() { @@ -160,9 +163,11 @@ public void testRenewAction() throws InterruptedException { assertNotNull(initialStats.getShards()); assertThat(initialStats.getShards(), arrayWithSize(1)); assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats()); - assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(initialStats.getShards()[0].getShardRouting()))); final RetentionLease initialRetentionLease = - initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); final long nextRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL @@ -195,9 +200,11 @@ public void testRenewAction() throws InterruptedException { assertNotNull(renewedStats.getShards()); assertThat(renewedStats.getShards(), arrayWithSize(1)); assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats()); - assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(initialStats.getShards()[0].getShardRouting()))); final RetentionLease renewedRetentionLease = - renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); assertThat(renewedRetentionLease.id(), equalTo(id)); assertThat( renewedRetentionLease.retainingSequenceNumber(), @@ -265,7 +272,9 @@ public void testRemoveAction() { assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(1)); assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); - assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0)); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertTrue(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(stats.getShards()[0].getShardRouting()))); } public void testRemoveNotFound() { @@ -328,8 +337,10 @@ public void onFailure(final Exception e) { assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(1)); assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); - assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); - final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(stats.getShards()[0].getShardRouting()))); + final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); assertThat(retentionLease.id(), equalTo(id)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber)); assertThat(retentionLease.source(), equalTo(source)); @@ -378,9 +389,10 @@ public void testRenewUnderBlock() throws InterruptedException { assertNotNull(initialStats.getShards()); assertThat(initialStats.getShards(), arrayWithSize(1)); assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats()); - assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); - final RetentionLease initialRetentionLease = - initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(initialStats.getShards()[0].getShardRouting()))); + final RetentionLease initialRetentionLease = initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); final long nextRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL @@ -427,9 +439,10 @@ public void onFailure(final Exception e) { assertNotNull(renewedStats.getShards()); assertThat(renewedStats.getShards(), arrayWithSize(1)); assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats()); - assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); - final RetentionLease renewedRetentionLease = - renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(2)); + assertTrue(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(renewedStats.getShards()[0].getShardRouting()))); + final RetentionLease renewedRetentionLease = renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().get(id); assertThat(renewedRetentionLease.id(), equalTo(id)); assertThat( renewedRetentionLease.retainingSequenceNumber(), @@ -484,7 +497,9 @@ public void onFailure(final Exception e) { assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(1)); assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); - assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0)); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertTrue(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(stats.getShards()[0].getShardRouting()))); } /* diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index debb6d219a5f1..e9faa5f8ce987 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -115,7 +115,8 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { retentionLock.close(); // check retention leases have been written on the primary - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -124,11 +125,13 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); + final Map retentionLeasesOnReplica + = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been written on the replica - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); } } } @@ -173,7 +176,8 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { retentionLock.close(); // check retention leases have been written on the primary - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -182,11 +186,13 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); + final Map retentionLeasesOnReplica = + RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been written on the replica - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); } } } @@ -239,7 +245,8 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(replica.getRetentionLeases().leases(), anyOf(empty(), contains(currentRetentionLease))); + assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).values(), + anyOf(empty(), contains(currentRetentionLease))); } // update the index for retention leases to short a long time, to force expiration @@ -256,7 +263,8 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { // sleep long enough that the current retention lease has expired final long later = System.nanoTime(); Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now))); - assertBusy(() -> assertThat(primary.getRetentionLeases().leases(), empty())); + assertBusy(() -> assertThat( + RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.getRetentionLeases()).entrySet(), empty())); // now that all retention leases are expired should have been synced to all replicas assertBusy(() -> { @@ -266,7 +274,7 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(replica.getRetentionLeases().leases(), empty()); + assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).entrySet(), empty()); } }); } @@ -432,11 +440,13 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); + final Map retentionLeasesOnReplica + = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been written on the replica; see RecoveryTarget#finalizeRecovery - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases()))); + assertThat(currentRetentionLeases, + equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); } } @@ -474,7 +484,9 @@ public void testCanRenewRetentionLeaseUnderBlock() throws InterruptedException { * way for the current retention leases to end up written to disk so we assume that if they are written to disk, it * implies that the background sync was able to execute under a block. */ - assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get()))); + assertBusy(() -> assertThat( + RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()).values(), + contains(retentionLease.get()))); } catch (final Exception e) { failWithException(e); } @@ -593,7 +605,9 @@ public void testCanRenewRetentionLeaseWithoutWaitingForShards() throws Interrupt * way for the current retention leases to end up written to disk so we assume that if they are written to disk, it * implies that the background sync was able to execute despite wait for shards being set on the index. */ - assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get()))); + assertBusy(() -> assertThat( + RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()).values(), + contains(retentionLease.get()))); } catch (final Exception e) { failWithException(e); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java index adacf6539a80e..da22d68bf5f4d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java @@ -63,7 +63,8 @@ public void testRetentionLeaseStats() throws InterruptedException { final IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("index").execute().actionGet(); assertThat(indicesStats.getShards(), arrayWithSize(1)); final RetentionLeaseStats retentionLeaseStats = indicesStats.getShards()[0].getRetentionLeaseStats(); - assertThat(RetentionLeases.toMap(retentionLeaseStats.retentionLeases()), equalTo(currentRetentionLeases)); + assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(retentionLeaseStats.retentionLeases()), + equalTo(currentRetentionLeases)); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 974e060bf2520..ec3065331d275 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.RetentionLeases; @@ -35,14 +36,13 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; @@ -73,7 +73,7 @@ public void testAddOrRenewRetentionLease() throws IOException { indexShard.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); assertRetentionLeases( - indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); + indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 2 + i, true, false); } for (int i = 0; i < length; i++) { @@ -84,7 +84,7 @@ public void testAddOrRenewRetentionLease() throws IOException { length, minimumRetainingSequenceNumbers, primaryTerm, - 1 + length + i, + 2 + length + i, true, false); } @@ -105,7 +105,7 @@ public void testRemoveRetentionLease() throws IOException { indexShard.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); assertRetentionLeases( - indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); + indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 2 + i, true, false); } for (int i = 0; i < length; i++) { @@ -115,7 +115,7 @@ public void testRemoveRetentionLease() throws IOException { length - i - 1, minimumRetainingSequenceNumbers, primaryTerm, - 1 + length + i, + 2 + length + i, true, false); } @@ -132,6 +132,12 @@ public void testExpirationOnReplica() throws IOException { runExpirationTest(false); } + private RetentionLease peerRecoveryRetentionLease(IndexShard indexShard) { + return new RetentionLease( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(indexShard.routingEntry()), 0, currentTimeMillis.get(), + ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + private void runExpirationTest(final boolean primary) throws IOException { final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); final Settings settings = Settings @@ -147,23 +153,28 @@ private void runExpirationTest(final boolean primary) throws IOException { try { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE); + final long initialVersion; if (primary) { + initialVersion = 2; indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); } else { + initialVersion = 3; final RetentionLeases retentionLeases = new RetentionLeases( primaryTerm, - 1, - Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + initialVersion, + Arrays.asList( + peerRecoveryRetentionLease(indexShard), + new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); indexShard.updateRetentionLeasesOnReplica(retentionLeases); } { final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); - assertThat(retentionLeases.version(), equalTo(1L)); - assertThat(retentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); + assertThat(retentionLeases.version(), equalTo(initialVersion)); + assertThat(retentionLeases.leases(), hasSize(2)); + final RetentionLease retentionLease = retentionLeases.get("0"); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 1, primary, false); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, initialVersion, primary, false); } // renew the lease @@ -174,28 +185,30 @@ private void runExpirationTest(final boolean primary) throws IOException { } else { final RetentionLeases retentionLeases = new RetentionLeases( primaryTerm, - 2, - Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + initialVersion + 1, + Arrays.asList( + peerRecoveryRetentionLease(indexShard), + new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); indexShard.updateRetentionLeasesOnReplica(retentionLeases); } { final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); - assertThat(retentionLeases.version(), equalTo(2L)); - assertThat(retentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); + assertThat(retentionLeases.version(), equalTo(initialVersion + 1)); + assertThat(retentionLeases.leases(), hasSize(2)); + final RetentionLease retentionLease = retentionLeases.get("0"); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, primary, false); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, initialVersion + 1, primary, false); } // now force the lease to expire currentTimeMillis.set( currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primary) { - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, true, false); - assertRetentionLeases(indexShard, 0, new long[0], primaryTerm, 3, true, true); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, initialVersion + 1, true, false); + assertRetentionLeases(indexShard, 0, new long[0], primaryTerm, initialVersion + 2, true, true); } else { - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, false, false); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, initialVersion + 1, false, false); } } finally { closeShards(indexShard); @@ -229,13 +242,8 @@ public void testPersistence() throws IOException { // the written retention leases should equal our current retention leases final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); final RetentionLeases writtenRetentionLeases = indexShard.loadRetentionLeases(); - if (retentionLeases.leases().isEmpty()) { - assertThat(writtenRetentionLeases.version(), equalTo(0L)); - assertThat(writtenRetentionLeases.leases(), empty()); - } else { - assertThat(writtenRetentionLeases.version(), equalTo((long) length)); - assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0]))); - } + assertThat(writtenRetentionLeases.version(), equalTo(1L + length)); + assertThat(writtenRetentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0]))); // when we recover, we should recover the retention leases final IndexShard recoveredShard = reinitShard( @@ -244,15 +252,10 @@ public void testPersistence() throws IOException { try { recoverShardFromStore(recoveredShard); final RetentionLeases recoveredRetentionLeases = recoveredShard.getEngine().config().retentionLeasesSupplier().get(); - if (retentionLeases.leases().isEmpty()) { - assertThat(recoveredRetentionLeases.version(), equalTo(0L)); - assertThat(recoveredRetentionLeases.leases(), empty()); - } else { - assertThat(recoveredRetentionLeases.version(), equalTo((long) length)); - assertThat( - recoveredRetentionLeases.leases(), - contains(retentionLeases.leases().toArray(new RetentionLease[0]))); - } + assertThat(recoveredRetentionLeases.version(), equalTo(1L + length)); + assertThat( + recoveredRetentionLeases.leases(), + contains(retentionLeases.leases().toArray(new RetentionLease[0]))); } finally { closeShards(recoveredShard); } @@ -265,8 +268,10 @@ public void testPersistence() throws IOException { try { recoverShardFromStore(forceRecoveredShard); final RetentionLeases recoveredRetentionLeases = forceRecoveredShard.getEngine().config().retentionLeasesSupplier().get(); - assertThat(recoveredRetentionLeases.leases(), empty()); - assertThat(recoveredRetentionLeases.version(), equalTo(0L)); + assertThat(recoveredRetentionLeases.leases(), hasSize(1)); + assertThat(recoveredRetentionLeases.leases().iterator().next().id(), + equalTo(ReplicationTracker.getPeerRecoveryRetentionLeaseId(indexShard.routingEntry()))); + assertThat(recoveredRetentionLeases.version(), equalTo(1L)); } finally { closeShards(forceRecoveredShard); } @@ -291,8 +296,8 @@ public void testRetentionLeaseStats() throws IOException { stats.retentionLeases(), length, minimumRetainingSequenceNumbers, - length == 0 ? RetentionLeases.EMPTY.primaryTerm() : indexShard.getOperationPrimaryTerm(), - length); + indexShard.getOperationPrimaryTerm(), + length + 1); } finally { closeShards(indexShard); } @@ -355,7 +360,9 @@ private void assertRetentionLeases( assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = new HashMap<>(); for (final RetentionLease retentionLease : retentionLeases.leases()) { - idToRetentionLease.put(retentionLease.id(), retentionLease); + if (retentionLease.isNotPeerRecoveryRetentionLease()) { + idToRetentionLease.put(retentionLease.id(), retentionLease); + } } assertThat(idToRetentionLease.entrySet(), hasSize(size)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5187ef37fcdf8..be1c6254d868b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -102,6 +102,8 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; @@ -2076,7 +2078,8 @@ public void testPrimaryHandOffUpdatesLocalCheckpoint() throws IOException { } IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); - final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard()); + final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard(), Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), primarySource.indexSettings().isSoftDeleteEnabled()).build()); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData()); recoverReplica(primaryTarget, primarySource, true); @@ -2873,7 +2876,7 @@ public void testCompletionStatsMarksSearcherAccessed() throws Exception { public void testDocStats() throws Exception { IndexShard indexShard = null; try { - indexShard = newStartedShard( + indexShard = newStartedShard(false, Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0).build()); final long numDocs = randomIntBetween(2, 32); // at least two documents so we have docs to delete final long numDocsToDelete = randomLongBetween(1, numDocs); @@ -2911,13 +2914,20 @@ public void testDocStats() throws Exception { deleteDoc(indexShard, "_doc", id); indexDoc(indexShard, "_doc", id); } - // Need to update and sync the global checkpoint as the soft-deletes retention MergePolicy depends on it. + // Need to update and sync the global checkpoint and the retention leases for the soft-deletes retention MergePolicy. if (indexShard.indexSettings.isSoftDeleteEnabled()) { + final long newGlobalCheckpoint = indexShard.getLocalCheckpoint(); if (indexShard.routingEntry().primary()) { - indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), - indexShard.getLocalCheckpoint()); + indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), newGlobalCheckpoint); + indexShard.advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint(); } else { - indexShard.updateGlobalCheckpointOnReplica(indexShard.getLocalCheckpoint(), "test"); + indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test"); + + final RetentionLeases retentionLeases = indexShard.getRetentionLeases(); + indexShard.updateRetentionLeasesOnReplica(new RetentionLeases( + retentionLeases.primaryTerm(), retentionLeases.version() + 1, + retentionLeases.leases().stream().map(lease -> new RetentionLease(lease.id(), newGlobalCheckpoint + 1, + lease.timestamp(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)).collect(Collectors.toList()))); } indexShard.sync(); } @@ -3504,6 +3514,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception { // In order to instruct the merge policy not to keep a fully deleted segment, // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { + primary.advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint(); primary.sync(); flushShard(primary); } @@ -3983,6 +3994,7 @@ public void testTypelessDelete() throws IOException { IndexMetaData metaData = IndexMetaData.builder("index") .putMapping("some_type", "{ \"properties\": {}}") .settings(settings) + .primaryTerm(0, 1) .build(); IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); recoverShardFromStore(shard); @@ -3990,10 +4002,10 @@ public void testTypelessDelete() throws IOException { assertTrue(indexResult.isCreated()); DeleteResult deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "some_other_type", "id", VersionType.INTERNAL, - UNASSIGNED_SEQ_NO, 0); + UNASSIGNED_SEQ_NO, 1); assertFalse(deleteResult.isFound()); - deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "_doc", "id", VersionType.INTERNAL, UNASSIGNED_SEQ_NO, 0); + deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "_doc", "id", VersionType.INTERNAL, UNASSIGNED_SEQ_NO, 1); assertTrue(deleteResult.isFound()); closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 59e7c21a3e6e8..4a3b6629eeb89 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -81,6 +81,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -1052,6 +1053,10 @@ public void testFilterCacheStats() throws Exception { // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { persistGlobalCheckpoint("index"); + internalCluster().nodesInclude("index").stream() + .flatMap(n -> StreamSupport.stream(internalCluster().getInstance(IndicesService.class, n).spliterator(), false)) + .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) + .forEach(IndexShard::advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint); flush("index"); } ForceMergeResponse forceMergeResponse = diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f7042448e7576..67c29004b2bea 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -48,6 +48,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; @@ -666,7 +667,7 @@ public EngineConfig config( SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> listener.onResponse(new ReplicationResponse())); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; } else { diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index b11a0f84fb84a..93d8b98d306e8 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -287,6 +287,7 @@ public synchronized int startReplicas(int numOfReplicasToStart) throws IOExcepti public void startPrimary() throws IOException { recoverPrimary(primary); + computeReplicationTargets(); HashSet activeIds = new HashSet<>(); activeIds.addAll(activeIds()); activeIds.add(primary.routingEntry().allocationId().getId()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 2a2176f1c100d..2041ba63e5e9d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -222,7 +222,12 @@ protected IndexShard newShard(boolean primary, Settings settings, EngineFactory } protected IndexShard newShard(ShardRouting shardRouting, final IndexingOperationListener... listeners) throws IOException { - return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners); + return newShard(shardRouting, Settings.EMPTY, listeners); + } + + protected IndexShard newShard(ShardRouting shardRouting, final Settings settings, final IndexingOperationListener... listeners) + throws IOException { + return newShard(shardRouting, settings, new InternalEngineFactory(), listeners); } /** From 6f446a2db67446c6ba61ef3312c07180a06ab5a9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 10:41:52 +0100 Subject: [PATCH 02/15] Fix ShardChangesTests --- .../elasticsearch/xpack/ccr/action/ShardChangesTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java index f42a50b91ff02..196a6d97ce6c1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java @@ -16,7 +16,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.ccr.Ccr; @@ -26,6 +28,7 @@ import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -111,6 +114,9 @@ public void testMissingOperations() throws Exception { client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet(); } client().admin().indices().refresh(new RefreshRequest("index")).actionGet(); + StreamSupport.stream(getInstanceFromNode(IndicesService.class).spliterator(), false) + .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) + .forEach(IndexShard::advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint); ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index"); forceMergeRequest.maxNumSegments(1); client().admin().indices().forceMerge(forceMergeRequest).actionGet(); From 4716a9ca4ff97a1b801cc83fa57b4664cded3eec Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 10:55:21 +0100 Subject: [PATCH 03/15] Fix CCRIT --- .../src/test/java/org/elasticsearch/client/CCRIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java index dbcab4d1b2ce3..149d7383f54b0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java @@ -54,6 +54,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.test.rest.yaml.ObjectPath; import org.junit.Before; @@ -260,7 +261,9 @@ public void testForgetFollower() throws IOException { final Map shardStatsAsMap = (Map) shardStats.get(0); final Map retentionLeasesStats = (Map) shardStatsAsMap.get("retention_leases"); final List leases = (List) retentionLeasesStats.get("leases"); - assertThat(leases, empty()); + for (final Object lease : leases) { + assertThat(((Map) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)); + } } } From 0f2f2b71579dd9738562c17dd5e83e368a00480d Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 14:31:48 +0100 Subject: [PATCH 04/15] Advance leases on replicas too --- .../index/seqno/ReplicationTracker.java | 14 ++++++++++---- .../org/elasticsearch/index/shard/IndexShard.java | 9 +++++---- .../elasticsearch/index/shard/IndexShardTests.java | 4 ++-- .../elasticsearch/indices/stats/IndexStatsIT.java | 2 +- .../elasticsearch/xpack/ccr/IndexFollowingIT.java | 11 +++++++++++ .../xpack/ccr/action/ShardChangesTests.java | 2 +- 6 files changed, 30 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index a33b791a666b0..48db19c4cdd25 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -450,12 +450,18 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting) } /** - * Renew the peer-recovery retention lease for the given shard, advancing the retained sequence number to discard operations up to the - * given global checkpoint. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}. + * Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done + * properly. TODO remove this. */ - public void renewPeerRecoveryRetentionLease(ShardRouting shardRouting, long globalCheckpoint) { + public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() { assert primaryMode; - renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE); + for (ShardRouting shardRouting : routingTable) { + if (shardRouting.assignedToNode()) { + final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); + renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), checkpointState.globalCheckpoint + 1, + PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + } } public static class CheckpointState implements Writeable { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4d7c2e880cef6..eb08ad1bb7df7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2421,12 +2421,13 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, } /** - * Test-only method to advance the primary's peer-recovery retention lease so that operations up to the global checkpoint can be - * discarded. TODO Remove this when retention leases are advanced by other mechanisms. + * Test-only method to advance the all shards' peer-recovery retention leases to their tracked global checkpoints so that operations + * can be discarded. TODO Remove this when retention leases are advanced by other mechanisms. */ - public void advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint() { + public void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() { assert assertPrimaryMode(); - replicationTracker.renewPeerRecoveryRetentionLease(routingEntry(), getGlobalCheckpoint()); + replicationTracker.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints(); + syncRetentionLeases(); } class ShardEventListener implements Engine.EventListener { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index be1c6254d868b..0475527df953a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2919,7 +2919,7 @@ public void testDocStats() throws Exception { final long newGlobalCheckpoint = indexShard.getLocalCheckpoint(); if (indexShard.routingEntry().primary()) { indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), newGlobalCheckpoint); - indexShard.advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint(); + indexShard.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints(); } else { indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test"); @@ -3514,7 +3514,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception { // In order to instruct the merge policy not to keep a fully deleted segment, // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { - primary.advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint(); + primary.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints(); primary.sync(); flushShard(primary); } diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 4a3b6629eeb89..7e755839401a0 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -1056,7 +1056,7 @@ public void testFilterCacheStats() throws Exception { internalCluster().nodesInclude("index").stream() .flatMap(n -> StreamSupport.stream(internalCluster().getInstance(IndicesService.class, n).spliterator(), false)) .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) - .forEach(IndexShard::advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint); + .forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints); flush("index"); } ForceMergeResponse forceMergeResponse = diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 78ecb91b2826b..f6498aab0e6a9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -68,7 +68,10 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; @@ -107,6 +110,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -1179,6 +1183,13 @@ private void runFallBehindTest( leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } leaderClient().prepareDelete("index1", "doc", "1").get(); + getLeaderCluster().nodesInclude("index1").stream() + .flatMap(n -> StreamSupport.stream(getLeaderCluster().getInstance(IndicesService.class, n).spliterator(), false)) + .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) + .filter(indexShard -> indexShard.shardId().getIndexName().equals("index1")) + .filter(indexShard -> indexShard.routingEntry().primary()) + .forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints); + leaderClient().admin().indices().refresh(new RefreshRequest("index1")).actionGet(); leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet(); ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index1"); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java index 196a6d97ce6c1..b85c00a635922 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java @@ -116,7 +116,7 @@ public void testMissingOperations() throws Exception { client().admin().indices().refresh(new RefreshRequest("index")).actionGet(); StreamSupport.stream(getInstanceFromNode(IndicesService.class).spliterator(), false) .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) - .forEach(IndexShard::advancePrimaryPeerRecoveryRetentionLeaseToGlobalCheckpoint); + .forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints); ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index"); forceMergeRequest.maxNumSegments(1); client().admin().indices().forceMerge(forceMergeRequest).actionGet(); From 9a2f984110bd4f735f46f328d2c209cf2e40437f Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 14:38:43 +0100 Subject: [PATCH 05/15] Unnecessary method, inline it --- .../java/org/elasticsearch/index/seqno/RetentionLease.java | 3 --- .../index/seqno/ReplicationTrackerRetentionLeaseTests.java | 3 ++- .../index/shard/IndexShardRetentionLeaseTests.java | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index ec67448341fa3..9cfad7c36ea06 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -211,7 +211,4 @@ public String toString() { '}'; } - public boolean isNotPeerRecoveryRetentionLease() { - return ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(source) == false; - } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 8dc91db7d3350..393ff44ef5c66 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -631,7 +631,8 @@ private void assertRetentionLeases( assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = retentionLeases.leases().stream() - .filter(RetentionLease::isNotPeerRecoveryRetentionLease).collect(Collectors.toMap(RetentionLease::id, Function.identity())); + .filter(retentionLease -> ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(retentionLease.source()) == false) + .collect(Collectors.toMap(RetentionLease::id, Function.identity())); assertThat(idToRetentionLease.entrySet(), hasSize(size)); for (int i = 0; i < size; i++) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index ec3065331d275..ed429bb680d7d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -360,7 +360,7 @@ private void assertRetentionLeases( assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = new HashMap<>(); for (final RetentionLease retentionLease : retentionLeases.leases()) { - if (retentionLease.isNotPeerRecoveryRetentionLease()) { + if (ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(retentionLease.source()) == false) { idToRetentionLease.put(retentionLease.id(), retentionLease); } } From 043771a4ab4c03dded1b179b1ea7c0ae371548d8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 14:40:58 +0100 Subject: [PATCH 06/15] Imports --- .../test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index f6498aab0e6a9..a3bf13fa593ee 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -68,7 +68,6 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.RetentionLeaseActions; -import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; From 32837be01d91776b4fa4787d29d3a8cc68101366 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 15:44:26 +0100 Subject: [PATCH 07/15] Fix CcrRetentionLeaseIT --- .../xpack/ccr/CcrRetentionLeaseIT.java | 86 +++++++++++-------- 1 file changed, 48 insertions(+), 38 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 2f0aed395a73c..148da675dd539 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -189,10 +189,10 @@ public void testRetentionLeaseIsTakenAtTheStartOfRecovery() throws Exception { final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); + final Map currentRetentionLeases + = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); + final RetentionLease retentionLease = currentRetentionLeases.values().iterator().next(); assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); } }); @@ -310,7 +310,7 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws */ assertBusy(() -> { // sample the leases after recovery - final List retentionLeases = new ArrayList<>(); + final List< Map> retentionLeases = new ArrayList<>(); assertBusy(() -> { retentionLeases.clear(); final IndicesStatsResponse stats = @@ -320,13 +320,14 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.leases(), hasSize(1)); + final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); + currentRetentionLeases.values().iterator().next(); final String expectedRetentionLeaseId = retentionLeaseId( getFollowerCluster().getClusterName(), new Index(followerIndex, followerUUID), @@ -353,16 +354,17 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws continue; } assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.leases(), hasSize(1)); + final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); + currentRetentionLeases.values().iterator().next(); assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); // we assert that retention leases are being renewed by an increase in the timestamp - assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).leases().iterator().next().timestamp())); + assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).values().iterator().next().timestamp())); } }); @@ -392,10 +394,10 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); final List shardsStats = getShardsStats(stats); for (final ShardStats shardStats : shardsStats) { - assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); - assertThat( - shardStats.getRetentionLeaseStats().retentionLeases().leases().iterator().next().id(), - equalTo(retentionLeaseId)); + final Map retentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardStats.getRetentionLeaseStats().retentionLeases()); + assertThat(Strings.toString(shardStats), retentionLeases.values(), hasSize(1)); + assertThat(retentionLeases.values().iterator().next().id(), equalTo(retentionLeaseId)); } // we will sometimes fake that some of the retention leases are already removed on the leader shard @@ -454,7 +456,8 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); for (final ShardStats shardStats : afterUnfollowShardsStats) { - assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + assertThat(Strings.toString(shardStats), RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardStats.getRetentionLeaseStats().retentionLeases()).values(), empty()); } } finally { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { @@ -605,10 +608,11 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception { final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.leases(), hasSize(1)); + final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); + currentRetentionLeases.values().iterator().next(); assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); // we assert that retention leases are being advanced assertThat( @@ -665,7 +669,7 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E */ assertBusy(() -> { // sample the leases after pausing - final List retentionLeases = new ArrayList<>(); + final List> retentionLeases = new ArrayList<>(); assertBusy(() -> { retentionLeases.clear(); final IndicesStatsResponse stats = @@ -675,13 +679,14 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.leases(), hasSize(1)); + final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); + currentRetentionLeases.values().iterator().next(); final String expectedRetentionLeaseId = retentionLeaseId( getFollowerCluster().getClusterName(), new Index(followerIndex, followerUUID), @@ -708,16 +713,17 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E continue; } assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.leases(), hasSize(1)); + final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); + currentRetentionLeases.values().iterator().next(); assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); // we assert that retention leases are not being renewed by an unchanged timestamp - assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).leases().iterator().next().timestamp())); + assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).values().iterator().next().timestamp())); } }); } @@ -924,7 +930,8 @@ public void onResponseReceived( final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); for (final ShardStats shardStats : afterUnfollowShardsStats) { assertNotNull(shardStats.getRetentionLeaseStats()); - assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + assertThat(Strings.toString(shardStats), RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardStats.getRetentionLeaseStats().retentionLeases()).values(), empty()); } } finally { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { @@ -975,7 +982,8 @@ public void testForgetFollower() throws Exception { final List afterForgetFollowerShardsStats = getShardsStats(afterForgetFollowerStats); for (final ShardStats shardStats : afterForgetFollowerShardsStats) { assertNotNull(shardStats.getRetentionLeaseStats()); - assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + assertThat(Strings.toString(shardStats), RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardStats.getRetentionLeaseStats().retentionLeases()).values(), empty()); } } @@ -985,7 +993,7 @@ private void assertRetentionLeaseRenewal( final String followerIndex, final String leaderIndex) throws Exception { // ensure that a retention lease has been put in place on each shard, and grab a copy of them - final List retentionLeases = new ArrayList<>(); + final List> retentionLeases = new ArrayList<>(); assertBusy(() -> { retentionLeases.clear(); final IndicesStatsResponse stats = @@ -995,10 +1003,11 @@ private void assertRetentionLeaseRenewal( final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.leases(), hasSize(1)); + final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); + currentRetentionLeases.values().iterator().next(); assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); retentionLeases.add(currentRetentionLeases); } @@ -1013,13 +1022,14 @@ private void assertRetentionLeaseRenewal( final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.leases(), hasSize(1)); + final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); + currentRetentionLeases.values().iterator().next(); assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); // we assert that retention leases are being renewed by an increase in the timestamp - assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp())); + assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).values().iterator().next().timestamp())); } }); } From 4085c74c015e7f6b3c130283f15d78c93c9a3a26 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 16:03:04 +0100 Subject: [PATCH 08/15] Line length --- .../java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 148da675dd539..32ff961fc145e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -189,8 +189,8 @@ public void testRetentionLeaseIsTakenAtTheStartOfRecovery() throws Exception { final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases - = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final RetentionLease retentionLease = currentRetentionLeases.values().iterator().next(); assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); From e89051a3fe995101977bab6efb97c8398b09e5e3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 17:01:25 +0100 Subject: [PATCH 09/15] Expect leases --- .../org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index cb54248ee3dbc..1b253874898f0 100644 --- a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.test.rest.yaml.ObjectPath; import java.io.IOException; @@ -228,7 +229,9 @@ public void testForgetFollower() throws IOException { final Map shardStatsAsMap = (Map) shardsStats.get(0); final Map retentionLeasesStats = (Map) shardStatsAsMap.get("retention_leases"); final List leases = (List) retentionLeasesStats.get("leases"); - assertThat(leases, empty()); + for (final Object lease : leases) { + assertThat(((Map) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)); + } } } } From e5c46a6816de09d436d7e0fc481ca56cd644787d Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2019 17:20:55 +0100 Subject: [PATCH 10/15] ffs --- .../java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 1b253874898f0..c9c74e658f4fd 100644 --- a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -25,7 +25,6 @@ import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; From d7fa1392e571437b589891b80d1f9342bf8d0f41 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 18 Jun 2019 11:01:00 +0100 Subject: [PATCH 11/15] No need to check relocation targets specially since #43276 --- .../org/elasticsearch/index/seqno/ReplicationTracker.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 48db19c4cdd25..8649389d94647 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -699,10 +699,6 @@ private boolean invariant() { assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false || retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : "no retention lease for tracked shard " + shardRouting + " in " + retentionLeases; - assert shardRouting.relocating() == false - || checkpoints.get(shardRouting.allocationId().getRelocationId()).tracked == false - || retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting.getTargetRelocatingShard())) - : "no retention lease for relocation target " + shardRouting + " in " + retentionLeases; } } From 47b6b428e7218f04e9297c6e1da015d0db13b1bd Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 18 Jun 2019 11:23:39 +0100 Subject: [PATCH 12/15] Clarify why we use startingSeqNo - 1 --- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 0bde05d536a06..6739ed48f5c07 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -196,8 +196,10 @@ public void recoverToTarget(ActionListener listener) { && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { runUnderPrimaryPermit(() -> { try { + // conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate to do so + final long globalCheckpoint = startingSeqNo - 1; // blindly create the lease. TODO integrate this with the recovery process - shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), startingSeqNo - 1, establishRetentionLeaseStep); + shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), globalCheckpoint, establishRetentionLeaseStep); } catch (RetentionLeaseAlreadyExistsException e) { logger.debug("peer-recovery retention lease already exists", e); establishRetentionLeaseStep.onResponse(null); From 64481dc6bf4bdfddaeda462fefe33ab81e2e97b1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 18 Jun 2019 12:06:13 +0100 Subject: [PATCH 13/15] Assert correct source --- .../elasticsearch/index/seqno/ReplicationTracker.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 8649389d94647..0e2f1f37e01e7 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -696,9 +696,14 @@ private boolean invariant() { && indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { - assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false - || retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : - "no retention lease for tracked shard " + shardRouting + " in " + retentionLeases; + if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { + assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) + : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; + assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( + retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source()) + : "incorrect source [" + retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source() + + "] for [" + shardRouting + "] in " + retentionLeases; + } } } From 78dd210262e5eee4118078d37577673b8e00885b Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 18 Jun 2019 11:37:24 +0100 Subject: [PATCH 14/15] Move toMapExcludingPeerRecoveryRetentionLeases to RetentionLeaseUtils --- .../index/seqno/RetentionLeases.java | 18 +------ .../RetentionLeasesReplicationTests.java | 3 +- .../index/seqno/RetentionLeaseIT.java | 26 +++++----- .../index/seqno/RetentionLeaseStatsTests.java | 2 +- .../index/seqno/RetentionLeaseUtils.java | 48 +++++++++++++++++++ .../xpack/ccr/CcrRetentionLeaseIT.java | 26 +++++----- 6 files changed, 78 insertions(+), 45 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java index 8148d12fbb8aa..8c5c282a72d08 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java @@ -274,21 +274,5 @@ private static Map toMap(final Collection toMapExcludingPeerRecoveryRetentionLeases(final RetentionLeases retentionLeases) { - return retentionLeases.leases.values().stream() - .filter(l -> ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(l.source()) == false) - .collect(Collectors.toMap(RetentionLease::id, Function.identity(), - (o1, o2) -> { - throw new AssertionError("unexpectedly merging " + o1 + " and " + o2); - }, - LinkedHashMap::new)); - } - } + diff --git a/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java index bc71fcc63510d..c1996604faeff 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseUtils; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -63,7 +64,7 @@ public void testSimpleSyncRetentionLeases() throws Exception { RetentionLeases leasesOnPrimary = group.getPrimary().getRetentionLeases(); assertThat(leasesOnPrimary.version(), equalTo(iterations + group.getReplicas().size() + 1L)); assertThat(leasesOnPrimary.primaryTerm(), equalTo(group.getPrimary().getOperationPrimaryTerm())); - assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(leasesOnPrimary).values(), + assertThat(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(leasesOnPrimary).values(), containsInAnyOrder(leases.toArray(new RetentionLease[0]))); latch.await(); for (IndexShard replica : group.getReplicas()) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index e9faa5f8ce987..d8decfcdebc79 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -116,7 +116,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { // check retention leases have been written on the primary assertThat(currentRetentionLeases, - equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()))); + equalTo(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -126,12 +126,12 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); final Map retentionLeasesOnReplica - = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); + = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been written on the replica assertThat(currentRetentionLeases, - equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); + equalTo(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); } } } @@ -177,7 +177,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { // check retention leases have been written on the primary assertThat(currentRetentionLeases, - equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()))); + equalTo(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -187,12 +187,12 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); final Map retentionLeasesOnReplica = - RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); + RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been written on the replica assertThat(currentRetentionLeases, - equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); + equalTo(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); } } } @@ -245,7 +245,7 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).values(), + assertThat(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).values(), anyOf(empty(), contains(currentRetentionLease))); } @@ -264,7 +264,7 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final long later = System.nanoTime(); Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now))); assertBusy(() -> assertThat( - RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.getRetentionLeases()).entrySet(), empty())); + RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(primary.getRetentionLeases()).entrySet(), empty())); // now that all retention leases are expired should have been synced to all replicas assertBusy(() -> { @@ -274,7 +274,7 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).entrySet(), empty()); + assertThat(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).entrySet(), empty()); } }); } @@ -441,12 +441,12 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); final Map retentionLeasesOnReplica - = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); + = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been written on the replica; see RecoveryTarget#finalizeRecovery assertThat(currentRetentionLeases, - equalTo(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); + equalTo(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.loadRetentionLeases()))); } } @@ -485,7 +485,7 @@ public void testCanRenewRetentionLeaseUnderBlock() throws InterruptedException { * implies that the background sync was able to execute under a block. */ assertBusy(() -> assertThat( - RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()).values(), + RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()).values(), contains(retentionLease.get()))); } catch (final Exception e) { failWithException(e); @@ -606,7 +606,7 @@ public void testCanRenewRetentionLeaseWithoutWaitingForShards() throws Interrupt * implies that the background sync was able to execute despite wait for shards being set on the index. */ assertBusy(() -> assertThat( - RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()).values(), + RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(primary.loadRetentionLeases()).values(), contains(retentionLease.get()))); } catch (final Exception e) { failWithException(e); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java index da22d68bf5f4d..a568bd728418f 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java @@ -63,7 +63,7 @@ public void testRetentionLeaseStats() throws InterruptedException { final IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("index").execute().actionGet(); assertThat(indicesStats.getShards(), arrayWithSize(1)); final RetentionLeaseStats retentionLeaseStats = indicesStats.getShards()[0].getRetentionLeaseStats(); - assertThat(RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases(retentionLeaseStats.retentionLeases()), + assertThat(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(retentionLeaseStats.retentionLeases()), equalTo(currentRetentionLeases)); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java b/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java new file mode 100644 index 0000000000000..55807161d51ad --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class RetentionLeaseUtils { + + private RetentionLeaseUtils() { + // only static methods + } + + /** + * A utility method to convert a retention lease collection to a map from retention lease ID to retention lease and exclude + * the automatically-added peer-recovery retention leases + * + * @param retentionLeases the retention lease collection + * @return the map from retention lease ID to retention lease + */ + public static Map toMapExcludingPeerRecoveryRetentionLeases(final RetentionLeases retentionLeases) { + return retentionLeases.leases().stream() + .filter(l -> ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(l.source()) == false) + .collect(Collectors.toMap(RetentionLease::id, Function.identity(), + (o1, o2) -> { + throw new AssertionError("unexpectedly merging " + o1 + " and " + o2); + }, + LinkedHashMap::new)); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 32ff961fc145e..f2bc649dd9686 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -36,7 +36,7 @@ import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; -import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.seqno.RetentionLeaseUtils; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.indices.IndicesService; @@ -189,7 +189,7 @@ public void testRetentionLeaseIsTakenAtTheStartOfRecovery() throws Exception { final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final RetentionLease retentionLease = currentRetentionLeases.values().iterator().next(); @@ -320,7 +320,7 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = @@ -354,7 +354,7 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws continue; } assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = @@ -394,7 +394,7 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); final List shardsStats = getShardsStats(stats); for (final ShardStats shardStats : shardsStats) { - final Map retentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + final Map retentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardStats.getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardStats), retentionLeases.values(), hasSize(1)); assertThat(retentionLeases.values().iterator().next().id(), equalTo(retentionLeaseId)); @@ -456,7 +456,7 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); for (final ShardStats shardStats : afterUnfollowShardsStats) { - assertThat(Strings.toString(shardStats), RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + assertThat(Strings.toString(shardStats), RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardStats.getRetentionLeaseStats().retentionLeases()).values(), empty()); } } finally { @@ -608,7 +608,7 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception { final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final RetentionLease retentionLease = @@ -679,7 +679,7 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = @@ -713,7 +713,7 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E continue; } assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = @@ -930,7 +930,7 @@ public void onResponseReceived( final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); for (final ShardStats shardStats : afterUnfollowShardsStats) { assertNotNull(shardStats.getRetentionLeaseStats()); - assertThat(Strings.toString(shardStats), RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + assertThat(Strings.toString(shardStats), RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardStats.getRetentionLeaseStats().retentionLeases()).values(), empty()); } } finally { @@ -982,7 +982,7 @@ public void testForgetFollower() throws Exception { final List afterForgetFollowerShardsStats = getShardsStats(afterForgetFollowerStats); for (final ShardStats shardStats : afterForgetFollowerShardsStats) { assertNotNull(shardStats.getRetentionLeaseStats()); - assertThat(Strings.toString(shardStats), RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + assertThat(Strings.toString(shardStats), RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardStats.getRetentionLeaseStats().retentionLeases()).values(), empty()); } } @@ -1003,7 +1003,7 @@ private void assertRetentionLeaseRenewal( final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final RetentionLease retentionLease = @@ -1022,7 +1022,7 @@ private void assertRetentionLeaseRenewal( final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeases.toMapExcludingPeerRecoveryRetentionLeases( + final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final RetentionLease retentionLease = From 09027bc9d091c73cae9153f5e837d91e9d1a083d Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 18 Jun 2019 12:24:37 +0100 Subject: [PATCH 15/15] Line length --- .../elasticsearch/index/seqno/RetentionLeaseIT.java | 3 ++- .../elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java | 10 ++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index d8decfcdebc79..22edba58c9446 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -274,7 +274,8 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).entrySet(), empty()); + assertThat( + RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases(replica.getRetentionLeases()).entrySet(), empty()); } }); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index f2bc649dd9686..6b4cfe20a099f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -320,8 +320,9 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( - shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + final Map currentRetentionLeases + = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); @@ -679,8 +680,9 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { assertNotNull(shardsStats.get(i).getRetentionLeaseStats()); - final Map currentRetentionLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( - shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); + final Map currentRetentionLeases + = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( + shardsStats.get(i).getRetentionLeaseStats().retentionLeases()); assertThat(Strings.toString(shardsStats.get(i)), currentRetentionLeases.values(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();