From a09a5292c51c38695eb6c0652f90faa72b9146d1 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 8 May 2019 12:05:04 +0200 Subject: [PATCH 1/3] Cleanup SnapshotsInProgress * Added separate enum for the state of each shard, it was really confusing that we used the same enum for the state of the snapshot overall and the state of each individual shard * relates https://github.com/elastic/elasticsearch/pull/40943#issuecomment-488664150 * Moved the contents of the class around a little so fields, constructors and nested classes/enums aren't all over the place especially now that we have yet another nested enum here * Shortened some obvious spots in equals method and saved a few lines via `computeIfAbsent` to make up for adding 50 new lines to this class --- .../cluster/SnapshotsInProgress.java | 414 ++++++++++-------- .../snapshots/SnapshotShardsService.java | 12 +- .../snapshots/SnapshotsService.java | 21 +- .../cluster/SnapshotsInProgressTests.java | 11 +- .../SharedClusterSnapshotRestoreIT.java | 7 +- ...SnapshotsInProgressSerializationTests.java | 3 +- 6 files changed, 255 insertions(+), 213 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index b5827dd01a1d1..9aabb4e56257e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -42,6 +42,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Meta data about snapshots that are currently executing @@ -49,16 +50,70 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implements Custom { public static final String TYPE = "snapshots"; + private final List entries; + + public SnapshotsInProgress(List entries) { + this.entries = entries; + } + + public SnapshotsInProgress(Entry... entries) { + this.entries = Arrays.asList(entries); + } + + public SnapshotsInProgress(StreamInput in) throws IOException { + Entry[] entries = new Entry[in.readVInt()]; + for (int i = 0; i < entries.length; i++) { + Snapshot snapshot = new Snapshot(in); + boolean includeGlobalState = in.readBoolean(); + boolean partial = in.readBoolean(); + State state = State.fromValue(in.readByte()); + int indices = in.readVInt(); + List indexBuilder = new ArrayList<>(); + for (int j = 0; j < indices; j++) { + indexBuilder.add(new IndexId(in.readString(), in.readString())); + } + long startTime = in.readLong(); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + int shards = in.readVInt(); + for (int j = 0; j < shards; j++) { + ShardId shardId = ShardId.readShardId(in); + builder.put(shardId, new ShardSnapshotStatus(in)); + } + long repositoryStateId = in.readLong(); + final String failure = in.readOptionalString(); + entries[i] = new Entry(snapshot, + includeGlobalState, + partial, + state, + Collections.unmodifiableList(indexBuilder), + startTime, + repositoryStateId, + builder.build(), + failure); + } + this.entries = Arrays.asList(entries); + } + + public List entries() { + return this.entries; + } + + public Entry snapshot(final Snapshot snapshot) { + for (Entry entry : entries) { + final Snapshot curr = entry.snapshot(); + if (curr.equals(snapshot)) { + return entry; + } + } + return null; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; + return entries.equals(((SnapshotsInProgress) o).entries); - SnapshotsInProgress that = (SnapshotsInProgress) o; - - if (!entries.equals(that.entries)) return false; - - return true; } @Override @@ -78,6 +133,120 @@ public String toString() { return builder.append("]").toString(); } + /** + * Checks if all shards in the list have completed + * + * @param shards list of shard statuses + * @return true if all shards have completed (either successfully or failed), false otherwise + */ + public static boolean completed(ObjectContainer shards) { + for (ObjectCursor status : shards) { + if (status.value.state().completed() == false) { + return false; + } + } + return true; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Custom.class, TYPE, in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(entries.size()); + for (Entry entry : entries) { + entry.snapshot().writeTo(out); + out.writeBoolean(entry.includeGlobalState()); + out.writeBoolean(entry.partial()); + out.writeByte(entry.state().value()); + out.writeVInt(entry.indices().size()); + for (IndexId index : entry.indices()) { + index.writeTo(out); + } + out.writeLong(entry.startTime()); + out.writeVInt(entry.shards().size()); + for (ObjectObjectCursor shardEntry : entry.shards()) { + shardEntry.key.writeTo(out); + shardEntry.value.writeTo(out); + } + out.writeLong(entry.repositoryStateId); + out.writeOptionalString(entry.failure); + } + } + + private static final String REPOSITORY = "repository"; + private static final String SNAPSHOTS = "snapshots"; + private static final String SNAPSHOT = "snapshot"; + private static final String UUID = "uuid"; + private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; + private static final String PARTIAL = "partial"; + private static final String STATE = "state"; + private static final String INDICES = "indices"; + private static final String START_TIME_MILLIS = "start_time_millis"; + private static final String START_TIME = "start_time"; + private static final String REPOSITORY_STATE_ID = "repository_state_id"; + private static final String SHARDS = "shards"; + private static final String INDEX = "index"; + private static final String SHARD = "shard"; + private static final String NODE = "node"; + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startArray(SNAPSHOTS); + for (Entry entry : entries) { + toXContent(entry, builder, params); + } + builder.endArray(); + return builder; + } + + public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(REPOSITORY, entry.snapshot().getRepository()); + builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName()); + builder.field(UUID, entry.snapshot().getSnapshotId().getUUID()); + builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState()); + builder.field(PARTIAL, entry.partial()); + builder.field(STATE, entry.state()); + builder.startArray(INDICES); + { + for (IndexId index : entry.indices()) { + index.toXContent(builder, params); + } + } + builder.endArray(); + builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime())); + builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId()); + builder.startArray(SHARDS); + { + for (ObjectObjectCursor shardEntry : entry.shards) { + ShardId shardId = shardEntry.key; + ShardSnapshotStatus status = shardEntry.value; + builder.startObject(); + { + builder.field(INDEX, shardId.getIndex()); + builder.field(SHARD, shardId.getId()); + builder.field(STATE, status.state()); + builder.field(NODE, status.nodeId()); + } + builder.endObject(); + } + } + builder.endArray(); + builder.endObject(); + } + public static class Entry { private final State state; private final Snapshot snapshot; @@ -91,8 +260,8 @@ public static class Entry { @Nullable private final String failure; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - long startTime, long repositoryStateId, ImmutableOpenMap shards, - String failure) { + long startTime, long repositoryStateId, ImmutableOpenMap shards, + String failure) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -122,7 +291,7 @@ public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, failure); + entry.repositoryStateId, shards, failure); } public Entry(Entry entry, ImmutableOpenMap shards) { @@ -208,18 +377,11 @@ public String toString() { return snapshot.toString(); } - // package private for testing - ImmutableOpenMap> findWaitingIndices(ImmutableOpenMap shards) { + private ImmutableOpenMap> findWaitingIndices(ImmutableOpenMap shards) { Map> waitingIndicesMap = new HashMap<>(); for (ObjectObjectCursor entry : shards) { - if (entry.value.state() == State.WAITING) { - final String indexName = entry.key.getIndexName(); - List waitingShards = waitingIndicesMap.get(indexName); - if (waitingShards == null) { - waitingShards = new ArrayList<>(); - waitingIndicesMap.put(indexName, waitingShards); - } - waitingShards.add(entry.key); + if (entry.value.state() == ShardState.WAITING) { + waitingIndicesMap.computeIfAbsent(entry.key.getIndexName(), k -> new ArrayList<>()).add(entry.key); } } if (waitingIndicesMap.isEmpty()) { @@ -233,36 +395,20 @@ ImmutableOpenMap> findWaitingIndices(ImmutableOpenMap shards) { - for (ObjectCursor status : shards) { - if (status.value.state().completed() == false) { - return false; - } - } - return true; - } - - public static class ShardSnapshotStatus { - private final State state; + private final ShardState state; private final String nodeId; private final String reason; public ShardSnapshotStatus(String nodeId) { - this(nodeId, State.INIT); + this(nodeId, ShardState.INIT); } - public ShardSnapshotStatus(String nodeId, State state) { + public ShardSnapshotStatus(String nodeId, ShardState state) { this(nodeId, state, null); } - public ShardSnapshotStatus(String nodeId, State state, String reason) { + public ShardSnapshotStatus(String nodeId, ShardState state, String reason) { this.nodeId = nodeId; this.state = state; this.reason = reason; @@ -272,11 +418,11 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) { public ShardSnapshotStatus(StreamInput in) throws IOException { nodeId = in.readOptionalString(); - state = State.fromValue(in.readByte()); + state = ShardState.fromValue(in.readByte()); reason = in.readOptionalString(); } - public State state() { + public ShardState state() { return state; } @@ -298,14 +444,9 @@ public void writeTo(StreamOutput out) throws IOException { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - ShardSnapshotStatus status = (ShardSnapshotStatus) o; + return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state; - if (nodeId != null ? !nodeId.equals(status.nodeId) : status.nodeId != null) return false; - if (reason != null ? !reason.equals(status.reason) : status.reason != null) return false; - if (state != status.state) return false; - - return true; } @Override @@ -377,161 +518,54 @@ public static State fromValue(byte value) { } } - private final List entries; - - - public SnapshotsInProgress(List entries) { - this.entries = entries; - } - - public SnapshotsInProgress(Entry... entries) { - this.entries = Arrays.asList(entries); - } - - public List entries() { - return this.entries; - } - - public Entry snapshot(final Snapshot snapshot) { - for (Entry entry : entries) { - final Snapshot curr = entry.snapshot(); - if (curr.equals(snapshot)) { - return entry; - } - } - return null; - } + public enum ShardState { + INIT((byte) 0, false, false), + STARTED((byte) 1, false, false), + SUCCESS((byte) 2, true, false), + FAILED((byte) 3, true, true), + ABORTED((byte) 4, false, true), + MISSING((byte) 5, true, true), + WAITING((byte) 6, false, false); - @Override - public String getWriteableName() { - return TYPE; - } + private byte value; - @Override - public Version getMinimalSupportedVersion() { - return Version.CURRENT.minimumCompatibilityVersion(); - } + private boolean completed; - public static NamedDiff readDiffFrom(StreamInput in) throws IOException { - return readDiffFrom(Custom.class, TYPE, in); - } + private boolean failed; - public SnapshotsInProgress(StreamInput in) throws IOException { - Entry[] entries = new Entry[in.readVInt()]; - for (int i = 0; i < entries.length; i++) { - Snapshot snapshot = new Snapshot(in); - boolean includeGlobalState = in.readBoolean(); - boolean partial = in.readBoolean(); - State state = State.fromValue(in.readByte()); - int indices = in.readVInt(); - List indexBuilder = new ArrayList<>(); - for (int j = 0; j < indices; j++) { - indexBuilder.add(new IndexId(in.readString(), in.readString())); - } - long startTime = in.readLong(); - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); - int shards = in.readVInt(); - for (int j = 0; j < shards; j++) { - ShardId shardId = ShardId.readShardId(in); - builder.put(shardId, new ShardSnapshotStatus(in)); - } - long repositoryStateId = in.readLong(); - final String failure = in.readOptionalString(); - entries[i] = new Entry(snapshot, - includeGlobalState, - partial, - state, - Collections.unmodifiableList(indexBuilder), - startTime, - repositoryStateId, - builder.build(), - failure); + ShardState(byte value, boolean completed, boolean failed) { + this.value = value; + this.completed = completed; + this.failed = failed; } - this.entries = Arrays.asList(entries); - } - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(entries.size()); - for (Entry entry : entries) { - entry.snapshot().writeTo(out); - out.writeBoolean(entry.includeGlobalState()); - out.writeBoolean(entry.partial()); - out.writeByte(entry.state().value()); - out.writeVInt(entry.indices().size()); - for (IndexId index : entry.indices()) { - index.writeTo(out); - } - out.writeLong(entry.startTime()); - out.writeVInt(entry.shards().size()); - for (ObjectObjectCursor shardEntry : entry.shards()) { - shardEntry.key.writeTo(out); - shardEntry.value.writeTo(out); - } - out.writeLong(entry.repositoryStateId); - out.writeOptionalString(entry.failure); + public boolean completed() { + return completed; } - } - private static final String REPOSITORY = "repository"; - private static final String SNAPSHOTS = "snapshots"; - private static final String SNAPSHOT = "snapshot"; - private static final String UUID = "uuid"; - private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; - private static final String PARTIAL = "partial"; - private static final String STATE = "state"; - private static final String INDICES = "indices"; - private static final String START_TIME_MILLIS = "start_time_millis"; - private static final String START_TIME = "start_time"; - private static final String REPOSITORY_STATE_ID = "repository_state_id"; - private static final String SHARDS = "shards"; - private static final String INDEX = "index"; - private static final String SHARD = "shard"; - private static final String NODE = "node"; - - @Override - public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startArray(SNAPSHOTS); - for (Entry entry : entries) { - toXContent(entry, builder, params); + public boolean failed() { + return failed; } - builder.endArray(); - return builder; - } - public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject(); - builder.field(REPOSITORY, entry.snapshot().getRepository()); - builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName()); - builder.field(UUID, entry.snapshot().getSnapshotId().getUUID()); - builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState()); - builder.field(PARTIAL, entry.partial()); - builder.field(STATE, entry.state()); - builder.startArray(INDICES); - { - for (IndexId index : entry.indices()) { - index.toXContent(builder, params); - } - } - builder.endArray(); - builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime())); - builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId()); - builder.startArray(SHARDS); - { - for (ObjectObjectCursor shardEntry : entry.shards) { - ShardId shardId = shardEntry.key; - ShardSnapshotStatus status = shardEntry.value; - builder.startObject(); - { - builder.field(INDEX, shardId.getIndex()); - builder.field(SHARD, shardId.getId()); - builder.field(STATE, status.state()); - builder.field(NODE, status.nodeId()); - } - builder.endObject(); + public static ShardState fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return STARTED; + case 2: + return SUCCESS; + case 3: + return FAILED; + case 4: + return ABORTED; + case 5: + return MISSING; + case 6: + return WAITING; + default: + throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]"); } } - builder.endArray(); - builder.endObject(); } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 9b6ab76a98745..87d5d267c0446 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -248,7 +249,8 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { // Add all new shards to start processing on final ShardId shardId = shard.key; final ShardSnapshotStatus shardSnapshotStatus = shard.value; - if (localNodeId.equals(shardSnapshotStatus.nodeId()) && shardSnapshotStatus.state() == State.INIT + if (localNodeId.equals(shardSnapshotStatus.nodeId()) + && shardSnapshotStatus.state() == ShardState.INIT && snapshotShards.containsKey(shardId) == false) { logger.trace("[{}] - Adding shard to the queue", shardId); if (startedShards == null) { @@ -286,7 +288,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { } else { // due to CS batching we might have missed the INIT state and straight went into ABORTED // notify master that abort has completed by moving to FAILED - if (shard.value.state() == State.ABORTED) { + if (shard.value.state() == ShardState.ABORTED) { notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason()); } } @@ -480,12 +482,14 @@ public String toString() { /** Notify the master node that the given shard has been successfully snapshotted **/ private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) { - sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.SUCCESS)); + sendSnapshotShardUpdate(snapshot, shardId, + new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS)); } /** Notify the master node that the given shard failed to be snapshotted **/ private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) { - sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.FAILED, failure)); + sendSnapshotShardUpdate(snapshot, shardId, + new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure)); } /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 1559bae8259b0..e606bff0cb9e4 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -776,7 +777,7 @@ public ClusterState execute(ClusterState currentState) { logger.warn("failing snapshot of shard [{}] on closed node [{}]", shardEntry.key, shardStatus.nodeId()); shards.put(shardEntry.key, - new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); + new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown")); } } } @@ -872,7 +873,7 @@ private static ImmutableOpenMap processWaitingShar for (ObjectObjectCursor shardEntry : snapshotShards) { ShardSnapshotStatus shardStatus = shardEntry.value; ShardId shardId = shardEntry.key; - if (shardStatus.state() == State.WAITING) { + if (shardStatus.state() == ShardState.WAITING) { IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); if (indexShardRoutingTable != null) { IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id()); @@ -893,7 +894,7 @@ private static ImmutableOpenMap processWaitingShar // Shard that we were waiting for went into unassigned state or disappeared - giving up snapshotChanged = true; logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId()); - shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "shard is unassigned")); + shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned")); } else { shards.put(shardId, shardStatus); } @@ -943,7 +944,7 @@ private static Tuple, Set> indicesWithMissingShards( Set missing = new HashSet<>(); Set closed = new HashSet<>(); for (ObjectObjectCursor entry : shards) { - if (entry.value.state() == State.MISSING) { + if (entry.value.state() == ShardState.MISSING) { if (metaData.hasIndex(entry.key.getIndex().getName()) && metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) { closed.add(entry.key.getIndex().getName()); @@ -1195,7 +1196,7 @@ public ClusterState execute(ClusterState currentState) { for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { ShardSnapshotStatus status = shardEntry.value; if (status.state().completed() == false) { - status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion"); + status = new ShardSnapshotStatus(status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion"); } shardsBuilder.put(shardEntry.key, status); } @@ -1385,7 +1386,7 @@ private static ImmutableOpenMap shards = ImmutableOpenMap.builder(); // test more than one waiting shard in an index - shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING)); - shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING)); + shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING)); + shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING)); shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); // test exactly one waiting shard in an index - shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING)); + shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING)); shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); // test no waiting shards in an index shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); @@ -72,7 +73,7 @@ public void testWaitingIndices() { assertFalse(waitingIndices.containsKey(idx3Name)); } - private State randomNonWaitingState() { - return randomFrom(Arrays.stream(State.values()).filter(s -> s != State.WAITING).collect(Collectors.toSet())); + private ShardState randomNonWaitingState() { + return randomFrom(Arrays.stream(ShardState.values()).filter(s -> s != ShardState.WAITING).collect(Collectors.toSet())); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 3a78b4786fc5c..0aa9fe1a9e2a6 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -53,6 +53,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.Entry; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -2701,9 +2702,9 @@ public void testDeleteOrphanSnapshot() throws Exception { public ClusterState execute(ClusterState currentState) { // Simulate orphan snapshot ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted")); - shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted")); - shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted")); + shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted")); + shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted")); + shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted")); return ClusterState.builder(currentState) .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(List.of(new Entry( new Snapshot(repositoryName, createSnapshotResponse.getSnapshotInfo().snapshotId()), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 3f23c8f0a2ded..6c8ddfb56c1cf 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.Entry; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -65,7 +66,7 @@ private Entry randomSnapshot() { for (int j = 0; j < shardsCount; j++) { ShardId shardId = new ShardId(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10)), randomIntBetween(0, 10)); String nodeId = randomAlphaOfLength(10); - State shardState = randomFrom(State.values()); + ShardState shardState = randomFrom(ShardState.values()); builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState, shardState.failed() ? randomAlphaOfLength(10) : null)); } From a29abbdf94d571d4944f13e6a409c4710eab6ca8 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 9 May 2019 14:36:00 +0200 Subject: [PATCH 2/3] deshuffle --- .../cluster/SnapshotsInProgress.java | 351 +++++++++--------- 1 file changed, 175 insertions(+), 176 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 689f3a60c6784..4f30a81cffc01 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -50,70 +50,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implements Custom { public static final String TYPE = "snapshots"; - private final List entries; - - public SnapshotsInProgress(List entries) { - this.entries = entries; - } - - public SnapshotsInProgress(Entry... entries) { - this.entries = Arrays.asList(entries); - } - - public SnapshotsInProgress(StreamInput in) throws IOException { - Entry[] entries = new Entry[in.readVInt()]; - for (int i = 0; i < entries.length; i++) { - Snapshot snapshot = new Snapshot(in); - boolean includeGlobalState = in.readBoolean(); - boolean partial = in.readBoolean(); - State state = State.fromValue(in.readByte()); - int indices = in.readVInt(); - List indexBuilder = new ArrayList<>(); - for (int j = 0; j < indices; j++) { - indexBuilder.add(new IndexId(in.readString(), in.readString())); - } - long startTime = in.readLong(); - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); - int shards = in.readVInt(); - for (int j = 0; j < shards; j++) { - ShardId shardId = new ShardId(in); - builder.put(shardId, new ShardSnapshotStatus(in)); - } - long repositoryStateId = in.readLong(); - final String failure = in.readOptionalString(); - entries[i] = new Entry(snapshot, - includeGlobalState, - partial, - state, - Collections.unmodifiableList(indexBuilder), - startTime, - repositoryStateId, - builder.build(), - failure); - } - this.entries = Arrays.asList(entries); - } - - public List entries() { - return this.entries; - } - - public Entry snapshot(final Snapshot snapshot) { - for (Entry entry : entries) { - final Snapshot curr = entry.snapshot(); - if (curr.equals(snapshot)) { - return entry; - } - } - return null; - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; return entries.equals(((SnapshotsInProgress) o).entries); - } @Override @@ -133,120 +74,6 @@ public String toString() { return builder.append("]").toString(); } - /** - * Checks if all shards in the list have completed - * - * @param shards list of shard statuses - * @return true if all shards have completed (either successfully or failed), false otherwise - */ - public static boolean completed(ObjectContainer shards) { - for (ObjectCursor status : shards) { - if (status.value.state().completed == false) { - return false; - } - } - return true; - } - - @Override - public String getWriteableName() { - return TYPE; - } - - @Override - public Version getMinimalSupportedVersion() { - return Version.CURRENT.minimumCompatibilityVersion(); - } - - public static NamedDiff readDiffFrom(StreamInput in) throws IOException { - return readDiffFrom(Custom.class, TYPE, in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(entries.size()); - for (Entry entry : entries) { - entry.snapshot().writeTo(out); - out.writeBoolean(entry.includeGlobalState()); - out.writeBoolean(entry.partial()); - out.writeByte(entry.state().value()); - out.writeVInt(entry.indices().size()); - for (IndexId index : entry.indices()) { - index.writeTo(out); - } - out.writeLong(entry.startTime()); - out.writeVInt(entry.shards().size()); - for (ObjectObjectCursor shardEntry : entry.shards()) { - shardEntry.key.writeTo(out); - shardEntry.value.writeTo(out); - } - out.writeLong(entry.repositoryStateId); - out.writeOptionalString(entry.failure); - } - } - - private static final String REPOSITORY = "repository"; - private static final String SNAPSHOTS = "snapshots"; - private static final String SNAPSHOT = "snapshot"; - private static final String UUID = "uuid"; - private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; - private static final String PARTIAL = "partial"; - private static final String STATE = "state"; - private static final String INDICES = "indices"; - private static final String START_TIME_MILLIS = "start_time_millis"; - private static final String START_TIME = "start_time"; - private static final String REPOSITORY_STATE_ID = "repository_state_id"; - private static final String SHARDS = "shards"; - private static final String INDEX = "index"; - private static final String SHARD = "shard"; - private static final String NODE = "node"; - - @Override - public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startArray(SNAPSHOTS); - for (Entry entry : entries) { - toXContent(entry, builder, params); - } - builder.endArray(); - return builder; - } - - public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject(); - builder.field(REPOSITORY, entry.snapshot().getRepository()); - builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName()); - builder.field(UUID, entry.snapshot().getSnapshotId().getUUID()); - builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState()); - builder.field(PARTIAL, entry.partial()); - builder.field(STATE, entry.state()); - builder.startArray(INDICES); - { - for (IndexId index : entry.indices()) { - index.toXContent(builder, params); - } - } - builder.endArray(); - builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime())); - builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId()); - builder.startArray(SHARDS); - { - for (ObjectObjectCursor shardEntry : entry.shards) { - ShardId shardId = shardEntry.key; - ShardSnapshotStatus status = shardEntry.value; - builder.startObject(); - { - builder.field(INDEX, shardId.getIndex()); - builder.field(SHARD, shardId.getId()); - builder.field(STATE, status.state()); - builder.field(NODE, status.nodeId()); - } - builder.endObject(); - } - } - builder.endArray(); - builder.endObject(); - } - public static class Entry { private final State state; private final Snapshot snapshot; @@ -260,8 +87,8 @@ public static class Entry { @Nullable private final String failure; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - long startTime, long repositoryStateId, ImmutableOpenMap shards, - String failure) { + long startTime, long repositoryStateId, ImmutableOpenMap shards, + String failure) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -291,7 +118,7 @@ public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, failure); + entry.repositoryStateId, shards, failure); } public Entry(Entry entry, ImmutableOpenMap shards) { @@ -395,6 +222,21 @@ private ImmutableOpenMap> findWaitingIndices(ImmutableOpen } } + /** + * Checks if all shards in the list have completed + * + * @param shards list of shard statuses + * @return true if all shards have completed (either successfully or failed), false otherwise + */ + public static boolean completed(ObjectContainer shards) { + for (ObjectCursor status : shards) { + if (status.value.state().completed == false) { + return false; + } + } + return true; + } + public static class ShardSnapshotStatus { private final ShardState state; private final String nodeId; @@ -518,6 +360,163 @@ public static State fromValue(byte value) { } } + private final List entries; + + public SnapshotsInProgress(List entries) { + this.entries = entries; + } + + public SnapshotsInProgress(Entry... entries) { + this.entries = Arrays.asList(entries); + } + + public List entries() { + return this.entries; + } + + public Entry snapshot(final Snapshot snapshot) { + for (Entry entry : entries) { + final Snapshot curr = entry.snapshot(); + if (curr.equals(snapshot)) { + return entry; + } + } + return null; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Custom.class, TYPE, in); + } + + public SnapshotsInProgress(StreamInput in) throws IOException { + Entry[] entries = new Entry[in.readVInt()]; + for (int i = 0; i < entries.length; i++) { + Snapshot snapshot = new Snapshot(in); + boolean includeGlobalState = in.readBoolean(); + boolean partial = in.readBoolean(); + State state = State.fromValue(in.readByte()); + int indices = in.readVInt(); + List indexBuilder = new ArrayList<>(); + for (int j = 0; j < indices; j++) { + indexBuilder.add(new IndexId(in.readString(), in.readString())); + } + long startTime = in.readLong(); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + int shards = in.readVInt(); + for (int j = 0; j < shards; j++) { + ShardId shardId = new ShardId(in); + builder.put(shardId, new ShardSnapshotStatus(in)); + } + long repositoryStateId = in.readLong(); + final String failure = in.readOptionalString(); + entries[i] = new Entry(snapshot, + includeGlobalState, + partial, + state, + Collections.unmodifiableList(indexBuilder), + startTime, + repositoryStateId, + builder.build(), + failure); + } + this.entries = Arrays.asList(entries); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(entries.size()); + for (Entry entry : entries) { + entry.snapshot().writeTo(out); + out.writeBoolean(entry.includeGlobalState()); + out.writeBoolean(entry.partial()); + out.writeByte(entry.state().value()); + out.writeVInt(entry.indices().size()); + for (IndexId index : entry.indices()) { + index.writeTo(out); + } + out.writeLong(entry.startTime()); + out.writeVInt(entry.shards().size()); + for (ObjectObjectCursor shardEntry : entry.shards()) { + shardEntry.key.writeTo(out); + shardEntry.value.writeTo(out); + } + out.writeLong(entry.repositoryStateId); + out.writeOptionalString(entry.failure); + } + } + + private static final String REPOSITORY = "repository"; + private static final String SNAPSHOTS = "snapshots"; + private static final String SNAPSHOT = "snapshot"; + private static final String UUID = "uuid"; + private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; + private static final String PARTIAL = "partial"; + private static final String STATE = "state"; + private static final String INDICES = "indices"; + private static final String START_TIME_MILLIS = "start_time_millis"; + private static final String START_TIME = "start_time"; + private static final String REPOSITORY_STATE_ID = "repository_state_id"; + private static final String SHARDS = "shards"; + private static final String INDEX = "index"; + private static final String SHARD = "shard"; + private static final String NODE = "node"; + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startArray(SNAPSHOTS); + for (Entry entry : entries) { + toXContent(entry, builder, params); + } + builder.endArray(); + return builder; + } + + public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(REPOSITORY, entry.snapshot().getRepository()); + builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName()); + builder.field(UUID, entry.snapshot().getSnapshotId().getUUID()); + builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState()); + builder.field(PARTIAL, entry.partial()); + builder.field(STATE, entry.state()); + builder.startArray(INDICES); + { + for (IndexId index : entry.indices()) { + index.toXContent(builder, params); + } + } + builder.endArray(); + builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime())); + builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId()); + builder.startArray(SHARDS); + { + for (ObjectObjectCursor shardEntry : entry.shards) { + ShardId shardId = shardEntry.key; + ShardSnapshotStatus status = shardEntry.value; + builder.startObject(); + { + builder.field(INDEX, shardId.getIndex()); + builder.field(SHARD, shardId.getId()); + builder.field(STATE, status.state()); + builder.field(NODE, status.nodeId()); + } + builder.endObject(); + } + } + builder.endArray(); + builder.endObject(); + } + public enum ShardState { INIT((byte) 0, false, false), STARTED((byte) 1, false, false), From b2d159fee5e99c03f62c1c847fcf11d7d06734d1 Mon Sep 17 00:00:00 2001 From: Armin Date: Sun, 12 May 2019 20:50:38 +0200 Subject: [PATCH 3/3] remove unused state from shard state enum --- .../status/TransportSnapshotsStatusAction.java | 1 - .../cluster/SnapshotsInProgress.java | 15 ++++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 5dfc24d1e280e..2baab96c2c8fe 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -186,7 +186,6 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li break; case INIT: case WAITING: - case STARTED: stage = SnapshotIndexShardStage.STARTED; break; case SUCCESS: diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 4f30a81cffc01..ae9506706e36a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -314,11 +314,11 @@ public enum State { MISSING((byte) 5, true, true), WAITING((byte) 6, false, false); - private byte value; + private final byte value; - private boolean completed; + private final boolean completed; - private boolean failed; + private final boolean failed; State(byte value, boolean completed, boolean failed) { this.value = value; @@ -519,18 +519,17 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p public enum ShardState { INIT((byte) 0, false, false), - STARTED((byte) 1, false, false), SUCCESS((byte) 2, true, false), FAILED((byte) 3, true, true), ABORTED((byte) 4, false, true), MISSING((byte) 5, true, true), WAITING((byte) 6, false, false); - private byte value; + private final byte value; - private boolean completed; + private final boolean completed; - private boolean failed; + private final boolean failed; ShardState(byte value, boolean completed, boolean failed) { this.value = value; @@ -550,8 +549,6 @@ public static ShardState fromValue(byte value) { switch (value) { case 0: return INIT; - case 1: - return STARTED; case 2: return SUCCESS; case 3: