From a3f39741108ca619aa5daa8a0400d38f8f2b86b2 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Tue, 16 Apr 2019 16:52:25 -0600 Subject: [PATCH 01/22] Add user-defined metadata to snapshots Adds a metadata field to snapshots which can be used to store arbitrary key-value information. This may be useful for attaching a description of why a snapshot was taken, tagging snapshots to make categorization easier, or identifying the source of automatically-created snapshots. --- .../create/CreateSnapshotRequest.java | 25 +++++++- .../cluster/SnapshotsInProgress.java | 23 ++++++-- .../repositories/FilterRepository.java | 6 +- .../repositories/Repository.java | 4 +- .../blobstore/BlobStoreRepository.java | 5 +- .../elasticsearch/snapshots/SnapshotInfo.java | 57 ++++++++++++++----- .../snapshots/SnapshotsService.java | 11 ++-- .../create/CreateSnapshotResponseTests.java | 2 +- .../get/GetSnapshotsResponseTests.java | 2 +- .../cluster/ClusterStateDiffIT.java | 13 +++-- .../cluster/SnapshotsInProgressTests.java | 2 +- .../MetaDataDeleteIndexServiceTests.java | 2 +- .../MetaDataIndexStateServiceTests.java | 2 +- .../RepositoriesServiceTests.java | 3 +- .../SharedClusterSnapshotRestoreIT.java | 3 +- ...SnapshotsInProgressSerializationTests.java | 2 +- .../index/shard/RestoreOnlyRepository.java | 2 +- .../xpack/ccr/repository/CcrRepository.java | 3 +- 18 files changed, 120 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java index 8f83da053b215..a53cd763775a6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java @@ -80,6 +80,8 @@ public class CreateSnapshotRequest extends MasterNodeRequest userMetadata; + public CreateSnapshotRequest() { } @@ -104,6 +106,7 @@ public CreateSnapshotRequest(StreamInput in) throws IOException { includeGlobalState = in.readBoolean(); waitForCompletion = in.readBoolean(); partial = in.readBoolean(); + userMetadata = in.readMap(); // NOCOMMIT version checking? } @Override @@ -117,6 +120,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(includeGlobalState); out.writeBoolean(waitForCompletion); out.writeBoolean(partial); + out.writeMap(userMetadata); // NOCOMMIT version checking? } @Override @@ -378,6 +382,15 @@ public boolean includeGlobalState() { return includeGlobalState; } + public Map userMetadata() { + return userMetadata; + } + + public CreateSnapshotRequest userMetadata(Map userMetadata) { + this.userMetadata = userMetadata; + return this; + } + /** * Parses snapshot definition. * @@ -405,6 +418,11 @@ public CreateSnapshotRequest source(Map source) { settings((Map) entry.getValue()); } else if (name.equals("include_global_state")) { includeGlobalState = nodeBooleanValue(entry.getValue(), "include_global_state"); + } else if (name.equals("_meta")) { + if (!(entry.getValue() instanceof Map)) { + throw new IllegalArgumentException("malformed _meta, should be an object"); + } + userMetadata((Map) entry.getValue()); } } indicesOptions(IndicesOptions.fromMap(source, indicesOptions)); @@ -433,6 +451,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (indicesOptions != null) { indicesOptions.toXContent(builder, params); } + builder.field("_meta", userMetadata); builder.endObject(); return builder; } @@ -460,12 +479,13 @@ public boolean equals(Object o) { Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions) && Objects.equals(settings, that.settings) && - Objects.equals(masterNodeTimeout, that.masterNodeTimeout); + Objects.equals(masterNodeTimeout, that.masterNodeTimeout) && + Objects.equals(userMetadata, that.userMetadata); } @Override public int hashCode() { - int result = Objects.hash(snapshot, repository, indicesOptions, partial, settings, includeGlobalState, waitForCompletion); + int result = Objects.hash(snapshot, repository, indicesOptions, partial, settings, includeGlobalState, waitForCompletion, userMetadata); result = 31 * result + Arrays.hashCode(indices); return result; } @@ -482,6 +502,7 @@ public String toString() { ", includeGlobalState=" + includeGlobalState + ", waitForCompletion=" + waitForCompletion + ", masterNodeTimeout=" + masterNodeTimeout + + ", _meta=" + userMetadata + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index b5827dd01a1d1..615d784f4e4f8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -88,11 +88,12 @@ public static class Entry { private final ImmutableOpenMap> waitingIndices; private final long startTime; private final long repositoryStateId; + @Nullable private final Map userMetadata; @Nullable private final String failure; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, long repositoryStateId, ImmutableOpenMap shards, - String failure) { + String failure, Map userMetadata) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -108,21 +109,23 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta } this.repositoryStateId = repositoryStateId; this.failure = failure; + this.userMetadata = userMetadata; } public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - long startTime, long repositoryStateId, ImmutableOpenMap shards) { - this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null); + long startTime, long repositoryStateId, ImmutableOpenMap shards, + Map userMetadata) { + this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata); } public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, entry.failure); + entry.repositoryStateId, shards, entry.failure, entry.userMetadata); } public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, failure); + entry.repositoryStateId, shards, failure, entry.userMetadata); } public Entry(Entry entry, ImmutableOpenMap shards) { @@ -153,6 +156,10 @@ public boolean includeGlobalState() { return includeGlobalState; } + public Map userMetadata() { + return userMetadata; + } + public boolean partial() { return partial; } @@ -437,6 +444,7 @@ public SnapshotsInProgress(StreamInput in) throws IOException { } long repositoryStateId = in.readLong(); final String failure = in.readOptionalString(); + final Map userMetadata = in.readMap(); // NOCOMMIT version checking? entries[i] = new Entry(snapshot, includeGlobalState, partial, @@ -445,7 +453,9 @@ public SnapshotsInProgress(StreamInput in) throws IOException { startTime, repositoryStateId, builder.build(), - failure); + failure, + userMetadata + ); } this.entries = Arrays.asList(entries); } @@ -470,6 +480,7 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeLong(entry.repositoryStateId); out.writeOptionalString(entry.failure); + out.writeMap(entry.userMetadata); // NOCOMMIT version checking? } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index afc38bda86c5b..403983d5d4059 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; public class FilterRepository implements Repository { @@ -79,9 +80,10 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState) { + List shardFailures, long repositoryStateId, boolean includeGlobalState, + Map userMetadata) { return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState); + includeGlobalState, userMetadata); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 20f7c42cb21dd..7e4a95c7af35a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.function.Function; /** @@ -134,7 +135,8 @@ default Repository create(RepositoryMetaData metaData, Function indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState); + List shardFailures, long repositoryStateId, boolean includeGlobalState, + Map userMetadata); /** * Deletes snapshot diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9351c5bf84e87..69245fc25a151 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -527,11 +527,12 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, final int totalShards, final List shardFailures, final long repositoryStateId, - final boolean includeGlobalState) { + final boolean includeGlobalState, + final Map userMetadata) { SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, indices.stream().map(IndexId::getName).collect(Collectors.toList()), startTime, failure, System.currentTimeMillis(), totalShards, shardFailures, - includeGlobalState); + includeGlobalState, userMetadata); try { snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID()); final RepositoryData repositoryData = getRepositoryData(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java index 38aa945bcca47..2475f639de401 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -74,6 +75,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, private static final String TOTAL_SHARDS = "total_shards"; private static final String SUCCESSFUL_SHARDS = "successful_shards"; private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; + private static final String USER_METADATA = "_meta"; private static final Comparator COMPARATOR = Comparator.comparing(SnapshotInfo::startTime).thenComparing(SnapshotInfo::snapshotId); @@ -88,6 +90,7 @@ public static final class SnapshotInfoBuilder { private long endTime = 0L; private ShardStatsBuilder shardStatsBuilder = null; private Boolean includeGlobalState = null; + private Map userMetadata = null; private int version = -1; private List shardFailures = null; @@ -127,6 +130,10 @@ private void setIncludeGlobalState(Boolean includeGlobalState) { this.includeGlobalState = includeGlobalState; } + private void setUserMetadata(Map userMetadata) { + this.userMetadata = userMetadata; + } + private void setVersion(int version) { this.version = version; } @@ -153,7 +160,7 @@ public SnapshotInfo build() { } return new SnapshotInfo(snapshotId, indices, snapshotState, reason, version, startTime, endTime, - totalShards, successfulShards, shardFailures, includeGlobalState); + totalShards, successfulShards, shardFailures, includeGlobalState, userMetadata); } } @@ -194,6 +201,7 @@ int getSuccessfulShards() { SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setEndTime, new ParseField(END_TIME_IN_MILLIS)); SNAPSHOT_INFO_PARSER.declareObject(SnapshotInfoBuilder::setShardStatsBuilder, SHARD_STATS_PARSER, new ParseField(SHARDS)); SNAPSHOT_INFO_PARSER.declareBoolean(SnapshotInfoBuilder::setIncludeGlobalState, new ParseField(INCLUDE_GLOBAL_STATE)); + SNAPSHOT_INFO_PARSER.declareObject(SnapshotInfoBuilder::setUserMetadata, (p, c) -> p.map() , new ParseField(USER_METADATA)); SNAPSHOT_INFO_PARSER.declareInt(SnapshotInfoBuilder::setVersion, new ParseField(VERSION_ID)); SNAPSHOT_INFO_PARSER.declareObjectArray(SnapshotInfoBuilder::setShardFailures, SnapshotShardFailure.SNAPSHOT_SHARD_FAILURE_PARSER, new ParseField(FAILURES)); @@ -223,35 +231,42 @@ int getSuccessfulShards() { @Nullable private final Boolean includeGlobalState; + @Nullable + private final Map userMetadata; + @Nullable private final Version version; private final List shardFailures; + // NOCOMMIT check uses of this constructor & plumb userMetadata public SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState state) { this(snapshotId, indices, state, null, null, 0L, 0L, 0, 0, - Collections.emptyList(), null); + Collections.emptyList(), null, null); } + // TODO double check this but I **think** this is fine just providing a null userMetadata public SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState state, Version version) { this(snapshotId, indices, state, null, version, 0L, 0L, 0, 0, - Collections.emptyList(), null); + Collections.emptyList(), null, null); } - public SnapshotInfo(SnapshotId snapshotId, List indices, long startTime, Boolean includeGlobalState) { + public SnapshotInfo(SnapshotId snapshotId, List indices, long startTime, Boolean includeGlobalState, + Map userMetadata) { this(snapshotId, indices, SnapshotState.IN_PROGRESS, null, Version.CURRENT, startTime, 0L, - 0, 0, Collections.emptyList(), includeGlobalState); + 0, 0, Collections.emptyList(), includeGlobalState, userMetadata); } public SnapshotInfo(SnapshotId snapshotId, List indices, long startTime, String reason, long endTime, - int totalShards, List shardFailures, Boolean includeGlobalState) { + int totalShards, List shardFailures, Boolean includeGlobalState, + Map userMetadata) { this(snapshotId, indices, snapshotState(reason, shardFailures), reason, Version.CURRENT, - startTime, endTime, totalShards, totalShards - shardFailures.size(), shardFailures, includeGlobalState); + startTime, endTime, totalShards, totalShards - shardFailures.size(), shardFailures, includeGlobalState, userMetadata); } private SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState state, String reason, Version version, long startTime, long endTime, int totalShards, int successfulShards, List shardFailures, - Boolean includeGlobalState) { + Boolean includeGlobalState, Map userMetadata) { this.snapshotId = Objects.requireNonNull(snapshotId); this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices)); this.state = state; @@ -263,6 +278,7 @@ private SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState this.successfulShards = successfulShards; this.shardFailures = Objects.requireNonNull(shardFailures); this.includeGlobalState = includeGlobalState; + this.userMetadata = userMetadata; } /** @@ -294,6 +310,7 @@ public SnapshotInfo(final StreamInput in) throws IOException { } version = in.readBoolean() ? Version.readVersion(in) : null; includeGlobalState = in.readOptionalBoolean(); + userMetadata = in.readMap(); // TODO NOCOMMIT this should check the version we're talking to } /** @@ -304,7 +321,7 @@ public static SnapshotInfo incompatible(SnapshotId snapshotId) { return new SnapshotInfo(snapshotId, Collections.emptyList(), SnapshotState.INCOMPATIBLE, "the snapshot is incompatible with the current version of Elasticsearch and its exact version is unknown", null, 0L, 0L, 0, 0, - Collections.emptyList(), null); + Collections.emptyList(), null, null); } /** @@ -492,6 +509,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (includeGlobalState != null) { builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState); } + if (userMetadata != null) { + builder.field(USER_METADATA, userMetadata); + } if (verbose || state != null) { builder.field(STATE, state); } @@ -543,6 +563,7 @@ private XContentBuilder toXContentInternal(final XContentBuilder builder, final if (includeGlobalState != null) { builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState); } + builder.field(USER_METADATA, userMetadata); builder.field(START_TIME, startTime); builder.field(END_TIME, endTime); builder.field(TOTAL_SHARDS, totalShards); @@ -573,6 +594,7 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr int totalShards = 0; int successfulShards = 0; Boolean includeGlobalState = null; + Map userMetadata = null; List shardFailures = Collections.emptyList(); if (parser.currentToken() == null) { // fresh parser? move to the first token parser.nextToken(); @@ -628,8 +650,12 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr parser.skipChildren(); } } else if (token == XContentParser.Token.START_OBJECT) { - // It was probably created by newer version - ignoring - parser.skipChildren(); + if (USER_METADATA.equals(currentFieldName)) { + userMetadata = parser.map(); + } else { + // It was probably created by newer version - ignoring + parser.skipChildren(); + } } } } @@ -651,7 +677,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr totalShards, successfulShards, shardFailures, - includeGlobalState); + includeGlobalState, + userMetadata); } @Override @@ -683,6 +710,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeBoolean(false); } out.writeOptionalBoolean(includeGlobalState); + out.writeMap(userMetadata); // TODO NOCOMMIT this should check the version we're talking to } private static SnapshotState snapshotState(final String reason, final List shardFailures) { @@ -712,13 +740,14 @@ public boolean equals(Object o) { Objects.equals(indices, that.indices) && Objects.equals(includeGlobalState, that.includeGlobalState) && Objects.equals(version, that.version) && - Objects.equals(shardFailures, that.shardFailures); + Objects.equals(shardFailures, that.shardFailures) && + Objects.equals(userMetadata, that.userMetadata); } @Override public int hashCode() { return Objects.hash(snapshotId, state, reason, indices, startTime, endTime, - totalShards, successfulShards, includeGlobalState, version, shardFailures); + totalShards, successfulShards, includeGlobalState, version, shardFailures, userMetadata); } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 1559bae8259b0..6ca8b9410447f 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -288,7 +288,8 @@ public ClusterState execute(ClusterState currentState) { snapshotIndices, System.currentTimeMillis(), repositoryData.getGenId(), - null); + null, + request.userMetadata()); initializingSnapshots.add(newSnapshot.snapshot()); snapshots = new SnapshotsInProgress(newSnapshot); } else { @@ -558,7 +559,8 @@ private void cleanupAfterError(Exception exception) { 0, Collections.emptyList(), snapshot.getRepositoryStateId(), - snapshot.includeGlobalState()); + snapshot.includeGlobalState(), + snapshot.userMetadata()); } catch (Exception inner) { inner.addSuppressed(exception); logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", @@ -573,7 +575,7 @@ private void cleanupAfterError(Exception exception) { private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { return new SnapshotInfo(entry.snapshot().getSnapshotId(), entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), - entry.startTime(), entry.includeGlobalState()); + entry.startTime(), entry.includeGlobalState(), entry.userMetadata()); } /** @@ -989,7 +991,8 @@ protected void doRun() { entry.shards().size(), unmodifiableList(shardFailures), entry.getRepositoryStateId(), - entry.includeGlobalState()); + entry.includeGlobalState(), + entry.userMetadata()); removeSnapshotFromClusterState(snapshot, snapshotInfo, null); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java index bbb11fc6feef0..edfba07607641 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java @@ -64,6 +64,6 @@ protected CreateSnapshotResponse createTestInstance() { boolean globalState = randomBoolean(); return new CreateSnapshotResponse( - new SnapshotInfo(snapshotId, indices, startTime, reason, endTime, totalShards, shardFailures, globalState)); + new SnapshotInfo(snapshotId, indices, startTime, reason, endTime, totalShards, shardFailures, globalState, null)); // NOCOMMIT generate actual test data maybe? } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java index c5bd7d9f38ac1..6623b07a89c2f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java @@ -54,7 +54,7 @@ protected GetSnapshotsResponse createTestInstance() { ShardId shardId = new ShardId("index", UUIDs.base64UUID(), 2); List shardFailures = Collections.singletonList(new SnapshotShardFailure("node-id", shardId, "reason")); snapshots.add(new SnapshotInfo(snapshotId, Arrays.asList("indice1", "indice2"), System.currentTimeMillis(), reason, - System.currentTimeMillis(), randomIntBetween(2, 3), shardFailures, randomBoolean())); + System.currentTimeMillis(), randomIntBetween(2, 3), shardFailures, randomBoolean(), null)); // NOCOMMIT generate actual test data maybe? } return new GetSnapshotsResponse(snapshots); diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index 313bf1d47c771..dde47cd798004 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -32,11 +32,6 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoriesMetaData; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -47,15 +42,20 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.ESIntegTestCase; import java.util.Collections; @@ -719,7 +719,8 @@ public ClusterState.Custom randomCreate(String name) { Collections.emptyList(), Math.abs(randomLong()), (long) randomIntBetween(0, 1000), - ImmutableOpenMap.of())); + ImmutableOpenMap.of(), + null)); // NOCOMMIT probably generate actual test data case 1: return new RestoreInProgress.Builder().add( new RestoreInProgress.Entry( diff --git a/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java b/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java index fcf70909b31d6..ec71462fe84f3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java @@ -64,7 +64,7 @@ public void testWaitingIndices() { // test no waiting shards in an index shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT, - indices, System.currentTimeMillis(), randomLong(), shards.build()); + indices, System.currentTimeMillis(), randomLong(), shards.build(), null); // NOCOMMIT generate test data maybe ImmutableOpenMap> waitingIndices = entry.waitingIndices(); assertEquals(2, waitingIndices.get(idx1Name).size()); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java index 5905d528ff43f..4fec5ae97f984 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java @@ -60,7 +60,7 @@ public void testDeleteSnapshotting() { Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid")); SnapshotsInProgress snaps = new SnapshotsInProgress(new SnapshotsInProgress.Entry(snapshot, true, false, SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")), - System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of())); + System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(), null)); // NOCOMMIT generate actual test data maybe? ClusterState state = ClusterState.builder(clusterState(index)) .putCustom(SnapshotsInProgress.TYPE, snaps) .build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 8189e0b2b047d..4e5b4af0943e1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -392,7 +392,7 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))); final SnapshotsInProgress.Entry entry = new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT, - Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build()); + Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build(), null); // NOCOMMIT generate actual test data maybe? return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entry)).build(); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 981004f48efe8..5b7242b49d6c3 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.mockito.Mockito.mock; @@ -145,7 +146,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState) { + boolean includeGlobalState, Map userMetadata) { return null; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index ffdbaea36f2df..22717f1caa790 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2713,7 +2713,8 @@ public ClusterState execute(ClusterState currentState) { Collections.singletonList(indexId), System.currentTimeMillis(), repositoryData.getGenId(), - shards.build())); + shards.build(), + null)); // NOCOMMIT generate test data maybe? return ClusterState.builder(currentState) .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))) .build(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 3f23c8f0a2ded..f05a07d3f83c0 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -70,7 +70,7 @@ private Entry randomSnapshot() { shardState.failed() ? randomAlphaOfLength(10) : null)); } ImmutableOpenMap shards = builder.build(); - return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards); + return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null); // NOCOMMIT generate actual test data maybe? } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index bc60b4c194622..03f77d52939e5 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -99,7 +99,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState) { + boolean includeGlobalState, Map userMetadata) { return null; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 5a0472339c192..97d0d1048a778 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -255,7 +255,8 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState) { + List shardFailures, long repositoryStateId, boolean includeGlobalState, + Map userMetadata) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } From 26b1457979e19d4b5147de29588b80d4bf26f2f4 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Wed, 17 Apr 2019 14:24:51 -0600 Subject: [PATCH 02/22] Check versions in stream methods --- .../snapshots/create/CreateSnapshotRequest.java | 9 +++++++-- .../elasticsearch/cluster/SnapshotsInProgress.java | 11 +++++++++-- .../org/elasticsearch/snapshots/SnapshotInfo.java | 12 +++++++++--- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java index a53cd763775a6..1a0c46e551fea 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java @@ -46,6 +46,7 @@ import static org.elasticsearch.common.settings.Settings.readSettingsFromStream; import static org.elasticsearch.common.settings.Settings.writeSettingsToStream; import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue; +import static org.elasticsearch.snapshots.SnapshotInfo.METADATA_ADDED_VERSION; /** * Create snapshot request @@ -106,7 +107,9 @@ public CreateSnapshotRequest(StreamInput in) throws IOException { includeGlobalState = in.readBoolean(); waitForCompletion = in.readBoolean(); partial = in.readBoolean(); - userMetadata = in.readMap(); // NOCOMMIT version checking? + if (in.getVersion().onOrAfter(METADATA_ADDED_VERSION)) { + userMetadata = in.readMap(); + } } @Override @@ -120,7 +123,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(includeGlobalState); out.writeBoolean(waitForCompletion); out.writeBoolean(partial); - out.writeMap(userMetadata); // NOCOMMIT version checking? + if (out.getVersion().onOrAfter(METADATA_ADDED_VERSION)) { + out.writeMap(userMetadata); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 615d784f4e4f8..847ba0cd41c5f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -43,6 +43,8 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.snapshots.SnapshotInfo.METADATA_ADDED_VERSION; + /** * Meta data about snapshots that are currently executing */ @@ -444,7 +446,10 @@ public SnapshotsInProgress(StreamInput in) throws IOException { } long repositoryStateId = in.readLong(); final String failure = in.readOptionalString(); - final Map userMetadata = in.readMap(); // NOCOMMIT version checking? + Map userMetadata = null; + if (in.getVersion().onOrAfter(METADATA_ADDED_VERSION)) { + userMetadata = in.readMap(); + } entries[i] = new Entry(snapshot, includeGlobalState, partial, @@ -480,7 +485,9 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeLong(entry.repositoryStateId); out.writeOptionalString(entry.failure); - out.writeMap(entry.userMetadata); // NOCOMMIT version checking? + if (out.getVersion().onOrAfter(METADATA_ADDED_VERSION)) { + out.writeMap(entry.userMetadata); + } } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java index 2475f639de401..78126102e4210 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java @@ -52,6 +52,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, public static final String CONTEXT_MODE_PARAM = "context_mode"; public static final String CONTEXT_MODE_SNAPSHOT = "SNAPSHOT"; + public static final Version METADATA_ADDED_VERSION = Version.V_7_1_0; private static final DateFormatter DATE_TIME_FORMATTER = DateFormatter.forPattern("strictDateOptionalTime"); private static final String SNAPSHOT = "snapshot"; private static final String UUID = "uuid"; @@ -239,7 +240,6 @@ int getSuccessfulShards() { private final List shardFailures; - // NOCOMMIT check uses of this constructor & plumb userMetadata public SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState state) { this(snapshotId, indices, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null); @@ -310,7 +310,11 @@ public SnapshotInfo(final StreamInput in) throws IOException { } version = in.readBoolean() ? Version.readVersion(in) : null; includeGlobalState = in.readOptionalBoolean(); - userMetadata = in.readMap(); // TODO NOCOMMIT this should check the version we're talking to + if (in.getVersion().onOrAfter(METADATA_ADDED_VERSION)) { + userMetadata = in.readMap(); + } else { + userMetadata = null; + } } /** @@ -710,7 +714,9 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeBoolean(false); } out.writeOptionalBoolean(includeGlobalState); - out.writeMap(userMetadata); // TODO NOCOMMIT this should check the version we're talking to + if (out.getVersion().onOrAfter(METADATA_ADDED_VERSION)) { + out.writeMap(userMetadata); + } } private static SnapshotState snapshotState(final String reason, final List shardFailures) { From 62d2c039dbdb86cd3a052a0f76f8b1d9f07bbc96 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Wed, 17 Apr 2019 15:52:45 -0600 Subject: [PATCH 03/22] Handle null metadata --- .../admin/cluster/snapshots/create/CreateSnapshotRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java index 1a0c46e551fea..6e3dd1009636e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java @@ -424,7 +424,7 @@ public CreateSnapshotRequest source(Map source) { } else if (name.equals("include_global_state")) { includeGlobalState = nodeBooleanValue(entry.getValue(), "include_global_state"); } else if (name.equals("_meta")) { - if (!(entry.getValue() instanceof Map)) { + if (entry.getValue() != null && !(entry.getValue() instanceof Map)) { throw new IllegalArgumentException("malformed _meta, should be an object"); } userMetadata((Map) entry.getValue()); From 4d6ea785a01ba6544e5f68c4bbf650a62672f68d Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Wed, 17 Apr 2019 15:53:02 -0600 Subject: [PATCH 04/22] Add metadata getter --- .../org/elasticsearch/snapshots/SnapshotInfo.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java index 78126102e4210..3560a5165781e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java @@ -449,6 +449,18 @@ public Version version() { return version; } + /** + * Returns the custom metadata that was attached to this snapshot at creation time. + * @return custom metadata + */ + @Nullable + public Map userMetadata() { + if (userMetadata != null) { + return Collections.unmodifiableMap(userMetadata); + } + return null; + } + /** * Compares two snapshots by their start time; if the start times are the same, then * compares the two snapshots by their snapshot ids. From 7ce8974b3c3d3dc298d96a97657006b3f0dd982c Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Wed, 17 Apr 2019 15:53:21 -0600 Subject: [PATCH 05/22] Add tests --- .../create/CreateSnapshotRequestTests.java | 4 + .../snapshots/SnapshotInfoTests.java | 118 ++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoTests.java diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequestTests.java index 0b598be6849cb..a768d7b94537a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequestTests.java @@ -80,6 +80,10 @@ public void testToXContent() throws IOException { original.includeGlobalState(randomBoolean()); } + if (randomBoolean()) { + original.userMetadata(); + } + if (randomBoolean()) { Collection wildcardStates = randomSubsetOf(Arrays.asList(WildcardStates.values())); Collection