|
23 | 23 | import com.carrotsearch.hppc.ObjectLongMap; |
24 | 24 | import com.carrotsearch.hppc.cursors.ObjectLongCursor; |
25 | 25 | import org.elasticsearch.common.SuppressForbidden; |
| 26 | +import org.elasticsearch.common.collect.HppcMaps; |
26 | 27 | import org.elasticsearch.index.IndexSettings; |
27 | 28 | import org.elasticsearch.index.shard.AbstractIndexShardComponent; |
28 | 29 | import org.elasticsearch.index.shard.ShardId; |
@@ -255,14 +256,68 @@ public synchronized void updateAllocationIdsFromMaster( |
255 | 256 | * @param seqNoPrimaryContext the sequence number context |
256 | 257 | */ |
257 | 258 | synchronized void updateAllocationIdsFromPrimaryContext(final SeqNoPrimaryContext seqNoPrimaryContext) { |
258 | | - final Set<String> inSyncAllocationIds = |
259 | | - new HashSet<>(Arrays.asList(seqNoPrimaryContext.inSyncLocalCheckpoints().keys().toArray(String.class))); |
260 | | - final Set<String> trackingAllocationIds = |
261 | | - new HashSet<>(Arrays.asList(seqNoPrimaryContext.trackingLocalCheckpoints().keys().toArray(String.class))); |
262 | | - updateAllocationIdsFromMaster(inSyncAllocationIds, trackingAllocationIds); |
| 259 | + /* |
| 260 | + * We are gathered here today to witness the relocation handoff transferring knowledge from the relocation source to the relocation |
| 261 | + * target. We need to consider the possibility that the version of the cluster state on the relocation source when the primary |
| 262 | + * context was sampled is different than the version of the cluster state on the relocation target at this exact moment. We define |
| 263 | + * the following values: |
| 264 | + * - version(source) = the cluster state version on the relocation source used to ensure a minimum cluster state version on the |
| 265 | + * relocation target |
| 266 | + * - version(context) = the cluster state version on the relocation source when the primary context was sampled |
| 267 | + * - version(target) = the current cluster state version on the relocation target |
| 268 | + * |
| 269 | + * We know that version(source) <= version(target) and version(context) < version(target), version(context) = version(target), and |
| 270 | + * version(target) < version(context) are all possibilities. |
| 271 | + * |
| 272 | + * The case of version(context) = version(target) causes no issues as in this case the knowledge of the in-sync and initializing |
| 273 | + * shards the target receives from the master will be equal to the knowledge of the in-sync and initializing shards the target |
| 274 | + * receives from the relocation source via the primary context. |
| 275 | + * |
| 276 | + * In the case when version(context) < version(target) or version(target) < version(context), we first consider shards that could be |
| 277 | + * contained in the primary context but not contained in the cluster state applied on the target. |
| 278 | + * |
| 279 | + * Suppose there is such a shard and that it is an in-sync shard. However, marking a shard as in-sync requires an operation permit |
| 280 | + * on the primary shard. Such a permit can not be obtained after the relocation handoff has started as the relocation handoff blocks |
| 281 | + * all operations. Therefore, there can not be such a shard that is marked in-sync. |
| 282 | + * |
| 283 | + * Now consider the case of an initializing shard that is contained in the primary context but not contained in the cluster state |
| 284 | + * applied on the target. |
| 285 | + * |
| 286 | + * If version(context) < version(target) it means that the shard has been removed by a later cluster state update that is already |
| 287 | + * applied on the target and we only need to ensure that we do not add it to the tracking map on the target. The call to |
| 288 | + * GlobalCheckpointTracker#updateLocalCheckpoint(String, long) is a no-op for such shards and this is safe. |
| 289 | + * |
| 290 | + * If version(target) < version(context) it means that the shard has started initializing by a later cluster state update has not |
| 291 | + * yet arrived on the target. However, there is a delay on recoveries before we ensure that version(source) <= version(target). |
| 292 | + * Therefore, such a shard can never initialize from the relocation source and will have to await the handoff completing. As such, |
| 293 | + * these shards are not problematic. |
| 294 | + * |
| 295 | + * Now we consider shards that are contained in the cluster state applied on the target but not contained in the primary context. |
| 296 | + * |
| 297 | + * If version(context) < version(target) it means that the target has learned of an initializing shard that the source is not aware |
| 298 | + * of. As explained above, this initialization can only succeed after the relocation is complete, and only with the target as the |
| 299 | + * source of the recovery. |
| 300 | + * |
| 301 | + * Otherwise, if version(target) < version(context) it only means that the global checkpoint on the target will be held back until a |
| 302 | + * later cluster state update arrives because the target will not learn of the removal until later. |
| 303 | + * |
| 304 | + * In both cases, no calls to update the local checkpoint for such shards will be made. This case is safe too. |
| 305 | + */ |
263 | 306 | for (final ObjectLongCursor<String> cursor : seqNoPrimaryContext.inSyncLocalCheckpoints()) { |
264 | 307 | updateLocalCheckpoint(cursor.key, cursor.value); |
| 308 | + assert cursor.value >= globalCheckpoint |
| 309 | + : "local checkpoint [" + cursor.value + "] violates being at least the global checkpoint [" + globalCheckpoint + "]"; |
| 310 | + try { |
| 311 | + markAllocationIdAsInSync(cursor.key, cursor.value); |
| 312 | + } catch (final InterruptedException e) { |
| 313 | + /* |
| 314 | + * Since the local checkpoint already exceeds the global checkpoint here, we never blocking waiting for advancement. This |
| 315 | + * means that we can never be interrupted. If we are bail, something is catastrophically wrong. |
| 316 | + */ |
| 317 | + throw new AssertionError(e); |
| 318 | + } |
265 | 319 | } |
| 320 | + |
266 | 321 | for (final ObjectLongCursor<String> cursor : seqNoPrimaryContext.trackingLocalCheckpoints()) { |
267 | 322 | updateLocalCheckpoint(cursor.key, cursor.value); |
268 | 323 | } |
|
0 commit comments