diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 9fd46d53049f2..9b304de6077fc 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -97,6 +97,7 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; protected final ShardId shardId; + protected final String allocationId; protected final Logger logger; protected final EngineConfig engineConfig; protected final Store store; @@ -126,6 +127,7 @@ protected Engine(EngineConfig engineConfig) { this.engineConfig = engineConfig; this.shardId = engineConfig.getShardId(); + this.allocationId = engineConfig.getAllocationId(); this.store = engineConfig.getStore(); this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name engineConfig.getIndexSettings().getSettings(), engineConfig.getShardId()); diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index d7019c77321da..66911ab80c723 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -51,6 +51,7 @@ */ public final class EngineConfig { private final ShardId shardId; + private final String allocationId; private final IndexSettings indexSettings; private final ByteSizeValue indexingBufferSize; private volatile boolean enableGcDeletes = true; @@ -109,7 +110,7 @@ public final class EngineConfig { /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ - public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, + public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, ThreadPool threadPool, IndexSettings indexSettings, Engine.Warmer warmer, Store store, MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, @@ -120,6 +121,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, throw new IllegalArgumentException("openMode must not be null"); } this.shardId = shardId; + this.allocationId = allocationId; this.indexSettings = indexSettings; this.threadPool = threadPool; this.warmer = warmer == null ? (a) -> {} : warmer; @@ -240,6 +242,15 @@ public IndexSettings getIndexSettings() { */ public ShardId getShardId() { return shardId; } + /** + * Returns the allocation ID for the shard. + * + * @return the allocation ID + */ + public String getAllocationId() { + return allocationId; + } + /** * Returns the analyzer as the default analyzer in the engines {@link org.apache.lucene.index.IndexWriter} */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 4bd4634a8cb09..e1bf949f50eab 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -192,7 +192,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { throw new IllegalArgumentException(openMode.toString()); } logger.trace("recovered [{}]", seqNoStats); - seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); + seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); indexWriter = writer; translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint()); @@ -283,10 +283,12 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { private static SequenceNumbersService sequenceNumberService( final ShardId shardId, + final String allocationId, final IndexSettings indexSettings, final SeqNoStats seqNoStats) { return new SequenceNumbersService( shardId, + allocationId, indexSettings, seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java index 4df58bcab4459..4d9c493540280 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java @@ -50,6 +50,8 @@ */ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { + private final String allocationId; + /** * The global checkpoint tracker can operate in two modes: * - primary: this shard is in charge of collecting local checkpoint information from all shard copies and computing the global @@ -245,12 +247,18 @@ private boolean invariant() { * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. * * @param shardId the shard ID + * @param allocationId the allocation ID * @param indexSettings the index settings * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} */ - GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { + GlobalCheckpointTracker( + final ShardId shardId, + final String allocationId, + final IndexSettings indexSettings, + final long globalCheckpoint) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; + this.allocationId = allocationId; this.primaryMode = false; this.handoffInProgress = false; this.appliedClusterStateVersion = -1L; @@ -310,7 +318,7 @@ public synchronized void updateGlobalCheckpointOnReplica(final long globalCheckp /** * Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion. */ - public synchronized void activatePrimaryMode(final String allocationId, final long localCheckpoint) { + public synchronized void activatePrimaryMode(final long localCheckpoint) { assert invariant(); assert primaryMode == false; assert localCheckpoints.get(allocationId) != null && localCheckpoints.get(allocationId).inSync && diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 44ad8db39a2a6..2c4286e6e5798 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -54,13 +54,14 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { */ public SequenceNumbersService( final ShardId shardId, + final String allocationId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint, final long globalCheckpoint) { super(shardId, indexSettings); localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint); - globalCheckpointTracker = new GlobalCheckpointTracker(shardId, indexSettings, globalCheckpoint); + globalCheckpointTracker = new GlobalCheckpointTracker(shardId, allocationId, indexSettings, globalCheckpoint); } /** @@ -201,7 +202,7 @@ public synchronized long getTrackedLocalCheckpointForShard(final String allocati * Called on primary activation or promotion. */ public void activatePrimaryMode(final String allocationId, final long localCheckpoint) { - globalCheckpointTracker.activatePrimaryMode(allocationId, localCheckpoint); + globalCheckpointTracker.activatePrimaryMode(localCheckpoint); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 22d6ba20be2cd..9845064c4df4a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2074,7 +2074,7 @@ private DocumentMapperForType docMapper(String type) { private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { Sort indexSort = indexSortSupplier.get(); - return new EngineConfig(openMode, shardId, + return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, indexCache.query(), cachingPolicy, translogConfig, diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c75926bbc0171..8d948846fbece 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -73,6 +73,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -80,6 +81,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -198,6 +200,7 @@ public class InternalEngineTests extends ESTestCase { protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); + protected final AllocationId allocationId = AllocationId.newInitializing(); private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); protected ThreadPool threadPool; @@ -264,11 +267,11 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) { } public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) { - return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), - config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), - config.getQueryCachingPolicy(), config.getTranslogConfig(), - config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), config.getTranslogRecoveryRunner()); + return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), + config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), + config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), + config.getTranslogRecoveryRunner()); } @Override @@ -447,7 +450,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { indexSettings.getSettings())); final List refreshListenerList = refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); - EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, + EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); @@ -728,6 +731,7 @@ public void testCommitStats() throws IOException { Store store = createStore(); InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService( config.getShardId(), + config.getAllocationId(), config.getIndexSettings(), maxSeqNo.get(), localCheckpoint.get(), @@ -901,6 +905,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { initialEngine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService( config.getShardId(), + config.getAllocationId(), config.getIndexSettings(), SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, @@ -2028,7 +2033,7 @@ public void testSeqNoAndCheckpoints() throws IOException { try { initialEngine = engine; - final ShardRouting primary = TestShardRouting.newShardRouting(shardId, "node1", true, ShardRoutingState.STARTED); + final ShardRouting primary = TestShardRouting.newShardRouting("test", shardId.id(), "node1", null, true, ShardRoutingState.STARTED, allocationId); final ShardRouting replica = TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.STARTED); initialEngine.seqNoService().updateAllocationIdsFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(), replica.allocationId().getId())), @@ -2788,12 +2793,11 @@ public void testRecoverFromForeignTranslog() throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE); - EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool, - config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), - config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null, - config.getTranslogRecoveryRunner()); + EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(), + threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(), + null, config.getTranslogRecoveryRunner()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); @@ -3628,6 +3632,7 @@ private SequenceNumbersService getStallingSeqNoService( final AtomicLong expectedLocalCheckpoint) { return new SequenceNumbersService( shardId, + allocationId.getId(), defaultSettings, SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, @@ -3839,7 +3844,7 @@ public void testNoOps() throws IOException { final int globalCheckpoint = randomIntBetween(0, localCheckpoint); try { final SequenceNumbersService seqNoService = - new SequenceNumbersService(shardId, defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) { + new SequenceNumbersService(shardId, allocationId.getId(), defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) { @Override public long generateSeqNo() { throw new UnsupportedOperationException(); @@ -3986,6 +3991,7 @@ public void testRestoreLocalCheckpointFromTranslog() throws IOException { final SequenceNumbersService seqNoService = new SequenceNumbersService( shardId, + allocationId.getId(), defaultSettings, SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java index 8d53c69e2713e..db686248ff2cc 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java @@ -25,11 +25,13 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -46,7 +48,6 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -61,25 +62,11 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { - GlobalCheckpointTracker tracker; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - tracker = - new GlobalCheckpointTracker( - new ShardId("test", "_na_", 0), - IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), - UNASSIGNED_SEQ_NO); - } - public void testEmptyShards() { + final GlobalCheckpointTracker tracker = newTracker(AllocationId.newInitializing()); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); } - private final AtomicInteger aIdGenerator = new AtomicInteger(); - private Map randomAllocationsWithLocalCheckpoints(int min, int max) { Map allocations = new HashMap<>(); for (int i = randomIntBetween(min, max); i > 0; i--) { @@ -117,6 +104,7 @@ public void testGlobalCheckpointUpdate() { // it is however nice not to assume this on this level and check we do the right thing. final long minLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO); + final GlobalCheckpointTracker tracker = newTracker(active.iterator().next()); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); logger.info("--> using allocations"); @@ -133,7 +121,7 @@ public void testGlobalCheckpointUpdate() { }); tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing), emptySet()); - tracker.activatePrimaryMode(active.iterator().next().getId(), NO_OPS_PERFORMED); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId))); @@ -179,9 +167,10 @@ public void testMissingActiveIdsPreventAdvance() { final Map assigned = new HashMap<>(); assigned.putAll(active); assigned.putAll(initializing); - tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet()); AllocationId primary = active.keySet().iterator().next(); - tracker.activatePrimaryMode(primary.getId(), NO_OPS_PERFORMED); + final GlobalCheckpointTracker tracker = newTracker(primary); + tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet()); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); final AllocationId missingActiveID = randomFrom(active.keySet()); assigned @@ -202,9 +191,11 @@ public void testMissingInSyncIdsPreventAdvance() { final Map active = randomAllocationsWithLocalCheckpoints(1, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(2, 5); logger.info("active: {}, initializing: {}", active, initializing); - tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet()); + AllocationId primary = active.keySet().iterator().next(); - tracker.activatePrimaryMode(primary.getId(), NO_OPS_PERFORMED); + final GlobalCheckpointTracker tracker = newTracker(primary); + tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet()); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); randomSubsetOf(randomIntBetween(1, initializing.size() - 1), initializing.keySet()).forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); @@ -221,8 +212,9 @@ public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { final Map active = randomAllocationsWithLocalCheckpoints(1, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); final Map nonApproved = randomAllocationsWithLocalCheckpoints(1, 5); + final GlobalCheckpointTracker tracker = newTracker(active.keySet().iterator().next()); tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet()); - tracker.activatePrimaryMode(active.keySet().iterator().next().getId(), NO_OPS_PERFORMED); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); nonApproved.keySet().forEach(k -> expectThrows(IllegalStateException.class, () -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED))); @@ -251,8 +243,9 @@ public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { if (randomBoolean()) { allocations.putAll(initializingToBeRemoved); } + final GlobalCheckpointTracker tracker = newTracker(active.iterator().next()); tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing), emptySet()); - tracker.activatePrimaryMode(active.iterator().next().getId(), NO_OPS_PERFORMED); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); if (randomBoolean()) { initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); } else { @@ -286,9 +279,10 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { final AtomicBoolean complete = new AtomicBoolean(); final AllocationId inSyncAllocationId = AllocationId.newInitializing(); final AllocationId trackingAllocationId = AllocationId.newInitializing(); + final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId); tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()), routingTable(Collections.singleton(trackingAllocationId)), emptySet()); - tracker.activatePrimaryMode(inSyncAllocationId.getId(), globalCheckpoint); + tracker.activatePrimaryMode(globalCheckpoint); final Thread thread = new Thread(() -> { try { // synchronize starting with the test thread @@ -326,6 +320,14 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { thread.join(); } + private GlobalCheckpointTracker newTracker(final AllocationId allocationId) { + return new GlobalCheckpointTracker( + new ShardId("test", "_na_", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + UNASSIGNED_SEQ_NO); + } + public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException { final int localCheckpoint = randomIntBetween(1, 32); final int globalCheckpoint = randomIntBetween(localCheckpoint + 1, 64); @@ -333,9 +335,10 @@ public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBar final AtomicBoolean interrupted = new AtomicBoolean(); final AllocationId inSyncAllocationId = AllocationId.newInitializing(); final AllocationId trackingAllocationId = AllocationId.newInitializing(); + final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId); tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()), routingTable(Collections.singleton(trackingAllocationId)), emptySet()); - tracker.activatePrimaryMode(inSyncAllocationId.getId(), globalCheckpoint); + tracker.activatePrimaryMode(globalCheckpoint); final Thread thread = new Thread(() -> { try { // synchronize starting with the test thread @@ -380,9 +383,10 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); final Set initializingIds = activeAndInitializingAllocationIds.v2(); IndexShardRoutingTable routingTable = routingTable(initializingIds); - tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable, emptySet()); AllocationId primaryId = activeAllocationIds.iterator().next(); - tracker.activatePrimaryMode(primaryId.getId(), NO_OPS_PERFORMED); + final GlobalCheckpointTracker tracker = newTracker(primaryId); + tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable, emptySet()); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds))); assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable)); @@ -529,9 +533,10 @@ public void testRaceUpdatingGlobalCheckpoint() throws InterruptedException, Brok final CyclicBarrier barrier = new CyclicBarrier(4); final int activeLocalCheckpoint = randomIntBetween(0, Integer.MAX_VALUE - 1); + final GlobalCheckpointTracker tracker = newTracker(active); tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active.getId()), routingTable(Collections.singleton(initializing)), emptySet()); - tracker.activatePrimaryMode(active.getId(), activeLocalCheckpoint); + tracker.activatePrimaryMode(activeLocalCheckpoint); final int nextActiveLocalCheckpoint = randomIntBetween(activeLocalCheckpoint + 1, Integer.MAX_VALUE); final Thread activeThread = new Thread(() -> { try { @@ -574,12 +579,15 @@ public void testRaceUpdatingGlobalCheckpoint() throws InterruptedException, Brok } public void testPrimaryContextHandoff() throws IOException { - GlobalCheckpointTracker oldPrimary = new GlobalCheckpointTracker(new ShardId("test", "_na_", 0), - IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO); - GlobalCheckpointTracker newPrimary = new GlobalCheckpointTracker(new ShardId("test", "_na_", 0), - IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY); + final ShardId shardId = new ShardId("test", "_na_", 0); FakeClusterState clusterState = initialState(); + GlobalCheckpointTracker oldPrimary = + new GlobalCheckpointTracker(shardId, randomFrom(ids(clusterState.inSyncIds)), indexSettings, UNASSIGNED_SEQ_NO); + GlobalCheckpointTracker newPrimary = + new GlobalCheckpointTracker(shardId, UUIDs.randomBase64UUID(random()), indexSettings, UNASSIGNED_SEQ_NO); + clusterState.apply(oldPrimary); clusterState.apply(newPrimary); @@ -686,9 +694,10 @@ public void testPrimaryContextHandoff() throws IOException { public void testIllegalStateExceptionIfUnknownAllocationId() { final AllocationId active = AllocationId.newInitializing(); final AllocationId initializing = AllocationId.newInitializing(); + final GlobalCheckpointTracker tracker = newTracker(active); tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active.getId()), routingTable(Collections.singleton(initializing)), emptySet()); - tracker.activatePrimaryMode(active.getId(), NO_OPS_PERFORMED); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); expectThrows(IllegalStateException.class, () -> tracker.initiateTracking(randomAlphaOfLength(10))); expectThrows(IllegalStateException.class, () -> tracker.markAllocationIdAsInSync(randomAlphaOfLength(10), randomNonNegativeLong())); @@ -731,7 +740,7 @@ private static FakeClusterState initialState() { } private static void activatePrimary(FakeClusterState clusterState, GlobalCheckpointTracker gcp) { - gcp.activatePrimaryMode(randomFrom(ids(clusterState.inSyncIds)), randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10)); + gcp.activatePrimaryMode(randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10)); } private static void randomLocalCheckpointUpdate(GlobalCheckpointTracker gcp) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 6b5bd57aed9c2..01893a99ae4e3 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -28,6 +28,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; @@ -98,6 +99,7 @@ public void setupListeners() throws Exception { threadPool = new TestThreadPool(getTestName()); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); ShardId shardId = new ShardId(new Index("index", "_na_"), 1); + String allocationId = UUIDs.randomBase64UUID(random()); Directory directory = newDirectory(); DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { @Override @@ -115,10 +117,9 @@ public void onFailedEngine(String reason, @Nullable Exception e) { // we don't need to notify anybody in this test } }; - EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, threadPool, indexSettings, null, - store, newMergePolicy(), iwc.getAnalyzer(), - iwc.getSimilarity(), new CodecService(null, logger), eventListener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool, + indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), + eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog());