From f0d907627fcdb28f4e2654ae3bde2559c68402e3 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 5 Nov 2020 16:26:04 +0100 Subject: [PATCH 01/17] Added test that fails now, but tests that rolling over a data stream that follows a remote data stream fails. --- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index b84a13333058e..cfc9baf022d07 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; @@ -347,6 +348,112 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception } } + public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int numDocs = 64; + final String dataStreamName = "logs-tomcat-prod"; + + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + + // Create auto follow pattern + Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); + try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + bodyBuilder.startObject(); + { + bodyBuilder.startArray("leader_index_patterns"); + { + bodyBuilder.value("logs-*"); + } + bodyBuilder.endArray(); + bodyBuilder.field("remote_cluster", "leader_cluster"); + } + bodyBuilder.endObject(); + request.setJsonEntity(Strings.toString(bodyBuilder)); + } + assertOK(client().performRequest(request)); + + // Create data stream and ensure that is is auto followed + { + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-tomcat-prod-000001"); + verifyDocuments(leaderClient, dataStreamName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs); + }); + } + + // Rollover in leader cluster and ensure second backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002"); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs + 1); + }); + } + + // Try rollover in follow cluster + { + Request rolloverRequest1 = new Request("POST", "/" + dataStreamName + "/_rollover"); + Exception e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1)); + assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " + + "because it is a replicated data stream")); + verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002"); + + // Unfollow .ds-logs-tomcat-prod-000001 + pauseFollow(".ds-logs-tomcat-prod-000001"); + closeIndex(".ds-logs-tomcat-prod-000001"); + unfollow(".ds-logs-tomcat-prod-000001"); + + // Try again + Request rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover"); + e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest2)); + assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " + + "because it is a replicated data stream")); + verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002"); + + // Unfollow .ds-logs-tomcat-prod-000002 + pauseFollow(".ds-logs-tomcat-prod-000002"); + closeIndex(".ds-logs-tomcat-prod-000002"); + unfollow(".ds-logs-tomcat-prod-000002"); + + // Try again and now the rollover should be successful since all backing indices are no longer follower indices: + Request rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(client().performRequest(rolloverRequest3)); + verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002", + ".ds-logs-tomcat-prod-000003"); + } + // Cleanup: + { + deleteAutoFollowPattern("test_pattern"); + deleteDataStream(dataStreamName); + } + } + private int getNumberOfSuccessfulFollowedIndices() throws IOException { Request statsRequest = new Request("GET", "/_ccr/stats"); Map response = toMap(client().performRequest(statsRequest)); From 10d7ffcb67bde9df9284f81962919f9b82e35372 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 6 Nov 2020 14:11:43 +0100 Subject: [PATCH 02/17] Added replicate flag to data stream and promote data stream api. --- .../rollover/MetadataRolloverService.java | 2 + .../cluster/metadata/DataStream.java | 48 ++++++++-- .../MetadataCreateDataStreamService.java | 2 +- .../snapshots/RestoreService.java | 2 +- .../cluster/DataStreamTestHelper.java | 3 +- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 18 +++- .../ccr/action/TransportPutFollowAction.java | 11 ++- .../ccr/action/TransportUnfollowAction.java | 2 +- .../core/action/PromoteDataStreamAction.java | 76 +++++++++++++++ .../xpack/datastreams/DataStreamsPlugin.java | 10 +- .../PromoteDataStreamTransportAction.java | 96 +++++++++++++++++++ .../rest/RestPromoteStreamsStatsAction.java | 35 +++++++ 12 files changed, 283 insertions(+), 22 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/PromoteDataStreamAction.java create mode 100644 x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java create mode 100644 x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index a8a2d431cbaf7..52772bb0008f8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; import java.util.List; @@ -148,6 +149,7 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra final DataStream ds = dataStream.getDataStream(); final IndexMetadata originalWriteIndex = dataStream.getWriteIndex(); final String newWriteIndexName = DataStream.getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1); + ds.rollover(new Index(newWriteIndexName, "uuid")); // just for validation createIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists if (onlyValidate) { return new RolloverResult(newWriteIndexName, originalWriteIndex.getIndex().getName(), currentState); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index e359d658c4d7c..b9ea8db737dd4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -44,6 +44,7 @@ public final class DataStream extends AbstractDiffable implements To public static final String BACKING_INDEX_PREFIX = ".ds-"; public static final Version HIDDEN_VERSION = Version.V_7_11_0; + public static final Version REPLICATED_VERSION = Version.V_8_0_0; private final String name; private final TimestampField timeStampField; @@ -51,19 +52,21 @@ public final class DataStream extends AbstractDiffable implements To private final long generation; private final Map metadata; private final boolean hidden; + private final boolean replicated; public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata) { - this(name, timeStampField, indices, generation, metadata, false); + this(name, timeStampField, indices, generation, metadata, false, false); } public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata, - boolean hidden) { + boolean hidden, boolean replicated) { this.name = name; this.timeStampField = timeStampField; this.indices = Collections.unmodifiableList(indices); this.generation = generation; this.metadata = metadata; this.hidden = hidden; + this.replicated = replicated; assert indices.size() > 0; } @@ -100,6 +103,16 @@ public boolean isHidden() { return hidden; } + /** + * Determines whether this data stream is replicated from elsewhere, + * for example a remote cluster. + * + * @return Whether this data stream is replicated. + */ + public boolean isReplicated() { + return replicated; + } + /** * Performs a rollover on a {@code DataStream} instance and returns a new instance containing * the updated list of backing indices and incremented generation. @@ -110,9 +123,14 @@ public boolean isHidden() { */ public DataStream rollover(Index newWriteIndex) { assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1)); + if (replicated) { + throw new IllegalArgumentException("data stream [" + name + "] cannot be rolled over, " + + "because it is a replicated data stream"); + } + List backingIndices = new ArrayList<>(indices); backingIndices.add(newWriteIndex); - return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden); + return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated); } /** @@ -126,7 +144,7 @@ public DataStream removeBackingIndex(Index index) { List backingIndices = new ArrayList<>(indices); backingIndices.remove(index); assert backingIndices.size() == indices.size() - 1; - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); } /** @@ -151,7 +169,11 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki "it is the write index", existingBackingIndex.getName(), name)); } backingIndices.set(backingIndexPosition, newBackingIndex); - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); + } + + public DataStream promoteDataStream() { + return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false); } /** @@ -169,7 +191,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene public DataStream(StreamInput in) throws IOException { this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(), in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readMap(): null, - in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean()); + in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean(), + in.getVersion().onOrAfter(REPLICATED_VERSION) && in.readBoolean()); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -188,6 +211,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(HIDDEN_VERSION)) { out.writeBoolean(hidden); } + if (out.getVersion().onOrAfter(REPLICATED_VERSION)) { + out.writeBoolean(replicated); + } } public static final ParseField NAME_FIELD = new ParseField("name"); @@ -196,11 +222,12 @@ public void writeTo(StreamOutput out) throws IOException { public static final ParseField GENERATION_FIELD = new ParseField("generation"); public static final ParseField METADATA_FIELD = new ParseField("_meta"); public static final ParseField HIDDEN_FIELD = new ParseField("hidden"); + public static final ParseField REPLICATED_FIELD = new ParseField("replicated"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", args -> new DataStream((String) args[0], (TimestampField) args[1], (List) args[2], (Long) args[3], - (Map) args[4], args[5] != null && (boolean) args[5])); + (Map) args[4], args[5] != null && (boolean) args[5], (boolean) args[6])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); @@ -209,6 +236,7 @@ public void writeTo(StreamOutput out) throws IOException { PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), REPLICATED_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -226,6 +254,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(METADATA_FIELD.getPreferredName(), metadata); } builder.field(HIDDEN_FIELD.getPreferredName(), hidden); + builder.field(REPLICATED_FIELD.getPreferredName(), replicated); builder.endObject(); return builder; } @@ -239,12 +268,13 @@ public boolean equals(Object o) { timeStampField.equals(that.timeStampField) && indices.equals(that.indices) && generation == that.generation && - Objects.equals(metadata, that.metadata); + Objects.equals(metadata, that.metadata) && + replicated == that.replicated; } @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation, metadata); + return Objects.hash(name, timeStampField, indices, generation, metadata, replicated); } public static final class TimestampField implements Writeable, ToXContentObject { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index f90e7759f0b4a..38f3de3f31098 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -185,7 +185,7 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn dsBackingIndices.add(writeIndex.getIndex()); boolean hidden = template.getDataStreamTemplate().isHidden(); DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L, - template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden); + template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden, false); Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream); logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName, writeIndex.getIndex().getName(), diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 74dfca8fedeed..2f8c6cb2c69c1 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -620,7 +620,7 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad .map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex()) .collect(Collectors.toList()); return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(), - dataStream.getMetadata(), dataStream.isHidden()); + dataStream.getMetadata(), dataStream.isHidden(), dataStream.isReplicated()); } public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set deletedIndices) { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java index 8ba80f0466984..e9626a1daf5fc 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java @@ -109,7 +109,8 @@ public static DataStream randomInstance() { if (randomBoolean()) { metadata = Map.of("key", "value"); } - return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata, randomBoolean()); + return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata, + randomBoolean(), randomBoolean()); } /** diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index cfc9baf022d07..127ed2c7420ee 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -436,16 +436,24 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { "because it is a replicated data stream")); verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002"); - // Unfollow .ds-logs-tomcat-prod-000002 - pauseFollow(".ds-logs-tomcat-prod-000002"); - closeIndex(".ds-logs-tomcat-prod-000002"); - unfollow(".ds-logs-tomcat-prod-000002"); + // Promote local data stream + Request promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName); + assertOK(client().performRequest(promoteRequest)); - // Try again and now the rollover should be successful since all backing indices are no longer follower indices: + // Try again and now the rollover should be successful because local data stream is now : Request rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(client().performRequest(rolloverRequest3)); verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002", ".ds-logs-tomcat-prod-000003"); + + // TODO: verify that following a backing index for logs-tomcat-prod data stream in remote cluster fails, + // because local data stream isn't a replicated data stream anymore. + + // Unfollow .ds-logs-tomcat-prod-000002, + // which is now possible because this index can now be closed as it is no longer the write index. + pauseFollow(".ds-logs-tomcat-prod-000002"); + closeIndex(".ds-logs-tomcat-prod-000002"); + unfollow(".ds-logs-tomcat-prod-000002"); } // Cleanup: { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 91db932343101..e978971822fda 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -268,8 +268,14 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, // The data stream and the backing indices have been created and validated in the remote cluster, // just copying the data stream is in this case safe. return new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(), - List.of(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata()); + List.of(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata(), + remoteDataStream.isHidden(), true); } else { + if (localDataStream.isReplicated() == false) { + throw new IllegalArgumentException("cannot follow backing index [" + backingIndexToFollow.getName() + + "], because local data stream [" + localDataStream.getName() + "] is no longer marked as replicated"); + } + List backingIndices = new ArrayList<>(localDataStream.getIndices()); backingIndices.add(backingIndexToFollow); @@ -280,7 +286,8 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, backingIndices.sort(Comparator.comparing(Index::getName)); return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices, - remoteDataStream.getGeneration(), remoteDataStream.getMetadata()); + remoteDataStream.getGeneration(), remoteDataStream.getMetadata(), localDataStream.isHidden(), + localDataStream.isReplicated()); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index 11cb42a4ca7d7..fb7dd99b7f118 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -108,7 +108,7 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta final int numberOfShards = IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetadata.getSettings()); final GroupedActionListener groupListener = new GroupedActionListener<>( - new ActionListener>() { + new ActionListener<>() { @Override public void onResponse(final Collection responses) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/PromoteDataStreamAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/PromoteDataStreamAction.java new file mode 100644 index 0000000000000..43fe4d17a23ba --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/PromoteDataStreamAction.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class PromoteDataStreamAction extends ActionType { + + public static final PromoteDataStreamAction INSTANCE = new PromoteDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/promote"; + + private PromoteDataStreamAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + public static class Request extends MasterNodeRequest { + + private String name; + + public Request(String name) { + this.name = Objects.requireNonNull(name); + } + + public String getName() { + return name; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (name == null) { + validationException = addValidationError("no data stream specified", validationException); + } + return validationException; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PromoteDataStreamAction.Request request = (PromoteDataStreamAction.Request) o; + return Objects.equals(name, request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + } + +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java index 1c95deec13f60..960d45baa6d08 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java @@ -24,8 +24,10 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; import org.elasticsearch.xpack.datastreams.action.DataStreamsStatsTransportAction; import org.elasticsearch.xpack.datastreams.action.MigrateToDataStreamTransportAction; +import org.elasticsearch.xpack.datastreams.action.PromoteDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.rest.RestCreateDataStreamAction; import org.elasticsearch.xpack.datastreams.rest.RestDataStreamsStatsAction; import org.elasticsearch.xpack.datastreams.rest.RestDeleteDataStreamAction; @@ -39,6 +41,7 @@ import org.elasticsearch.xpack.datastreams.action.GetDataStreamsTransportAction; import org.elasticsearch.xpack.datastreams.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.xpack.datastreams.rest.RestMigrateToDataStreamAction; +import org.elasticsearch.xpack.datastreams.rest.RestPromoteStreamsStatsAction; import java.util.List; import java.util.Map; @@ -60,7 +63,9 @@ public Map getMetadataMappers() { var dsUsageAction = new ActionHandler<>(XPackUsageFeatureAction.DATA_STREAMS, DataStreamUsageTransportAction.class); var dsInfoAction = new ActionHandler<>(XPackInfoFeatureAction.DATA_STREAMS, DataStreamInfoTransportAction.class); var migrateAction = new ActionHandler<>(MigrateToDataStreamAction.INSTANCE, MigrateToDataStreamTransportAction.class); - return List.of(createDsAction, deleteDsInfoAction, getDsAction, dsStatsAction, dsUsageAction, dsInfoAction, migrateAction); + var promoteAction = new ActionHandler<>(PromoteDataStreamAction.INSTANCE, PromoteDataStreamTransportAction.class); + return List.of(createDsAction, deleteDsInfoAction, getDsAction, dsStatsAction, dsUsageAction, dsInfoAction, migrateAction, + promoteAction); } @Override @@ -78,6 +83,7 @@ public List getRestHandlers( var getDsAction = new RestGetDataStreamsAction(); var dsStatsAction = new RestDataStreamsStatsAction(); var migrateAction = new RestMigrateToDataStreamAction(); - return List.of(createDsAction, deleteDsAction, getDsAction, dsStatsAction, migrateAction); + var promoteAction = new RestPromoteStreamsStatsAction(); + return List.of(createDsAction, deleteDsAction, getDsAction, dsStatsAction, migrateAction, promoteAction); } } diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java new file mode 100644 index 0000000000000..8041fee8a7096 --- /dev/null +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/PromoteDataStreamTransportAction.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.datastreams.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; + +public class PromoteDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction { + + @Inject + public PromoteDataStreamTransportAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + PromoteDataStreamAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + PromoteDataStreamAction.Request::new, + indexNameExpressionResolver, + ThreadPool.Names.SAME + ); + } + + @Override + protected void masterOperation( + Task task, + PromoteDataStreamAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + clusterService.submitStateUpdateTask( + "promote-data-stream [" + request.getName() + "]", + new ClusterStateUpdateTask(Priority.HIGH, request.masterNodeTimeout()) { + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public ClusterState execute(ClusterState currentState) { + return promoteDataStream(currentState, request); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(AcknowledgedResponse.TRUE); + } + } + ); + } + + static ClusterState promoteDataStream(ClusterState currentState, PromoteDataStreamAction.Request request) { + DataStream dataStream = currentState.getMetadata().dataStreams().get(request.getName()); + if (dataStream == null) { + throw new ResourceNotFoundException("data stream [" + request.getName() + "] does not exist"); + } + + DataStream promotedDataStream = dataStream.promoteDataStream(); + Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + metadata.put(promotedDataStream); + return ClusterState.builder(currentState).metadata(metadata).build(); + } + + @Override + protected ClusterBlockException checkBlock(PromoteDataStreamAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java new file mode 100644 index 0000000000000..9b5d349dc62e9 --- /dev/null +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datastreams.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; + +import java.io.IOException; +import java.util.List; + +public class RestPromoteStreamsStatsAction extends BaseRestHandler { + @Override + public String getName() { + return "promote_data_stream_action"; + } + + @Override + public List routes() { + return List.of( + new Route(RestRequest.Method.POST, "/_data_stream/_promote/{name}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + PromoteDataStreamAction.Request request = new PromoteDataStreamAction.Request(restRequest.param("name")); + return channel -> client.execute(PromoteDataStreamAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} From 461773d173af213c29ac1ff0ff8aacf3f6885295 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 9 Nov 2020 11:02:31 +0100 Subject: [PATCH 03/17] fix precommit --- .../metadata/IndexNameExpressionResolverTests.java | 2 +- .../xpack/datastreams/DataStreamsPlugin.java | 12 ++++++++++-- .../rest/RestPromoteStreamsStatsAction.java | 4 +--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index da6df7a532f77..ec94c2fda833e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -2107,7 +2107,7 @@ public void testHiddenDataStreams() { .put(index2, false) .put(justAnIndex, false) .put(new DataStream(dataStream1, createTimestampField("@timestamp"), - List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true))).build(); + List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true, false))).build(); Index[] result = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.strictExpandHidden(), true, "logs-*"); assertThat(result, arrayContainingInAnyOrder(index1.getIndex(), index2.getIndex(), justAnIndex.getIndex() )); diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java index 960d45baa6d08..38bedb0d8d5f8 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java @@ -64,8 +64,16 @@ public Map getMetadataMappers() { var dsInfoAction = new ActionHandler<>(XPackInfoFeatureAction.DATA_STREAMS, DataStreamInfoTransportAction.class); var migrateAction = new ActionHandler<>(MigrateToDataStreamAction.INSTANCE, MigrateToDataStreamTransportAction.class); var promoteAction = new ActionHandler<>(PromoteDataStreamAction.INSTANCE, PromoteDataStreamTransportAction.class); - return List.of(createDsAction, deleteDsInfoAction, getDsAction, dsStatsAction, dsUsageAction, dsInfoAction, migrateAction, - promoteAction); + return List.of( + createDsAction, + deleteDsInfoAction, + getDsAction, + dsStatsAction, + dsUsageAction, + dsInfoAction, + migrateAction, + promoteAction + ); } @Override diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java index 9b5d349dc62e9..8977f2f70bed4 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java @@ -22,9 +22,7 @@ public String getName() { @Override public List routes() { - return List.of( - new Route(RestRequest.Method.POST, "/_data_stream/_promote/{name}") - ); + return List.of(new Route(RestRequest.Method.POST, "/_data_stream/_promote/{name}")); } @Override From ef71529aca1040848f71f0aefe5673749dcb40ec Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 9 Nov 2020 11:02:59 +0100 Subject: [PATCH 04/17] nit --- .../java/org/elasticsearch/cluster/metadata/DataStream.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index b9ea8db737dd4..deab1c1e8e5e0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -269,12 +269,13 @@ public boolean equals(Object o) { indices.equals(that.indices) && generation == that.generation && Objects.equals(metadata, that.metadata) && + hidden == that.hidden && replicated == that.replicated; } @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation, metadata, replicated); + return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated); } public static final class TimestampField implements Writeable, ToXContentObject { From 72379300c9cce1f7567f04521bd1bc8e75ef95e4 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 9 Nov 2020 14:25:19 +0100 Subject: [PATCH 05/17] fixed tests --- .../rollover/MetadataRolloverServiceTests.java | 4 +++- .../action/TransportPutFollowActionTests.java | 18 +++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index b56b179305d4c..8f76155aee783 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -536,7 +536,9 @@ public void testRolloverClusterState() throws Exception { } public void testRolloverClusterStateForDataStream() throws Exception { - final DataStream dataStream = DataStreamTestHelper.randomInstance(); + final DataStream dataStream = DataStreamTestHelper.randomInstance() + // ensure no replicate data stream + .promoteDataStream(); ComposableIndexTemplate template = new ComposableIndexTemplate(List.of(dataStream.getName() + "*"), null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate(), null); Metadata.Builder builder = Metadata.builder(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java index 585a8f62ace1e..d6313123af00a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java @@ -22,7 +22,7 @@ public class TransportPutFollowActionTests extends ESTestCase { public void testCreateNewLocalDataStream() { - DataStream remoteDataStream = generateDataSteam("logs-foobar", 3); + DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false); Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, null, remoteDataStream); assertThat(result.getName(), equalTo(remoteDataStream.getName())); @@ -33,8 +33,8 @@ public void testCreateNewLocalDataStream() { } public void testUpdateLocalDataStream_followNewBackingIndex() { - DataStream remoteDataStream = generateDataSteam("logs-foobar", 3); - DataStream localDataStream = generateDataSteam("logs-foobar", 2); + DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false); + DataStream localDataStream = generateDataSteam("logs-foobar", 2, true); Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); assertThat(result.getName(), equalTo(remoteDataStream.getName())); @@ -48,8 +48,8 @@ public void testUpdateLocalDataStream_followNewBackingIndex() { public void testUpdateLocalDataStream_followOlderBackingIndex() { // follow first backing index: - DataStream remoteDataStream = generateDataSteam("logs-foobar", 5); - DataStream localDataStream = generateDataSteam("logs-foobar", 5, DataStream.getDefaultBackingIndexName("logs-foobar", 5)); + DataStream remoteDataStream = generateDataSteam("logs-foobar", 5, false); + DataStream localDataStream = generateDataSteam("logs-foobar", 5, true, DataStream.getDefaultBackingIndexName("logs-foobar", 5)); Index backingIndexToFollow = remoteDataStream.getIndices().get(0); DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); assertThat(result.getName(), equalTo(remoteDataStream.getName())); @@ -72,19 +72,19 @@ public void testUpdateLocalDataStream_followOlderBackingIndex() { assertThat(result.getIndices().get(2).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 5))); } - static DataStream generateDataSteam(String name, int numBackingIndices) { + static DataStream generateDataSteam(String name, int numBackingIndices, boolean replicate) { List backingIndices = IntStream.range(1, numBackingIndices + 1) .mapToObj(value -> DataStream.getDefaultBackingIndexName(name, value)) .map(value -> new Index(value, "uuid")) .collect(Collectors.toList()); - return new DataStream(name, new TimestampField("@timestamp"), backingIndices); + return new DataStream(name, new TimestampField("@timestamp"), backingIndices, backingIndices.size(), Map.of(), false, replicate); } - static DataStream generateDataSteam(String name, int generation, String... backingIndexNames) { + static DataStream generateDataSteam(String name, int generation, boolean replicate, String... backingIndexNames) { List backingIndices = Arrays.stream(backingIndexNames) .map(value -> new Index(value, "uuid")) .collect(Collectors.toList()); - return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of()); + return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of(), false, replicate); } } From e9b6627aad502aa554775e5095ea6b624f2d6e3e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 9 Nov 2020 17:38:17 +0100 Subject: [PATCH 06/17] fixed tests --- .../java/org/elasticsearch/cluster/metadata/DataStream.java | 2 +- .../org/elasticsearch/cluster/metadata/DataStreamTests.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index deab1c1e8e5e0..a9f09e0fc42ff 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -236,7 +236,7 @@ public void writeTo(StreamOutput out) throws IOException { PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD); - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), REPLICATED_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 826d5940245cd..73309b271dcfd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -52,7 +52,7 @@ protected DataStream createTestInstance() { } public void testRollover() { - DataStream ds = DataStreamTestHelper.randomInstance(); + DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream(); Index newWriteIndex = new Index(getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1), UUIDs.randomBase64UUID(random())); DataStream rolledDs = ds.rollover(newWriteIndex); From e031d72b214e2cda485b4dcd2c302734fcb60b86 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 9 Nov 2020 19:05:03 +0100 Subject: [PATCH 07/17] fixed npe --- .../java/org/elasticsearch/cluster/metadata/DataStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index a9f09e0fc42ff..dc9e458bbc699 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -227,7 +227,7 @@ public void writeTo(StreamOutput out) throws IOException { @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", args -> new DataStream((String) args[0], (TimestampField) args[1], (List) args[2], (Long) args[3], - (Map) args[4], args[5] != null && (boolean) args[5], (boolean) args[6])); + (Map) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); From 0c22ce149d4b8f83771c806ddd1d6a4fbfaa670d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 10 Nov 2020 10:04:12 +0100 Subject: [PATCH 08/17] fixed test --- .../admin/indices/rollover/MetadataRolloverServiceTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index 8f76155aee783..3e45cdefc2d08 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -634,7 +634,9 @@ public void testValidation() throws Exception { final boolean useDataStream = randomBoolean(); final Metadata.Builder builder = Metadata.builder(); if (useDataStream) { - DataStream dataStream = DataStreamTestHelper.randomInstance(); + DataStream dataStream = DataStreamTestHelper.randomInstance() + // ensure no replicate data stream + .promoteDataStream(); rolloverTarget = dataStream.getName(); sourceIndexName = dataStream.getIndices().get(dataStream.getIndices().size() - 1).getName(); defaultRolloverIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1); From 5efa94385e06dbe070be1dd0b4b8a746093750f1 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 17 Nov 2020 14:47:58 +0100 Subject: [PATCH 09/17] added ccr bi-directional test with data streams --- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 281 +++++++++++++----- .../xpack/ccr/ESCCRRestTestCase.java | 6 +- 2 files changed, 211 insertions(+), 76 deletions(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 127ed2c7420ee..03df294a50b12 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -186,21 +186,7 @@ public void testDataStreams() throws Exception { int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); // Create auto follow pattern - Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); - try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { - bodyBuilder.startObject(); - { - bodyBuilder.startArray("leader_index_patterns"); - { - bodyBuilder.value("logs-*"); - } - bodyBuilder.endArray(); - bodyBuilder.field("remote_cluster", "leader_cluster"); - } - bodyBuilder.endObject(); - request.setJsonEntity(Strings.toString(bodyBuilder)); - } - assertOK(client().performRequest(request)); + createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster"); // Create data stream and ensure that is is auto followed { @@ -211,12 +197,12 @@ public void testDataStreams() throws Exception { indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); } - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); verifyDocuments(leaderClient, dataStreamName, numDocs); } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); - verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, numDocs); }); @@ -227,7 +213,7 @@ public void testDataStreams() throws Exception { try (RestClient leaderClient = buildLeaderClient()) { Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); @@ -237,7 +223,7 @@ public void testDataStreams() throws Exception { } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); - verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, numDocs + 1); }); @@ -248,8 +234,8 @@ public void testDataStreams() throws Exception { try (RestClient leaderClient = buildLeaderClient()) { Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", "" + - ".ds-logs-mysql-error-000003"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), + backingIndexName(dataStreamName, 3)); Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); @@ -259,8 +245,8 @@ public void testDataStreams() throws Exception { } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3)); - verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", - ".ds-logs-mysql-error-000003"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), + backingIndexName(dataStreamName, 3)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, numDocs + 2); }); @@ -289,34 +275,18 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); } - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); verifyDocuments(leaderClient, dataStreamName, initialNumDocs); } } // Create auto follow pattern - { - Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); - try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { - bodyBuilder.startObject(); - { - bodyBuilder.startArray("leader_index_patterns"); - { - bodyBuilder.value("logs-*"); - } - bodyBuilder.endArray(); - bodyBuilder.field("remote_cluster", "leader_cluster"); - } - bodyBuilder.endObject(); - request.setJsonEntity(Strings.toString(bodyBuilder)); - } - assertOK(client().performRequest(request)); - } + createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster"); // Rollover and ensure only second backing index is replicated: { try (RestClient leaderClient = buildLeaderClient()) { Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); @@ -326,17 +296,17 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); - verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000002"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 2)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, 1); }); } // Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly: { - followIndex(".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000001"); + followIndex(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 1)); assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); - verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, initialNumDocs + 1); }); @@ -359,21 +329,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); // Create auto follow pattern - Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); - try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { - bodyBuilder.startObject(); - { - bodyBuilder.startArray("leader_index_patterns"); - { - bodyBuilder.value("logs-*"); - } - bodyBuilder.endArray(); - bodyBuilder.field("remote_cluster", "leader_cluster"); - } - bodyBuilder.endObject(); - request.setJsonEntity(Strings.toString(bodyBuilder)); - } - assertOK(client().performRequest(request)); + createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster"); // Create data stream and ensure that is is auto followed { @@ -384,12 +340,12 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); } - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-tomcat-prod-000001"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); verifyDocuments(leaderClient, dataStreamName, numDocs); } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); - verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, numDocs); }); @@ -400,7 +356,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { try (RestClient leaderClient = buildLeaderClient()) { Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002"); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); @@ -410,7 +366,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { } assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); - verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, numDocs + 1); }); @@ -422,19 +378,19 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { Exception e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1)); assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " + "because it is a replicated data stream")); - verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); // Unfollow .ds-logs-tomcat-prod-000001 - pauseFollow(".ds-logs-tomcat-prod-000001"); - closeIndex(".ds-logs-tomcat-prod-000001"); - unfollow(".ds-logs-tomcat-prod-000001"); + pauseFollow(backingIndexName(dataStreamName, 1)); + closeIndex(backingIndexName(dataStreamName, 1)); + unfollow(backingIndexName(dataStreamName, 1)); // Try again Request rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover"); e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest2)); assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " + "because it is a replicated data stream")); - verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); // Promote local data stream Request promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName); @@ -443,17 +399,17 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { // Try again and now the rollover should be successful because local data stream is now : Request rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(client().performRequest(rolloverRequest3)); - verifyDataStream(client(), dataStreamName, ".ds-logs-tomcat-prod-000001", ".ds-logs-tomcat-prod-000002", - ".ds-logs-tomcat-prod-000003"); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), + backingIndexName(dataStreamName, 3)); // TODO: verify that following a backing index for logs-tomcat-prod data stream in remote cluster fails, // because local data stream isn't a replicated data stream anymore. // Unfollow .ds-logs-tomcat-prod-000002, // which is now possible because this index can now be closed as it is no longer the write index. - pauseFollow(".ds-logs-tomcat-prod-000002"); - closeIndex(".ds-logs-tomcat-prod-000002"); - unfollow(".ds-logs-tomcat-prod-000002"); + pauseFollow(backingIndexName(dataStreamName, 2)); + closeIndex(backingIndexName(dataStreamName, 2)); + unfollow(backingIndexName(dataStreamName, 2)); } // Cleanup: { @@ -462,13 +418,183 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { } } + public void testDataStreamsBiDirectionalReplication() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + int initialNumberOfSuccessfulFollowedIndicesInFollowCluster = getNumberOfSuccessfulFollowedIndices(); + int initialNumberOfSuccessfulFollowedIndicesInLeaderCluster; + + // Create auto follow pattern in follow cluster + createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster"); + // Create auto follow pattern in leader cluster: + try (RestClient leaderClient = buildLeaderClient()) { + initialNumberOfSuccessfulFollowedIndicesInLeaderCluster = getNumberOfSuccessfulFollowedIndices(leaderClient); + // First add remote cluster to leader cluster: + Request request = new Request("PUT", "/_cluster/settings"); + try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + bodyBuilder.startObject(); + { + bodyBuilder.startObject("persistent"); + { + bodyBuilder.startObject("cluster"); + { + bodyBuilder.startObject("remote"); + { + bodyBuilder.startObject("follower_cluster"); + { + bodyBuilder.startArray("seeds"); + Request nodesInfoRequest = new Request("GET", "/_nodes/_local"); + Map nodesInfoResponse = toMap(client().performRequest(nodesInfoRequest)); + Map node = (Map) ((Map) nodesInfoResponse.get("nodes")).values().iterator().next(); + Map transportMetrics = (Map) node.get("transport"); + String address = (String) transportMetrics.get("publish_address"); + bodyBuilder.value(address); + bodyBuilder.endArray(); + } + bodyBuilder.endObject(); + } + bodyBuilder.endObject(); + } + bodyBuilder.endObject(); + } + bodyBuilder.endObject(); + } + bodyBuilder.endObject(); + request.setJsonEntity(Strings.toString(bodyBuilder)); + } + assertOK(leaderClient.performRequest(request)); + // Then create the actual auto follow pattern: + createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster"); + } + + int numDocs = 128; + String leaderDataStreamName = "logs-http-eu"; + // Create data stream in leader cluster and ensure it is followed in follow cluster + { + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, leaderDataStreamName, backingIndexName(leaderDataStreamName, 1)); + verifyDocuments(leaderClient, leaderDataStreamName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndicesInFollowCluster + 1)); + verifyDataStream(client(), leaderDataStreamName, backingIndexName(leaderDataStreamName, 1)); + ensureYellow(leaderDataStreamName); + verifyDocuments(client(), leaderDataStreamName, numDocs); + }); + } + String followerDataStreamName = "logs-http-na"; + { + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(client().performRequest(indexRequest)); + } + verifyDocuments(client(), followerDataStreamName, numDocs); + try (RestClient leaderClient = buildLeaderClient()) { + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(leaderClient), + equalTo(initialNumberOfSuccessfulFollowedIndicesInLeaderCluster + 1)); + verifyDataStream(leaderClient, followerDataStreamName, backingIndexName(followerDataStreamName, 1)); + ensureYellow(followerDataStreamName); + verifyDocuments(leaderClient, followerDataStreamName, numDocs); + }); + } + } + // See all eu and na logs in leader and follower cluster: + verifyDocuments(client(), "logs-http*", numDocs * 2); + try (RestClient leaderClient = buildLeaderClient()) { + verifyDocuments(leaderClient, "logs-http*", numDocs * 2); + } + int moreDocs = 48; + // Index more docs into leader cluster + { + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < moreDocs; i++) { + Request indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDocuments(leaderClient, leaderDataStreamName, numDocs + moreDocs); + } + assertBusy(() -> { + verifyDocuments(client(), leaderDataStreamName, numDocs + moreDocs); + }); + } + // Index more docs into follower cluster + { + for (int i = 0; i < moreDocs; i++) { + Request indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(client().performRequest(indexRequest)); + } + verifyDocuments(client(), followerDataStreamName, numDocs + moreDocs); + try (RestClient leaderClient = buildLeaderClient()) { + assertBusy(() -> { + verifyDocuments(leaderClient, followerDataStreamName, numDocs + moreDocs); + }); + } + } + // See all eu and na logs in leader and follower cluster: + verifyDocuments(client(), "logs-http*", (numDocs + moreDocs) * 2); + try (RestClient leaderClient = buildLeaderClient()) { + verifyDocuments(leaderClient, "logs-http*", (numDocs + moreDocs) * 2); + } + + // Cleanup: + { + deleteAutoFollowPattern(client(), "id1"); + deleteDataStream(client(), followerDataStreamName); + try (RestClient leaderClient = buildLeaderClient()) { + deleteDataStream(leaderClient, leaderDataStreamName); + deleteAutoFollowPattern(leaderClient, "id2"); + } + } + } + private int getNumberOfSuccessfulFollowedIndices() throws IOException { + return getNumberOfSuccessfulFollowedIndices(client()); + } + + private int getNumberOfSuccessfulFollowedIndices(RestClient client) throws IOException { Request statsRequest = new Request("GET", "/_ccr/stats"); - Map response = toMap(client().performRequest(statsRequest)); + Map response = toMap(client.performRequest(statsRequest)); response = (Map) response.get("auto_follow_stats"); return (Integer) response.get("number_of_successful_follow_indices"); } + private void createAutoFollowPattern(RestClient client, String name, String pattern, String remoteCluster) throws IOException { + Request request = new Request("PUT", "/_ccr/auto_follow/" + name); + try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + bodyBuilder.startObject(); + { + bodyBuilder.startArray("leader_index_patterns"); + { + bodyBuilder.value(pattern); + } + bodyBuilder.endArray(); + bodyBuilder.field("remote_cluster", remoteCluster); + } + bodyBuilder.endObject(); + request.setJsonEntity(Strings.toString(bodyBuilder)); + } + assertOK(client.performRequest(request)); + } + + private static String backingIndexName(String dataStreamName, int generation) { + return String.format(Locale.ROOT, ".ds-%s-%06d", dataStreamName, generation); + } + private static void verifyDocuments(final RestClient client, final String index, final int expectedNumDocs) throws IOException { @@ -503,4 +629,9 @@ private void deleteDataStream(String name) throws IOException { } } + private void deleteDataStream(RestClient client, String name) throws IOException { + Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name); + assertOK(client.performRequest(deleteTemplateRequest)); + } + } diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index 10841dbd7edf2..0fb22e6235bd8 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -122,8 +122,12 @@ protected static void putAutoFollowPattern(String patternName, String remoteClus } protected static void deleteAutoFollowPattern(String patternName) throws IOException { + deleteAutoFollowPattern(client(), patternName); + } + + protected static void deleteAutoFollowPattern(RestClient client, String patternName) throws IOException { Request putPatternRequest = new Request("DELETE", "/_ccr/auto_follow/" + patternName); - assertOK(client().performRequest(putPatternRequest)); + assertOK(client.performRequest(putPatternRequest)); } protected static void unfollow(String followIndex) throws IOException { From 56074c3120a7a4e32f81fd52f942b1d9ea305991 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 19 Nov 2020 11:11:32 +0100 Subject: [PATCH 10/17] added docs --- docs/reference/ccr/auto-follow.asciidoc | 3 ++ .../data-streams/data-stream-apis.asciidoc | 3 ++ .../promote-data-stream-api.asciidoc | 38 +++++++++++++++++++ 3 files changed, 44 insertions(+) create mode 100644 docs/reference/data-streams/promote-data-stream-api.asciidoc diff --git a/docs/reference/ccr/auto-follow.asciidoc b/docs/reference/ccr/auto-follow.asciidoc index 333bb708cf1da..1fa514cb9d3e3 100644 --- a/docs/reference/ccr/auto-follow.asciidoc +++ b/docs/reference/ccr/auto-follow.asciidoc @@ -13,6 +13,9 @@ automatically followed if the data stream name matches an auto-follow pattern. If you create a data stream after creating the auto-follow pattern, all backing indices are followed automatically. +The data streams replicated from a remote cluster by CCR are protected from +local rollovers. The <> +can be used to turn these data streams into regular data streams. Auto-follow patterns are especially useful with <>, which might continually create diff --git a/docs/reference/data-streams/data-stream-apis.asciidoc b/docs/reference/data-streams/data-stream-apis.asciidoc index b18965162670c..d6c64d98f5623 100644 --- a/docs/reference/data-streams/data-stream-apis.asciidoc +++ b/docs/reference/data-streams/data-stream-apis.asciidoc @@ -9,6 +9,7 @@ The following APIs are available for managing <>: * <> * <> * <> +* <> For concepts and tutorials, see <>. @@ -21,3 +22,5 @@ include::{es-repo-dir}/indices/get-data-stream.asciidoc[] include::{es-repo-dir}/indices/migrate-to-data-stream.asciidoc[] include::{es-repo-dir}/indices/data-stream-stats.asciidoc[] + +include::{es-repo-dir}/data-streams/promote-data-stream-api.asciidoc[] diff --git a/docs/reference/data-streams/promote-data-stream-api.asciidoc b/docs/reference/data-streams/promote-data-stream-api.asciidoc new file mode 100644 index 0000000000000..6a8a0afa6fadd --- /dev/null +++ b/docs/reference/data-streams/promote-data-stream-api.asciidoc @@ -0,0 +1,38 @@ +[role="xpack"] +[[promote-data-stream-api]] +=== Promote Data Stream API +++++ +Promote data stream api +++++ + +The purpose of the promote data stream api is to turn +a data stream that is replicated by CCR into a regular +data stream. + +Via CCR Auto Following, a data stream from a remote cluster +can be replicated to the local cluster. These data streams +can't be rolled over in the local cluster. Only if the upstream +data stream rolls over then these replicated data streams roll +over as well. In the event that the remote cluster is no longer +available, the data stream in the local cluster can be promoted +to a regular data stream, which allows these data streams to +be rolled over in the local cluster. + +[source,console] +---- +POST /_data_stream/_promote/my-data-stream +---- +// TEST[catch:missing] + +[[promote-data-stream-api-request]] +==== {api-request-title} + +`POST /_data_stream/_promote/` + + +[[promote-data-stream-api-path-params]] +==== {api-path-parms-title} + +``:: +(Required, string) +The name of the data stream to promote. From 1b49909dde0a427072660a9af133115426adce11 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 23 Nov 2020 11:26:39 +0100 Subject: [PATCH 11/17] Added a test, which verifies that an alias in follow cluster can't be rolled over. --- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 03df294a50b12..07976d9c6167a 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -30,6 +31,8 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; public class AutoFollowIT extends ESCCRRestTestCase { @@ -418,6 +421,89 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { } } + public void testRolloverAliasInFollowClusterForbidden() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int numDocs = 64; + final String aliasName = "log-tomcat-prod"; + + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + + // Create auto follow pattern + createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster"); + + // Create leader index and write alias: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request createFirstIndexRequest = new Request("PUT", "/" + aliasName + "-000001"); + createFirstIndexRequest.setJsonEntity("{\"aliases\": {\"" + aliasName + "\":{\"is_write_index\":true}}}"); + leaderClient.performRequest(createFirstIndexRequest); + + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + aliasName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyAlias(leaderClient, aliasName, true, aliasName + "-000001"); + verifyDocuments(leaderClient, aliasName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyAlias(client(), aliasName, false, aliasName + "-000001"); + ensureYellow(aliasName); + verifyDocuments(client(), aliasName, numDocs); + }); + } + + // Rollover in leader cluster and ensure second backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + aliasName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyAlias(leaderClient, aliasName, true, aliasName + "-000002", aliasName + "-000001"); + + Request indexRequest = new Request("POST", "/" + aliasName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, aliasName, numDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001"); + ensureYellow(aliasName); + verifyDocuments(client(), aliasName, numDocs + 1); + }); + } + + // Try rollover in follow cluster, this should fail, because is_write_index property of an alias isn't + // replicated to follow cluster. + { + Request rolloverRequest1 = new Request("POST", "/" + aliasName + "/_rollover"); + Exception e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1)); + assertThat(e.getMessage(), containsString("rollover target [" + aliasName + "] does not point to a write index")); + verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001"); + } + // Cleanup: + { + deleteAutoFollowPattern("test_pattern"); + } + } + + private static void verifyAlias(RestClient client, String aliasName, boolean checkWriteIndex, String... otherIndices) throws IOException { + Request getAliasRequest = new Request("GET", "/_alias/" + aliasName); + Map responseBody = toMap(client.performRequest(getAliasRequest)); + if (checkWriteIndex) { + assertThat(ObjectPath.eval(otherIndices[0] + ".aliases." + aliasName + ".is_write_index", responseBody), is(true)); + } + for (String otherIndex : otherIndices) { + assertThat(ObjectPath.eval(otherIndex + ".aliases." + aliasName, responseBody), notNullValue()); + } + } + public void testDataStreamsBiDirectionalReplication() throws Exception { if ("follow".equals(targetCluster) == false) { return; From 5528da438e4a5075e055523307353a68094f0965 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 23 Nov 2020 11:33:23 +0100 Subject: [PATCH 12/17] fix checkstyle --- .../test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 07976d9c6167a..cef801a47b5eb 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -493,7 +493,10 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception { } } - private static void verifyAlias(RestClient client, String aliasName, boolean checkWriteIndex, String... otherIndices) throws IOException { + private static void verifyAlias(RestClient client, + String aliasName, + boolean checkWriteIndex, + String... otherIndices) throws IOException { Request getAliasRequest = new Request("GET", "/_alias/" + aliasName); Map responseBody = toMap(client.performRequest(getAliasRequest)); if (checkWriteIndex) { From a21454e2ef70c5437ae2a10d81570a7abfb72592 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 23 Nov 2020 11:47:43 +0100 Subject: [PATCH 13/17] added rest spec and renamed rest action --- .../xpack/datastreams/DataStreamsPlugin.java | 4 +-- ....java => RestPromoteDataStreamAction.java} | 2 +- .../api/indices.promote_data_stream.json | 27 +++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) rename x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/{RestPromoteStreamsStatsAction.java => RestPromoteDataStreamAction.java} (94%) create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java index 38bedb0d8d5f8..075ca5bb8f1cb 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java @@ -41,7 +41,7 @@ import org.elasticsearch.xpack.datastreams.action.GetDataStreamsTransportAction; import org.elasticsearch.xpack.datastreams.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.xpack.datastreams.rest.RestMigrateToDataStreamAction; -import org.elasticsearch.xpack.datastreams.rest.RestPromoteStreamsStatsAction; +import org.elasticsearch.xpack.datastreams.rest.RestPromoteDataStreamAction; import java.util.List; import java.util.Map; @@ -91,7 +91,7 @@ public List getRestHandlers( var getDsAction = new RestGetDataStreamsAction(); var dsStatsAction = new RestDataStreamsStatsAction(); var migrateAction = new RestMigrateToDataStreamAction(); - var promoteAction = new RestPromoteStreamsStatsAction(); + var promoteAction = new RestPromoteDataStreamAction(); return List.of(createDsAction, deleteDsAction, getDsAction, dsStatsAction, migrateAction, promoteAction); } } diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java similarity index 94% rename from x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java rename to x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java index 8977f2f70bed4..35c3f5a9d47dc 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteStreamsStatsAction.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestPromoteDataStreamAction.java @@ -14,7 +14,7 @@ import java.io.IOException; import java.util.List; -public class RestPromoteStreamsStatsAction extends BaseRestHandler { +public class RestPromoteDataStreamAction extends BaseRestHandler { @Override public String getName() { return "promote_data_stream_action"; diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json new file mode 100644 index 0000000000000..90c22b51957e1 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json @@ -0,0 +1,27 @@ +{ + "indices.promote_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Promotes a data stream from a replicate data stream managed by CCR to a regular data stream" + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_data_stream/_promote/{name}", + "methods":[ + "POST" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream" + } + } + } + ] + }, + "params":{ + } + } +} From 676c7a8e04a6af4bdfa3a30fc4bd2214fb3f0dec Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 7 Dec 2020 10:56:24 +0100 Subject: [PATCH 14/17] added TODOs --- .../org/elasticsearch/xpack/ccr/AutoFollowIT.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index cef801a47b5eb..dc2f886979a29 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -598,11 +598,18 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { }); } } + + // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and + // writes via 'logs-http' alias (ensuring write goes to write data stream). + // Currently aliases can't refer to data streams, so we can't fully test the bi-direction replication scenario. + // See: https://github.com/elastic/elasticsearch/pull/64710#discussion_r537210322 + // See all eu and na logs in leader and follower cluster: verifyDocuments(client(), "logs-http*", numDocs * 2); try (RestClient leaderClient = buildLeaderClient()) { verifyDocuments(leaderClient, "logs-http*", numDocs * 2); } + int moreDocs = 48; // Index more docs into leader cluster { @@ -634,6 +641,10 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { }); } } + + // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and writes via 'logs-http' + // (see previous TODO) + // See all eu and na logs in leader and follower cluster: verifyDocuments(client(), "logs-http*", (numDocs + moreDocs) * 2); try (RestClient leaderClient = buildLeaderClient()) { From 8e21fc79bd213a5a50f0c9d92060f412c6b32069 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 7 Dec 2020 11:08:07 +0100 Subject: [PATCH 15/17] varify --- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index dc2f886979a29..a4ab93ae07edd 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -327,7 +327,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { } final int numDocs = 64; - final String dataStreamName = "logs-tomcat-prod"; + final var dataStreamName = "logs-tomcat-prod"; int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); @@ -336,9 +336,9 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { // Create data stream and ensure that is is auto followed { - try (RestClient leaderClient = buildLeaderClient()) { + try (var leaderClient = buildLeaderClient()) { for (int i = 0; i < numDocs; i++) { - Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); @@ -356,12 +356,12 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { // Rollover in leader cluster and ensure second backing index is replicated: { - try (RestClient leaderClient = buildLeaderClient()) { - Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + try (var leaderClient = buildLeaderClient()) { + var rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); - Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); @@ -377,8 +377,8 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { // Try rollover in follow cluster { - Request rolloverRequest1 = new Request("POST", "/" + dataStreamName + "/_rollover"); - Exception e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1)); + var rolloverRequest1 = new Request("POST", "/" + dataStreamName + "/_rollover"); + var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1)); assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " + "because it is a replicated data stream")); verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); @@ -389,18 +389,18 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { unfollow(backingIndexName(dataStreamName, 1)); // Try again - Request rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover"); + var rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover"); e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest2)); assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " + "because it is a replicated data stream")); verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); // Promote local data stream - Request promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName); + var promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName); assertOK(client().performRequest(promoteRequest)); // Try again and now the rollover should be successful because local data stream is now : - Request rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover"); + var rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(client().performRequest(rolloverRequest3)); verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), backingIndexName(dataStreamName, 3)); @@ -427,7 +427,7 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception { } final int numDocs = 64; - final String aliasName = "log-tomcat-prod"; + final var aliasName = "log-tomcat-prod"; int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); @@ -436,13 +436,13 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception { // Create leader index and write alias: { - try (RestClient leaderClient = buildLeaderClient()) { - Request createFirstIndexRequest = new Request("PUT", "/" + aliasName + "-000001"); + try (var leaderClient = buildLeaderClient()) { + var createFirstIndexRequest = new Request("PUT", "/" + aliasName + "-000001"); createFirstIndexRequest.setJsonEntity("{\"aliases\": {\"" + aliasName + "\":{\"is_write_index\":true}}}"); leaderClient.performRequest(createFirstIndexRequest); for (int i = 0; i < numDocs; i++) { - Request indexRequest = new Request("POST", "/" + aliasName + "/_doc"); + var indexRequest = new Request("POST", "/" + aliasName + "/_doc"); indexRequest.addParameter("refresh", "true"); indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); @@ -460,12 +460,12 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception { // Rollover in leader cluster and ensure second backing index is replicated: { - try (RestClient leaderClient = buildLeaderClient()) { - Request rolloverRequest = new Request("POST", "/" + aliasName + "/_rollover"); + try (var leaderClient = buildLeaderClient()) { + var rolloverRequest = new Request("POST", "/" + aliasName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); verifyAlias(leaderClient, aliasName, true, aliasName + "-000002", aliasName + "-000001"); - Request indexRequest = new Request("POST", "/" + aliasName + "/_doc"); + var indexRequest = new Request("POST", "/" + aliasName + "/_doc"); indexRequest.addParameter("refresh", "true"); indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); @@ -482,8 +482,8 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception { // Try rollover in follow cluster, this should fail, because is_write_index property of an alias isn't // replicated to follow cluster. { - Request rolloverRequest1 = new Request("POST", "/" + aliasName + "/_rollover"); - Exception e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1)); + var rolloverRequest1 = new Request("POST", "/" + aliasName + "/_rollover"); + var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1)); assertThat(e.getMessage(), containsString("rollover target [" + aliasName + "] does not point to a write index")); verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001"); } @@ -497,8 +497,8 @@ private static void verifyAlias(RestClient client, String aliasName, boolean checkWriteIndex, String... otherIndices) throws IOException { - Request getAliasRequest = new Request("GET", "/_alias/" + aliasName); - Map responseBody = toMap(client.performRequest(getAliasRequest)); + var getAliasRequest = new Request("GET", "/_alias/" + aliasName); + var responseBody = toMap(client.performRequest(getAliasRequest)); if (checkWriteIndex) { assertThat(ObjectPath.eval(otherIndices[0] + ".aliases." + aliasName + ".is_write_index", responseBody), is(true)); } @@ -518,11 +518,11 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { // Create auto follow pattern in follow cluster createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster"); // Create auto follow pattern in leader cluster: - try (RestClient leaderClient = buildLeaderClient()) { + try (var leaderClient = buildLeaderClient()) { initialNumberOfSuccessfulFollowedIndicesInLeaderCluster = getNumberOfSuccessfulFollowedIndices(leaderClient); // First add remote cluster to leader cluster: - Request request = new Request("PUT", "/_cluster/settings"); - try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + var request = new Request("PUT", "/_cluster/settings"); + try (var bodyBuilder = JsonXContent.contentBuilder()) { bodyBuilder.startObject(); { bodyBuilder.startObject("persistent"); @@ -534,11 +534,11 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { bodyBuilder.startObject("follower_cluster"); { bodyBuilder.startArray("seeds"); - Request nodesInfoRequest = new Request("GET", "/_nodes/_local"); - Map nodesInfoResponse = toMap(client().performRequest(nodesInfoRequest)); - Map node = (Map) ((Map) nodesInfoResponse.get("nodes")).values().iterator().next(); - Map transportMetrics = (Map) node.get("transport"); - String address = (String) transportMetrics.get("publish_address"); + var nodesInfoRequest = new Request("GET", "/_nodes/_local"); + var nodesInfoResponse = toMap(client().performRequest(nodesInfoRequest)); + var node = (Map) ((Map) nodesInfoResponse.get("nodes")).values().iterator().next(); + var transportMetrics = (Map) node.get("transport"); + var address = (String) transportMetrics.get("publish_address"); bodyBuilder.value(address); bodyBuilder.endArray(); } @@ -558,11 +558,11 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster"); } - int numDocs = 128; - String leaderDataStreamName = "logs-http-eu"; + var numDocs = 128; + var leaderDataStreamName = "logs-http-eu"; // Create data stream in leader cluster and ensure it is followed in follow cluster { - try (RestClient leaderClient = buildLeaderClient()) { + try (var leaderClient = buildLeaderClient()) { for (int i = 0; i < numDocs; i++) { Request indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); @@ -579,16 +579,16 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { verifyDocuments(client(), leaderDataStreamName, numDocs); }); } - String followerDataStreamName = "logs-http-na"; + var followerDataStreamName = "logs-http-na"; { for (int i = 0; i < numDocs; i++) { - Request indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc"); + var indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(client().performRequest(indexRequest)); } verifyDocuments(client(), followerDataStreamName, numDocs); - try (RestClient leaderClient = buildLeaderClient()) { + try (var leaderClient = buildLeaderClient()) { assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(leaderClient), equalTo(initialNumberOfSuccessfulFollowedIndicesInLeaderCluster + 1)); @@ -606,16 +606,16 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { // See all eu and na logs in leader and follower cluster: verifyDocuments(client(), "logs-http*", numDocs * 2); - try (RestClient leaderClient = buildLeaderClient()) { + try (var leaderClient = buildLeaderClient()) { verifyDocuments(leaderClient, "logs-http*", numDocs * 2); } int moreDocs = 48; // Index more docs into leader cluster { - try (RestClient leaderClient = buildLeaderClient()) { + try (var leaderClient = buildLeaderClient()) { for (int i = 0; i < moreDocs; i++) { - Request indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc"); + var indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); @@ -629,13 +629,13 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { // Index more docs into follower cluster { for (int i = 0; i < moreDocs; i++) { - Request indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc"); + var indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(client().performRequest(indexRequest)); } verifyDocuments(client(), followerDataStreamName, numDocs + moreDocs); - try (RestClient leaderClient = buildLeaderClient()) { + try (var leaderClient = buildLeaderClient()) { assertBusy(() -> { verifyDocuments(leaderClient, followerDataStreamName, numDocs + moreDocs); }); From 9c7a0715e3cf348000265bc2d16215d01c3d3a27 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 7 Dec 2020 15:13:33 +0100 Subject: [PATCH 16/17] mark promote ds api as non operator api --- .../org/elasticsearch/xpack/security/operator/Constants.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 2d3038f51a7b7..965be69866e23 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -324,6 +324,7 @@ public class Constants { "indices:admin/data_stream/delete", "indices:admin/data_stream/get", "indices:admin/data_stream/migrate", + "indices:admin/data_stream/promote", "indices:admin/delete", "indices:admin/flush", "indices:admin/flush[s]", From a80f77706c4d89c57af2cd2c7a59af148de8434c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 8 Dec 2020 07:38:19 +0100 Subject: [PATCH 17/17] fixed typo --- .../rest-api-spec/api/indices.promote_data_stream.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json index 90c22b51957e1..4f423a78bbc5d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/indices.promote_data_stream.json @@ -2,7 +2,7 @@ "indices.promote_data_stream":{ "documentation":{ "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", - "description":"Promotes a data stream from a replicate data stream managed by CCR to a regular data stream" + "description":"Promotes a data stream from a replicated data stream managed by CCR to a regular data stream" }, "stability":"stable", "url":{