Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
d687f51
Introduce primary context
jasontedor Jun 2, 2017
8d871cf
Fix test
jasontedor Jun 8, 2017
a197a64
Add assertion messages
jasontedor Jun 8, 2017
1e4255c
Javadocs
jasontedor Jun 8, 2017
c1588bb
Barrier between marking a shard in sync and relocating
jasontedor Jun 9, 2017
0b47386
Fix misplaced call
jasontedor Jun 9, 2017
73bc0d1
Paranoia
jasontedor Jun 9, 2017
76162e4
Better latch countdown
jasontedor Jun 9, 2017
08bb6fa
Merge branch 'master' into primary-context
jasontedor Jun 10, 2017
a799e69
Catch any exception
jasontedor Jun 10, 2017
58adfa0
Fix comment
jasontedor Jun 10, 2017
03b863b
Fix wait for cluster state relocation test
jasontedor Jun 10, 2017
40a3fcc
Merge branch 'master' into primary-context
jasontedor Jun 12, 2017
8504b82
Update knowledge via upate local checkpoint API
jasontedor Jun 13, 2017
5f05d92
toString
jasontedor Jun 14, 2017
a35e497
Visibility
jasontedor Jun 14, 2017
b54a8d6
Refactor permit
jasontedor Jun 14, 2017
48157cd
Push down
jasontedor Jun 14, 2017
9a58ff4
Imports
jasontedor Jun 14, 2017
62966c5
Docs
jasontedor Jun 14, 2017
da4c9aa
Merge branch 'master' into primary-context
jasontedor Jun 14, 2017
a4dda93
Fix compilation
jasontedor Jun 14, 2017
ef1d345
Remove assertion
jasontedor Jun 14, 2017
51ad65b
Fix compilation
jasontedor Jun 14, 2017
e3f1886
Merge branch 'master' into primary-context
jasontedor Jun 15, 2017
4ba8d5c
Remove context wrapper
jasontedor Jun 15, 2017
48572a6
Move PrimaryContext to new package
jasontedor Jun 15, 2017
c9bd0d7
Piping for cluster state version
jasontedor Jun 15, 2017
acca222
Remove unused import
jasontedor Jun 15, 2017
d459f16
Implement versioning in tracker
jasontedor Jun 15, 2017
4471dc2
Fix test
jasontedor Jun 15, 2017
56f3c17
Unneeded public
jasontedor Jun 15, 2017
f97fd92
Imports
jasontedor Jun 15, 2017
29ca82a
Promote on our own
jasontedor Jun 15, 2017
d7a8021
Add tests
jasontedor Jun 16, 2017
9683757
Import
jasontedor Jun 16, 2017
d9dda28
Merge branch 'master' of github.com:elastic/elasticsearch into primar…
jasontedor Jun 16, 2017
5422f6e
Newline
jasontedor Jun 16, 2017
d04f14a
Update comment
jasontedor Jun 16, 2017
99597d1
Merge branch 'master' into primary-context
jasontedor Jun 22, 2017
52af334
Serialization
jasontedor Jun 22, 2017
e1de296
Assertion message
jasontedor Jun 22, 2017
643fa8a
Update stale comment
jasontedor Jun 22, 2017
4e1fa9c
Remove newline
jasontedor Jun 22, 2017
a275167
Less verbose
jasontedor Jun 22, 2017
00e4083
Remove redundant assertion
jasontedor Jun 22, 2017
5f16884
Tracking -> in-sync
jasontedor Jun 22, 2017
3a53fd3
Assertions
jasontedor Jun 22, 2017
ccde798
Just say no
jasontedor Jun 23, 2017
e6bbe8b
Extra newline
jasontedor Jun 23, 2017
ff54eec
Add allocation ID to assertion
jasontedor Jun 23, 2017
6ea61ef
Rename method
jasontedor Jun 23, 2017
f6f6acb
Another rename
jasontedor Jun 23, 2017
de862cd
Introduce sealing
jasontedor Jun 23, 2017
dc377fa
Merge branch 'master' into primary-context
jasontedor Jun 23, 2017
077b3f4
Sealing tests
jasontedor Jun 23, 2017
4e05714
One more assertion
jasontedor Jun 23, 2017
b30dbcc
Merge branch 'master' into primary-context
jasontedor Jun 26, 2017
744b03a
Fix imports
jasontedor Jun 26, 2017
3c2731f
Safer sealing
jasontedor Jun 26, 2017
e19c8b3
Remove check
jasontedor Jun 26, 2017
0d33a88
Remove another sealed check
jasontedor Jun 26, 2017
7bd3c1b
Merge branch 'master' into primary-context
jasontedor Jun 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions core/src/main/java/org/elasticsearch/common/collect/LongTuple.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.collect;

