diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 2d5a1dda46460..b6e15708ef730 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -732,6 +732,7 @@ private void maybeTrimTranslog() { continue; case POST_RECOVERY: case STARTED: + case PROMOTING: case RELOCATED: try { shard.trimTranslog(); @@ -758,6 +759,7 @@ private void maybeSyncGlobalCheckpoints() { assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active"; continue; case STARTED: + case PROMOTING: try { shard.acquirePrimaryOperationPermit( ActionListener.wrap( diff --git a/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java b/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java index 1260a3829d4a9..32325b5ffbe98 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.shard; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.rest.RestStatus; @@ -52,7 +53,11 @@ public IllegalIndexShardStateException(StreamInput in) throws IOException{ @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(currentState.id()); + if (out.getVersion().before(Version.V_7_0_0_alpha1) && currentState == IndexShardState.PROMOTING) { + out.writeByte(IndexShardState.STARTED.id()); + } else { + out.writeByte(currentState.id()); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f5eba1b4f62b4..5145f87ea69e7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -217,12 +217,12 @@ Runnable getGlobalCheckpointSyncer() { private final IndexShardOperationPermits indexShardOperationPermits; - private static final EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); + private static final EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.PROMOTING, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); // for primaries, we only allow to write when actually started (so the cluster has decided we started) // in case we have a relocation of a primary, we also allow to write after phase 2 completed, where the shard may be // in state RECOVERING or POST_RECOVERY. After a primary has been marked as RELOCATED, we only allow writes to the relocation target // which can be either in POST_RECOVERY or already STARTED (this prevents writing concurrently to two primaries). - public static final EnumSet writeAllowedStatesForPrimary = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED); + public static final EnumSet writeAllowedStatesForPrimary = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.PROMOTING); // replication is also allowed while recovering, since we index also during recovery to replicas and rely on version checks to make sure its consistent // a relocated shard can also be target of a replication if the relocation target has not been marked as active yet and is syncing it's changes back to the relocation source private static final EnumSet writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); @@ -426,7 +426,7 @@ public void updateShardState(final ShardRouting newRouting, throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); } assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || - state == IndexShardState.CLOSED : + state == IndexShardState.PROMOTING || state == IndexShardState.CLOSED : "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state; persistMetadata(path, indexSettings, newRouting, currentRouting, logger); final CountDownLatch shardStateUpdated = new CountDownLatch(1); @@ -460,10 +460,10 @@ public void updateShardState(final ShardRouting newRouting, * incremented. */ // to prevent primary relocation handoff while resync is not completed - boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true); - if (resyncStarted == false) { - throw new IllegalStateException("cannot start resync while it's already in progress"); + if (state != IndexShardState.STARTED) { + throw new IllegalIndexShardStateException(shardId, state, "cannot start resync while it's already in progress"); } + changeState(IndexShardState.PROMOTING, "Promoting to primary"); indexShardOperationPermits.asyncBlockOperations( 30, TimeUnit.MINUTES, @@ -496,19 +496,27 @@ public void updateShardState(final ShardRouting newRouting, public void onResponse(ResyncTask resyncTask) { logger.info("primary-replica resync completed with {} operations", resyncTask.getResyncedOperations()); - boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); - assert resyncCompleted : "primary-replica resync finished but was not started"; + synchronized (mutex) { + if (state != IndexShardState.PROMOTING) { + throw new IllegalIndexShardStateException(shardId, state, "primary-replica resync finished but was not started"); + } + changeState(IndexShardState.STARTED, "Resync is completed"); + } } @Override public void onFailure(Exception e) { - boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); - assert resyncCompleted : "primary-replica resync finished but was not started"; - if (state == IndexShardState.CLOSED) { + synchronized (mutex) { // ignore, shutting down - } else { - failShard("exception during primary-replica resync", e); + if (state == IndexShardState.CLOSED) { + return; + } + if (state != IndexShardState.PROMOTING) { + throw new IllegalIndexShardStateException(shardId, state, "primary-replica resync failed but was not started"); + } + changeState(IndexShardState.STARTED, "Resync is failed"); } + failShard("exception during primary-replica resync", e); } }); } catch (final AlreadyClosedException e) { @@ -541,7 +549,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); } - if (state == IndexShardState.STARTED) { + if (state == IndexShardState.STARTED || state == IndexShardState.PROMOTING) { throw new IndexShardStartedException(shardId); } if (state == IndexShardState.RELOCATED) { @@ -558,8 +566,6 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta } } - private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean(); - /** * Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided * {@link Runnable} is executed after all operations are successfully blocked. @@ -622,11 +628,6 @@ private void verifyRelocatingState() { throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, ": shard is no longer relocating " + shardRouting); } - - if (primaryReplicaResyncInProgress.get()) { - throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, - ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting); - } } @Override @@ -1090,7 +1091,7 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() { public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException { IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine - if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { + if (state == IndexShardState.STARTED || state == IndexShardState.PROMOTING || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { return getEngine().acquireIndexCommit(flushFirst); } else { throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); @@ -1193,7 +1194,7 @@ public IndexShard postRecovery(String reason) throws IndexShardStartedException, if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); } - if (state == IndexShardState.STARTED) { + if (state == IndexShardState.STARTED || state == IndexShardState.PROMOTING) { throw new IndexShardStartedException(shardId); } if (state == IndexShardState.RELOCATED) { @@ -1426,7 +1427,7 @@ public void finalizeRecovery() { public boolean ignoreRecoveryAttempt() { IndexShardState state = state(); // one time volatile read return state == IndexShardState.POST_RECOVERY || state == IndexShardState.RECOVERING || state == IndexShardState.STARTED || - state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED; + state == IndexShardState.PROMOTING || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED; } public void readAllowed() throws IllegalIndexShardStateException { @@ -1494,7 +1495,7 @@ private void verifyNotClosed(Exception suppressed) throws IllegalIndexShardState protected final void verifyActive() throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read - if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) { + if (state != IndexShardState.STARTED && state != IndexShardState.PROMOTING && state != IndexShardState.RELOCATED) { throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard is active"); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardState.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardState.java index d3c6de7136c11..006fe51c2f9bd 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardState.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardState.java @@ -26,7 +26,8 @@ public enum IndexShardState { POST_RECOVERY((byte) 2), STARTED((byte) 3), RELOCATED((byte) 4), - CLOSED((byte) 5); + CLOSED((byte) 5), + PROMOTING((byte) 6); private static final IndexShardState[] IDS = new IndexShardState[IndexShardState.values().length]; diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index b1bd1c5b3138e..8935cbac0858d 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -128,7 +128,7 @@ public synchronized Translog.Operation next() throws IOException { if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); } else { - assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; + assert state == IndexShardState.PROMOTING : "resync should happen on a promoting shard, but state was: " + state; } return snapshot.next(); } diff --git a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index 73ba9342175d4..0f06884a7d64e 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -90,7 +90,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index private final Cancellable scheduler; private static final EnumSet CAN_WRITE_INDEX_BUFFER_STATES = EnumSet.of( - IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); + IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.PROMOTING, IndexShardState.RELOCATED); private final ShardsIndicesStatusChecker statusChecker; diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 5aa8b5f3ee1b3..65ec4b94e047d 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -589,7 +589,8 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard } final IndexShardState state = shard.state(); - if (shardRouting.initializing() && (state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY)) { + if (shardRouting.initializing() + && (state == IndexShardState.STARTED || state == IndexShardState.PROMOTING || state == IndexShardState.POST_RECOVERY)) { // the master thinks we are initializing, but we are already started or on POST_RECOVERY and waiting // for master to confirm a shard started message (either master failover, or a cluster event before // we managed to tell the master we started), mark us as started diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 2ae8d12a9fe9f..c33ba2a80b393 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -77,7 +77,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe Setting.positiveTimeSetting("indices.store.delete.shard.timeout", new TimeValue(30, TimeUnit.SECONDS), Property.NodeScope); public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists"; - private static final EnumSet ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED); + private static final EnumSet ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.PROMOTING, IndexShardState.RELOCATED); private final IndicesService indicesService; private final ClusterService clusterService; private final TransportService transportService; diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index d143ebe329341..c97bdfc009630 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -116,6 +116,7 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertVersionSerializable; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -398,10 +399,15 @@ public void testSearchParseException() throws IOException { public void testIllegalIndexShardStateException() throws IOException { ShardId id = new ShardId("foo", "_na_", 1); IndexShardState state = randomFrom(IndexShardState.values()); - IllegalIndexShardStateException ex = serialize(new IllegalIndexShardStateException(id, state, "come back later buddy")); + final Version version = VersionUtils.randomVersion(random()); + IllegalIndexShardStateException ex = serialize(new IllegalIndexShardStateException(id, state, "come back later buddy"), version); assertEquals(id, ex.getShardId()); assertEquals("CurrentState[" + state.name() + "] come back later buddy", ex.getMessage()); - assertEquals(state, ex.currentState()); + if (state == IndexShardState.PROMOTING && version.before(Version.V_7_0_0_alpha1)) { + assertThat(ex.currentState(), equalTo(IndexShardState.STARTED)); + } else { + assertThat(ex.currentState(), equalTo(state)); + } } public void testConnectTransportException() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 1b9a0ff629066..fa2fa254ac6c3 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -82,6 +82,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { logger.info("Total ops: {}, global checkpoint: {}", numDocs, globalCheckPoint); PlainActionFuture fut = new PlainActionFuture<>(); + shard.state = IndexShardState.PROMOTING; syncer.resync(shard, fut); fut.get(); @@ -126,6 +127,7 @@ public void testSyncerOnClosingShard() throws Exception { String allocationId = shard.routingEntry().allocationId().getId(); shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet()); + shard.state = IndexShardState.PROMOTING; PlainActionFuture fut = new PlainActionFuture<>(); threadPool.generic().execute(() -> { diff --git a/test/framework/src/main/java/org/elasticsearch/test/store/MockFSIndexStore.java b/test/framework/src/main/java/org/elasticsearch/test/store/MockFSIndexStore.java index 1efd210b110c8..03febd6457dd8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/store/MockFSIndexStore.java +++ b/test/framework/src/main/java/org/elasticsearch/test/store/MockFSIndexStore.java @@ -85,8 +85,7 @@ public DirectoryService newDirectoryService(ShardPath path) { } private static final EnumSet validCheckIndexStates = EnumSet.of( - IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY - ); + IndexShardState.STARTED, IndexShardState.PROMOTING, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); private static final class Listener implements IndexEventListener { private final Map shardSet = Collections.synchronizedMap(new IdentityHashMap<>());