From 00c938e4e877156625411a4035e81570c4b2cf87 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 8 Dec 2020 08:34:24 +0100 Subject: [PATCH 1/2] Protect replicated data streams against local rollovers Backporting #64710 to the 7.x branch. When a data stream is being auto followed then a rollover in a local cluster can break auto following, if the local cluster performs a rollover then it creates a new write index and if then later the remote cluster rolls over as well then that new write index can't be replicated, because it has the same name as in the write index in the local cluster, which was created earlier. If a data stream is managed by ccr, then the local cluster should not do a rollover for those data streams. The data stream should be rolled over in the remote cluster and that change should replicate to the local cluster. Performing a rollover in the local cluster is an operation that the data stream support in ccr should perform. To protect against rolling over a replicated data stream, this PR adds a replicate field to DataStream class. The rollover api will fail with an error in case a data stream is being rolled over and the targeted data stream is a replicated data stream. When the put follow api creates a data stream in the local cluster then the replicate flag is set to true. There should be a way to turn a replicated data stream into a regular data stream when for example during disaster recovery. The newly added api in this pr (promote data stream api) is doing that. After a replicated data stream is promoted to a regular data stream then the local data stream can be rolled over, so that the new write index is no longer a follower index. Also if the put follow api is attempting to update this data stream (for example to attempt to resume auto following) then that with fail, because the data stream is no longer a replicated data stream. Today with time based indices behind an alias, the is_write_index property isn't replicated from remote cluster to the local cluster, so when attempting to rollover the alias in the local cluster the rollover fails, because the alias doesn't have a write index. The added replicated field in the DataStream class and added validation achieve the same kind of protection, but in a more robust way. A followup from #61993. --- docs/reference/ccr/auto-follow.asciidoc | 3 + .../data-streams/data-stream-apis.asciidoc | 5 +- .../promote-data-stream-api.asciidoc | 38 ++ .../rollover/MetadataRolloverService.java | 2 + .../cluster/metadata/DataStream.java | 49 +- .../MetadataCreateDataStreamService.java | 2 +- .../snapshots/RestoreService.java | 2 +- .../MetadataRolloverServiceTests.java | 8 +- .../cluster/metadata/DataStreamTests.java | 2 +- .../IndexNameExpressionResolverTests.java | 2 +- .../cluster/DataStreamTestHelper.java | 3 +- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 438 ++++++++++++++++-- .../xpack/ccr/ESCCRRestTestCase.java | 6 +- .../ccr/action/TransportPutFollowAction.java | 11 +- .../action/TransportPutFollowActionTests.java | 20 +- .../core/action/PromoteDataStreamAction.java | 76 +++ .../xpack/datastreams/DataStreamsPlugin.java | 9 +- .../PromoteDataStreamTransportAction.java | 96 ++++ .../rest/RestPromoteDataStreamAction.java | 33 ++ .../xpack/security/operator/Constants.java | 1 + .../api/indices.promote_data_stream.json | 27 ++ 21 files changed, 756 insertions(+), 77 deletions(-) create mode 100644 docs/reference/data-streams/promote-data-stream-api.asciidoc create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/PromoteDataStreamAction.java create mode 100644 x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java create mode 100644 x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json diff --git a/docs/reference/ccr/auto-follow.asciidoc b/docs/reference/ccr/auto-follow.asciidoc index 333bb708cf1da..1fa514cb9d3e3 100644 --- a/docs/reference/ccr/auto-follow.asciidoc +++ b/docs/reference/ccr/auto-follow.asciidoc @@ -13,6 +13,9 @@ automatically followed if the data stream name matches an auto-follow pattern. If you create a data stream after creating the auto-follow pattern, all backing indices are followed automatically. +The data streams replicated from a remote cluster by CCR are protected from +local rollovers. The <> +can be used to turn these data streams into regular data streams. Auto-follow patterns are especially useful with <>, which might continually create diff --git a/docs/reference/data-streams/data-stream-apis.asciidoc b/docs/reference/data-streams/data-stream-apis.asciidoc index 04a5a281c222c..0711abaf6eb10 100644 --- a/docs/reference/data-streams/data-stream-apis.asciidoc +++ b/docs/reference/data-streams/data-stream-apis.asciidoc @@ -8,6 +8,7 @@ The following APIs are available for managing <>: * <> * <> * <> +* <> For concepts and tutorials, see <>. @@ -17,4 +18,6 @@ include::{es-repo-dir}/indices/delete-data-stream.asciidoc[] include::{es-repo-dir}/indices/get-data-stream.asciidoc[] -include::{es-repo-dir}/indices/data-stream-stats.asciidoc[] \ No newline at end of file +include::{es-repo-dir}/indices/data-stream-stats.asciidoc[] + +include::{es-repo-dir}/data-streams/promote-data-stream-api.asciidoc[] diff --git a/docs/reference/data-streams/promote-data-stream-api.asciidoc b/docs/reference/data-streams/promote-data-stream-api.asciidoc new file mode 100644 index 0000000000000..6a8a0afa6fadd --- /dev/null +++ b/docs/reference/data-streams/promote-data-stream-api.asciidoc @@ -0,0 +1,38 @@ +[role="xpack"] +[[promote-data-stream-api]] +=== Promote Data Stream API +++++ +Promote data stream api +++++ + +The purpose of the promote data stream api is to turn +a data stream that is replicated by CCR into a regular +data stream. + +Via CCR Auto Following, a data stream from a remote cluster +can be replicated to the local cluster. These data streams +can't be rolled over in the local cluster. Only if the upstream +data stream rolls over then these replicated data streams roll +over as well. In the event that the remote cluster is no longer +available, the data stream in the local cluster can be promoted +to a regular data stream, which allows these data streams to +be rolled over in the local cluster. + +[source,console] +---- +POST /_data_stream/_promote/my-data-stream +---- +// TEST[catch:missing] + +[[promote-data-stream-api-request]] +==== {api-request-title} + +`POST /_data_stream/_promote/` + + +[[promote-data-stream-api-path-params]] +==== {api-path-parms-title} + +``:: +(Required, string) +The name of the data stream to promote. diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index e2fbbda0c0e54..12a273a89c7e3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; @@ -150,6 +151,7 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra final DataStream ds = dataStream.getDataStream(); final IndexMetadata originalWriteIndex = dataStream.getWriteIndex(); final String newWriteIndexName = DataStream.getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1); + ds.rollover(new Index(newWriteIndexName, "uuid")); // just for validation createIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists if (onlyValidate) { return new RolloverResult(newWriteIndexName, originalWriteIndex.getIndex().getName(), currentState); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index e07d11dda16c6..d1483f0190e0a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -44,6 +44,7 @@ public final class DataStream extends AbstractDiffable implements To public static final String BACKING_INDEX_PREFIX = ".ds-"; public static final Version HIDDEN_VERSION = Version.V_7_11_0; + public static final Version REPLICATED_VERSION = Version.V_7_11_0; private final String name; private final TimestampField timeStampField; @@ -51,19 +52,21 @@ public final class DataStream extends AbstractDiffable implements To private final long generation; private final Map metadata; private final boolean hidden; + private final boolean replicated; public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata) { - this(name, timeStampField, indices, generation, metadata, false); + this(name, timeStampField, indices, generation, metadata, false, false); } public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata, - boolean hidden) { + boolean hidden, boolean replicated) { this.name = name; this.timeStampField = timeStampField; this.indices = Collections.unmodifiableList(indices); this.generation = generation; this.metadata = metadata; this.hidden = hidden; + this.replicated = replicated; assert indices.size() > 0; assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation)); } @@ -97,6 +100,16 @@ public boolean isHidden() { return hidden; } + /** + * Determines whether this data stream is replicated from elsewhere, + * for example a remote cluster. + * + * @return Whether this data stream is replicated. + */ + public boolean isReplicated() { + return replicated; + } + /** * Performs a rollover on a {@code DataStream} instance and returns a new instance containing * the updated list of backing indices and incremented generation. @@ -107,9 +120,14 @@ public boolean isHidden() { */ public DataStream rollover(Index newWriteIndex) { assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1)); + if (replicated) { + throw new IllegalArgumentException("data stream [" + name + "] cannot be rolled over, " + + "because it is a replicated data stream"); + } + List backingIndices = new ArrayList<>(indices); backingIndices.add(newWriteIndex); - return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden); + return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated); } /** @@ -123,7 +141,7 @@ public DataStream removeBackingIndex(Index index) { List backingIndices = new ArrayList<>(indices); backingIndices.remove(index); assert backingIndices.size() == indices.size() - 1; - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); } /** @@ -148,7 +166,11 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki "it is the write index", existingBackingIndex.getName(), name)); } backingIndices.set(backingIndexPosition, newBackingIndex); - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); + } + + public DataStream promoteDataStream() { + return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false); } /** @@ -166,7 +188,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene public DataStream(StreamInput in) throws IOException { this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(), in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readMap(): null, - in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean()); + in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean(), + in.getVersion().onOrAfter(REPLICATED_VERSION) && in.readBoolean()); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -185,6 +208,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(HIDDEN_VERSION)) { out.writeBoolean(hidden); } + if (out.getVersion().onOrAfter(REPLICATED_VERSION)) { + out.writeBoolean(replicated); + } } public static final ParseField NAME_FIELD = new ParseField("name"); @@ -193,11 +219,12 @@ public void writeTo(StreamOutput out) throws IOException { public static final ParseField GENERATION_FIELD = new ParseField("generation"); public static final ParseField METADATA_FIELD = new ParseField("_meta"); public static final ParseField HIDDEN_FIELD = new ParseField("hidden"); + public static final ParseField REPLICATED_FIELD = new ParseField("replicated"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", args -> new DataStream((String) args[0], (TimestampField) args[1], (List) args[2], (Long) args[3], - (Map) args[4], args[5] != null && (boolean) args[5])); + (Map) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); @@ -206,6 +233,7 @@ public void writeTo(StreamOutput out) throws IOException { PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -223,6 +251,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(METADATA_FIELD.getPreferredName(), metadata); } builder.field(HIDDEN_FIELD.getPreferredName(), hidden); + builder.field(REPLICATED_FIELD.getPreferredName(), replicated); builder.endObject(); return builder; } @@ -236,12 +265,14 @@ public boolean equals(Object o) { timeStampField.equals(that.timeStampField) && indices.equals(that.indices) && generation == that.generation && - Objects.equals(metadata, that.metadata); + Objects.equals(metadata, that.metadata) && + hidden == that.hidden && + replicated == that.replicated; } @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation, metadata); + return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated); } public static final class TimestampField implements Writeable, ToXContentObject { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 514f903f25d48..589f7e91101c3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -161,7 +161,7 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn DataStream newDataStream = new DataStream(request.name, timestampField, Collections.singletonList(firstBackingIndex.getIndex()), 1L, - template.metadata() != null ? Collections.unmodifiableMap(new HashMap<>(template.metadata())) : null, hidden); + template.metadata() != null ? Collections.unmodifiableMap(new HashMap<>(template.metadata())) : null, hidden, false); Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream); logger.info("adding data stream [{}]", request.name); return ClusterState.builder(currentState).metadata(builder).build(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 6651139a23d3b..b0a55b165aca2 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -639,7 +639,7 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad .map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex()) .collect(Collectors.toList()); return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(), - dataStream.getMetadata(), dataStream.isHidden()); + dataStream.getMetadata(), dataStream.isHidden(), dataStream.isReplicated()); } public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set deletedIndices) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index e6da2484cf870..8d33d22eba371 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -537,7 +537,9 @@ public void testRolloverClusterState() throws Exception { } public void testRolloverClusterStateForDataStream() throws Exception { - final DataStream dataStream = DataStreamTestHelper.randomInstance(); + final DataStream dataStream = DataStreamTestHelper.randomInstance() + // ensure no replicate data stream + .promoteDataStream(); ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList(dataStream.getName() + "*"), null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate(), null); Metadata.Builder builder = Metadata.builder(); @@ -633,7 +635,9 @@ public void testValidation() throws Exception { final boolean useDataStream = randomBoolean(); final Metadata.Builder builder = Metadata.builder(); if (useDataStream) { - DataStream dataStream = DataStreamTestHelper.randomInstance(); + DataStream dataStream = DataStreamTestHelper.randomInstance() + // ensure no replicate data stream + .promoteDataStream(); rolloverTarget = dataStream.getName(); sourceIndexName = dataStream.getIndices().get(dataStream.getIndices().size() - 1).getName(); defaultRolloverIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 826d5940245cd..73309b271dcfd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -52,7 +52,7 @@ protected DataStream createTestInstance() { } public void testRollover() { - DataStream ds = DataStreamTestHelper.randomInstance(); + DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream(); Index newWriteIndex = new Index(getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1), UUIDs.randomBase64UUID(random())); DataStream rolledDs = ds.rollover(newWriteIndex); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index afa40bb3b420c..d11d7b83e120d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -2134,7 +2134,7 @@ public void testHiddenDataStreams() { .put(index2, false) .put(justAnIndex, false) .put(new DataStream(dataStream1, createTimestampField("@timestamp"), - Arrays.asList(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true))).build(); + Arrays.asList(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true, false))).build(); Index[] result = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.strictExpandHidden(), true, "logs-*"); assertThat(result, arrayContainingInAnyOrder(index1.getIndex(), index2.getIndex(), justAnIndex.getIndex() )); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java index 2951199842401..ece0be8ec74ee 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java @@ -111,7 +111,8 @@ public static DataStream randomInstance() { metadata = new HashMap<>(); metadata.put("key", "value"); } - return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata, randomBoolean()); + return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata, + randomBoolean(), randomBoolean()); } /** diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index b84a13333058e..dc2f886979a29 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -24,11 +25,14 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; public class AutoFollowIT extends ESCCRRestTestCase { @@ -185,21 +189,7 @@ public void testDataStreams() throws Exception { int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); // Create auto follow pattern - Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); - try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { - bodyBuilder.startObject(); - { - bodyBuilder.startArray("leader_index_patterns"); - { - bodyBuilder.value("logs-*"); - } - bodyBuilder.endArray(); - bodyBuilder.field("remote_cluster", "leader_cluster"); - } - bodyBuilder.endObject(); - request.setJsonEntity(Strings.toString(bodyBuilder)); - } - assertOK(client().performRequest(request)); + createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster"); // Create data stream and ensure that is is auto followed { @@ -210,12 +200,12 @@ public void testDataStreams() throws Exception { indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); } - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); verifyDocuments(leaderClient, dataStreamName, numDocs); } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); - verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, numDocs); }); @@ -226,7 +216,7 @@ public void testDataStreams() throws Exception { try (RestClient leaderClient = buildLeaderClient()) { Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); @@ -236,7 +226,7 @@ public void testDataStreams() throws Exception { } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); - verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, numDocs + 1); }); @@ -247,8 +237,8 @@ public void testDataStreams() throws Exception { try (RestClient leaderClient = buildLeaderClient()) { Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", "" + - ".ds-logs-mysql-error-000003"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), + backingIndexName(dataStreamName, 3)); Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); @@ -258,8 +248,8 @@ public void testDataStreams() throws Exception { } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3)); - verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", - ".ds-logs-mysql-error-000003"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), + backingIndexName(dataStreamName, 3)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, numDocs + 2); }); @@ -288,34 +278,18 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); } - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); verifyDocuments(leaderClient, dataStreamName, initialNumDocs); } } // Create auto follow pattern - { - Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); - try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { - bodyBuilder.startObject(); - { - bodyBuilder.startArray("leader_index_patterns"); - { - bodyBuilder.value("logs-*"); - } - bodyBuilder.endArray(); - bodyBuilder.field("remote_cluster", "leader_cluster"); - } - bodyBuilder.endObject(); - request.setJsonEntity(Strings.toString(bodyBuilder)); - } - assertOK(client().performRequest(request)); - } + createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster"); // Rollover and ensure only second backing index is replicated: { try (RestClient leaderClient = buildLeaderClient()) { Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); @@ -325,17 +299,17 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); - verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000002"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 2)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, 1); }); } // Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly: { - followIndex(".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000001"); + followIndex(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 1)); assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); - verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, initialNumDocs + 1); }); @@ -347,13 +321,380 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception } } + public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int numDocs = 64; + final String dataStreamName = "logs-tomcat-prod"; + + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + + // Create auto follow pattern + createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster"); + + // Create data stream and ensure that is is auto followed + { + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); + verifyDocuments(leaderClient, dataStreamName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1)); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs); + }); + } + + // Rollover in leader cluster and ensure second backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs + 1); + }); + } + + // Try rollover in follow cluster + { + Request rolloverRequest1 = new Request("POST", "/" + dataStreamName + "/_rollover"); + Exception e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1)); + assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " + + "because it is a replicated data stream")); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); + + // Unfollow .ds-logs-tomcat-prod-000001 + pauseFollow(backingIndexName(dataStreamName, 1)); + closeIndex(backingIndexName(dataStreamName, 1)); + unfollow(backingIndexName(dataStreamName, 1)); + + // Try again + Request rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover"); + e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest2)); + assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " + + "because it is a replicated data stream")); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); + + // Promote local data stream + Request promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName); + assertOK(client().performRequest(promoteRequest)); + + // Try again and now the rollover should be successful because local data stream is now : + Request rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(client().performRequest(rolloverRequest3)); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), + backingIndexName(dataStreamName, 3)); + + // TODO: verify that following a backing index for logs-tomcat-prod data stream in remote cluster fails, + // because local data stream isn't a replicated data stream anymore. + + // Unfollow .ds-logs-tomcat-prod-000002, + // which is now possible because this index can now be closed as it is no longer the write index. + pauseFollow(backingIndexName(dataStreamName, 2)); + closeIndex(backingIndexName(dataStreamName, 2)); + unfollow(backingIndexName(dataStreamName, 2)); + } + // Cleanup: + { + deleteAutoFollowPattern("test_pattern"); + deleteDataStream(dataStreamName); + } + } + + public void testRolloverAliasInFollowClusterForbidden() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int numDocs = 64; + final String aliasName = "log-tomcat-prod"; + + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + + // Create auto follow pattern + createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster"); + + // Create leader index and write alias: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request createFirstIndexRequest = new Request("PUT", "/" + aliasName + "-000001"); + createFirstIndexRequest.setJsonEntity("{\"aliases\": {\"" + aliasName + "\":{\"is_write_index\":true}}}"); + leaderClient.performRequest(createFirstIndexRequest); + + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + aliasName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyAlias(leaderClient, aliasName, true, aliasName + "-000001"); + verifyDocuments(leaderClient, aliasName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyAlias(client(), aliasName, false, aliasName + "-000001"); + ensureYellow(aliasName); + verifyDocuments(client(), aliasName, numDocs); + }); + } + + // Rollover in leader cluster and ensure second backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + aliasName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyAlias(leaderClient, aliasName, true, aliasName + "-000002", aliasName + "-000001"); + + Request indexRequest = new Request("POST", "/" + aliasName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, aliasName, numDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001"); + ensureYellow(aliasName); + verifyDocuments(client(), aliasName, numDocs + 1); + }); + } + + // Try rollover in follow cluster, this should fail, because is_write_index property of an alias isn't + // replicated to follow cluster. + { + Request rolloverRequest1 = new Request("POST", "/" + aliasName + "/_rollover"); + Exception e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1)); + assertThat(e.getMessage(), containsString("rollover target [" + aliasName + "] does not point to a write index")); + verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001"); + } + // Cleanup: + { + deleteAutoFollowPattern("test_pattern"); + } + } + + private static void verifyAlias(RestClient client, + String aliasName, + boolean checkWriteIndex, + String... otherIndices) throws IOException { + Request getAliasRequest = new Request("GET", "/_alias/" + aliasName); + Map responseBody = toMap(client.performRequest(getAliasRequest)); + if (checkWriteIndex) { + assertThat(ObjectPath.eval(otherIndices[0] + ".aliases." + aliasName + ".is_write_index", responseBody), is(true)); + } + for (String otherIndex : otherIndices) { + assertThat(ObjectPath.eval(otherIndex + ".aliases." + aliasName, responseBody), notNullValue()); + } + } + + public void testDataStreamsBiDirectionalReplication() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + int initialNumberOfSuccessfulFollowedIndicesInFollowCluster = getNumberOfSuccessfulFollowedIndices(); + int initialNumberOfSuccessfulFollowedIndicesInLeaderCluster; + + // Create auto follow pattern in follow cluster + createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster"); + // Create auto follow pattern in leader cluster: + try (RestClient leaderClient = buildLeaderClient()) { + initialNumberOfSuccessfulFollowedIndicesInLeaderCluster = getNumberOfSuccessfulFollowedIndices(leaderClient); + // First add remote cluster to leader cluster: + Request request = new Request("PUT", "/_cluster/settings"); + try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + bodyBuilder.startObject(); + { + bodyBuilder.startObject("persistent"); + { + bodyBuilder.startObject("cluster"); + { + bodyBuilder.startObject("remote"); + { + bodyBuilder.startObject("follower_cluster"); + { + bodyBuilder.startArray("seeds"); + Request nodesInfoRequest = new Request("GET", "/_nodes/_local"); + Map nodesInfoResponse = toMap(client().performRequest(nodesInfoRequest)); + Map node = (Map) ((Map) nodesInfoResponse.get("nodes")).values().iterator().next(); + Map transportMetrics = (Map) node.get("transport"); + String address = (String) transportMetrics.get("publish_address"); + bodyBuilder.value(address); + bodyBuilder.endArray(); + } + bodyBuilder.endObject(); + } + bodyBuilder.endObject(); + } + bodyBuilder.endObject(); + } + bodyBuilder.endObject(); + } + bodyBuilder.endObject(); + request.setJsonEntity(Strings.toString(bodyBuilder)); + } + assertOK(leaderClient.performRequest(request)); + // Then create the actual auto follow pattern: + createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster"); + } + + int numDocs = 128; + String leaderDataStreamName = "logs-http-eu"; + // Create data stream in leader cluster and ensure it is followed in follow cluster + { + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, leaderDataStreamName, backingIndexName(leaderDataStreamName, 1)); + verifyDocuments(leaderClient, leaderDataStreamName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndicesInFollowCluster + 1)); + verifyDataStream(client(), leaderDataStreamName, backingIndexName(leaderDataStreamName, 1)); + ensureYellow(leaderDataStreamName); + verifyDocuments(client(), leaderDataStreamName, numDocs); + }); + } + String followerDataStreamName = "logs-http-na"; + { + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(client().performRequest(indexRequest)); + } + verifyDocuments(client(), followerDataStreamName, numDocs); + try (RestClient leaderClient = buildLeaderClient()) { + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(leaderClient), + equalTo(initialNumberOfSuccessfulFollowedIndicesInLeaderCluster + 1)); + verifyDataStream(leaderClient, followerDataStreamName, backingIndexName(followerDataStreamName, 1)); + ensureYellow(followerDataStreamName); + verifyDocuments(leaderClient, followerDataStreamName, numDocs); + }); + } + } + + // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and + // writes via 'logs-http' alias (ensuring write goes to write data stream). + // Currently aliases can't refer to data streams, so we can't fully test the bi-direction replication scenario. + // See: https://github.com/elastic/elasticsearch/pull/64710#discussion_r537210322 + + // See all eu and na logs in leader and follower cluster: + verifyDocuments(client(), "logs-http*", numDocs * 2); + try (RestClient leaderClient = buildLeaderClient()) { + verifyDocuments(leaderClient, "logs-http*", numDocs * 2); + } + + int moreDocs = 48; + // Index more docs into leader cluster + { + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < moreDocs; i++) { + Request indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDocuments(leaderClient, leaderDataStreamName, numDocs + moreDocs); + } + assertBusy(() -> { + verifyDocuments(client(), leaderDataStreamName, numDocs + moreDocs); + }); + } + // Index more docs into follower cluster + { + for (int i = 0; i < moreDocs; i++) { + Request indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(client().performRequest(indexRequest)); + } + verifyDocuments(client(), followerDataStreamName, numDocs + moreDocs); + try (RestClient leaderClient = buildLeaderClient()) { + assertBusy(() -> { + verifyDocuments(leaderClient, followerDataStreamName, numDocs + moreDocs); + }); + } + } + + // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and writes via 'logs-http' + // (see previous TODO) + + // See all eu and na logs in leader and follower cluster: + verifyDocuments(client(), "logs-http*", (numDocs + moreDocs) * 2); + try (RestClient leaderClient = buildLeaderClient()) { + verifyDocuments(leaderClient, "logs-http*", (numDocs + moreDocs) * 2); + } + + // Cleanup: + { + deleteAutoFollowPattern(client(), "id1"); + deleteDataStream(client(), followerDataStreamName); + try (RestClient leaderClient = buildLeaderClient()) { + deleteDataStream(leaderClient, leaderDataStreamName); + deleteAutoFollowPattern(leaderClient, "id2"); + } + } + } + private int getNumberOfSuccessfulFollowedIndices() throws IOException { + return getNumberOfSuccessfulFollowedIndices(client()); + } + + private int getNumberOfSuccessfulFollowedIndices(RestClient client) throws IOException { Request statsRequest = new Request("GET", "/_ccr/stats"); - Map response = toMap(client().performRequest(statsRequest)); + Map response = toMap(client.performRequest(statsRequest)); response = (Map) response.get("auto_follow_stats"); return (Integer) response.get("number_of_successful_follow_indices"); } + private void createAutoFollowPattern(RestClient client, String name, String pattern, String remoteCluster) throws IOException { + Request request = new Request("PUT", "/_ccr/auto_follow/" + name); + try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + bodyBuilder.startObject(); + { + bodyBuilder.startArray("leader_index_patterns"); + { + bodyBuilder.value(pattern); + } + bodyBuilder.endArray(); + bodyBuilder.field("remote_cluster", remoteCluster); + } + bodyBuilder.endObject(); + request.setJsonEntity(Strings.toString(bodyBuilder)); + } + assertOK(client.performRequest(request)); + } + + private static String backingIndexName(String dataStreamName, int generation) { + return String.format(Locale.ROOT, ".ds-%s-%06d", dataStreamName, generation); + } + private static void verifyDocuments(final RestClient client, final String index, final int expectedNumDocs) throws IOException { @@ -388,4 +729,9 @@ private void deleteDataStream(String name) throws IOException { } } + private void deleteDataStream(RestClient client, String name) throws IOException { + Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name); + assertOK(client.performRequest(deleteTemplateRequest)); + } + } diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index 10841dbd7edf2..0fb22e6235bd8 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -122,8 +122,12 @@ protected static void putAutoFollowPattern(String patternName, String remoteClus } protected static void deleteAutoFollowPattern(String patternName) throws IOException { + deleteAutoFollowPattern(client(), patternName); + } + + protected static void deleteAutoFollowPattern(RestClient client, String patternName) throws IOException { Request putPatternRequest = new Request("DELETE", "/_ccr/auto_follow/" + patternName); - assertOK(client().performRequest(putPatternRequest)); + assertOK(client.performRequest(putPatternRequest)); } protected static void unfollow(String followIndex) throws IOException { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index d3e32ff7dd67c..c635a80343d27 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -268,8 +268,14 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, // The data stream and the backing indices have been created and validated in the remote cluster, // just copying the data stream is in this case safe. return new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(), - Collections.singletonList(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata()); + Collections.singletonList(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata(), + remoteDataStream.isHidden(), true); } else { + if (localDataStream.isReplicated() == false) { + throw new IllegalArgumentException("cannot follow backing index [" + backingIndexToFollow.getName() + + "], because local data stream [" + localDataStream.getName() + "] is no longer marked as replicated"); + } + List backingIndices = new ArrayList<>(localDataStream.getIndices()); backingIndices.add(backingIndexToFollow); @@ -280,7 +286,8 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, backingIndices.sort(Comparator.comparing(Index::getName)); return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices, - remoteDataStream.getGeneration(), remoteDataStream.getMetadata()); + remoteDataStream.getGeneration(), remoteDataStream.getMetadata(), localDataStream.isHidden(), + localDataStream.isReplicated()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java index 42819b54725c7..b81e6da1385e6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java @@ -22,7 +22,7 @@ public class TransportPutFollowActionTests extends ESTestCase { public void testCreateNewLocalDataStream() { - DataStream remoteDataStream = generateDataSteam("logs-foobar", 3); + DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false); Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, null, remoteDataStream); assertThat(result.getName(), equalTo(remoteDataStream.getName())); @@ -33,8 +33,8 @@ public void testCreateNewLocalDataStream() { } public void testUpdateLocalDataStream_followNewBackingIndex() { - DataStream remoteDataStream = generateDataSteam("logs-foobar", 3); - DataStream localDataStream = generateDataSteam("logs-foobar", 2); + DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false); + DataStream localDataStream = generateDataSteam("logs-foobar", 2, true); Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); assertThat(result.getName(), equalTo(remoteDataStream.getName())); @@ -48,8 +48,8 @@ public void testUpdateLocalDataStream_followNewBackingIndex() { public void testUpdateLocalDataStream_followOlderBackingIndex() { // follow first backing index: - DataStream remoteDataStream = generateDataSteam("logs-foobar", 5); - DataStream localDataStream = generateDataSteam("logs-foobar", 5, DataStream.getDefaultBackingIndexName("logs-foobar", 5)); + DataStream remoteDataStream = generateDataSteam("logs-foobar", 5, false); + DataStream localDataStream = generateDataSteam("logs-foobar", 5, true, DataStream.getDefaultBackingIndexName("logs-foobar", 5)); Index backingIndexToFollow = remoteDataStream.getIndices().get(0); DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); assertThat(result.getName(), equalTo(remoteDataStream.getName())); @@ -72,19 +72,21 @@ public void testUpdateLocalDataStream_followOlderBackingIndex() { assertThat(result.getIndices().get(2).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 5))); } - static DataStream generateDataSteam(String name, int numBackingIndices) { + static DataStream generateDataSteam(String name, int numBackingIndices, boolean replicate) { List backingIndices = IntStream.range(1, numBackingIndices + 1) .mapToObj(value -> DataStream.getDefaultBackingIndexName(name, value)) .map(value -> new Index(value, "uuid")) .collect(Collectors.toList()); - return new DataStream(name, new TimestampField("@timestamp"), backingIndices); + return new DataStream(name, new TimestampField("@timestamp"), backingIndices, backingIndices.size(), + Collections.emptyMap(), false, replicate); } - static DataStream generateDataSteam(String name, int generation, String... backingIndexNames) { + static DataStream generateDataSteam(String name, int generation, boolean replicate, String... backingIndexNames) { List backingIndices = Arrays.stream(backingIndexNames) .map(value -> new Index(value, "uuid")) .collect(Collectors.toList()); - return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Collections.emptyMap()); + return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, + Collections.emptyMap(), false, replicate); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/PromoteDataStreamAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/PromoteDataStreamAction.java new file mode 100644 index 0000000000000..43fe4d17a23ba --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/PromoteDataStreamAction.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class PromoteDataStreamAction extends ActionType { + + public static final PromoteDataStreamAction INSTANCE = new PromoteDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/promote"; + + private PromoteDataStreamAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + public static class Request extends MasterNodeRequest { + + private String name; + + public Request(String name) { + this.name = Objects.requireNonNull(name); + } + + public String getName() { + return name; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (name == null) { + validationException = addValidationError("no data stream specified", validationException); + } + return validationException; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PromoteDataStreamAction.Request request = (PromoteDataStreamAction.Request) o; + return Objects.equals(name, request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + } + +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java index 60e85de6d5718..d39ee82f0f12f 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java @@ -24,7 +24,9 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; import org.elasticsearch.xpack.datastreams.action.DataStreamsStatsTransportAction; +import org.elasticsearch.xpack.datastreams.action.PromoteDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.rest.RestCreateDataStreamAction; import org.elasticsearch.xpack.datastreams.rest.RestDataStreamsStatsAction; import org.elasticsearch.xpack.datastreams.rest.RestDeleteDataStreamAction; @@ -34,6 +36,7 @@ import org.elasticsearch.xpack.datastreams.action.DeleteDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.action.GetDataStreamsTransportAction; import org.elasticsearch.xpack.datastreams.mapper.DataStreamTimestampFieldMapper; +import org.elasticsearch.xpack.datastreams.rest.RestPromoteDataStreamAction; import java.util.ArrayList; import java.util.Arrays; @@ -62,7 +65,8 @@ public Map getMetadataMappers() { new ActionHandler<>(CreateDataStreamAction.INSTANCE, CreateDataStreamTransportAction.class), new ActionHandler<>(DeleteDataStreamAction.INSTANCE, DeleteDataStreamTransportAction.class), new ActionHandler<>(GetDataStreamAction.INSTANCE, GetDataStreamsTransportAction.class), - new ActionHandler<>(DataStreamsStatsAction.INSTANCE, DataStreamsStatsTransportAction.class) + new ActionHandler<>(DataStreamsStatsAction.INSTANCE, DataStreamsStatsTransportAction.class), + new ActionHandler<>(PromoteDataStreamAction.INSTANCE, PromoteDataStreamTransportAction.class) ); } @@ -80,7 +84,8 @@ public List getRestHandlers( RestHandler deleteDsAction = new RestDeleteDataStreamAction(); RestHandler getDsAction = new RestGetDataStreamsAction(); RestHandler dsStatsAction = new RestDataStreamsStatsAction(); - return Arrays.asList(createDsAction, deleteDsAction, getDsAction, dsStatsAction); + RestHandler promoteAction = new RestPromoteDataStreamAction(); + return Arrays.asList(createDsAction, deleteDsAction, getDsAction, dsStatsAction, promoteAction); } public Collection createGuiceModules() { diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java new file mode 100644 index 0000000000000..8041fee8a7096 --- /dev/null +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.datastreams.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; + +public class PromoteDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction { + + @Inject + public PromoteDataStreamTransportAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + PromoteDataStreamAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + PromoteDataStreamAction.Request::new, + indexNameExpressionResolver, + ThreadPool.Names.SAME + ); + } + + @Override + protected void masterOperation( + Task task, + PromoteDataStreamAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + clusterService.submitStateUpdateTask( + "promote-data-stream [" + request.getName() + "]", + new ClusterStateUpdateTask(Priority.HIGH, request.masterNodeTimeout()) { + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public ClusterState execute(ClusterState currentState) { + return promoteDataStream(currentState, request); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(AcknowledgedResponse.TRUE); + } + } + ); + } + + static ClusterState promoteDataStream(ClusterState currentState, PromoteDataStreamAction.Request request) { + DataStream dataStream = currentState.getMetadata().dataStreams().get(request.getName()); + if (dataStream == null) { + throw new ResourceNotFoundException("data stream [" + request.getName() + "] does not exist"); + } + + DataStream promotedDataStream = dataStream.promoteDataStream(); + Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + metadata.put(promotedDataStream); + return ClusterState.builder(currentState).metadata(metadata).build(); + } + + @Override + protected ClusterBlockException checkBlock(PromoteDataStreamAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java new file mode 100644 index 0000000000000..35c3f5a9d47dc --- /dev/null +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datastreams.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; + +import java.io.IOException; +import java.util.List; + +public class RestPromoteDataStreamAction extends BaseRestHandler { + @Override + public String getName() { + return "promote_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(RestRequest.Method.POST, "/_data_stream/_promote/{name}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + PromoteDataStreamAction.Request request = new PromoteDataStreamAction.Request(restRequest.param("name")); + return channel -> client.execute(PromoteDataStreamAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 36c26ff75c827..04b8240fe268f 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -275,6 +275,7 @@ public class Constants { "indices:admin/data_stream/create", "indices:admin/data_stream/delete", "indices:admin/data_stream/get", + "indices:admin/data_stream/promote", "indices:admin/delete", "indices:admin/exists", "indices:admin/flush", diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json new file mode 100644 index 0000000000000..4f423a78bbc5d --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json @@ -0,0 +1,27 @@ +{ + "indices.promote_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Promotes a data stream from a replicated data stream managed by CCR to a regular data stream" + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_data_stream/_promote/{name}", + "methods":[ + "POST" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream" + } + } + } + ] + }, + "params":{ + } + } +} From c62e583ccd05a530e769b080ffc4a56e35712f02 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 8 Dec 2020 09:14:24 +0100 Subject: [PATCH 2/2] fixed compile errors --- .../datastreams/action/PromoteDataStreamTransportAction.java | 2 -- .../xpack/datastreams/rest/RestPromoteDataStreamAction.java | 3 ++- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java index 8041fee8a7096..490a06b7d53b9 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java @@ -21,7 +21,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; @@ -50,7 +49,6 @@ public PromoteDataStreamTransportAction( @Override protected void masterOperation( - Task task, PromoteDataStreamAction.Request request, ClusterState state, ActionListener listener diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java index 35c3f5a9d47dc..8befd53b8feab 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; import java.io.IOException; +import java.util.Collections; import java.util.List; public class RestPromoteDataStreamAction extends BaseRestHandler { @@ -22,7 +23,7 @@ public String getName() { @Override public List routes() { - return List.of(new Route(RestRequest.Method.POST, "/_data_stream/_promote/{name}")); + return Collections.singletonList(new Route(RestRequest.Method.POST, "/_data_stream/_promote/{name}")); } @Override