public class LongTuple<T> {

public static <T> LongTuple<T> tuple(final T v1, final long v2) {
return new LongTuple<>(v1, v2);
}

private final T v1;
private final long v2;

private LongTuple(final T v1, final long v2) {
this.v1 = v1;
this.v2 = v2;
}

public T v1() {
return v1;
}

public long v2() {
return v2;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

LongTuple tuple = (LongTuple) o;

return (v1 == null ? tuple.v1 == null : v1.equals(tuple.v1)) && (v2 == tuple.v2);
}

@Override
public int hashCode() {
int result = v1 != null ? v1.hashCode() : 0;
result = 31 * result + Long.hashCode(v2);
return result;
}

@Override
public String toString() {
return "Tuple [v1=" + v1 + ", v2=" + v2 + "]";
}

}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;

import java.util.Set;
Expand Down Expand Up @@ -165,13 +166,24 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) {

/**
* Notifies the service of the current allocation IDs in the cluster state. See
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
*
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
public void updateAllocationIdsFromMaster(
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
}

/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
*
* @param primaryContext the sequence number context
*/
public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext);
}

/**
Expand All @@ -183,4 +195,20 @@ public boolean pendingInSync() {
return globalCheckpointTracker.pendingInSync();
}

/**
* Get the primary context for the shard. This includes the state of the global checkpoint tracker.
*
* @return the primary context
*/
public PrimaryContext primaryContext() {
return globalCheckpointTracker.primaryContext();
}

/**
* Releases a previously acquired primary context.
*/
public void releasePrimaryContext() {
globalCheckpointTracker.releasePrimaryContext();
}

}
100 changes: 71 additions & 29 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,31 +515,37 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta

private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();

public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
/**
* Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided
* {@link Runnable} is executed after all operations are successfully blocked.
*
* @param reason the reason for the relocation
* @param consumer a {@link Runnable} that is executed after operations are blocked
* @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation
* @throws InterruptedException if blocking operations is interrupted
*/
public void relocated(
final String reason, final Consumer<PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
"in-flight operations in progress while moving shard state to relocated";
synchronized (mutex) {
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
// if the master cancelled the recovery, the target will be removed
// and the recovery will stopped.
// However, it is still possible that we concurrently end up here
// and therefore have to protect we don't mark the shard as relocated when
// its shard routing says otherwise.
if (shardRouting.relocating() == false) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}
if (primaryReplicaResyncInProgress.get()) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
"in-flight operations in progress while moving shard state to relocated";
/*
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
*/
verifyRelocatingState();
final PrimaryContext primaryContext = getEngine().seqNoService().primaryContext();
try {
consumer.accept(primaryContext);
synchronized (mutex) {
verifyRelocatingState();
changeState(IndexShardState.RELOCATED, reason);
}
changeState(IndexShardState.RELOCATED, reason);
} catch (final Exception e) {
getEngine().seqNoService().releasePrimaryContext();
}
});
} catch (TimeoutException e) {
Expand All @@ -551,6 +557,26 @@ public void relocated(String reason) throws IllegalIndexShardStateException, Int
}
}

private void verifyRelocatingState() {
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
/*
* If the master cancelled recovery, the target will be removed and the recovery will be cancelled. However, it is still possible
* that we concurrently end up here and therefore have to protect that we do not mark the shard as relocated when its shard routing
* says otherwise.
*/

if (shardRouting.relocating() == false) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}

