Skip to content

Commit c6a03bc

Browse files
authored
Introduce primary context (#25122)
* Introduce primary context The target of a primary relocation is not aware of the state of the replication group. In particular, it is not tracking in-sync and initializing shards and their checkpoints. This means that after the target shard is started, its knowledge of the replication group could differ from that of the relocation source. In particular, this differing view can lead to it computing a global checkpoint that moves backwards after it becomes aware of the state of the entire replication group. This commit addresses this issue by transferring a primary context during relocation handoff. * Fix test * Add assertion messages * Javadocs * Barrier between marking a shard in sync and relocating * Fix misplaced call * Paranoia * Better latch countdown * Catch any exception * Fix comment * Fix wait for cluster state relocation test * Update knowledge via upate local checkpoint API * toString * Visibility * Refactor permit * Push down * Imports * Docs * Fix compilation * Remove assertion * Fix compilation * Remove context wrapper * Move PrimaryContext to new package * Piping for cluster state version This commit adds piping for the cluster state version to the global checkpoint tracker. We do not use it yet. * Remove unused import * Implement versioning in tracker * Fix test * Unneeded public * Imports * Promote on our own * Add tests * Import * Newline * Update comment * Serialization * Assertion message * Update stale comment * Remove newline * Less verbose * Remove redundant assertion * Tracking -> in-sync * Assertions * Just say no Friends do not let friends block the cluster state update thread on network operations. * Extra newline * Add allocation ID to assertion * Rename method * Another rename * Introduce sealing * Sealing tests * One more assertion * Fix imports * Safer sealing * Remove check * Remove another sealed check
1 parent 4306315 commit c6a03bc

21 files changed

+908
-103
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.collect;
21+
22+
public class LongTuple<T> {
23+
24+
public static <T> LongTuple<T> tuple(final T v1, final long v2) {
25+
return new LongTuple<>(v1, v2);
26+
}
27+
28+
private final T v1;
29+
private final long v2;
30+
31+
private LongTuple(final T v1, final long v2) {
32+
this.v1 = v1;
33+
this.v2 = v2;
34+
}
35+
36+
public T v1() {
37+
return v1;
38+
}
39+
40+
public long v2() {
41+
return v2;
42+
}
43+
44+
@Override
45+
public boolean equals(final Object o) {
46+
if (this == o) return true;
47+
if (o == null || getClass() != o.getClass()) return false;
48+
49+
LongTuple tuple = (LongTuple) o;
50+
51+
return (v1 == null ? tuple.v1 == null : v1.equals(tuple.v1)) && (v2 == tuple.v2);
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
int result = v1 != null ? v1.hashCode() : 0;
57+
result = 31 * result + Long.hashCode(v2);
58+
return result;
59+
}
60+
61+
@Override
62+
public String toString() {
63+
return "Tuple [v1=" + v1 + ", v2=" + v2 + "]";
64+
}
65+
66+
}

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java

Lines changed: 187 additions & 8 deletions
Large diffs are not rendered by default.

core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.index.IndexSettings;
2323
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
24+
import org.elasticsearch.index.shard.PrimaryContext;
2425
import org.elasticsearch.index.shard.ShardId;
2526

2627
import java.util.Set;
@@ -165,13 +166,24 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) {
165166

166167
/**
167168
* Notifies the service of the current allocation IDs in the cluster state. See
168-
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
169+
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
169170
*
170-
* @param activeAllocationIds the allocation IDs of the currently active shard copies
171-
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
171+
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
172+
* @param activeAllocationIds the allocation IDs of the currently active shard copies
173+
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
172174
*/
173-
public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
174-
globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
175+
public void updateAllocationIdsFromMaster(
176+
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
177+
globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
178+
}
179+
180+
/**
181+
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
182+
*
183+
* @param primaryContext the sequence number context
184+
*/
185+
public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
186+
globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext);
175187
}
176188

177189
/**
@@ -183,4 +195,20 @@ public boolean pendingInSync() {
183195
return globalCheckpointTracker.pendingInSync();
184196
}
185197

198+
/**
199+
* Get the primary context for the shard. This includes the state of the global checkpoint tracker.
200+
*
201+
* @return the primary context
202+
*/
203+
public PrimaryContext primaryContext() {
204+
return globalCheckpointTracker.primaryContext();
205+
}
206+
207+
/**
208+
* Releases a previously acquired primary context.
209+
*/
210+
public void releasePrimaryContext() {
211+
globalCheckpointTracker.releasePrimaryContext();
212+
}
213+
186214
}

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 71 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -515,31 +515,37 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
515515

