diff --git a/server/src/main/java/org/elasticsearch/index/engine/SearchOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/SearchOnlyEngine.java new file mode 100644 index 0000000000000..959f2b52b19a4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/SearchOnlyEngine.java @@ -0,0 +1,356 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; +import org.apache.lucene.search.SearcherFactory; +import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStats; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.BiFunction; +import java.util.stream.Stream; + +/** + * A minimal engine that does not accept writes, and always points stats, searcher to the last commit. + */ +public final class SearchOnlyEngine extends Engine { + private final SegmentInfos lastCommittedSegmentInfos; + private final SeqNoStats seqNoStats; + private final TranslogStats translogStats; + private final SearcherManager searcherManager; + + public SearchOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats) { + super(config); + this.seqNoStats = seqNoStats; + this.translogStats = translogStats; + try { + store.incRef(); + DirectoryReader reader = null; + boolean success = false; + try { + this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + reader = DirectoryReader.open(store.directory()); + if (config.getIndexSettings().isSoftDeleteEnabled()) { + reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); + } + this.searcherManager = new SearcherManager(reader, new SearcherFactory()); + success = true; + } finally { + if (success == false) { + IOUtils.close(reader, store::decRef); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + protected void closeNoLock(String reason, CountDownLatch closedLatch) { + if (isClosed.compareAndSet(false, true)) { + try { + IOUtils.close(searcherManager, store::decRef); + } catch (Exception ex) { + logger.warn("failed to close engine", ex); + } finally { + closedLatch.countDown(); + } + } + } + + @Override + public void flushAndClose() throws IOException { + // make a flush as a noop so that callers can close (and flush) this engine without worrying about the engine type. + close(); + } + + @Override + public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { + return getFromSearcher(get, this::acquireSearcher, SearcherScope.INTERNAL); + } + + @Override + public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { + ensureOpen(); + Releasable releasable = null; + try (ReleasableLock ignored = readLock.acquire()) { + store.incRef(); + releasable = store::decRef; + final EngineSearcher searcher = new EngineSearcher(source, searcherManager, store, logger); + releasable = null; // hand over the reference to the engine searcher + return searcher; + } catch (AlreadyClosedException ex) { + throw ex; + } catch (Exception ex) { + ensureOpen(ex); // throw AlreadyClosedException if it's closed + throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex); + } finally { + Releasables.close(releasable); + } + } + + @Override + protected SegmentInfos getLastCommittedSegmentInfos() { + return lastCommittedSegmentInfos; + } + + @Override + public String getHistoryUUID() { + return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY); + } + + @Override + public long getWritingBytes() { + return 0; + } + + @Override + public long getIndexThrottleTimeInMillis() { + return 0; + } + + @Override + public boolean isThrottled() { + return false; + } + + @Override + public IndexResult index(Index index) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public DeleteResult delete(Delete delete) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public NoOpResult noOp(NoOp noOp) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public boolean isTranslogSyncNeeded() { + return false; + } + + @Override + public boolean ensureTranslogSynced(Stream locations) { + return false; + } + + @Override + public void syncTranslog() { + // noop + } + + @Override + public Closeable acquireRetentionLockForPeerRecovery() { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, + long fromSeqNo, long toSeqNo, boolean requiredFullRange) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public TranslogStats getTranslogStats() { + return translogStats; + } + + @Override + public Translog.Location getTranslogLastWriteLocation() { + // noop - returns null as the caller treats null as noop. + return null; + } + + @Override + public long getLocalCheckpoint() { + return seqNoStats.getLocalCheckpoint(); + } + + @Override + public void waitForOpsToComplete(long seqNo) { + ensureUnsupportedMethodNeverCalled(); + } + + @Override + public void resetLocalCheckpoint(long newCheckpoint) { + ensureUnsupportedMethodNeverCalled(); + } + + @Override + public SeqNoStats getSeqNoStats(long globalCheckpoint) { + return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); + } + + @Override + public long getLastSyncedGlobalCheckpoint() { + return seqNoStats.getGlobalCheckpoint(); + } + + @Override + public long getIndexBufferRAMBytesUsed() { + return 0; + } + + @Override + public List segments(boolean verbose) { + return Arrays.asList(getSegmentInfo(lastCommittedSegmentInfos, verbose)); + } + + @Override + public void refresh(String source) throws EngineException { + // noop + } + + @Override + public void writeIndexingBuffer() throws EngineException { + + } + + @Override + public boolean shouldPeriodicallyFlush() { + return false; + } + + @Override + public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException { + return SyncedFlushResult.PENDING_OPERATIONS; + } + + @Override + public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { + return new CommitId(lastCommittedSegmentInfos.getId()); + } + + @Override + public CommitId flush() throws EngineException { + return flush(false, false); + } + + @Override + public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, + boolean upgrade, boolean upgradeOnlyAncientSegments) { + ensureUnsupportedMethodNeverCalled(); + } + + @Override + public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public IndexCommitRef acquireSafeIndexCommit() { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public void activateThrottling() { + } + + @Override + public void deactivateThrottling() { + } + + @Override + public void trimUnreferencedTranslogFiles() { + ensureUnsupportedMethodNeverCalled(); + } + + @Override + public boolean shouldRollTranslogGeneration() { + return false; + } + + @Override + public void rollTranslogGeneration() throws EngineException { + // noop + } + + @Override + public void restoreLocalCheckpointFromTranslog() { + ensureUnsupportedMethodNeverCalled(); + } + + @Override + public int fillSeqNoGaps(long primaryTerm) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public Engine recoverFromTranslog(long recoverUpToSeqNo) { + return ensureUnsupportedMethodNeverCalled(); + } + + @Override + public void skipTranslogRecovery() { + ensureUnsupportedMethodNeverCalled(); + } + + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { + ensureUnsupportedMethodNeverCalled(); + } + + @Override + public void maybePruneDeletes() { + + } + + private T ensureUnsupportedMethodNeverCalled() { + assert false : "invoking an unsupported method in a search-only engine"; + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f1e7dec6995a0..243267214b4a3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -85,6 +85,7 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; +import org.elasticsearch.index.engine.SearchOnlyEngine; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; @@ -201,6 +202,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm protected volatile long operationPrimaryTerm; protected final AtomicReference currentEngineReference = new AtomicReference<>(); + private volatile long maxSeqNoOfResettingEngine = Long.MIN_VALUE; final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; @@ -507,8 +509,16 @@ public void updateShardState(final ShardRouting newRouting, * numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by * replaying the translog and marking any operations there are completed. */ - final Engine engine = getEngine(); - engine.restoreLocalCheckpointFromTranslog(); + Engine engine = getEngine(); + if (seqNoStats().getLocalCheckpoint() < maxSeqNoOfResettingEngine) { + // The engine was reset before but hasn't restored all existing operations yet. + // We need to reset the engine again with all the local history. + engine.flush(); + resetEngineUpToSeqNo(Long.MAX_VALUE); + engine = getEngine(); // engine was swapped + } else { + engine.restoreLocalCheckpointFromTranslog(); + } /* Rolling the translog generation is not strictly needed here (as we will never have collisions between * sequence numbers in a translog generation in a new primary as it takes the last known sequence number * as a starting point), but it simplifies reasoning about the relationship between primary terms and @@ -676,20 +686,20 @@ private IndexShardState changeState(IndexShardState newState, String reason) { public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse, long autoGeneratedTimestamp, boolean isRetry) throws IOException { assert versionType.validateVersionForWrites(version); - return applyIndexOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp, + return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse) throws IOException { - return applyIndexOperation(seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry, + return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse); } - private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType, - long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, - SourceToParse sourceToParse) throws IOException { + private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, + @Nullable VersionType versionType, long autoGeneratedTimeStamp, boolean isRetry, + Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm + "]"; ensureWriteAllowed(origin); @@ -711,7 +721,7 @@ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, l return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo); } - return index(getEngine(), operation); + return index(engine, operation); } public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version indexCreatedVersion, SourceToParse source, long seqNo, @@ -745,17 +755,17 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc } public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException { - return markSeqNoAsNoop(seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA); + return markSeqNoAsNoop(getEngine(), seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA); } - private Engine.NoOpResult markSeqNoAsNoop(long seqNo, long opPrimaryTerm, String reason, + private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrimaryTerm, String reason, Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm + "]"; long startTime = System.nanoTime(); ensureWriteAllowed(origin); final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason); - return noOp(getEngine(), noOp); + return noOp(engine, noOp); } private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) { @@ -777,15 +787,15 @@ public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) { public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType) throws IOException { assert versionType.validateVersionForWrites(version); - return applyDeleteOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, + return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, Engine.Operation.Origin.PRIMARY); } public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException { - return applyDeleteOperation(seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA); + return applyDeleteOperation(getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA); } - private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id, + private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id, @Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm + "]"; @@ -809,7 +819,7 @@ private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, final Term uid = extractUidForDelete(type, id); final Engine.Delete delete = prepareDelete(type, id, uid, seqNo, opPrimaryTerm, version, versionType, origin); - return delete(getEngine(), delete); + return delete(engine, delete); } private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, @@ -1245,6 +1255,10 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { } public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { + return applyTranslogOperation(getEngine(), operation, origin); + } + + private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type. final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null; final Engine.Result result; @@ -1253,19 +1267,19 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine final Translog.Index index = (Translog.Index) operation; // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all // autoGeneratedID docs that are coming from the primary are updated correctly. - result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(), + result = applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(), versionType, index.getAutoGeneratedIdTimestamp(), true, origin, source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())).routing(index.routing())); break; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; - result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(), + result = applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(), versionType, origin); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; - result = markSeqNoAsNoop(noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin); + result = markSeqNoAsNoop(engine, noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin); break; default: throw new IllegalStateException("No operation defined for [" + operation + "]"); @@ -1275,14 +1289,17 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine // package-private for testing int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { - recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); - recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); + final boolean isResetting = isEngineResetting(); + if (isResetting == false) { + recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); + recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); + } int opsRecovered = 0; Translog.Operation operation; while ((operation = snapshot.next()) != null) { try { logger.trace("[translog] recover op {}", operation); - Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY); + Engine.Result result = applyTranslogOperation(engine, operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY); switch (result.getResultType()) { case FAILURE: throw result.getFailure(); @@ -1295,7 +1312,9 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce } opsRecovered++; - recoveryState.getTranslog().incrementRecoveredOperations(); + if (isResetting == false) { + recoveryState.getTranslog().incrementRecoveredOperations(); + } } catch (Exception e) { if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { // mainly for MapperParsingException and Failure to detect xcontent @@ -1340,12 +1359,6 @@ private void innerOpenEngineAndTranslog() throws IOException { } } recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - - final EngineConfig config = newEngineConfig(); - - // we disable deletes since we allow for operations to be executed against the shard while recovering - // but we need to make sure we don't loose deletes until we are done recovering - config.setEnableGcDeletes(false); // we have to set it before we open an engine and recover from the translog because // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. @@ -1353,17 +1366,15 @@ private void innerOpenEngineAndTranslog() throws IOException { final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); - assertMaxUnsafeAutoIdInCommit(); - - final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); - store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); - - createNewEngine(config); - verifyNotClosed(); - // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, - // we still give sync'd flush a chance to run: - active.set(true); - assertSequenceNumbersInCommit(); + final EngineConfig config = newEngineConfig(); + // we disable deletes since we allow for operations to be executed against the shard while recovering + // but we need to make sure we don't loose deletes until we are done recovering + config.setEnableGcDeletes(false); + synchronized (mutex) { + assert currentEngineReference.get() == null : "engine is initialized already"; + currentEngineReference.set(createNewEngine(config)); + } + assert assertSequenceNumbersInCommit(); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } @@ -1456,7 +1467,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn IndexShardState state = this.state; // one time volatile read if (origin.isRecovery()) { - if (state != IndexShardState.RECOVERING) { + if (state != IndexShardState.RECOVERING && isEngineResetting() == false) { throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when recovering, origin [" + origin + "]"); } } else { @@ -1472,6 +1483,10 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn } } + private boolean isEngineResetting() { + return getEngineOrNull() instanceof SearchOnlyEngine; + } + private boolean assertPrimaryMode() { assert shardRouting.primary() && replicationTracker.isPrimaryMode() : "shard " + shardRouting + " is not a primary shard in primary mode"; return true; @@ -2164,33 +2179,21 @@ public void onFailedEngine(String reason, @Nullable Exception failure) { } } - private Engine createNewEngine(EngineConfig config) { - synchronized (mutex) { - if (state == IndexShardState.CLOSED) { - throw new AlreadyClosedException(shardId + " can't create engine - shard is closed"); - } - assert this.currentEngineReference.get() == null; - Engine engine = newEngine(config); - onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen - // inside the callback are not visible. This one enforces happens-before - this.currentEngineReference.set(engine); - } - - // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which - // settings changes could possibly have happened, so here we forcefully push any config changes to the new engine: - Engine engine = getEngineOrNull(); - - // engine could perhaps be null if we were e.g. concurrently closed: - if (engine != null) { - engine.onSettingsChanged(); - } + private Engine createNewEngine(EngineConfig config) throws IOException { + assert Thread.holdsLock(mutex); + verifyNotClosed(); + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); + store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); + assertMaxUnsafeAutoIdInCommit(); + final Engine engine = engineFactory.newReadWriteEngine(config); + onNewEngine(engine); + engine.onSettingsChanged(); + active.set(true); return engine; } - protected Engine newEngine(EngineConfig config) { - return engineFactory.newReadWriteEngine(config); - } - private static void persistMetadata( final ShardPath shardPath, final IndexSettings indexSettings, @@ -2320,13 +2323,17 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g } else { localCheckpoint = currentGlobalCheckpoint; } - logger.trace( - "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", - opPrimaryTerm, - getLocalCheckpoint(), - localCheckpoint); - getEngine().resetLocalCheckpoint(localCheckpoint); - getEngine().rollTranslogGeneration(); + logger.info("detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", + opPrimaryTerm, getLocalCheckpoint(), localCheckpoint); + Engine engine = getEngine(); + if (localCheckpoint < engine.getSeqNoStats(localCheckpoint).getMaxSeqNo()) { + engine.flush(true, true); + engine.resetLocalCheckpoint(localCheckpoint); + resetEngineUpToSeqNo(localCheckpoint); + } else { + engine.resetLocalCheckpoint(localCheckpoint); + engine.rollTranslogGeneration(); + } }); } } @@ -2687,4 +2694,34 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { } }; } + + /** Rollback the current engine to the safe commit, the replay local translog up to the given {@code upToSeqNo} (inclusive) */ + void resetEngineUpToSeqNo(long upToSeqNo) throws IOException { + assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; + assert SequenceNumbers.loadSeqNoInfoFromLuceneCommit(commitStats().getUserData().entrySet()).maxSeqNo == seqNoStats().getMaxSeqNo() + : "engine must be flushed before reset; max_seq_no[" + seqNoStats() + "] commit[" + commitStats().getUserData() + "]"; + final Engine resettingEngine; + sync(); // persist the global checkpoint which will be used to trim unsafe commits + synchronized (mutex) { + final Engine currentEngine = getEngine(); + final SeqNoStats seqNoStats = currentEngine.getSeqNoStats(getGlobalCheckpoint()); + logger.info("resetting replica engine from max_seq_no [{}] to seq_no [{}] global checkpoint [{}]", + seqNoStats.getMaxSeqNo(), upToSeqNo, seqNoStats.getGlobalCheckpoint()); + final TranslogStats translogStats = currentEngine.getTranslogStats(); + final SearchOnlyEngine searchOnlyEngine = new SearchOnlyEngine(currentEngine.config(), seqNoStats, translogStats); + IOUtils.close(currentEngineReference.getAndSet(searchOnlyEngine)); + maxSeqNoOfResettingEngine = seqNoStats.getMaxSeqNo(); + resettingEngine = createNewEngine(newEngineConfig()); + active.set(true); + } + resettingEngine.recoverFromTranslog(upToSeqNo); + // FIXME: The resetting engine might not contain all acknowledged writes in the previous engine. + // In order to not temporarily lose the visibility of any previous acknowledged writes, we should not activate + // the resetting engine until its local checkpoint is at least the max_seq_no of the previous engine. + // However, this delay will prevent the new acknowledged writes from being visible even with an + // acknowledged refresh. Other option is to make search and get unavailable until the resetting + // engine engine is ready. But this option would redirect all traffic to the primary. + // We should revisit a document-level rollback which does not suffer this limitation. + IOUtils.close(currentEngineReference.getAndSet(resettingEngine)); + } } diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index fab38a2b73b4a..984ef963fd755 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -211,6 +211,7 @@ public void testAckedIndexing() throws Exception { throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e); } } + assertSameDocIdsOnShards(); }, 30, TimeUnit.SECONDS); logger.info("done validating (iteration [{}])", iter); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d3aead9e44e16..27e10049c6e36 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5020,7 +5020,7 @@ public void testLastRefreshCheckpoint() throws Exception { assertThat(engine.lastRefreshedCheckpoint(), equalTo(engine.getLocalCheckpoint())); } - private static void trimUnsafeCommits(EngineConfig config) throws IOException { + static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); diff --git a/server/src/test/java/org/elasticsearch/index/engine/SearchOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/SearchOnlyEngineTests.java new file mode 100644 index 0000000000000..5fe1c9edc3167 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/SearchOnlyEngineTests.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.store.Store; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.hamcrest.Matchers.equalTo; + +public class SearchOnlyEngineTests extends EngineTestCase { + + public void testSearchOnlyEngine() throws Exception { + IOUtils.close(engine, store); + Engine searchOnlyEngine = null; + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + final SeqNoStats lastSeqNoStats; + final Set lastDocIds; + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < numDocs; i++) { + if (rarely()) { + continue; // gap in sequence number + } + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, REPLICA, + System.nanoTime(), -1, false)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + } + engine.syncTranslog(); + engine.flush(); + searchOnlyEngine = new SearchOnlyEngine(engine.engineConfig, + engine.getSeqNoStats(globalCheckpoint.get()), engine.getTranslogStats()); + lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); + lastDocIds = getDocIds(engine, true); + assertThat(searchOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(searchOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); + try (Engine.Searcher searcher = searchOnlyEngine.acquireSearcher("test")) { + assertThat(getDocIds(searcher), equalTo(lastDocIds)); + } + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + String delId = Integer.toString(i); + engine.delete(new Engine.Delete("test", delId, newUid(delId), primaryTerm.get())); + } + if (rarely()) { + engine.flush(); + } + } + // the locked down engine should still point to the previous commit + assertThat(searchOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(searchOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); + try (Engine.Searcher searcher = searchOnlyEngine.acquireSearcher("test")) { + assertThat(getDocIds(searcher), equalTo(lastDocIds)); + } + } + // Close and reopen the main engine + InternalEngineTests.trimUnsafeCommits(config); + try (InternalEngine recoveringEngine = new InternalEngine(config)) { + recoveringEngine.recoverFromTranslog(Long.MAX_VALUE); + // the locked down engine should still point to the previous commit + assertThat(searchOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(searchOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); + assertThat(getDocIds(searchOnlyEngine, false), equalTo(lastDocIds)); + } + } finally { + IOUtils.close(searchOnlyEngine); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index fba71dd1e5296..b525323438c7d 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -526,11 +526,7 @@ public void testSeqNoCollision() throws Exception { assertThat(snapshot, containsOperationsInAnyOrder(expectedOps)); assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); } - // TODO: We should assert the content of shards in the ReplicationGroup. - // Without rollback replicas(current implementation), we don't have the same content across shards: - // - replica1 has {doc1} - // - replica2 has {doc1, doc2} - // - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery + shards.assertAllEqual(initDocs + 1); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 28122665e9bb6..8e9f92bf75b80 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -22,6 +22,8 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; @@ -38,9 +40,11 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; @@ -55,7 +59,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -68,8 +71,10 @@ import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -306,14 +311,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); } - - // roll back the extra ops in the replica - shards.removeReplica(replica); - replica.close("resync", false); - replica.store().close(); - newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); - shards.recoverReplica(newReplica); - shards.assertAllEqual(totalDocs); // Make sure that flushing on a recovering shard is ok. shards.flush(); shards.assertAllEqual(totalDocs); @@ -370,7 +367,6 @@ public void testResyncAfterPrimaryPromotion() throws Exception { try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { shards.startAll(); int initialDocs = randomInt(10); - for (int i = 0; i < initialDocs; i++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "initial_doc_" + i) .source("{ \"f\": \"normal\"}", XContentType.JSON); @@ -406,31 +402,15 @@ public void testResyncAfterPrimaryPromotion() throws Exception { indexOnReplica(bulkShardRequest, shards, justReplica); } - logger.info("--> seqNo primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats()); - - logger.info("--> resyncing replicas"); - PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get(); + logger.info("--> resyncing replicas seqno_stats primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats()); + PrimaryReplicaSyncer.ResyncTask resyncTask = shards.promoteReplicaToPrimary(newPrimary).get(); if (syncedGlobalCheckPoint) { - assertEquals(extraDocs, task.getResyncedOperations()); + assertEquals(extraDocs, resyncTask.getResyncedOperations()); } else { - assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs)); - } - List replicas = shards.getReplicas(); - - // check all docs on primary are available on replica - Set primaryIds = getShardDocUIDs(newPrimary); - assertThat(primaryIds.size(), equalTo(initialDocs + extraDocs)); - for (IndexShard replica : replicas) { - Set replicaIds = getShardDocUIDs(replica); - Set temp = new HashSet<>(primaryIds); - temp.removeAll(replicaIds); - assertThat(replica.routingEntry() + " is missing docs", temp, empty()); - temp = new HashSet<>(replicaIds); - temp.removeAll(primaryIds); - // yeah, replica has more docs as there is no Lucene roll back on it - assertThat(replica.routingEntry() + " has to have extra docs", temp, - extraDocsToBeTrimmed > 0 ? not(empty()) : empty()); + assertThat(resyncTask.getResyncedOperations(), greaterThanOrEqualTo(extraDocs)); } + // documents on replicas should be aligned with primary + shards.assertAllEqual(initialDocs + extraDocs); // check translog on replica is trimmed int translogOperations = 0; @@ -658,6 +638,85 @@ public long indexTranslogOperations(final List operations, f } } + public void testRollbackOnPromotion() throws Exception { + final int numberOfReplicas = between(2, 3); + final AtomicBoolean isDone = new AtomicBoolean(); + final List threads = new ArrayList<>(); + try (ReplicationGroup shards = createGroup(numberOfReplicas)) { + shards.startAll(); + IndexShard newPrimary = randomFrom(shards.getReplicas()); + int initDocs = shards.indexDocs(randomInt(100)); + Set ackedDocIds = getShardDocUIDs(shards.getPrimary(), true); + int inFlightOps = scaledRandomIntBetween(10, 200); + int extraDocsOnNewPrimary = 0; + for (int i = 0; i < inFlightOps; i++) { + String id = "extra-" + i; + IndexRequest primaryRequest = new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON); + BulkShardRequest replicationRequest = indexOnPrimary(primaryRequest, shards.getPrimary()); + int indexed = 0; + for (IndexShard replica : shards.getReplicas()) { + if (randomBoolean()) { + indexOnReplica(replicationRequest, shards, replica); + if (replica == newPrimary) { + extraDocsOnNewPrimary++; + } + indexed++; + } + } + if (indexed == numberOfReplicas) { + // TODO: the current rollback impl only guarantees the visibility for acknowledged writes before global checkpoint. + // ackedDocIds.add(id); + } + if (randomBoolean()) { + shards.syncGlobalCheckpoint(); + } + if (rarely()) { + shards.flush(); + } + } + shards.refresh("test"); + CountDownLatch latch = new CountDownLatch(numberOfReplicas); + for (int i = 0; i < numberOfReplicas; i++) { + IndexShard replica = shards.getReplicas().get(i); + Thread thread = new Thread(() -> { + latch.countDown(); + int hitClosedException = 0; + while (isDone.get() == false) { + try { + try (Engine.Searcher searcher = replica.acquireSearcher("test")) { + Set docIds = EngineTestCase.getDocIds(searcher); + assertThat(ackedDocIds, everyItem(isIn(docIds))); + } + for (String id : randomSubsetOf(ackedDocIds)) { + try (Engine.GetResult getResult = replica.get( + new Engine.Get(randomBoolean(), randomBoolean(), "type", id, new Term("_id", Uid.encodeId(id))))) { + assertThat("doc [" + id + "] not found", getResult.exists(), equalTo(true)); + } + } + assertThat(replica.getLocalCheckpoint(), greaterThanOrEqualTo(initDocs - 1L)); + } catch (AlreadyClosedException e) { + hitClosedException++; + } catch (Exception e) { + throw new AssertionError(e); + } + } + assertThat(hitClosedException, lessThanOrEqualTo(2)); + }); + threads.add(thread); + thread.start(); + } + latch.await(); + shards.promoteReplicaToPrimary(newPrimary).get(); + shards.assertAllEqual(initDocs + extraDocsOnNewPrimary); + int moreDocs = shards.indexDocs(scaledRandomIntBetween(1, 10)); + shards.assertAllEqual(initDocs + extraDocsOnNewPrimary + moreDocs); + isDone.set(true); // stop before closing shards to have an accurate "AlreadyClosedException" count + for (Thread thread : threads) { + thread.join(); + } + } + } + public static class BlockingTarget extends RecoveryTarget { private final CountDownLatch recoveryBlocked; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 584ed7085a8ae..e8da87f69a23e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -170,12 +170,14 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; @@ -822,7 +824,7 @@ private void finish() { } else { assertTrue(onResponse.get()); assertNull(onFailure.get()); - assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, greaterThanOrEqualTo(translogGen + 1)); assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); } @@ -894,24 +896,18 @@ public void testGlobalCheckpointSync() throws IOException { closeShards(replicaShard, primaryShard); } - public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException { + public void testRestoreHistoryFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); - final long globalCheckpointOnReplica = SequenceNumbers.UNASSIGNED_SEQ_NO; - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); + final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); - - final int globalCheckpoint = - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); + final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); final CountDownLatch latch = new CountDownLatch(1); + Set docIds = getShardDocUIDs(indexShard, true); indexShard.acquireReplicaOperationPermit( indexShard.getPendingPrimaryTerm() + 1, globalCheckpoint, @@ -944,27 +940,18 @@ public void onFailure(Exception e) { resyncLatch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); - + assertThat(getShardDocUIDs(indexShard, true), equalTo(docIds)); closeShards(indexShard); } - public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException { + public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); - - // most of the time this is large enough that most of the time there will be at least one gap - final int operations = 1024 - scaledRandomIntBetween(0, 1024); - indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); - - final long globalCheckpointOnReplica = - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); + indexOnReplicaWithGaps(indexShard, scaledRandomIntBetween(0, 1024), Math.toIntExact(indexShard.getLocalCheckpoint())); + final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); - - final int globalCheckpoint = - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); + final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); + Set docsBelowGlobalCheckpoint = getShardDocUIDs(indexShard).stream() + .filter(id -> Long.parseLong(id) <= Math.max(globalCheckpointOnReplica, globalCheckpoint)).collect(Collectors.toSet()); final CountDownLatch latch = new CountDownLatch(1); indexShard.acquireReplicaOperationPermit( indexShard.pendingPrimaryTerm + 1, @@ -990,9 +977,9 @@ public void onFailure(final Exception e) { } else { assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); } - + assertThat(getShardDocUIDs(indexShard), equalTo(docsBelowGlobalCheckpoint)); // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances - final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); + final Result result = indexOnReplicaWithGaps(indexShard, between(0, 100), Math.toIntExact(indexShard.getLocalCheckpoint())); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); closeShards(indexShard); @@ -1854,8 +1841,9 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); - // Simulate resync (without rollback): Noop #1, index #2 - acquireReplicaOperationPermitBlockingly(shard, shard.pendingPrimaryTerm + 1); + // This test tries to assert that a store recovery recovers from the safe commit the replays only valid translog operations. + // Here we try to put stale operations which is no longer possible since we rollback on a primary fail-over. + shard.pendingPrimaryTerm += 1; // primary fail-over without rollback shard.markSeqNoAsNoop(1, "test"); shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "_doc", "doc-2", new BytesArray("{}"), XContentType.JSON)); @@ -1863,6 +1851,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2")); // Recovering from store should discard doc #1 final ShardRouting replicaRouting = shard.routingEntry(); + shard.close("test", false); IndexShard newShard = reinitShard(shard, newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE)); @@ -2839,6 +2828,9 @@ private Result indexOnReplicaWithGaps( } else { gap = true; } + if (rarely()) { + indexShard.flush(new FlushRequest()); + } } assert localCheckpoint == indexShard.getLocalCheckpoint(); assert !gap || (localCheckpoint != max); @@ -3376,4 +3368,64 @@ public void testSupplyTombstoneDoc() throws Exception { closeShards(shard); } + public void testResetEngine() throws Exception { + IndexShard replica = newStartedShard(false); + indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint())); + long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, replica.getLocalCheckpoint()); + replica.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + Set allDocs = getShardDocUIDs(replica); + Thread[] searchers = new Thread[between(1, 4)]; + AtomicBoolean done = new AtomicBoolean(); + AtomicLong ackedSeqNo = new AtomicLong(globalCheckpoint); + CountDownLatch latch = new CountDownLatch(searchers.length); + for (int i = 0; i < searchers.length; i++) { + searchers[i] = new Thread(() -> { + latch.countDown(); + while (done.get() == false) { + Set ackedDocs = allDocs.stream() + .filter(id -> Long.parseLong(id) <= ackedSeqNo.get()).collect(Collectors.toSet()); + try (Engine.Searcher searcher = replica.acquireSearcher("test")) { + Set docIds = EngineTestCase.getDocIds(searcher); + assertThat(ackedDocs, everyItem(isIn(docIds))); + } catch (IOException e) { + throw new AssertionError(e); + } + for (String id : randomSubsetOf(ackedDocs)) { + Engine.Get get = new Engine.Get(randomBoolean(), randomBoolean(), "_doc", id, new Term("_id", Uid.encodeId(id))); + try (Engine.GetResult getResult = replica.get(get)) { + assertThat("doc [" + id + "] not found" + ackedSeqNo, getResult.exists(), equalTo(true)); + } + } + } + }); + searchers[i].start(); + } + latch.await(); + int iterations = between(1, 10); + for (int i = 0; i < iterations; i++) { + globalCheckpoint = randomLongBetween(replica.getGlobalCheckpoint(), replica.getLocalCheckpoint()); + replica.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + long upToSeqNo = randomLongBetween(globalCheckpoint, replica.seqNoStats().getMaxSeqNo()); + logger.debug("resetting from {} to {}", replica.seqNoStats().getMaxSeqNo(), upToSeqNo); + replica.flush(new FlushRequest()); // we flush before reset in the production. + replica.resetEngineUpToSeqNo(upToSeqNo); + ackedSeqNo.set(globalCheckpoint); // expose to the background threads + Set expectedDocs = getShardDocUIDs(replica).stream() + .filter(id -> Long.parseLong(id) <= upToSeqNo).collect(Collectors.toSet()); + assertThat(getShardDocUIDs(replica), equalTo(expectedDocs)); + if (randomBoolean()) { + // simulate a primary promotion + replica.resetEngineUpToSeqNo(Long.MAX_VALUE); + replica.getEngine().fillSeqNoGaps(replica.pendingPrimaryTerm); + if (randomBoolean()) { + indexOnReplicaWithGaps(replica, between(0, 100), Math.toIntExact(replica.getLocalCheckpoint())); + } + } + } + done.set(true); + for (Thread searcher : searchers) { + searcher.join(); + } + closeShards(replica); + } } diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index cb93d803bb7c6..8d0f1845be60d 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -103,6 +103,7 @@ protected Collection> nodePlugins() { protected void beforeIndexDeletion() throws Exception { super.beforeIndexDeletion(); assertSeqNos(); + assertSameDocIdsOnShards(); } public void testSimpleRelocationNoIndexing() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index b558cd1ba9000..809362ba0b753 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -785,20 +785,24 @@ public static Set getDocIds(Engine engine, boolean refresh) throws IOExc engine.refresh("test_get_doc_ids"); } try (Engine.Searcher searcher = engine.acquireSearcher("test_get_doc_ids")) { - Set ids = new HashSet<>(); - for (LeafReaderContext leafContext : searcher.reader().leaves()) { - LeafReader reader = leafContext.reader(); - Bits liveDocs = reader.getLiveDocs(); - for (int i = 0; i < reader.maxDoc(); i++) { - if (liveDocs == null || liveDocs.get(i)) { - Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME)); - BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME); - ids.add(Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length))); - } + return getDocIds(searcher); + } + } + + public static Set getDocIds(Engine.Searcher searcher) throws IOException { + Set ids = new HashSet<>(); + for (LeafReaderContext leafContext : searcher.reader().leaves()) { + LeafReader reader = leafContext.reader(); + Bits liveDocs = reader.getLiveDocs(); + for (int i = 0; i < reader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME)); + BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME); + ids.add(Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length))); } } - return ids; } + return ids; } /** diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 375e74f6cca3e..0e67ee4613491 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -644,8 +644,17 @@ private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) th return result; } - protected Set getShardDocUIDs(final IndexShard shard) throws IOException { - return EngineTestCase.getDocIds(shard.getEngine(), true); + public static Set getShardDocUIDs(final IndexShard shard, boolean refresh) throws IOException { + if (refresh) { + shard.refresh("test"); + } + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + return EngineTestCase.getDocIds(searcher); + } + } + + public static Set getShardDocUIDs(final IndexShard shard) throws IOException { + return getShardDocUIDs(shard, true); } protected void assertDocCount(IndexShard shard, int docDount) throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index be9e40ab42098..db4b7f0514cc0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -132,6 +132,7 @@ import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; @@ -2131,6 +2132,48 @@ public Set assertAllShardsOnNodes(String index, String... pattern) { return nodes; } + /** + * Asserts that all shards with the same shardId should have document Ids. + */ + public void assertSameDocIdsOnShards() throws Exception { + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + for (ObjectObjectCursor indexRoutingTable : state.routingTable().indicesRouting()) { + for (IntObjectCursor indexShardRoutingTable : indexRoutingTable.value.shards()) { + ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard(); + if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) { + continue; + } + DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId()); + IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName()) + .indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id()); + final Set docsOnPrimary; + try { + docsOnPrimary = IndexShardTestCase.getShardDocUIDs(primaryShard, true); + } catch (AlreadyClosedException ex) { + continue; + } + for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) { + if (replicaShardRouting.assignedToNode() == false) { + continue; + } + DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId()); + IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName()) + .indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id()); + final Set docsOnReplica; + try { + docsOnReplica = IndexShardTestCase.getShardDocUIDs(replicaShard, true); + } catch (AlreadyClosedException ex) { + continue; + } + assertThat("out of sync shards: primary=[" + primaryShardRouting + "] num_docs_on_primary=[" + docsOnPrimary.size() + + "] vs replica=[" + replicaShardRouting + "] num_docs_on_replica=[" + docsOnReplica.size() + "]", + docsOnReplica, equalTo(docsOnPrimary)); + } + } + } + }); + } /** * Asserts that all segments are sorted with the provided {@link Sort}.