if (primaryReplicaResyncInProgress.get()) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
}
}

public IndexShardState state() {
return state;
Expand Down Expand Up @@ -1319,16 +1345,16 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn

private void verifyPrimary() {
if (shardRouting.primary() == false) {
throw new IllegalStateException("shard is not a primary " + shardRouting);
throw new IllegalStateException("shard " + shardRouting + " is not a primary");
}
}

private void verifyReplicationTarget() {
final IndexShardState state = state();
if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalStateException("active primary shard cannot be a replication target before " +
" relocation hand off " + shardRouting + ", state is [" + state + "]");
throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " +
"relocation hand off, state is [" + state + "]");
}
}

Expand Down Expand Up @@ -1603,8 +1629,8 @@ public void markAllocationIdAsInSync(final String allocationId, final long local
verifyPrimary();
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint);
/*
* We could have blocked waiting for the replica to catch up that we fell idle and there will not be a background sync to the
* replica; mark our self as active to force a future background sync.
* We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to
* the replica; mark our self as active to force a future background sync.
*/
active.compareAndSet(false, true);
}
Expand Down Expand Up @@ -1654,18 +1680,34 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) {

/**
* Notifies the service of the current allocation IDs in the cluster state. See
* {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)}
* {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)}
* for details.
*
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
public void updateAllocationIdsFromMaster(
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
verifyPrimary();
final Engine engine = getEngineOrNull();
// if the engine is not yet started, we are not ready yet and can just ignore this
if (engine != null) {
engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
}
}

/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
*
* @param primaryContext the sequence number context
*/
public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
verifyPrimary();
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
final Engine engine = getEngineOrNull();
if (engine != null) {
engine.seqNoService().updateAllocationIdsFromPrimaryContext(primaryContext);
}
}

Expand Down
105 changes: 105 additions & 0 deletions core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.shard;

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;

/**
* Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing
* shards and their local checkpoints.
*/
public class PrimaryContext implements Writeable {

private long clusterStateVersion;

public long clusterStateVersion() {
return clusterStateVersion;
}

private ObjectLongMap<String> inSyncLocalCheckpoints;

public ObjectLongMap<String> inSyncLocalCheckpoints() {
return inSyncLocalCheckpoints;
}

private ObjectLongMap<String> trackingLocalCheckpoints;

public ObjectLongMap<String> trackingLocalCheckpoints() {
return trackingLocalCheckpoints;
}

public PrimaryContext(
final long clusterStateVersion,
final ObjectLongMap<String> inSyncLocalCheckpoints,
final ObjectLongMap<String> trackingLocalCheckpoints) {
this.clusterStateVersion = clusterStateVersion;
this.inSyncLocalCheckpoints = inSyncLocalCheckpoints;
this.trackingLocalCheckpoints = trackingLocalCheckpoints;
}

public PrimaryContext(final StreamInput in) throws IOException {
clusterStateVersion = in.readVLong();
inSyncLocalCheckpoints = readMap(in);
trackingLocalCheckpoints = readMap(in);
}

private static ObjectLongMap<String> readMap(final StreamInput in) throws IOException {
final int length = in.readVInt();
final ObjectLongMap<String> map = new ObjectLongHashMap<>(length);
for (int i = 0; i < length; i++) {
final String key = in.readString();
final long value = in.readZLong();
map.addTo(key, value);
}
return map;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeVLong(clusterStateVersion);
writeMap(out, inSyncLocalCheckpoints);
writeMap(out, trackingLocalCheckpoints);
}

private static void writeMap(final StreamOutput out, final ObjectLongMap<String> map) throws IOException {
out.writeVInt(map.size());
for (ObjectLongCursor<String> cursor : map) {
out.writeString(cursor.key);
out.writeZLong(cursor.value);
}
}

@Override
public String toString() {
return "PrimaryContext{" +
"clusterStateVersion=" + clusterStateVersion +
", inSyncLocalCheckpoints=" + inSyncLocalCheckpoints +
", trackingLocalCheckpoints=" + trackingLocalCheckpoints +
'}';
}

}
Loading