516516
private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();
517517

518-
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
518+
/**
519+
* Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided
520+
* {@link Runnable} is executed after all operations are successfully blocked.
521+
*
522+
* @param reason the reason for the relocation
523+
* @param consumer a {@link Runnable} that is executed after operations are blocked
524+
* @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation
525+
* @throws InterruptedException if blocking operations is interrupted
526+
*/
527+
public void relocated(
528+
final String reason, final Consumer<PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException {
519529
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
520530
try {
521531
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
522532
// no shard operation permits are being held here, move state from started to relocated
523533
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
524-
"in-flight operations in progress while moving shard state to relocated";
525-
synchronized (mutex) {
526-
if (state != IndexShardState.STARTED) {
527-
throw new IndexShardNotStartedException(shardId, state);
528-
}
529-
// if the master cancelled the recovery, the target will be removed
530-
// and the recovery will stopped.
531-
// However, it is still possible that we concurrently end up here
532-
// and therefore have to protect we don't mark the shard as relocated when
533-
// its shard routing says otherwise.
534-
if (shardRouting.relocating() == false) {
535-
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
536-
": shard is no longer relocating " + shardRouting);
537-
}
538-
if (primaryReplicaResyncInProgress.get()) {
539-
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
540-
": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
534+
"in-flight operations in progress while moving shard state to relocated";
535+
/*
536+
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
537+
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
538+
*/
539+
verifyRelocatingState();
540+
final PrimaryContext primaryContext = getEngine().seqNoService().primaryContext();
541+
try {
542+
consumer.accept(primaryContext);
543+
synchronized (mutex) {
544+
verifyRelocatingState();
545+
changeState(IndexShardState.RELOCATED, reason);
541546
}
542-
changeState(IndexShardState.RELOCATED, reason);
547+
} catch (final Exception e) {
548+
getEngine().seqNoService().releasePrimaryContext();
543549
}
544550
});
545551
} catch (TimeoutException e) {
@@ -551,6 +557,26 @@ public void relocated(String reason) throws IllegalIndexShardStateException, Int
551557
}
552558
}
553559

560+
private void verifyRelocatingState() {
561+
if (state != IndexShardState.STARTED) {
562+
throw new IndexShardNotStartedException(shardId, state);
563+
}
564+
/*
565+
* If the master cancelled recovery, the target will be removed and the recovery will be cancelled. However, it is still possible
566+
* that we concurrently end up here and therefore have to protect that we do not mark the shard as relocated when its shard routing
567+
* says otherwise.
568+
*/
569+
570+
if (shardRouting.relocating() == false) {
571+
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
572+
": shard is no longer relocating " + shardRouting);
573+
}
574+
575+
if (primaryReplicaResyncInProgress.get()) {
576+
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
577+
": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
578+
}
579+
}
554580

555581
public IndexShardState state() {
556582
return state;
@@ -1319,16 +1345,16 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
13191345

13201346
private void verifyPrimary() {
13211347
if (shardRouting.primary() == false) {
1322-
throw new IllegalStateException("shard is not a primary " + shardRouting);
1348+
throw new IllegalStateException("shard " + shardRouting + " is not a primary");
13231349
}
13241350
}
13251351

13261352
private void verifyReplicationTarget() {
13271353
final IndexShardState state = state();
13281354
if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) {
13291355
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
1330-
throw new IllegalStateException("active primary shard cannot be a replication target before " +
1331-
" relocation hand off " + shardRouting + ", state is [" + state + "]");
1356+
throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " +
1357+
"relocation hand off, state is [" + state + "]");
13321358
}
13331359
}
13341360

