Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ private void maybeTrimTranslog() {
continue;
case POST_RECOVERY:
case STARTED:
case PROMOTING:
case RELOCATED:
try {
shard.trimTranslog();
Expand All @@ -758,6 +759,7 @@ private void maybeSyncGlobalCheckpoints() {
assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active";
continue;
case STARTED:
case PROMOTING:
try {
shard.acquirePrimaryOperationPermit(
ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.shard;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -52,7 +53,11 @@ public IllegalIndexShardStateException(StreamInput in) throws IOException{
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(currentState.id());
if (out.getVersion().before(Version.V_7_0_0_alpha1) && currentState == IndexShardState.PROMOTING) {
out.writeByte(IndexShardState.STARTED.id());
} else {
out.writeByte(currentState.id());
}
}

@Override
Expand Down
51 changes: 26 additions & 25 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ Runnable getGlobalCheckpointSyncer() {

private final IndexShardOperationPermits indexShardOperationPermits;

private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.PROMOTING, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
// for primaries, we only allow to write when actually started (so the cluster has decided we started)
// in case we have a relocation of a primary, we also allow to write after phase 2 completed, where the shard may be
// in state RECOVERING or POST_RECOVERY. After a primary has been marked as RELOCATED, we only allow writes to the relocation target
// which can be either in POST_RECOVERY or already STARTED (this prevents writing concurrently to two primaries).
public static final EnumSet<IndexShardState> writeAllowedStatesForPrimary = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED);
public static final EnumSet<IndexShardState> writeAllowedStatesForPrimary = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.PROMOTING);
// replication is also allowed while recovering, since we index also during recovery to replicas and rely on version checks to make sure its consistent
// a relocated shard can also be target of a replication if the relocation target has not been marked as active yet and is syncing it's changes back to the relocation source
private static final EnumSet<IndexShardState> writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
Expand Down Expand Up @@ -426,7 +426,7 @@ public void updateShardState(final ShardRouting newRouting,
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
}
assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED ||
state == IndexShardState.CLOSED :
state == IndexShardState.PROMOTING || state == IndexShardState.CLOSED :
"routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
persistMetadata(path, indexSettings, newRouting, currentRouting, logger);
final CountDownLatch shardStateUpdated = new CountDownLatch(1);
Expand Down Expand Up @@ -460,10 +460,10 @@ public void updateShardState(final ShardRouting newRouting,
* incremented.
*/
// to prevent primary relocation handoff while resync is not completed
boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
if (resyncStarted == false) {
throw new IllegalStateException("cannot start resync while it's already in progress");
if (state != IndexShardState.STARTED) {
throw new IllegalIndexShardStateException(shardId, state, "cannot start resync while it's already in progress");
}
changeState(IndexShardState.PROMOTING, "Promoting to primary");
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
Expand Down Expand Up @@ -496,19 +496,27 @@ public void updateShardState(final ShardRouting newRouting,
public void onResponse(ResyncTask resyncTask) {
logger.info("primary-replica resync completed with {} operations",
resyncTask.getResyncedOperations());
boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
assert resyncCompleted : "primary-replica resync finished but was not started";
synchronized (mutex) {
if (state != IndexShardState.PROMOTING) {
throw new IllegalIndexShardStateException(shardId, state, "primary-replica resync finished but was not started");
}
changeState(IndexShardState.STARTED, "Resync is completed");
}
}

@Override
public void onFailure(Exception e) {
boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we have no corresponding state transition here now, does it mean that the shard can forever be stuck in PROMOTING state, allow no relocations?

assert resyncCompleted : "primary-replica resync finished but was not started";
if (state == IndexShardState.CLOSED) {
synchronized (mutex) {
// ignore, shutting down
} else {
failShard("exception during primary-replica resync", e);
if (state == IndexShardState.CLOSED) {
return;
}
if (state != IndexShardState.PROMOTING) {
throw new IllegalIndexShardStateException(shardId, state, "primary-replica resync failed but was not started");
}
changeState(IndexShardState.STARTED, "Resync is failed");
}
failShard("exception during primary-replica resync", e);
}
});
} catch (final AlreadyClosedException e) {
Expand Down Expand Up @@ -541,7 +549,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
if (state == IndexShardState.STARTED || state == IndexShardState.PROMOTING) {
throw new IndexShardStartedException(shardId);
}
if (state == IndexShardState.RELOCATED) {
Expand All @@ -558,8 +566,6 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
}
}

private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();

/**
* Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided
* {@link Runnable} is executed after all operations are successfully blocked.
Expand Down Expand Up @@ -622,11 +628,6 @@ private void verifyRelocatingState() {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}

if (primaryReplicaResyncInProgress.get()) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
}
}

@Override
Expand Down Expand Up @@ -1090,7 +1091,7 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
if (state == IndexShardState.STARTED || state == IndexShardState.PROMOTING || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return getEngine().acquireIndexCommit(flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
Expand Down Expand Up @@ -1193,7 +1194,7 @@ public IndexShard postRecovery(String reason) throws IndexShardStartedException,
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
if (state == IndexShardState.STARTED || state == IndexShardState.PROMOTING) {
throw new IndexShardStartedException(shardId);
}
if (state == IndexShardState.RELOCATED) {
Expand Down Expand Up @@ -1426,7 +1427,7 @@ public void finalizeRecovery() {
public boolean ignoreRecoveryAttempt() {
IndexShardState state = state(); // one time volatile read
return state == IndexShardState.POST_RECOVERY || state == IndexShardState.RECOVERING || state == IndexShardState.STARTED ||
state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED;
state == IndexShardState.PROMOTING || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED;
}

public void readAllowed() throws IllegalIndexShardStateException {
Expand Down Expand Up @@ -1494,7 +1495,7 @@ private void verifyNotClosed(Exception suppressed) throws IllegalIndexShardState

protected final void verifyActive() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) {
if (state != IndexShardState.STARTED && state != IndexShardState.PROMOTING && state != IndexShardState.RELOCATED) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard is active");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public enum IndexShardState {
POST_RECOVERY((byte) 2),
STARTED((byte) 3),
RELOCATED((byte) 4),
CLOSED((byte) 5);
CLOSED((byte) 5),
PROMOTING((byte) 6);

private static final IndexShardState[] IDS = new IndexShardState[IndexShardState.values().length];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public synchronized Translog.Operation next() throws IOException {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
} else {
assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state;
assert state == IndexShardState.PROMOTING : "resync should happen on a promoting shard, but state was: " + state;
}
return snapshot.next();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
private final Cancellable scheduler;

private static final EnumSet<IndexShardState> CAN_WRITE_INDEX_BUFFER_STATES = EnumSet.of(
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.PROMOTING, IndexShardState.RELOCATED);

private final ShardsIndicesStatusChecker statusChecker;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,8 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
}

final IndexShardState state = shard.state();
if (shardRouting.initializing() && (state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY)) {
if (shardRouting.initializing()
&& (state == IndexShardState.STARTED || state == IndexShardState.PROMOTING || state == IndexShardState.POST_RECOVERY)) {
// the master thinks we are initializing, but we are already started or on POST_RECOVERY and waiting
// for master to confirm a shard started message (either master failover, or a cluster event before
// we managed to tell the master we started), mark us as started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
Setting.positiveTimeSetting("indices.store.delete.shard.timeout", new TimeValue(30, TimeUnit.SECONDS),
Property.NodeScope);
public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists";
private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED);
private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.PROMOTING, IndexShardState.RELOCATED);
private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertVersionSerializable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;

Expand Down Expand Up @@ -398,10 +399,15 @@ public void testSearchParseException() throws IOException {
public void testIllegalIndexShardStateException() throws IOException {
ShardId id = new ShardId("foo", "_na_", 1);
IndexShardState state = randomFrom(IndexShardState.values());
IllegalIndexShardStateException ex = serialize(new IllegalIndexShardStateException(id, state, "come back later buddy"));
final Version version = VersionUtils.randomVersion(random());
IllegalIndexShardStateException ex = serialize(new IllegalIndexShardStateException(id, state, "come back later buddy"), version);
assertEquals(id, ex.getShardId());
assertEquals("CurrentState[" + state.name() + "] come back later buddy", ex.getMessage());
assertEquals(state, ex.currentState());
if (state == IndexShardState.PROMOTING && version.before(Version.V_7_0_0_alpha1)) {
assertThat(ex.currentState(), equalTo(IndexShardState.STARTED));
} else {
assertThat(ex.currentState(), equalTo(state));
}
}

public void testConnectTransportException() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
logger.info("Total ops: {}, global checkpoint: {}", numDocs, globalCheckPoint);

PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
shard.state = IndexShardState.PROMOTING;
syncer.resync(shard, fut);
fut.get();

Expand Down Expand Up @@ -126,6 +127,7 @@ public void testSyncerOnClosingShard() throws Exception {
String allocationId = shard.routingEntry().allocationId().getId();
shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet());
shard.state = IndexShardState.PROMOTING;

PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
threadPool.generic().execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ public DirectoryService newDirectoryService(ShardPath path) {
}

private static final EnumSet<IndexShardState> validCheckIndexStates = EnumSet.of(
IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY
);
IndexShardState.STARTED, IndexShardState.PROMOTING, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
private static final class Listener implements IndexEventListener {

private final Map<IndexShard, Boolean> shardSet = Collections.synchronizedMap(new IdentityHashMap<>());
Expand Down