diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 602a31288ef46..dbf8bd3e0658f 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -50,6 +51,7 @@ import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -442,6 +444,43 @@ public void testCloseIndexDuringRollingUpgrade() throws Exception { } } + + /** + * We test that a closed index makes no-op replica allocation only. + */ + public void testClosedIndexReplicaAllocation() throws Exception { + final String indexName = "closed_index_replica_allocation"; + if (CLUSTER_TYPE == ClusterType.OLD) { + createIndex(indexName, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "120s") + .put("index.routing.allocation.include._name", "node-0") + .build()); + indexDocs(indexName, 0, randomInt(10)); + // allocate replica to node-2 + updateIndexSettings(indexName, + Settings.builder().put("index.routing.allocation.include._name", "node-0,node-2,upgraded-node-*")); + ensureGreen(indexName); + closeIndex(indexName); + } + + final Version indexVersionCreated = indexVersionCreated(indexName); + if (indexVersionCreated.onOrAfter(Version.V_7_2_0)) { + // index was created on a version that supports the replication of closed indices, + // so we expect the index to be closed and replicated + ensureGreen(indexName); + assertClosedIndex(indexName, true); + // todo: change to 7_X once backported. + if (CLUSTER_TYPE != ClusterType.OLD && minimumNodeVersion().onOrAfter(Version.V_8_0_0)) { + assertNoFileBasedRecovery(indexName, s-> s.startsWith("upgraded")); + } + } else { + assertClosedIndex(indexName, false); + } + + } /** * Returns the version in which the given index has been created */ @@ -585,4 +624,35 @@ public void testUpdateDoc() throws Exception { client().performRequest(update); } } + + private void assertNoFileBasedRecovery(String indexName, Predicate targetNode) throws IOException { + Map recoveries = entityAsMap(client() + .performRequest(new Request("GET", indexName + "/_recovery?detailed=true"))); + + @SuppressWarnings("unchecked") + List> shards = (List>) XContentMapValues.extractValue(indexName + ".shards", recoveries); + assertNotNull(shards); + boolean foundReplica = false; + for (Map shard : shards) { + if (shard.get("primary") == Boolean.FALSE + && targetNode.test((String) XContentMapValues.extractValue("target.name", shard))) { + List details = (List) XContentMapValues.extractValue("index.files.details", shard); + // once detailed recoveries works, remove this if. + if (details == null) { + long totalFiles = ((Number) XContentMapValues.extractValue("index.files.total", shard)).longValue(); + long reusedFiles = ((Number) XContentMapValues.extractValue("index.files.reused", shard)).longValue(); + assertEquals("must reuse all files, recoveries [" + recoveries + "]", totalFiles, reusedFiles); + } else { + assertNotNull(details); + assertThat(details, empty()); + } + + long translogRecovered = ((Number) XContentMapValues.extractValue("translog.recovered", shard)).longValue(); + assertEquals("must be noop, recoveries [" + recoveries + "]", 0, translogRecovered); + foundReplica = true; + } + } + + assertTrue("must find replica", foundReplica); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java index 8b97f1357fa00..7014d08b03f69 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java @@ -228,6 +228,8 @@ public String getAllocationId() { * matching sync ids are irrelevant. */ public boolean hasMatchingSyncId() { + // TODO: this method needs a rename, leaving it for now to not make too many iterations on that until we have full seqno + // based recovery. return matchingBytes == Long.MAX_VALUE; } @@ -274,6 +276,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("allocation_id", allocationId); } if (matchingBytes >= 0) { + // TODO: we should eventually either distinguish between sync-id and non sync-id equivalent shard allocation or + // rename this to synced_match + // left this for now, since it changes the API and should preferably be handled together with seqno based + // replica shard allocation, consisting of whether this will be ops based and how many ops to recover. if (hasMatchingSyncId()) { builder.field("matching_sync_id", true); } else { diff --git a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java index d03f6abf7d9bd..0374884c05f2b 100644 --- a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java @@ -231,6 +231,13 @@ protected synchronized void processAsyncFetch(List responses, List> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); + // contains ephemeralIds + private volatile Set lastDataNodes = Collections.emptySet(); + @Inject public GatewayAllocator(RoutingService routingService, TransportNodesListGatewayStartedShards startedAction, @@ -101,6 +110,7 @@ public void applyFailedShards(final RoutingAllocation allocation, final List clearCacheForPrimary(fetch, allocation)); + // recalc to also (lazily) clear out old nodes. + Set newDataNodes = new HashSet<>(nodes.getDataNodes().size()); + for (Iterator iterator = nodes.getDataNodes().valuesIt(); iterator.hasNext(); ) { + newDataNodes.add(iterator.next().getEphemeralId()); + } + this.lastDataNodes = newDataNodes; + } + } + + private void clearCacheForPrimary(AsyncShardFetch fetch, + RoutingAllocation allocation) { + ShardRouting primary = allocation.routingNodes().activePrimary(fetch.shardId); + if (primary != null) { + fetch.clearCacheForNode(primary.currentNodeId()); + } + } + + private boolean hasNewNodes(DiscoveryNodes nodes, Set lastDataNodes) { + for (Iterator iterator = nodes.getDataNodes().valuesIt(); iterator.hasNext(); ) { + DiscoveryNode node = iterator.next(); + if (lastDataNodes.contains(node.getEphemeralId()) == false) { + logger.trace("new node {} found, clearing primary async-fetch-store cache", node); + return true; + } + } + + return false; + } + class InternalAsyncFetch extends AsyncShardFetch { InternalAsyncFetch(Logger logger, String type, ShardId shardId, Lister, T> action) { diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index ce3cde3e6db71..59aa02527c675 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -49,7 +49,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -101,17 +100,16 @@ public void processExistingRecoveries(RoutingAllocation allocation) { DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider - final String currentSyncId; + final TransportNodesListShardStoreMetaData.StoreFilesMetaData currentStore; if (shardStores.getData().containsKey(currentNode)) { - currentSyncId = shardStores.getData().get(currentNode).storeFilesMetaData().syncId(); + currentStore = shardStores.getData().get(currentNode).storeFilesMetaData(); } else { - currentSyncId = null; + currentStore = null; } if (currentNode.equals(nodeWithHighestMatch) == false - && Objects.equals(currentSyncId, primaryStore.syncId()) == false - && matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch)) { - // we found a better match that has a full sync id match, the existing allocation is not fully synced - // so we found a better one, cancel this one + && isNoopRecovery(primaryStore, currentStore) == false + && matchingNodes.isNoopRecovery(nodeWithHighestMatch)) { + // we found a better match that can do a fast recovery, cancel current recovery logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]", currentNode, nodeWithHighestMatch); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, @@ -363,10 +361,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData) { - String primarySyncId = primaryStore.syncId(); - String replicaSyncId = storeFilesMetaData.syncId(); - // see if we have a sync id we can make use of - if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) { + if (isNoopRecovery(primaryStore, storeFilesMetaData)) { return Long.MAX_VALUE; } else { long sizeMatched = 0; @@ -380,6 +375,34 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St } } + /** + * Is a "noop recovery", which means expecting no operations to recover (though with sync-id, we could in principle still + * have a few). + */ + private static boolean isNoopRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) { + // keeping syncIdMatch for 7.x to remain backwards compatible with pre-7.3 versions, but will remove for 8.0. + return syncIdMatch(primaryStore, candidateStore) + || noopMatch(primaryStore, candidateStore); + } + + private static boolean syncIdMatch(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) { + String primarySyncId = primaryStore.syncId(); + String replicaSyncId = candidateStore.syncId(); + return (replicaSyncId != null && replicaSyncId.equals(primarySyncId)); + } + + private static boolean noopMatch(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) { + // We need the maxSeqNo conditions until we support non-noop recovery for closed indices (and preferably also have + // retention leases in place to ensure ops based recovery will actually be performed). + return primaryStore.hasSeqNoInfo() + && primaryStore.maxSeqNo() == candidateStore.maxSeqNo() + && primaryStore.provideRecoverySeqNo() <= candidateStore.requireRecoverySeqNo() + && candidateStore.requireRecoverySeqNo() == primaryStore.maxSeqNo() + 1; + } + protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); /** @@ -418,7 +441,10 @@ public DiscoveryNode getNodeWithHighestMatch() { return this.nodeWithHighestMatch; } - public boolean isNodeMatchBySyncID(DiscoveryNode node) { + /** + * Is supplied node a no-operations recovery, either sync-id match or sequence number match. + */ + public boolean isNoopRecovery(DiscoveryNode node) { return nodesToSize.get(node) == Long.MAX_VALUE; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fdd95614756b7..7689df2d81e63 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -23,7 +23,10 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; import org.apache.lucene.search.Query; @@ -50,6 +53,7 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; @@ -222,6 +226,7 @@ Runnable getGlobalCheckpointSyncer() { private RecoveryState recoveryState; private final RecoveryStats recoveryStats = new RecoveryStats(); + private final MeanMetric refreshMetric = new MeanMetric(); private final MeanMetric externalRefreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); @@ -1182,7 +1187,25 @@ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { * @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present. */ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { - Engine.IndexCommitRef indexCommit = null; + return snapshot(this::snapshotStoreMetadataFromEngine, recoverySource -> store.getMetadata(null, true)); + } + + /** + * Similar to snapshotStoreMetadata, but extended with additional info about sequence numbers for recovery. + * + * @see #snapshotStoreMetadata() for info on lifecycle and exceptions. + */ + public Store.RecoveryMetadataSnapshot snapshotStoreRecoveryMetadata() throws IOException { + return snapshot(this::snapshotStoreRecoveryMetadataFromEngine, this::snapshotStoreRecoveryMetadataFromStore); + } + + /** + * snapshot data from either engine if available or else from store + * @param engineSnapper snap data from engine, must be safe towards concurrent close. + * @param storeSnapper snap data from store, will be run under mutex. Recovery source will be null if shard is closed. + */ + private T snapshot(CheckedFunction engineSnapper, + CheckedFunction storeSnapper) throws IOException { store.incRef(); try { Engine engine; @@ -1192,14 +1215,59 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { // That can be done out of mutex, since the engine can be closed half way. engine = getEngineOrNull(); if (engine == null) { - return store.getMetadata(null, true); + // take recovery source from shard routing since we are then sure it is available from the beginning of the IndexShard + // lifecycle. + assert shardRouting.initializing() || state == IndexShardState.CLOSED; + if (shardRouting.initializing()) { + return storeSnapper.apply(shardRouting.recoverySource()); + } else { + return storeSnapper.apply(null); + } } } - indexCommit = engine.acquireLastIndexCommit(false); - return store.getMetadata(indexCommit.getIndexCommit()); + // engineSnaper must be safe towards concurrent engine close. + return engineSnapper.apply(engine); } finally { store.decRef(); - IOUtils.close(indexCommit); + } + } + + private Store.MetadataSnapshot snapshotStoreMetadataFromEngine(Engine engine) throws IOException { + try (Engine.IndexCommitRef indexCommit = engine.acquireLastIndexCommit(false)) { + return store.getMetadata(indexCommit.getIndexCommit()); + } + } + + private Store.RecoveryMetadataSnapshot snapshotStoreRecoveryMetadataFromEngine(Engine engine) throws IOException { + try (Engine.IndexCommitRef lastCommit = engine.acquireLastIndexCommit(false)) { + SeqNoStats seqNoStats = engine.getSeqNoStats(-1); + MetadataSnapshot metadata = store.getMetadata(lastCommit.getIndexCommit()); + try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + SequenceNumbers.CommitInfo safeCommitSeqNoInfo = Store.loadSeqNoInfo(safeCommit.getIndexCommit()); + // before shard is started, we cannot provide anything so use Long.MAX_VALUE + long provideSeqNo = state() == IndexShardState.POST_RECOVERY || state() == IndexShardState.STARTED + ? safeCommitSeqNoInfo.localCheckpoint + 1 + : Long.MAX_VALUE; + return new Store.RecoveryMetadataSnapshot(metadata, provideSeqNo, safeCommitSeqNoInfo.localCheckpoint + 1, + seqNoStats.getMaxSeqNo()); + } + } + } + + private Store.RecoveryMetadataSnapshot snapshotStoreRecoveryMetadataFromStore(RecoverySource recoverySource) throws IOException { + assert Thread.holdsLock(mutex); + // if closed, PEER recovery or existing store, we know files are intact/not concurrently modified while holding mutex (except for + // smaller modifications done under store writeLock). + // "shouldBootstrapNewHistory" could really go in both buckets (not important since replica shard allocator will re-fetch data + // from primary after it is started). + if (recoverySource == null + || recoverySource.getType() == RecoverySource.Type.PEER + || recoverySource.getType() == RecoverySource.Type.EXISTING_STORE) { + return store.getRecoveryMetadata(path, true); + } else { + // During restore from snapshot we have to wait until recovery is done before serving any meaningful data. + // Likewise for local shards, though in principle, we could be smarter about those in the future. + return Store.RecoveryMetadataSnapshot.EMPTY; } } @@ -1291,6 +1359,59 @@ public void prepareForIndexRecovery() { assert currentEngineReference.get() == null; } + /** + * Finalize index recovery. Manipulate store files, clean up old files, generate new empty translog and do other + * housekeeping for retention leases. + */ + public void finalizeIndexRecovery(CheckedRunnable manipulateStore, long globalCheckpoint, + MetadataSnapshot sourceMetaData) throws IOException { + assert getEngineOrNull() == null; + + final Store store = store(); + store.incRef(); + try { + // protect snapshotRecoveryMetadata readers from seeing invalid/half copied index data + synchronized (mutex) { + manipulateStore.run(); + try { + store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); + final String translogUUID = Translog.createEmptyTranslog( + shardPath().resolveTranslog(), globalCheckpoint, shardId, this.getPendingPrimaryTerm()); + store.associateIndexWithNewTranslog(translogUUID); + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. + try { + try { + store.removeCorruptionMarker(); + } finally { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + } + } catch (Exception e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); + } + throw ex; + } + } + + if (getRetentionLeases().leases().isEmpty()) { + // if empty, may be a fresh IndexShard, so write an empty leases file to disk + persistRetentionLeases(); + assert loadRetentionLeases().leases().isEmpty(); + } else { + assert assertRetentionLeasesPersisted(); + } + } catch (RuntimeException | IOException e) { + throw e; + } finally { + store.decRef(); + } + } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } @@ -2025,7 +2146,7 @@ public void persistRetentionLeases() throws WriteStateException { replicationTracker.persistRetentionLeases(path.getShardStatePath()); } - public boolean assertRetentionLeasesPersisted() throws IOException { + private boolean assertRetentionLeasesPersisted() throws IOException { return replicationTracker.assertRetentionLeasesPersisted(path.getShardStatePath()); } diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 5f1f7d23a8c6a..3a821225ae328 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -81,6 +81,7 @@ import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.translog.Translog; import java.io.Closeable; @@ -247,7 +248,7 @@ final void ensureOpen() { * Note that this method requires the caller verify it has the right to access the store and * no concurrent file changes are happening. If in doubt, you probably want to use one of the following: * - * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking + * {@link #readRecoveryMetadataSnapshot(ShardPath, NodeEnvironment.ShardLocker, Logger)} to read meta data while locking * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * @param commit the index commit to read the snapshot from or null if the latest snapshot should be read from the @@ -271,7 +272,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException { * Note that this method requires the caller verify it has the right to access the store and * no concurrent file changes are happening. If in doubt, you probably want to use one of the following: * - * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking + * {@link #readRecoveryMetadataSnapshot(ShardPath, NodeEnvironment.ShardLocker, Logger)} to read meta data while locking * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * @@ -305,6 +306,55 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t } } + /** + * Returns a new RecoveryMetadataSnapshot for the shard store. + * + * Note that this method requires the caller verify it has the right to access the store and + * no concurrent file changes are happening. If in doubt, you probably want to use one of the following: + * + * {@link #readRecoveryMetadataSnapshot(ShardPath, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking + * {@link IndexShard#snapshotStoreRecoveryMetadata()} to safely read from an existing shard + * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed + * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an + * unexpected exception when opening the index reading the segments file. + * @throws IndexFormatTooOldException if the lucene index is too old to be opened. + * @throws IndexFormatTooNewException if the lucene index is too new to be opened. + * @throws FileNotFoundException if one or more files referenced by a commit are not present. + * @throws NoSuchFileException if one or more files referenced by a commit are not present. + * @throws IndexNotFoundException if the commit point can't be found in this store + */ + public RecoveryMetadataSnapshot getRecoveryMetadata(ShardPath shardPath, boolean lockDirectory) throws IOException { + ensureOpen(); + failIfCorrupted(); + // if we lock the directory we also acquire the write lock since that makes sure that nobody else tries to lock the IW + // on this store at the same time. + java.util.concurrent.locks.Lock lock = lockDirectory ? metadataLock.writeLock() : metadataLock.readLock(); + lock.lock(); + try (Closeable ignored = lockDirectory ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : () -> {} ) { + return readRecoveryMetadataNoLock(shardPath, directory, logger); + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + markStoreCorrupted(ex); + throw ex; + } finally { + lock.unlock(); + } + } + + private static RecoveryMetadataSnapshot readRecoveryMetadataNoLock(ShardPath shardPath, Directory directory, + Logger logger) throws IOException { + MetadataSnapshot lastCommit = new MetadataSnapshot(null, directory, logger); + final String translogUUID = lastCommit.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(shardPath.resolveTranslog(), translogUUID); + final List existingCommits = DirectoryReader.listCommits(directory); + IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint); + final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit); + + return new RecoveryMetadataSnapshot(lastCommit, + Long.MAX_VALUE, // non-started shard cannot provide anything yet. + seqNoStats.localCheckpoint + 1, + Math.max(Translog.readMaxSeqNo(shardPath.resolveTranslog(), translogUUID), seqNoStats.maxSeqNo)); + } + /** * Renames all the given files from the key of the map to the * value of the map. All successfully renamed files are removed from the map in-place. @@ -442,24 +492,27 @@ private void closeInternal() { } /** - * Reads a MetadataSnapshot from the given index locations or returns an empty snapshot if it can't be read. + * Reads a RecoveryMetadataSnapshot from the given index locations or returns an empty snapshot if it can't be read. * * @throws IOException if the index we try to read is corrupted */ - public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, - Logger logger) throws IOException { + public static RecoveryMetadataSnapshot readRecoveryMetadataSnapshot(ShardPath shardPath, + NodeEnvironment.ShardLocker shardLocker, + Logger logger) throws IOException { + ShardId shardId = shardPath.getShardId(); try (ShardLock lock = shardLocker.lock(shardId, "read metadata snapshot", TimeUnit.SECONDS.toMillis(5)); - Directory dir = new SimpleFSDirectory(indexLocation)) { + Directory dir = new SimpleFSDirectory(shardPath.resolveIndex())) { failIfCorrupted(dir, shardId); - return new MetadataSnapshot(null, dir, logger); + return readRecoveryMetadataNoLock(shardPath, dir, logger); } catch (IndexNotFoundException ex) { // that's fine - happens all the time no need to log + logger.trace("{} node reported index not found, responding with empty", shardId); } catch (FileNotFoundException | NoSuchFileException ex) { logger.info("Failed to open / find files while reading metadata snapshot", ex); } catch (ShardLockObtainFailedException ex) { logger.info(() -> new ParameterizedMessage("{}: failed to obtain shard lock", shardId), ex); } - return MetadataSnapshot.EMPTY; + return RecoveryMetadataSnapshot.EMPTY; } /** @@ -1112,6 +1165,77 @@ public String getSyncId() { } } + /** + * Extended meta data snapshot including sequence number information: + *
    + *
  • provideRecoverySeqNo: the sequence number from which this copy can provide all operations (currently)
  • + *
  • requireRecoverySeqNo: the sequence number from which all operations are required by this copy in order to perform + * operations based recovery
  • + *
  • maxSeqNo: the maximum sequence number this copy knows of
  • + *
+ * @see MetadataSnapshot + */ + public static final class RecoveryMetadataSnapshot implements Writeable { + public static final RecoveryMetadataSnapshot EMPTY = new RecoveryMetadataSnapshot(); + private final MetadataSnapshot lastCommit; + private final long provideRecoverySeqNo; + private final long requireRecoverySeqNo; + private final long maxSeqNo; + + public RecoveryMetadataSnapshot(MetadataSnapshot lastCommit, long provideRecoverySeqNo, long requireRecoverySeqNo, long maxSeqNo) { + this.lastCommit = lastCommit; + this.provideRecoverySeqNo = provideRecoverySeqNo; + this.requireRecoverySeqNo = requireRecoverySeqNo; + this.maxSeqNo = maxSeqNo; + } + + public RecoveryMetadataSnapshot(StreamInput in) throws IOException { + this.lastCommit = new MetadataSnapshot(in); + if (in.getVersion().onOrAfter(org.elasticsearch.Version.V_8_0_0)) { // todo: change to 7_X when backported + this.provideRecoverySeqNo = in.readLong(); + this.requireRecoverySeqNo = in.readLong(); + this.maxSeqNo = in.readLong(); + } else { + this.provideRecoverySeqNo = Long.MAX_VALUE; + this.requireRecoverySeqNo = SequenceNumbers.NO_OPS_PERFORMED; + this.maxSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + } + } + + private RecoveryMetadataSnapshot() { + this(MetadataSnapshot.EMPTY, Long.MAX_VALUE, SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.UNASSIGNED_SEQ_NO); + } + + public MetadataSnapshot lastCommit() { + return lastCommit; + } + + public long provideRecoverySeqNo() { + return provideRecoverySeqNo; + } + + public long requireRecoverySeqNo() { + return requireRecoverySeqNo; + } + + /** + * @return max sequence number or UNASSIGNED_SEQ_NO if not available. + */ + public long maxSeqNo() { + return maxSeqNo; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + lastCommit.writeTo(out); + if (out.getVersion().onOrAfter(org.elasticsearch.Version.V_8_0_0)) { // todo: change to 7_X when backported + out.writeLong(provideRecoverySeqNo); + out.writeLong(requireRecoverySeqNo); + out.writeLong(maxSeqNo); + } + } + } + /** * A class representing the diff between a recovery source and recovery target * diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 7626270b6cdc5..59bbcf727701a 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1791,6 +1791,30 @@ public static long readGlobalCheckpoint(final Path location, final String expect return checkpoint.globalCheckpoint; } + /** + * Reads the maximum sequence number from all active generations of the translog. + * Checks that the translogUUID matches + * + * Notice that trimOperations calls are not taken into account and therefore, the maxSeqNo returned can be higher than the max(seqNo) + * of all operations in translog + * + * @param location the location of the translog + * @return the maximum sequence number in translog or -1 if no operations. + * @throws IOException if an I/O exception occurred reading the checkpoint + * @throws TranslogCorruptedException if the translog is corrupted or mismatched with the given uuid + */ + public static long readMaxSeqNo(final Path location, final String expectedTranslogUUID) throws IOException { + final Checkpoint checkpoint = readCheckpoint(location, expectedTranslogUUID); + + assert checkpoint.minTranslogGeneration >= 0 : "missing minTranslogGeneration"; + long maxSeqNo = checkpoint.maxSeqNo; + for (long i = checkpoint.generation - 1; i >= checkpoint.minTranslogGeneration; i--) { + Checkpoint previous = Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))); + maxSeqNo = Math.max(maxSeqNo, previous.maxSeqNo); + } + return maxSeqNo; + } + private static Checkpoint readCheckpoint(Path location, String expectedTranslogUUID) throws IOException { final Checkpoint checkpoint = readCheckpoint(location); // We need to open at least one translog header to validate the translogUUID. @@ -1829,6 +1853,9 @@ public String getTranslogUUID() { /** * Returns the max seq_no of translog operations found in this translog. Since this value is calculated based on the current * existing readers, this value is not necessary to be the max seq_no of all operations have been stored in this translog. + * + * Notice that trimOperations calls are not taken into account and therefore, the maxSeqNo returned can be higher than the max(seqNo) + * of all operations in translog */ public long getMaxSeqNo() { try (ReleasableLock ignored = readLock.acquire()) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b3c6d12ab96e3..945956df7242f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -20,9 +20,6 @@ package org.elasticsearch.indices.recovery; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -32,7 +29,6 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.index.engine.Engine; @@ -393,51 +389,14 @@ public void receiveFileInfo(List phase1FileNames, @Override public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); - // first, we go and move files that were created with the recovery id suffix to - // the actual names, its ok if we have a corrupted index here, since we have replicas - // to recover from in case of a full cluster shutdown just when this code executes... - multiFileWriter.renameAllTempFiles(); - final Store store = store(); - store.incRef(); try { - store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); - final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); - store.associateIndexWithNewTranslog(translogUUID); - - if (indexShard.getRetentionLeases().leases().isEmpty()) { - // if empty, may be a fresh IndexShard, so write an empty leases file to disk - indexShard.persistRetentionLeases(); - assert indexShard.loadRetentionLeases().leases().isEmpty(); - } else { - assert indexShard.assertRetentionLeasesPersisted(); - } - - } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { - // this is a fatal exception at this stage. - // this means we transferred files from the remote that have not be checksummed and they are - // broken. We have to clean up this shard entirely, remove all files and bubble it up to the - // source shard since this index might be broken there as well? The Source can handle this and checks - // its content on disk if possible. - try { - try { - store.removeCorruptionMarker(); - } finally { - Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files - } - } catch (Exception e) { - logger.debug("Failed to clean lucene index", e); - ex.addSuppressed(e); - } - RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); - fail(rfe, true); - throw rfe; + // rename files under lock to ensure that we do not concurrently try to read same files from store. + // rename is not atomic, but in case this fails/stops halfway through, a subsequent future recovery will repair. + indexShard.finalizeIndexRecovery(multiFileWriter::renameAllTempFiles, globalCheckpoint, sourceMetaData); } catch (Exception ex) { RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); fail(rfe, true); throw rfe; - } finally { - store.decRef(); } } diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 20307af32f4ed..cef1674da5677 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -44,6 +44,7 @@ import org.elasticsearch.gateway.AsyncShardFetch; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -125,15 +126,16 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException IndexShard indexShard = indexService.getShardOrNull(shardId.id()); if (indexShard != null) { try { - final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata()); + final StoreFilesMetaData storeFilesMetaData = + new StoreFilesMetaData(shardId, indexShard.snapshotStoreRecoveryMetadata()); exists = true; return storeFilesMetaData; } catch (org.apache.lucene.index.IndexNotFoundException e) { logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.RecoveryMetadataSnapshot.EMPTY); } catch (IOException e) { logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.RecoveryMetadataSnapshot.EMPTY); } } } @@ -148,20 +150,20 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException } if (metaData == null) { logger.trace("{} node doesn't have meta data for the requests index, responding with empty", shardId); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.RecoveryMetadataSnapshot.EMPTY); } final IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(metaData, settings); final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); if (shardPath == null) { - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + logger.trace("{} node doesn't have shard path for the requested shard, responding with empty", shardId); + return new StoreFilesMetaData(shardId, Store.RecoveryMetadataSnapshot.EMPTY); } // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: // 1) a shard is being constructed, which means the master will not use a copy of this replica // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not // reuse local resources. - return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, - nodeEnv::shardLock, logger)); + return new StoreFilesMetaData(shardId, Store.readRecoveryMetadataSnapshot(shardPath, nodeEnv::shardLock, logger)); } finally { TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); if (exists) { @@ -174,12 +176,12 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException public static class StoreFilesMetaData implements Iterable, Streamable { private ShardId shardId; - Store.MetadataSnapshot metadataSnapshot; + private Store.RecoveryMetadataSnapshot metadataSnapshot; StoreFilesMetaData() { } - public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot) { + public StoreFilesMetaData(ShardId shardId, Store.RecoveryMetadataSnapshot metadataSnapshot) { this.shardId = shardId; this.metadataSnapshot = metadataSnapshot; } @@ -189,20 +191,20 @@ public ShardId shardId() { } public boolean isEmpty() { - return metadataSnapshot.size() == 0; + return metadataSnapshot.lastCommit().size() == 0; } @Override public Iterator iterator() { - return metadataSnapshot.iterator(); + return metadataSnapshot.lastCommit().iterator(); } public boolean fileExists(String name) { - return metadataSnapshot.asMap().containsKey(name); + return metadataSnapshot.lastCommit().asMap().containsKey(name); } public StoreFileMetaData file(String name) { - return metadataSnapshot.asMap().get(name); + return metadataSnapshot.lastCommit().asMap().get(name); } public static StoreFilesMetaData readStoreFilesMetaData(StreamInput in) throws IOException { @@ -214,7 +216,7 @@ public static StoreFilesMetaData readStoreFilesMetaData(StreamInput in) throws I @Override public void readFrom(StreamInput in) throws IOException { shardId = new ShardId(in); - this.metadataSnapshot = new Store.MetadataSnapshot(in); + this.metadataSnapshot = new Store.RecoveryMetadataSnapshot(in); } @Override @@ -227,14 +229,38 @@ public void writeTo(StreamOutput out) throws IOException { * @return commit sync id if exists, else null */ public String syncId() { - return metadataSnapshot.getSyncId(); + return metadataSnapshot.lastCommit().getSyncId(); + } + + public long provideRecoverySeqNo() { + return metadataSnapshot.provideRecoverySeqNo(); + } + + public long requireRecoverySeqNo() { + return metadataSnapshot.requireRecoverySeqNo(); + } + + /** + * @return max sequence number or UNASSIGNED_SEQ_NO if not available. + */ + public long maxSeqNo() { + return metadataSnapshot.maxSeqNo(); + } + + public boolean hasSeqNoInfo() { + return maxSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO; } @Override public String toString() { return "StoreFilesMetaData{" + ", shardId=" + shardId + - ", metadataSnapshot{size=" + metadataSnapshot.size() + ", syncId=" + metadataSnapshot.getSyncId() + "}" + + ", metadataSnapshot{size=" + metadataSnapshot.lastCommit().size() + + ", syncId=" + syncId() + + ", requireSeqNo=" + requireRecoverySeqNo() + + ", provideSeqNo=" + provideRecoverySeqNo() + + ", maxSeqNo=" + maxSeqNo() + + "}" + '}'; } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index 25765ab1ee667..00bf2e13d314d 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -263,7 +263,7 @@ public void testUnassignedReplicaDelayedAllocation() throws Exception { nodes.put(primaryNodeName, AllocationDecision.NO); String[] currentNodes = internalCluster().getNodeNames(); nodes.put(currentNodes[0].equals(primaryNodeName) ? currentNodes[1] : currentNodes[0], AllocationDecision.YES); - verifyNodeDecisions(parser, nodes, includeYesDecisions, true); + verifyNodeDecisions(parser, nodes, includeYesDecisions, true, true); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -385,7 +385,7 @@ public void testUnassignedReplicaWithPriorCopy() throws Exception { for (String nodeName : internalCluster().getNodeNames()) { nodeDecisions.put(nodeName, AllocationDecision.NO); } - verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, true); + verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, true, true); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -478,7 +478,7 @@ public void testAllocationFilteringOnIndexCreation() throws Exception { for (String nodeName : internalCluster().getNodeNames()) { nodeDecisions.put(nodeName, AllocationDecision.NO); } - verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, false); + verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -586,7 +586,7 @@ public void testAllocationFilteringPreventsShardMove() throws Exception { assertEquals("move_explanation", parser.currentName()); parser.nextToken(); assertEquals("cannot move shard to another node, even though it is not allowed to remain on its current node", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -698,7 +698,7 @@ public void testRebalancingNotAllowed() throws Exception { parser.nextToken(); assertEquals("rebalancing is not allowed, even though there is at least one node on which the shard can be allocated", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.YES, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.YES, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -801,7 +801,7 @@ public void testWorseBalance() throws Exception { parser.nextToken(); assertEquals("cannot rebalance as no target node exists that can both allocate this shard and improve the cluster balance", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.WORSE_BALANCE, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.WORSE_BALANCE, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -911,7 +911,7 @@ public void testBetterBalanceButCannotAllocate() throws Exception { parser.nextToken(); assertEquals("cannot rebalance as no target node exists that can both allocate this shard and improve the cluster balance", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -1008,7 +1008,7 @@ public void testAssignedReplicaOnSpecificNode() throws Exception { assertEquals("rebalance_explanation", parser.currentName()); parser.nextToken(); assertEquals("rebalancing is not allowed", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, false), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, false), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -1329,7 +1329,7 @@ private void verifyStaleShardCopyNodeDecisions(XContentParser parser, int numNod } private void verifyNodeDecisions(XContentParser parser, Map expectedNodeDecisions, - boolean includeYesDecisions, boolean reuseStore) throws IOException { + boolean includeYesDecisions, boolean reuseStore, boolean synced) throws IOException { parser.nextToken(); assertEquals("node_allocation_decisions", parser.currentName()); assertEquals(Token.START_ARRAY, parser.nextToken()); @@ -1349,9 +1349,15 @@ private void verifyNodeDecisions(XContentParser parser, Map fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + assertThat(test.reroute.get(), equalTo(0)); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(1)); + + // verify we get back right data from node + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1)); + + // second fetch gets same data + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1)); + + test.clearCacheForNode(node1.getId()); + + // prepare next request + test.addSimulation(node1.getId(), response1_2); + + // no fetched data, new request on going + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(2)); + + // verify we get new data back + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); + } + + public void testConcurrentRequestAndClearCache() throws Exception { + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build(); + test.addSimulation(node1.getId(), response1); + + // no fetched data, request still on going + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + assertThat(test.reroute.get(), equalTo(0)); + + // clear cache while request is still on going, before it is processed + test.clearCacheForNode(node1.getId()); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(1)); + + // prepare next request + test.addSimulation(node1.getId(), response1_2); + + // verify still no fetched data, request still on going + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(2)); + + // verify we get new data back + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); + + } static class TestFetch extends AsyncShardFetch { static class Entry { diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java new file mode 100644 index 0000000000000..8c8334f36942b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -0,0 +1,145 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.InternalTestCluster; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class ReplicaShardAllocatorIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(InternalSettingsPlugin.class); + } + + public void testRecentPrimaryData() throws Exception { + final String indexName = "test"; + internalCluster().startMasterOnlyNode(); + String primary = internalCluster().startDataOnlyNode(); + + assertAcked(client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") + // disable merges to keep segments the same + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false")) + .get()); + + String replica1 = internalCluster().startDataOnlyNode(); + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(100, 200)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + flush(indexName); + + final String initialReplica = allocatedToReplica(indexName); + + String replicaWithFileOverlap = internalCluster().startDataOnlyNode(); + ensureGreen(); + + // this extra node ensures allocation deciders say yes. But due to delayed allocation, it will not be used until we are + // done. + internalCluster().startDataOnlyNode(); + + internalCluster().restartNode(replicaWithFileOverlap, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + // disabled below for now, since it makes test succeed. Problem is that when a node leaves the cluster, + // GatewayAllocator.applyFailedShards is not called. Likewise, when TransportWriteAction marks stale, it does not call + // it because it is no longer in the routing table anyway. + // without below the test triggers the cache issue half the time (since it picks one of the nodes with matching seqno + // randomly). + + // ensure replicaWithFileOverlap is outdated seqno-wise compared to primary +// indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 5)) +// .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + // this should fix above, but cannot do, since it picks wrong node to start (data folder issue). + // clear cache, since this is not done when replica1 dies below (bug?). +// internalCluster().restartNode(primary, new InternalTestCluster.RestartCallback()); + + internalCluster().restartNode(replica1, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")).get()); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)) + .get()); + + // todo: can we wait for first reroute being done, including async-fetching and subsequent processing? + Thread.sleep(100); + + // invalidate primary data compared to cache on master. + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 5)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + return super.onNodeStopped(nodeName); + } + }); + return super.onNodeStopped(nodeName); + } + }); + + assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.YELLOW)); + + logger.info("--> Re-enabling allocation"); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String) null)).get()); + ensureGreen(); + + String after = allocatedToReplica(indexName); + logger.info("--> Now allocated to {}, was {}", after, initialReplica); + assertThat(after, not(equalTo(initialReplica))); + } + + private String allocatedToReplica(String indexName) { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + List replicas = + clusterState.routingTable().allShards(indexName).stream() + .filter(r -> r.primary() == false).filter(ShardRouting::started).map(ShardRouting::currentNodeId).collect(toList()); + assertEquals(1, replicas.size()); + return replicas.get(0); + } +} diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index d30f7eafce4a8..17cd9b6bb1df5 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,11 +48,11 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; -import org.elasticsearch.cluster.ESAllocationTestCase; import org.junit.Before; import java.util.Arrays; @@ -60,6 +61,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.Matchers.equalTo; @@ -74,6 +76,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { private TestAllocator testAllocator; + private AtomicLong sequenceGenerator = new AtomicLong(); + @Before public void buildTestAllocator() { this.testAllocator = new TestAllocator(); @@ -145,6 +149,34 @@ public void testSyncIdMatch() { equalTo(nodeToMatch.getId())); } + /** + * Verifies that if sequence numbers allow a noop recovery, this is preferred over file match. + */ + public void testSequenceNumberNoopMatch() { + RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders()); + DiscoveryNode nodeToMatch = randomBoolean() ? node2 : node3; + DiscoveryNode nodeNotToMatch = nodeToMatch == node2 ? node3 : node2; + long maxSeqNo = randomNonNegativeLong(); + testAllocator + .addData(node1, "MATCH", randomLongBetween(0, maxSeqNo + 1), randomLongBetween(0, maxSeqNo + 1), maxSeqNo, + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) + .addData(nodeToMatch, "NO_MATCH", randomLongBetween(0, maxSeqNo + 1), maxSeqNo + 1, maxSeqNo, + new StoreFileMetaData("file1", 10, "NO_MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + if (randomBoolean()) { + testAllocator.addData(nodeNotToMatch, "NO_MATCH", randomLongBetween(0, maxSeqNo + 1), randomLongBetween(0, maxSeqNo), maxSeqNo, + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + } else { + testAllocator + .addData(nodeNotToMatch, "NO_MATCH", + randomLongBetween(0, maxSeqNo + 1), maxSeqNo + 1, randomLongBetween(maxSeqNo+1, Long.MAX_VALUE), + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + } + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(nodeToMatch.getId())); + } + /** * Verifies that when there is no sync id match but files match, we allocate it to matching node. */ @@ -298,10 +330,13 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide } private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason) { + IndexMetaData.State indexState = randomFrom(IndexMetaData.State.values()); + ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT).put(settings)) .numberOfShards(1).numberOfReplicas(1) + .state(indexState) .putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId()))) .build(); // mark shard as delayed if reason is NODE_LEFT @@ -369,6 +404,21 @@ public boolean getFetchDataCalledAndClean() { } public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaData... files) { + long requireSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + long provideSeqNo = Long.MAX_VALUE; + long maxSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + if (randomBoolean()) { + // generate unique sequence numbers, validating that if replica.requireSeqNo != primary.maxSeqNo + 1, it has no effect. + long seqNo = sequenceGenerator.incrementAndGet(); + requireSeqNo = seqNo + 1; + provideSeqNo = seqNo + 1; + maxSeqNo = seqNo; + } + return addData(node, syncId, provideSeqNo, requireSeqNo, maxSeqNo, files); + } + + public TestAllocator addData(DiscoveryNode node, String syncId, long provideSeqNo, long requireSeqNo, long maxSeqNo, + StoreFileMetaData... files) { if (data == null) { data = new HashMap<>(); } @@ -381,7 +431,8 @@ public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaDat commitData.put(Engine.SYNC_COMMIT_ID, syncId); } data.put(node, new TransportNodesListShardStoreMetaData.StoreFilesMetaData(shardId, - new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()))); + new Store.RecoveryMetadataSnapshot(new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), + unmodifiableMap(commitData), randomInt()), provideSeqNo, requireSeqNo, maxSeqNo))); return this; } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5187ef37fcdf8..07b758ff1563a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -30,6 +30,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.elasticsearch.Assertions; @@ -63,6 +64,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -107,6 +109,7 @@ import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreUtils; import org.elasticsearch.index.translog.TestTranslog; @@ -116,6 +119,7 @@ import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.IndexId; @@ -1357,26 +1361,38 @@ public void testSnapshotStore() throws IOException { Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + Store.RecoveryMetadataSnapshot cannotProvideRecoveryMetadata = new Store.RecoveryMetadataSnapshot(snapshot, Long.MAX_VALUE, 1, 0); + Store.RecoveryMetadataSnapshot canProvideRecoveryMetadata = new Store.RecoveryMetadataSnapshot(snapshot, 1, 1, 0); + assertIdenticalRecoveryMetadataSnapshot(cannotProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + assertIdenticalRecoveryMetadataSnapshot(cannotProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); assertTrue(newShard.recoverFromStore()); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + assertIdenticalRecoveryMetadataSnapshot(canProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + assertIdenticalRecoveryMetadataSnapshot(canProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); newShard.close("test", false); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + assertIdenticalRecoveryMetadataSnapshot(cannotProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); + + Store.RecoveryMetadataSnapshot recoveryMetadataSnapshot = Store.readRecoveryMetadataSnapshot(shard.shardPath(), + (id, l, d) -> new DummyShardLock(id), logger); + assertThat(recoveryMetadataSnapshot.lastCommit().getSegmentsFile().name(), equalTo("segments_3")); + assertIdenticalRecoveryMetadataSnapshot(cannotProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); closeShards(newShard); } @@ -2027,7 +2043,9 @@ public void testRecoverFromStore() throws IOException { translogOps = 0; } String historyUUID = shard.getHistoryUUID(); + Store.RecoveryMetadataSnapshot beforeSnapshot = shard.snapshotStoreRecoveryMetadata(); IndexShard newShard = reinitShard(shard); + verifySnapshotRecoveryStoreMetadata(beforeSnapshot, false, newShard); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(newShard.recoverFromStore()); @@ -2035,6 +2053,9 @@ public void testRecoverFromStore() throws IOException { assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + verifySnapshotRecoveryStoreMetadata(newShard.snapshotStoreMetadata(), + beforeSnapshot.maxSeqNo() + 1, beforeSnapshot.maxSeqNo() + 1, beforeSnapshot.maxSeqNo(), + newShard.snapshotStoreRecoveryMetadata()); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); // check that local checkpoint of new primary is properly tracked after recovery assertThat(newShard.getLocalCheckpoint(), equalTo(totalOps - 1L)); @@ -2149,6 +2170,7 @@ public void testRecoverFromCleanStore() throws IOException { IndexShard newShard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shardRouting, RecoverySource.EmptyStoreRecoverySource.INSTANCE) ); + assertEquals(Store.RecoveryMetadataSnapshot.EMPTY, newShard.snapshotStoreRecoveryMetadata()); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); @@ -2157,6 +2179,8 @@ public void testRecoverFromCleanStore() throws IOException { assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + verifySnapshotRecoveryStoreMetadata(newShard.snapshotStoreMetadata(), 0, 0, -1, + newShard.snapshotStoreRecoveryMetadata()); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, 0); closeShards(newShard); @@ -2293,6 +2317,7 @@ public void testRestoreShard() throws IOException { routing = ShardRoutingHelper.newWithRestoreSource(routing, new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); target = reinitShard(target, routing); + assertEquals(Store.RecoveryMetadataSnapshot.EMPTY, target.snapshotStoreRecoveryMetadata()); Store sourceStore = source.store(); Store targetStore = target.store(); @@ -2318,12 +2343,14 @@ public void restoreShard(Store store, SnapshotId snapshotId, assertThat(target.getLocalCheckpoint(), equalTo(2L)); assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); + verifySnapshotRecoveryStoreMetadata(target.snapshotStoreMetadata(), 1, 1, 2, target.snapshotStoreRecoveryMetadata()); IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard( target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L)); assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L)); assertDocs(target, "0", "2"); + verifySnapshotRecoveryStoreMetadata(target.snapshotStoreMetadata(), 1, 1, 2, target.snapshotStoreRecoveryMetadata()); closeShard(source, false); closeShards(target); @@ -2806,6 +2833,7 @@ public void testRecoverFromLocalShard() throws IOException { Map requestedMappingUpdates = ConcurrentCollections.newConcurrentMap(); { targetShard = newShard(targetRouting); + assertEquals(Store.RecoveryMetadataSnapshot.EMPTY, targetShard.snapshotStoreRecoveryMetadata()); targetShard.markAsRecovering("store", new RecoveryState(targetShard.routingEntry(), localNode, null)); BiConsumer mappingConsumer = (type, mapping) -> { @@ -2833,6 +2861,11 @@ public void testRecoverFromLocalShard() throws IOException { // check that local checkpoint of new primary is properly tracked after recovery assertThat(targetShard.getLocalCheckpoint(), equalTo(1L)); assertThat(targetShard.getReplicationTracker().getGlobalCheckpoint(), equalTo(1L)); + + long maxSeqNo = sourceShard.seqNoStats().getMaxSeqNo(); + verifySnapshotRecoveryStoreMetadata(targetShard.snapshotStoreMetadata(), maxSeqNo + 1, maxSeqNo + 1, maxSeqNo, + targetShard.snapshotStoreRecoveryMetadata()); + IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry())); assertThat(targetShard.getReplicationTracker().getTrackedLocalCheckpointForShard( targetShard.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(1L)); @@ -3033,12 +3066,17 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept indexShard.refresh("test"); } indexDoc(indexShard, "_doc", "1", "{}"); + updateGlobalCheckpointOnReplica(indexShard); indexShard.flush(new FlushRequest()); + Store.RecoveryMetadataSnapshot before = indexShard.snapshotStoreRecoveryMetadata(); closeShards(indexShard); final IndexShard newShard = reinitShard(indexShard); Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); assertTrue("at least 2 files, commit and data: " +storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); + Store.RecoveryMetadataSnapshot expectedRecoveryMetadata = + new Store.RecoveryMetadataSnapshot(storeFileMetaDatas, Long.MAX_VALUE, before.requireRecoverySeqNo(), before.maxSeqNo()); + assertIdenticalRecoveryMetadataSnapshot(expectedRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); AtomicBoolean stop = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); expectThrows(AlreadyClosedException.class, () -> newShard.getEngine()); // no engine @@ -3050,6 +3088,7 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept assertEquals(0, storeFileMetaDatas.recoveryDiff(readMeta).different.size()); assertEquals(0, storeFileMetaDatas.recoveryDiff(readMeta).missing.size()); assertEquals(storeFileMetaDatas.size(), storeFileMetaDatas.recoveryDiff(readMeta).identical.size()); + assertIdenticalRecoveryMetadataSnapshot(expectedRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); } catch (IOException e) { throw new AssertionError(e); } @@ -3200,7 +3239,9 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { indexShard.refresh("test"); } } + updateGlobalCheckpointOnReplica(indexShard); indexShard.flush(new FlushRequest()); + Store.RecoveryMetadataSnapshot before = indexShard.snapshotStoreRecoveryMetadata(); closeShards(indexShard); final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), @@ -3216,6 +3257,8 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); + Store.RecoveryMetadataSnapshot expectedRecoveryMetadata = + new Store.RecoveryMetadataSnapshot(storeFileMetaDatas, Long.MAX_VALUE, before.requireRecoverySeqNo(), before.maxSeqNo()); AtomicBoolean stop = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); Thread snapshotter = new Thread(() -> { @@ -3227,6 +3270,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { assertThat(storeFileMetaDatas.recoveryDiff(readMeta).different.size(), equalTo(0)); assertThat(storeFileMetaDatas.recoveryDiff(readMeta).missing.size(), equalTo(0)); assertThat(storeFileMetaDatas.recoveryDiff(readMeta).identical.size(), equalTo(storeFileMetaDatas.size())); + assertIdenticalRecoveryMetadataSnapshot(expectedRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); } catch (IOException e) { throw new AssertionError(e); } @@ -3877,6 +3921,7 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover try { readyToSnapshotLatch.await(); shard.snapshotStoreMetadata(); + shard.snapshotStoreRecoveryMetadata(); try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(randomBoolean())) { shard.store().getMetadata(indexCommitRef.getIndexCommit()); } @@ -3970,6 +4015,100 @@ public void onFailure(final Exception e) { closeShard(replica, false); } + /** + * Go through the steps of peer recovery and check that snapshotRecoveryMetadata returns correct answers at each step. + */ + public void testSnapshotRecoveryMetadataDuringFileBasedPeerRecovery() throws IOException { + final IndexShard primary = newStartedShard(true); + int numDocs = randomInt(10); + for (int i = 0; i < numDocs; ++i) { + indexDoc(primary, "_doc", "x" + i); + } + + if (randomBoolean()) { + flushShard(primary); + } + + IndexShard replica = newShard(primary.shardId(), false); + recoverReplica(replica, primary, true); + Store.RecoveryMetadataSnapshot originalReplicaMetadata = replica.snapshotStoreRecoveryMetadata(); + long maxSeqNo = primary.seqNoStats().getMaxSeqNo(); + verifySnapshotRecoveryStoreMetadata(replica.snapshotStoreMetadata(), maxSeqNo + 1, maxSeqNo + 1, maxSeqNo, originalReplicaMetadata); + replica = reinitShard(replica); + verifySnapshotRecoveryStoreMetadata(originalReplicaMetadata, false, replica); + + numDocs = randomInt(10); + for (int i = 0; i < numDocs; ++i) { + indexDoc(primary, "_doc", "y" + i); + } + primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null, 1000L, + Collections.singleton(primary.routingEntry().allocationId().getId()), + new IndexShardRoutingTable.Builder(primary.shardId()).addShard(primary.routingEntry()).build()); + flushShard(primary); + Store.RecoveryMetadataSnapshot flushedMetadata = primary.snapshotStoreRecoveryMetadata(); + + numDocs = randomInt(10); + for (int i = 0; i < numDocs; ++i) { + indexDoc(primary, "_doc", "z" + i); + } + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + replica.markAsRecovering("testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + verifySnapshotRecoveryStoreMetadata(originalReplicaMetadata, false, replica); + + replica.prepareForIndexRecovery(); + verifySnapshotRecoveryStoreMetadata(originalReplicaMetadata, false, replica); + + RecoveryState.Index indexRecoveryState = new RecoveryState.Index(); + MultiFileWriter writer = new MultiFileWriter(replica.store(), indexRecoveryState, "tmp_", logger, () -> {}); + try (Engine.IndexCommitRef phase1Snapshot = primary.acquireSafeIndexCommit()) { + byte[] buffer = new byte[8192]; + Store.MetadataSnapshot metadata = primary.store().getMetadata(phase1Snapshot.getIndexCommit()); + for (StoreFileMetaData md : metadata) { + indexRecoveryState.addFileDetail(md.name(), md.length(), false); + try (IndexInput input = primary.store().directory().openInput(md.name(), IOContext.DEFAULT)) { + InputStreamIndexInput in = new InputStreamIndexInput(input, md.length()); + long position = 0; + int bytesRead; + while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) { + final boolean lastChunk = position + bytesRead == md.length(); + writer.writeFileChunk(md, position, new BytesArray(buffer, 0, bytesRead), lastChunk); + position += bytesRead; + } + } + } + verifySnapshotRecoveryStoreMetadata(originalReplicaMetadata, false, replica); + + replica.finalizeIndexRecovery(writer::renameAllTempFiles, primary.getGlobalCheckpoint(), metadata); + } + + Store.RecoveryMetadataSnapshot actualAfterCleanFiles = replica.snapshotStoreRecoveryMetadata(); + Store.RecoveryMetadataSnapshot expectedAfterCleanFiles = new Store.RecoveryMetadataSnapshot(replica.snapshotStoreMetadata(), + Long.MAX_VALUE, flushedMetadata.requireRecoverySeqNo(), flushedMetadata.maxSeqNo()); + assertIdenticalRecoveryMetadataSnapshot(expectedAfterCleanFiles, actualAfterCleanFiles); + + replica.openEngineAndSkipTranslogRecovery(); + assertIdenticalRecoveryMetadataSnapshot(expectedAfterCleanFiles, replica.snapshotStoreRecoveryMetadata()); + + try (Translog.Snapshot phase2Snapshot = primary.getHistoryOperations("peer-recovery", 0)) { + Translog.Operation operation; + while ((operation = phase2Snapshot.next()) != null) { + replica.applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); + } + } + Store.RecoveryMetadataSnapshot primarySnapshot = primary.snapshotStoreRecoveryMetadata(); + Store.RecoveryMetadataSnapshot expectedAfterTranslog = + new Store.RecoveryMetadataSnapshot(replica.snapshotStoreMetadata(), + primarySnapshot.provideRecoverySeqNo(), primarySnapshot.requireRecoverySeqNo(), primarySnapshot.maxSeqNo()); + verifySnapshotRecoveryStoreMetadata(expectedAfterTranslog, false, replica); + replica.finalizeRecovery(); + verifySnapshotRecoveryStoreMetadata(expectedAfterTranslog, false, replica); + + replica.postRecovery("testing"); + verifySnapshotRecoveryStoreMetadata(expectedAfterTranslog, true, replica); + + closeShards(replica, primary); + } + @Override public Settings threadPoolSettings() { return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build(); @@ -4083,4 +4222,33 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs)); closeShards(readonlyShard); } + + private void updateGlobalCheckpointOnReplica(IndexShard indexShard) { + if (indexShard.routingEntry().primary() == false) { + indexShard.updateGlobalCheckpointOnReplica(indexShard.seqNoStats().getLocalCheckpoint(), "test"); + } + } + + private void verifySnapshotRecoveryStoreMetadata(Store.MetadataSnapshot snapshot, long provide, long require, long maxSeqNo, + Store.RecoveryMetadataSnapshot actual) { + assertIdenticalRecoveryMetadataSnapshot(new Store.RecoveryMetadataSnapshot(snapshot, provide, require, maxSeqNo), + actual); + } + + private void verifySnapshotRecoveryStoreMetadata(Store.RecoveryMetadataSnapshot expected, boolean canProvide, + IndexShard verifyShard) throws IOException { + if (canProvide == false) { + expected = new Store.RecoveryMetadataSnapshot(expected.lastCommit(), + Long.MAX_VALUE, expected.requireRecoverySeqNo(), expected.maxSeqNo()); + } + assertIdenticalRecoveryMetadataSnapshot(expected, verifyShard.snapshotStoreRecoveryMetadata()); + } + + private void assertIdenticalRecoveryMetadataSnapshot(Store.RecoveryMetadataSnapshot expected, Store.RecoveryMetadataSnapshot actual) { + assertEquals(expected.lastCommit().asMap().keySet(), actual.lastCommit().asMap().keySet()); + assertEquals(expected.lastCommit().getCommitUserData(), actual.lastCommit().getCommitUserData()); + assertEquals(expected.requireRecoverySeqNo(), actual.requireRecoverySeqNo()); + assertEquals(expected.provideRecoverySeqNo(), actual.provideRecoverySeqNo()); + assertEquals(expected.maxSeqNo(), actual.maxSeqNo()); + } } diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 9e8fae209dd81..2513e0925a01c 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -58,6 +58,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -72,6 +73,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.VersionUtils; import org.hamcrest.Matchers; import java.io.ByteArrayInputStream; @@ -89,9 +91,12 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -861,10 +866,11 @@ public void testStreamStoreFilesMetaData() throws Exception { Store.MetadataSnapshot metadataSnapshot = createMetaDataSnapshot(); TransportNodesListShardStoreMetaData.StoreFilesMetaData outStoreFileMetaData = new TransportNodesListShardStoreMetaData.StoreFilesMetaData(new ShardId("test", "_na_", 0), - metadataSnapshot); + new Store.RecoveryMetadataSnapshot(metadataSnapshot, randomLong(), randomLong(), randomLong())); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); - org.elasticsearch.Version targetNodeVersion = randomVersion(random()); + org.elasticsearch.Version targetNodeVersion = + randomVersionBetween(random(), org.elasticsearch.Version.V_8_0_0, org.elasticsearch.Version.CURRENT); out.setVersion(targetNodeVersion); outStoreFileMetaData.writeTo(out); ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); @@ -877,6 +883,39 @@ public void testStreamStoreFilesMetaData() throws Exception { assertThat(inFile.name(), equalTo(outFiles.next().name())); } assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId())); + assertEquals(outStoreFileMetaData.maxSeqNo(), inStoreFileMetaData.maxSeqNo()); + assertEquals(outStoreFileMetaData.provideRecoverySeqNo(), inStoreFileMetaData.provideRecoverySeqNo()); + assertEquals(outStoreFileMetaData.requireRecoverySeqNo(), inStoreFileMetaData.requireRecoverySeqNo()); + } + + public void testStreamCompatibilityForRecoveryMetaDataSnapshot() throws IOException { + Store.MetadataSnapshot metadataSnapshot = createMetaDataSnapshot(); + Store.RecoveryMetadataSnapshot recoverySnapshot = new Store.RecoveryMetadataSnapshot(metadataSnapshot, randomLong(), randomLong() + , randomLong()); + + verifyStreamCompatibility(metadataSnapshot, Function.identity(), in -> new Store.RecoveryMetadataSnapshot(in).lastCommit()); + verifyStreamCompatibility(recoverySnapshot, Store.RecoveryMetadataSnapshot::lastCommit, Store.MetadataSnapshot::new); + } + + private void verifyStreamCompatibility(S source, + Function toMetaData, + Writeable.Reader reader) throws IOException { + // todo: change to V_7_X once backported. + org.elasticsearch.Version targetNodeVersion = + randomFrom(VersionUtils.allVersions().stream().filter(org.elasticsearch.Version.V_8_0_0::after).collect(Collectors.toList())); + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(targetNodeVersion); + source.writeTo(out); + + ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); + in.setVersion(targetNodeVersion); + + Store.MetadataSnapshot inData = reader.read(in); + Store.MetadataSnapshot outData = toMetaData.apply(source); + assertEquals(outData.asMap().keySet(), inData.asMap().keySet()); + assertEquals(outData.getCommitUserData(), inData.getCommitUserData()); } public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index ceca8a811a67f..5c0db9ce92690 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.translog; import com.carrotsearch.randomizedtesting.generators.RandomPicks; - import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.document.Field; @@ -3137,6 +3136,7 @@ void callCloseOnTragicEvent() { } public void testMaxSeqNo() throws Exception { + final String translogUUID = translog.getTranslogUUID(); Map maxSeqNoPerGeneration = new HashMap<>(); for (int iterations = between(1, 10), i = 0; i < iterations; i++) { long startSeqNo = randomLongBetween(0, Integer.MAX_VALUE); @@ -3152,13 +3152,21 @@ public void testMaxSeqNo() throws Exception { translog.rollGeneration(); } translog.sync(); - assertThat(translog.getMaxSeqNo(), - equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values()))); + + long expectedMaxSeqNo = maxSeqNoPerGeneration.isEmpty() + ? SequenceNumbers.NO_OPS_PERFORMED + : Collections.max(maxSeqNoPerGeneration.values()); + assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo)); + assertThat(Translog.readMaxSeqNo(translogDir, translogUUID), equalTo(expectedMaxSeqNo)); + long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration()); - long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() + expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() .filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue()) .max().orElse(SequenceNumbers.NO_OPS_PERFORMED); assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo)); + + assertThat(Translog.readMaxSeqNo(translogDir, translogUUID), equalTo(expectedMaxSeqNo)); + expectThrows(TranslogCorruptedException.class, () -> Translog.readMaxSeqNo(translogDir, UUIDs.randomBase64UUID())); } static class SortedSnapshot implements Translog.Snapshot { diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index b39a008de5f4f..9a13f1308388d 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -488,6 +489,56 @@ private static void closeIndices(final CloseIndexRequestBuilder requestBuilder) } } + /** + * Verify that if we have two shard copies around, we prefer one with identical sequence numbers and do + * a noop recovery. + */ + public void testClosedIndexRecoversFast() throws Exception { + final String indexName = "closed-index-fast-recovery"; + internalCluster().ensureAtLeastNumDataNodes(3); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureGreen(indexName); + if (randomBoolean()) { + flush(indexName); + } + + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", "Extra" + i).setSource("num", i)).collect(toList())); + ensureGreen(); + + assertAcked(client().admin().indices().prepareClose(indexName).get()); + ensureGreen(); + + // disable replica allocation to ensure ReplicaShardAllocator sees both options at the same time. + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")).get()); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ensureYellow(); + return super.onNodeStopped(nodeName); + } + }); + return super.onNodeStopped(nodeName); + } + }); + + assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.YELLOW)); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String) null)).get()); + ensureGreen(); + assertNoFileBasedRecovery(indexName); + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { @@ -534,7 +585,7 @@ static void assertException(final Throwable throwable, final String indexName) { } } - void assertNoFileBasedRecovery(String indexName) { + private void assertNoFileBasedRecovery(String indexName) { for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { if (recovery.getPrimary() == false) { assertThat(recovery.getIndex().fileDetails(), empty()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 4780bc4fba8bd..61ddc324dc208 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -546,9 +546,14 @@ private NodeAndClient getRandomNodeAndClient() { return getRandomNodeAndClient(nc -> true); } - private synchronized NodeAndClient getRandomNodeAndClient(Predicate predicate) { + private NodeAndClient getRandomNodeAndClient(Predicate predicate) { + return getRandomNodeAndClientIncludingClosed(((Predicate) nc -> nc.isClosed() == false).and(predicate)); + } + + private synchronized NodeAndClient getRandomNodeAndClientIncludingClosed(Predicate predicate) { ensureOpen(); - List values = nodes.values().stream().filter(predicate).collect(Collectors.toList()); + List values = nodes.values().stream().filter(predicate) + .collect(Collectors.toList()); if (values.isEmpty() == false) { return randomFrom(random, values); } @@ -954,6 +959,10 @@ public void close() throws IOException { } } + public boolean isClosed() { + return closed.get(); + } + private void markNodeDataDirsAsPendingForWipe(Node node) { assert Thread.holdsLock(InternalTestCluster.this); NodeEnvironment nodeEnv = node.getNodeEnvironment(); @@ -1092,10 +1101,11 @@ public synchronized void validateClusterFormed() { /** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */ private synchronized void validateClusterFormed(String viaNode) { - Set expectedNodes = new HashSet<>(); - for (NodeAndClient nodeAndClient : nodes.values()) { - expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode()); - } + Set expectedNodes = + nodes.values().stream() + .filter(nc -> nc.isClosed() == false) + .map(nc -> getInstanceFromNode(ClusterService.class, nc.node()).localNode()) + .collect(Collectors.toSet()); logger.trace("validating cluster formed via [{}], expecting {}", viaNode, expectedNodes); final Client client = client(viaNode); try { @@ -1429,7 +1439,7 @@ public T getMasterNodeInstance(Class clazz) { } private synchronized T getInstance(Class clazz, Predicate predicate) { - NodeAndClient randomNodeAndClient = getRandomNodeAndClient(predicate); + NodeAndClient randomNodeAndClient = getRandomNodeAndClientIncludingClosed(predicate); assert randomNodeAndClient != null; return getInstanceFromNode(clazz, randomNodeAndClient.node); } @@ -1453,7 +1463,7 @@ public Settings dataPathSettings(String node) { @Override public int size() { - return nodes.size(); + return Math.toIntExact(nodes.values().stream().filter(nc -> nc.isClosed() == false).count()); } @Override @@ -1984,7 +1994,10 @@ public List startDataOnlyNodes(int numNodes) { } private int getMasterNodesCount() { - return (int) nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); + return (int) nodes.values().stream() + .filter(n -> n.isClosed() == false) + .filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())) + .count(); } public String startMasterOnlyNode() {