@@ -1603,8 +1629,8 @@ public void markAllocationIdAsInSync(final String allocationId, final long local
16031629
verifyPrimary();
16041630
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint);
16051631
/*
1606-
* We could have blocked waiting for the replica to catch up that we fell idle and there will not be a background sync to the
1607-
* replica; mark our self as active to force a future background sync.
1632+
* We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to
1633+
* the replica; mark our self as active to force a future background sync.
16081634
*/
16091635
active.compareAndSet(false, true);
16101636
}
@@ -1654,18 +1680,34 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) {
16541680

16551681
/**
16561682
* Notifies the service of the current allocation IDs in the cluster state. See
1657-
* {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)}
1683+
* {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)}
16581684
* for details.
16591685
*
1660-
* @param activeAllocationIds the allocation IDs of the currently active shard copies
1661-
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
1686+
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
1687+
* @param activeAllocationIds the allocation IDs of the currently active shard copies
1688+
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
16621689
*/
1663-
public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
1690+
public void updateAllocationIdsFromMaster(
1691+
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
16641692
verifyPrimary();
16651693
final Engine engine = getEngineOrNull();
16661694
// if the engine is not yet started, we are not ready yet and can just ignore this
16671695
if (engine != null) {
1668-
engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
1696+
engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
1697+
}
1698+
}
1699+
1700+
/**
1701+
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
1702+
*
1703+
* @param primaryContext the sequence number context
1704+
*/
1705+
public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
1706+
verifyPrimary();
1707+
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
1708+
final Engine engine = getEngineOrNull();
1709+
if (engine != null) {
1710+
engine.seqNoService().updateAllocationIdsFromPrimaryContext(primaryContext);
16691711
}
16701712
}
16711713

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.shard;
21+
22+
import com.carrotsearch.hppc.ObjectLongHashMap;
23+
import com.carrotsearch.hppc.ObjectLongMap;
24+
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
25+
import org.elasticsearch.common.io.stream.StreamInput;
26+
import org.elasticsearch.common.io.stream.StreamOutput;
27+
import org.elasticsearch.common.io.stream.Writeable;
28+
29+
import java.io.IOException;
30+
31+
/**
32+
* Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing
33+
* shards and their local checkpoints.
34+
*/
35+
public class PrimaryContext implements Writeable {
36+
37+
private long clusterStateVersion;
38+
39+
public long clusterStateVersion() {
40+
return clusterStateVersion;
41+
}
42+
43+
private ObjectLongMap<String> inSyncLocalCheckpoints;
44+
45+
public ObjectLongMap<String> inSyncLocalCheckpoints() {
46+
return inSyncLocalCheckpoints;
47+
}
48+
49+
private ObjectLongMap<String> trackingLocalCheckpoints;
50+
51+
public ObjectLongMap<String> trackingLocalCheckpoints() {
52+
return trackingLocalCheckpoints;
53+
}
54+
55+
public PrimaryContext(
56+
final long clusterStateVersion,
57+
final ObjectLongMap<String> inSyncLocalCheckpoints,
58+
final ObjectLongMap<String> trackingLocalCheckpoints) {
59+
this.clusterStateVersion = clusterStateVersion;
60+
this.inSyncLocalCheckpoints = inSyncLocalCheckpoints;
61+
this.trackingLocalCheckpoints = trackingLocalCheckpoints;
62+
}
63+
64+
public PrimaryContext(final StreamInput in) throws IOException {
65+
clusterStateVersion = in.readVLong();
66+
inSyncLocalCheckpoints = readMap(in);
67+
trackingLocalCheckpoints = readMap(in);
68+
}
69+
70+
private static ObjectLongMap<String> readMap(final StreamInput in) throws IOException {
71+
final int length = in.readVInt();
72+
final ObjectLongMap<String> map = new ObjectLongHashMap<>(length);
73+
for (int i = 0; i < length; i++) {
74+
final String key = in.readString();
75+
final long value = in.readZLong();
76+
map.addTo(key, value);
77+
}
78+
return map;
79+
}
80+
81+
@Override
82+
public void writeTo(final StreamOutput out) throws IOException {
83+
out.writeVLong(clusterStateVersion);
84+
writeMap(out, inSyncLocalCheckpoints);
85+
writeMap(out, trackingLocalCheckpoints);
86+
}
87+
88+
private static void writeMap(final StreamOutput out, final ObjectLongMap<String> map) throws IOException {
89+
out.writeVInt(map.size());
90+
for (ObjectLongCursor<String> cursor : map) {
91+
out.writeString(cursor.key);
92+
out.writeZLong(cursor.value);
93+
}
94+
}
95+
96+
@Override
97+
public String toString() {
98+
return "PrimaryContext{" +
99+
"clusterStateVersion=" + clusterStateVersion +
100+
", inSyncLocalCheckpoints=" + inSyncLocalCheckpoints +
101+
", trackingLocalCheckpoints=" + trackingLocalCheckpoints +
102+
'}';
103+
}
104+
105+
}

0 commit comments

Comments
 (0)