diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 090b6797fe500..6b1249d82e2c6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.metadata.MetadataMappingService; import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; -import org.elasticsearch.cluster.metadata.RollupMetadata; import org.elasticsearch.cluster.routing.DelayedAllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; @@ -65,7 +64,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksNodeService; import org.elasticsearch.plugins.ClusterPlugin; -import org.elasticsearch.rollup.RollupV2; import org.elasticsearch.script.ScriptMetadata; import org.elasticsearch.snapshots.SnapshotsInfoService; import org.elasticsearch.tasks.Task; @@ -132,9 +130,6 @@ public static List getNamedWriteables() { ComposableIndexTemplateMetadata::readDiffFrom); registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom); - if (RollupV2.isEnabled()) { - registerMetadataCustom(entries, RollupMetadata.TYPE, RollupMetadata::new, RollupMetadata::readDiffFrom); - } // Task Status (not Diffable) entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new)); return entries; @@ -159,10 +154,6 @@ public static List getNamedXWriteables() { ComposableIndexTemplateMetadata::fromXContent)); entries.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(DataStreamMetadata.TYPE), DataStreamMetadata::fromXContent)); - if (RollupV2.isEnabled()) { - entries.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(RollupMetadata.TYPE), - RollupMetadata::fromXContent)); - } return entries; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RollupGroup.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RollupGroup.java index b60d6fd96f492..c72be8fc5e8fa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RollupGroup.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RollupGroup.java @@ -22,113 +22,79 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** - * Object representing information about rollup-v2 indices and their respective original-indexes. These objects - * also include information about their capabilities, like which date-intervals and date-timezones they are configured - * with. Used by {@link RollupMetadata}. + * Object representing a group of rollup-v2 indices that have been computed on their respective original-indexes. + * Used by {@link RollupMetadata}. The rollup group is based on a map with the rollup-index name as a key and + * its rollup information object as value. * - * The information in this class will be used to decide which index within the group will be chosen - * for a specific aggregation. For example, if there are two indices with different intervals (`1h`, `1d`) and - * a date-histogram aggregation request is sent for daily intervals, then the index with the associated `1d` interval - * will be chosen. */ public class RollupGroup extends AbstractDiffable implements ToXContentObject { private static final ParseField GROUP_FIELD = new ParseField("group"); - private static final ParseField DATE_INTERVAL_FIELD = new ParseField("interval"); - private static final ParseField DATE_TIMEZONE_FIELD = new ParseField("timezone"); - // the list of indices part of this rollup group - private List group; - // a map from index-name to the date interval used in the associated index - private Map dateInterval; - // a map from index-name to timezone used in the associated index - private Map dateTimezone; + /** a map from rollup-index name to its rollup configuration */ + private final Map group; @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("rollup_group", false, - a -> new RollupGroup((List) a[0], (Map) a[1], (Map) a[2])); + a -> new RollupGroup((Map) a[0])); static { - PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), GROUP_FIELD); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { - Map intervalMap = new HashMap<>(); - - while (p.nextToken() != XContentParser.Token.END_OBJECT) { - String name = p.currentName(); - p.nextToken(); - String expression = p.text(); - intervalMap.put(name, new DateHistogramInterval(expression)); - } - return intervalMap; - }, DATE_INTERVAL_FIELD); - PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { - Map zoneMap = new HashMap<>(); + Map rollupGroups = new HashMap<>(); while (p.nextToken() != XContentParser.Token.END_OBJECT) { String name = p.currentName(); - p.nextToken(); - String timezone = p.text(); - zoneMap.put(name, WriteableZoneId.of(timezone)); + rollupGroups.put(name, RollupIndexMetadata.parse(p)); } - return zoneMap; - }, DATE_TIMEZONE_FIELD); + return rollupGroups; + }, GROUP_FIELD); } - public RollupGroup(List group, Map dateInterval, Map dateTimezone) { + public RollupGroup(Map group) { this.group = group; - this.dateInterval = dateInterval; - this.dateTimezone = dateTimezone; } public RollupGroup() { - this.group = new ArrayList<>(); - this.dateInterval = new HashMap<>(); - this.dateTimezone = new HashMap<>(); + this.group = new HashMap<>(); } public RollupGroup(StreamInput in) throws IOException { - this.group = in.readStringList(); - this.dateInterval = in.readMap(StreamInput::readString, DateHistogramInterval::new); - this.dateTimezone = in.readMap(StreamInput::readString, WriteableZoneId::new); + this.group = in.readMap(StreamInput::readString, RollupIndexMetadata::new); } - public static RollupGroup fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } - public void add(String name, DateHistogramInterval interval, WriteableZoneId timezone) { - group.add(name); - dateInterval.put(name, interval); - dateTimezone.put(name, timezone); + public void add(String name, RollupIndexMetadata rollupIndexMetadata) { + group.put(name, rollupIndexMetadata); } public void remove(String name) { group.remove(name); - dateInterval.remove(name); - dateTimezone.remove(name); } public boolean contains(String name) { - return group.contains(name); + return group.containsKey(name); } public DateHistogramInterval getDateInterval(String name) { - return dateInterval.get(name); + RollupIndexMetadata rollupIndex = group.get(name); + return rollupIndex != null ? rollupIndex.getDateInterval() : null; } public WriteableZoneId getDateTimezone(String name) { - return dateTimezone.get(name); + RollupIndexMetadata rollupIndex = group.get(name); + return rollupIndex != null ? rollupIndex.getDateTimezone() : null; } - public List getIndices() { - return group; + public Set getIndices() { + return group.keySet(); } static Diff readDiffFrom(StreamInput in) throws IOException { @@ -146,9 +112,7 @@ public String toString() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeStringCollection(group); - out.writeMap(dateInterval, StreamOutput::writeString, (stream, val) -> val.writeTo(stream)); - out.writeMap(dateTimezone, StreamOutput::writeString, (stream, val) -> val.writeTo(stream)); + out.writeMap(group, StreamOutput::writeString, (stream, val) -> val.writeTo(stream)); } @Override @@ -156,8 +120,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder .startObject() .field(GROUP_FIELD.getPreferredName(), group) - .field(DATE_INTERVAL_FIELD.getPreferredName(), dateInterval) - .field(DATE_TIMEZONE_FIELD.getPreferredName(), dateTimezone) .endObject(); } @@ -166,13 +128,11 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RollupGroup that = (RollupGroup) o; - return group.equals(that.group) && - dateInterval.equals(that.dateInterval) && - dateTimezone.equals(that.dateTimezone); + return Objects.equals(group, that.group); } @Override public int hashCode() { - return Objects.hash(group, dateInterval, dateTimezone); + return Objects.hash(group); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RollupIndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RollupIndexMetadata.java new file mode 100644 index 0000000000000..b9e4460c4db2e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RollupIndexMetadata.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.time.WriteableZoneId; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * Metadata about rollup indices that includes information about their capabilities, + * such as which date-intervals and date-timezones they are configured with and what metric aggregations + * do they support. + * + * The information in this class will be used to decide which index within the {@link RollupGroup} will be chosen + * for a specific aggregation. For example, if there are two indices with different intervals (`1h`, `1d`) and + * a date-histogram aggregation request is sent for daily intervals, then the index with the associated `1d` interval + * will be chosen. + */ +public class RollupIndexMetadata extends AbstractDiffable implements ToXContentObject { + + private static final ParseField DATE_INTERVAL_FIELD = new ParseField("interval"); + private static final ParseField DATE_TIMEZONE_FIELD = new ParseField("timezone"); + private static final ParseField METRICS_FIELD = new ParseField("metrics"); + + // the date interval used for rolling up data + private final DateHistogramInterval dateInterval; + // the timezone used for the date_histogram + private final WriteableZoneId dateTimezone; + // a map from field name to metrics supported by the rollup for this field + private final Map> metrics; + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("rollup_index", false, + a -> new RollupIndexMetadata((DateHistogramInterval) a[0], (WriteableZoneId) a[1], (Map>) a[2])); + + static { + PARSER.declareField(optionalConstructorArg(), + p -> new DateHistogramInterval(p.text()), DATE_INTERVAL_FIELD, ObjectParser.ValueType.STRING); + PARSER.declareField(optionalConstructorArg(), + p -> WriteableZoneId.of(p.text()), DATE_TIMEZONE_FIELD, ObjectParser.ValueType.STRING); + + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map> metricsMap = new HashMap<>(); + while (p.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = p.currentName(); + p.nextToken(); + List metricObj = p.list(); + List metrics = new ArrayList<>(metricObj.size()); + for (Object o: metricObj) { + metrics.add((String) o); + } + metricsMap.put(fieldName, metrics); + } + return metricsMap; + }, METRICS_FIELD); + } + + public RollupIndexMetadata(DateHistogramInterval dateInterval, + WriteableZoneId dateTimezone, + Map> metrics) { + this.dateInterval = dateInterval; + this.dateTimezone = dateTimezone; + this.metrics = metrics; + } + + public RollupIndexMetadata(StreamInput in) throws IOException { + this.dateInterval = new DateHistogramInterval(in); + this.dateTimezone = new WriteableZoneId(in); + this.metrics = in.readMapOfLists(StreamInput::readString, StreamInput::readString); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + dateInterval.writeTo(out); + dateTimezone.writeTo(out); + out.writeMapOfLists(metrics, StreamOutput::writeString, StreamOutput::writeString); + } + + public static RollupIndexMetadata parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + + static RollupIndexMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + public DateHistogramInterval getDateInterval() { + return dateInterval; + } + + public WriteableZoneId getDateTimezone() { + return dateTimezone; + } + + public Map> getMetrics() { + return metrics; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder + .startObject() + .field(DATE_INTERVAL_FIELD.getPreferredName(), dateInterval) + .field(DATE_TIMEZONE_FIELD.getPreferredName(), dateTimezone) + .field(METRICS_FIELD.getPreferredName(), metrics) + .endObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RollupIndexMetadata that = (RollupIndexMetadata) o; + return dateInterval.equals(that.dateInterval) && + dateTimezone.equals(that.dateTimezone) && + Objects.equals(this.metrics, that.metrics); + } + + @Override + public int hashCode() { + return Objects.hash(dateInterval, dateTimezone, metrics); + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RollupMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RollupMetadata.java index 2e4db7ae7f355..fc4e2c45712a4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RollupMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RollupMetadata.java @@ -8,20 +8,17 @@ package org.elasticsearch.cluster.metadata; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.Diff; -import org.elasticsearch.cluster.DiffableUtils; -import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; -import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -29,7 +26,7 @@ /** * Custom {@link Metadata} implementation for storing a map of {@link RollupGroup}s and their names. */ -public class RollupMetadata implements Metadata.Custom { +public class RollupMetadata extends AbstractDiffable implements ToXContentObject { public static final String TYPE = "rollup"; public static final String SOURCE_INDEX_NAME_META_FIELD = "source_index"; private static final ParseField ROLLUP = new ParseField("rollup"); @@ -49,6 +46,7 @@ public class RollupMetadata implements Metadata.Custom { }, ROLLUP); } + /** a map with the name of the original index and the group of rollups as a value */ private final Map rollupIndices; public RollupMetadata(Map rollupIndices) { @@ -67,30 +65,6 @@ public boolean contains(String index) { return this.rollupIndices.containsKey(index); } - @Override - public Diff diff(Metadata.Custom before) { - return new RollupMetadata.RollupMetadataDiff((RollupMetadata) before, this); - } - - public static NamedDiff readDiffFrom(StreamInput in) throws IOException { - return new RollupMetadataDiff(in); - } - - @Override - public EnumSet context() { - return Metadata.ALL_CONTEXTS; - } - - @Override - public String getWriteableName() { - return TYPE; - } - - @Override - public Version getMinimalSupportedVersion() { - return Version.V_7_11_0; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeMap(this.rollupIndices, StreamOutput::writeString, (stream, val) -> val.writeTo(stream)); @@ -102,12 +76,10 @@ public static RollupMetadata fromXContent(XContentParser parser) throws IOExcept @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(ROLLUP.getPreferredName()); - for (Map.Entry rollup : rollupIndices.entrySet()) { - builder.field(rollup.getKey(), rollup.getValue()); - } - builder.endObject(); - return builder; + return builder + .startObject() + .field(ROLLUP.getPreferredName(), rollupIndices) + .endObject(); } public static Builder builder() { @@ -149,33 +121,4 @@ public RollupMetadata build() { return new RollupMetadata(rollupIndices); } } - - static class RollupMetadataDiff implements NamedDiff { - - final Diff> rollupIndicesDiff; - - RollupMetadataDiff(RollupMetadata before, RollupMetadata after) { - this.rollupIndicesDiff = DiffableUtils.diff(before.rollupIndices, after.rollupIndices, DiffableUtils.getStringKeySerializer()); - } - - RollupMetadataDiff(StreamInput in) throws IOException { - this.rollupIndicesDiff = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), - RollupGroup::new, RollupGroup::readDiffFrom); - } - - @Override - public Metadata.Custom apply(Metadata.Custom part) { - return new RollupMetadata(rollupIndicesDiff.apply(((RollupMetadata) part).rollupIndices)); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - rollupIndicesDiff.writeTo(out); - } - - @Override - public String getWriteableName() { - return TYPE; - } - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/RollupGroupTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/RollupGroupTests.java index 1b757d421ab96..8e3b452d2afec 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/RollupGroupTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/RollupGroupTests.java @@ -8,16 +8,11 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.time.WriteableZoneId; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; -import java.time.ZoneOffset; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; public class RollupGroupTests extends AbstractSerializingTestCase { @@ -38,17 +33,10 @@ protected RollupGroup createTestInstance() { } static RollupGroup randomInstance() { - List group = new ArrayList<>(); + Map group = new HashMap<>(); for (int i = 0; i < randomIntBetween(1, 5); i++) { - group.add(randomAlphaOfLength(5 + i)); + group.put(randomAlphaOfLength(5 + i), RollupIndexMetadataTests.randomInstance()); } - Map dateInterval = new HashMap<>(); - Map dateTimezone = new HashMap<>(); - for (String index : group) { - DateHistogramInterval interval = randomFrom(DateHistogramInterval.MINUTE, DateHistogramInterval.HOUR); - dateInterval.put(index, interval); - dateTimezone.put(index, WriteableZoneId.of(randomFrom(ZoneOffset.getAvailableZoneIds()))); - } - return new RollupGroup(group, dateInterval, dateTimezone); + return new RollupGroup(group); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/RollupIndexMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/RollupIndexMetadataTests.java new file mode 100644 index 0000000000000..01a6c965699c8 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/RollupIndexMetadataTests.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.time.WriteableZoneId; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RollupIndexMetadataTests extends AbstractSerializingTestCase { + + @Override + protected RollupIndexMetadata doParseInstance(XContentParser parser) throws IOException { + return RollupIndexMetadata.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return RollupIndexMetadata::new; + } + + @Override + protected RollupIndexMetadata createTestInstance() { + return randomInstance(); + } + + static RollupIndexMetadata randomInstance() { + DateHistogramInterval interval = randomFrom(DateHistogramInterval.MINUTE, DateHistogramInterval.HOUR); + WriteableZoneId dateTimezone = WriteableZoneId.of(randomFrom(ZoneOffset.getAvailableZoneIds())); + Map> metrics = new HashMap<>(); + for (int i = 0; i < randomIntBetween(1, 5); i++) { + metrics.put(randomAlphaOfLength(5 + i), + randomList(5, () -> randomFrom("min", "max", "sum", "value_count", "avg"))); + } + return new RollupIndexMetadata(interval, dateTimezone, metrics); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/RollupMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/RollupMetadataTests.java index 14afbc4441cbd..311ce3ce78db1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/RollupMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/RollupMetadataTests.java @@ -8,8 +8,9 @@ package org.elasticsearch.cluster.metadata; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.test.AbstractNamedWriteableTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; import java.util.Collections; @@ -17,9 +18,9 @@ import java.util.Map; /** - * Tests for testing {@link RollupMetadata}, the Rollup V2 cluster metadata + * Tests for testing {@link RollupMetadata}, the data stream metadata for Rollup V2. */ -public class RollupMetadataTests extends AbstractNamedWriteableTestCase { +public class RollupMetadataTests extends AbstractSerializingTestCase { @Override protected RollupMetadata createTestInstance() { @@ -34,18 +35,12 @@ protected RollupMetadata createTestInstance() { } @Override - protected RollupMetadata mutateInstance(RollupMetadata instance) throws IOException { - return randomValueOtherThan(instance, this::createTestInstance); + protected RollupMetadata doParseInstance(XContentParser parser) throws IOException { + return RollupMetadata.fromXContent(parser); } @Override - protected NamedWriteableRegistry getNamedWriteableRegistry() { - return new NamedWriteableRegistry(Collections.singletonList(new NamedWriteableRegistry.Entry(RollupMetadata.class, - RollupMetadata.TYPE, RollupMetadata::new))); - } - - @Override - protected Class categoryClass() { - return RollupMetadata.class; + protected Writeable.Reader instanceReader() { + return RollupMetadata::new; } } diff --git a/x-pack/plugin/rollup/build.gradle b/x-pack/plugin/rollup/build.gradle index 4838b642b3acd..461475cd3b862 100644 --- a/x-pack/plugin/rollup/build.gradle +++ b/x-pack/plugin/rollup/build.gradle @@ -15,6 +15,7 @@ dependencies { compileOnly project(path: xpackModule('core'), configuration: 'default') compileOnly project(path: xpackModule('analytics'), configuration: 'default') compileOnly project(path: xpackModule('mapper-aggregate-metric'), configuration: 'default') + compileOnly project(path: xpackModule('data-streams'), configuration: 'default') testImplementation project(path: xpackModule('core'), configuration: 'testArtifacts') } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java index eb82c6447237e..e1cdbf4349bce 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RollupGroup; +import org.elasticsearch.cluster.metadata.RollupIndexMetadata; import org.elasticsearch.cluster.metadata.RollupMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -43,12 +44,12 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.rollup.RollupActionConfig; +import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.RollupActionGroupConfig; import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction; +import org.elasticsearch.xpack.core.rollup.action.RollupAction; import org.elasticsearch.xpack.core.rollup.job.HistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.job.MetricConfig; -import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig; -import org.elasticsearch.xpack.core.rollup.action.RollupAction; import java.io.IOException; import java.util.ArrayList; @@ -210,48 +211,59 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS @Override public ClusterState execute(ClusterState currentState) { String rollupIndexName = request.getRollupIndex(); - IndexMetadata rollupIndexMetadata = currentState.getMetadata().index(rollupIndexName); - Index rollupIndex = rollupIndexMetadata.getIndex(); // TODO(talevy): find better spot to get the original index name // extract created rollup index original index name to be used as metadata key String originalIndexName = request.getSourceIndex(); - Map idxMetadata = currentState.getMetadata().index(originalIndexName) - .getCustomData(RollupMetadata.TYPE); - String rollupGroupKeyName = (idxMetadata == null) ? - originalIndexName : idxMetadata.get(RollupMetadata.SOURCE_INDEX_NAME_META_FIELD); + IndexAbstraction originalIndex = currentState.getMetadata().getIndicesLookup().get(originalIndexName); + IndexMetadata rollupIndexMetadata = currentState.getMetadata().index(rollupIndexName); + Index rollupIndex = rollupIndexMetadata.getIndex(); + + // Add metadata to rollup index metadata. In the rollup index metadata we only add the name + // of the source index. + Map idxMetadata = currentState.getMetadata().index(originalIndexName).getCustomData(RollupMetadata.TYPE); + String rollupGroupKeyName = idxMetadata != null ? + idxMetadata.get(RollupMetadata.SOURCE_INDEX_NAME_META_FIELD) : originalIndexName; Map rollupIndexRollupMetadata = new HashMap<>(); rollupIndexRollupMetadata.put(RollupMetadata.SOURCE_INDEX_NAME_META_FIELD, rollupGroupKeyName); - final RollupMetadata rollupMetadata = currentState.metadata().custom(RollupMetadata.TYPE); - final Map rollupGroups; - if (rollupMetadata == null) { - rollupGroups = new HashMap<>(); - } else { - rollupGroups = new HashMap<>(rollupMetadata.rollupGroups()); - } - RollupActionDateHistogramGroupConfig dateConfig = request.getRollupConfig().getGroupConfig().getDateHistogram(); - WriteableZoneId rollupDateZoneId = WriteableZoneId.of(dateConfig.getTimeZone()); - if (rollupGroups.containsKey(rollupGroupKeyName)) { - RollupGroup group = rollupGroups.get(rollupGroupKeyName); - group.add(rollupIndexName, dateConfig.getInterval(), rollupDateZoneId); - } else { - RollupGroup group = new RollupGroup(); - group.add(rollupIndexName, dateConfig.getInterval(), rollupDateZoneId); - rollupGroups.put(rollupGroupKeyName, group); - } - // add rolled up index to backing datastream if rolling up a backing index of a datastream - IndexAbstraction originalIndex = currentState.getMetadata().getIndicesLookup().get(originalIndexName); - DataStream dataStream = null; + + Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()) + .put(IndexMetadata.builder(rollupIndexMetadata).putCustom(RollupMetadata.TYPE, rollupIndexRollupMetadata)); + if (originalIndex.getParentDataStream() != null) { + // If rolling up a backing index of a datastream, add rolled up index to backing datastream DataStream originalDataStream = originalIndex.getParentDataStream().getDataStream(); + + Map dsMetadata = originalDataStream.getMetadata() != null + ? originalDataStream.getMetadata() : new HashMap<>(); + final RollupMetadata rollupMetadata = dsMetadata.containsKey(RollupMetadata.TYPE) ? + (RollupMetadata) dsMetadata.get(RollupMetadata.TYPE) : null; + final Map rollupGroups = rollupMetadata != null ? + new HashMap<>(rollupMetadata.rollupGroups()) : new HashMap<>(); + + RollupActionConfig rollupConfig = request.getRollupConfig(); + RollupActionDateHistogramGroupConfig dateConfig = rollupConfig.getGroupConfig().getDateHistogram(); + WriteableZoneId rollupDateZoneId = WriteableZoneId.of(dateConfig.getTimeZone()); + Map> metricsConfig = new HashMap<>(); + for (MetricConfig mconfig: rollupConfig.getMetricsConfig()) { + metricsConfig.put(mconfig.getField(), mconfig.getMetrics()); + } + RollupIndexMetadata rollupInfo = new RollupIndexMetadata(dateConfig.getInterval(), rollupDateZoneId, metricsConfig); + + if (rollupGroups.containsKey(rollupGroupKeyName)) { + RollupGroup group = rollupGroups.get(rollupGroupKeyName); + group.add(rollupIndexName, rollupInfo); + } else { + RollupGroup group = new RollupGroup(); + group.add(rollupIndexName, rollupInfo); + rollupGroups.put(rollupGroupKeyName, group); + } + dsMetadata.put(RollupMetadata.TYPE, new RollupMetadata(rollupGroups)); + List backingIndices = new ArrayList<>(originalDataStream.getIndices()); backingIndices.add(rollupIndex); - dataStream = new DataStream(originalDataStream.getName(), originalDataStream.getTimeStampField(), - backingIndices, originalDataStream.getGeneration(), null); - } - Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()) - .put(IndexMetadata.builder(rollupIndexMetadata).putCustom(RollupMetadata.TYPE, rollupIndexRollupMetadata)) - .putCustom(RollupMetadata.TYPE, new RollupMetadata(rollupGroups)); - if (dataStream != null) { + + DataStream dataStream = new DataStream(originalDataStream.getName(), originalDataStream.getTimeStampField(), + backingIndices, originalDataStream.getGeneration(), dsMetadata); metadataBuilder.put(dataStream); } return ClusterState.builder(currentState).metadata(metadataBuilder.build()).build(); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java index d87b01ed635d9..fe131518b4cfc 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java @@ -9,12 +9,20 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction; +import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -37,6 +45,8 @@ import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; import org.elasticsearch.xpack.analytics.AnalyticsPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.action.CreateDataStreamAction; +import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupActionConfig; import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig; @@ -45,7 +55,9 @@ import org.elasticsearch.xpack.core.rollup.job.HistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.job.MetricConfig; import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; +import org.elasticsearch.xpack.datastreams.DataStreamsPlugin; import org.elasticsearch.xpack.rollup.Rollup; +import org.junit.After; import org.junit.Before; import java.io.IOException; @@ -53,8 +65,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; @@ -68,9 +83,13 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase { private long startTime; private int docCount; + private String timestampFieldName = "@timestamp"; + private final Set createdDataStreams = new HashSet<>(); + @Override protected Collection> getPlugins() { - return List.of(LocalStateCompositeXPackPlugin.class, Rollup.class, AnalyticsPlugin.class, AggregateMetricMapperPlugin.class); + return List.of(LocalStateCompositeXPackPlugin.class, Rollup.class, AnalyticsPlugin.class, + AggregateMetricMapperPlugin.class, DataStreamsPlugin.class); } @Before @@ -89,6 +108,18 @@ public void setup() { "categorical_1", "type=keyword").get(); } + @Override + @After + public void tearDown() throws Exception { + if (createdDataStreams.isEmpty() == false) { + for (String createdDataStream : createdDataStreams) { + deleteDataStream(createdDataStream); + } + createdDataStreams.clear(); + } + super.tearDown(); + } + public void testCannotRollupToExistingIndex() throws Exception { RollupActionDateHistogramGroupConfig dateHistogramGroupConfig = randomRollupActionDateHistogramGroupConfig("date_1"); SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder().startObject() @@ -226,6 +257,25 @@ public void testAvgMetric() throws IOException { assertRollupIndex(config); } + public void testRollupDatastream() throws Exception { + RollupActionDateHistogramGroupConfig dateHistogramGroupConfig = randomRollupActionDateHistogramGroupConfig(timestampFieldName); + String dataStreamName = createDataStream(false); + + SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder().startObject() + .field(timestampFieldName, randomDateForInterval(dateHistogramGroupConfig.getInterval())) + .field("numeric_1", randomDouble()) + .endObject(); + RollupActionConfig config = new RollupActionConfig( + new RollupActionGroupConfig(dateHistogramGroupConfig, null, null), + Collections.singletonList(new MetricConfig("numeric_1", Collections.singletonList("value_count")))); + bulkIndex(dataStreamName, sourceSupplier); + + String oldIndexName = rollover(dataStreamName).getOldIndex(); + String rollupIndexName = ".rollup-" + oldIndexName; + rollup(oldIndexName, rollupIndexName, config); +// assertRollupIndex(config); + } + private RollupActionDateHistogramGroupConfig randomRollupActionDateHistogramGroupConfig(String field) { RollupActionDateHistogramGroupConfig randomConfig = ConfigTestHelpers.randomRollupActionDateHistogramGroupConfig(random()); if (randomConfig instanceof RollupActionDateHistogramGroupConfig.FixedInterval) { @@ -244,10 +294,14 @@ private String randomDateForInterval(DateHistogramInterval interval) { } private void bulkIndex(SourceSupplier sourceSupplier) throws IOException { + bulkIndex(index, sourceSupplier); + } + + private void bulkIndex(String indexName, SourceSupplier sourceSupplier) throws IOException { BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (int i = 0; i < docCount; i++) { - IndexRequest indexRequest = new IndexRequest(index); + IndexRequest indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE); XContentBuilder source = sourceSupplier.get(); indexRequest.source(source); bulkRequestBuilder.add(indexRequest); @@ -256,15 +310,25 @@ private void bulkIndex(SourceSupplier sourceSupplier) throws IOException { if (bulkResponse.hasFailures()) { fail("Failed to index data: " + bulkResponse.buildFailureMessage()); } - assertHitCount(client().prepareSearch(index).setSize(0).get(), docCount); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), docCount); } private void rollup(RollupActionConfig config) { + rollup(index, rollupIndex,config); + } + + private void rollup(String sourceIndex, String rollupIndex, RollupActionConfig config) { AcknowledgedResponse rollupResponse = client().execute(RollupAction.INSTANCE, - new RollupAction.Request(index, rollupIndex, config)).actionGet(); + new RollupAction.Request(sourceIndex, rollupIndex, config)).actionGet(); assertTrue(rollupResponse.isAcknowledged()); } + private RolloverResponse rollover(String dataStreamName) throws ExecutionException, InterruptedException { + RolloverResponse response = client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).get(); + assertTrue(response.isAcknowledged()); + return response; + } + private void assertRollupIndex(RollupActionConfig config) { // TODO(talevy): assert mapping // TODO(talevy): assert settings @@ -359,5 +423,49 @@ private CompositeAggregationBuilder buildCompositeAggs(String name, RollupAction public interface SourceSupplier { XContentBuilder get() throws IOException; } + + private String createDataStream(boolean hidden) throws Exception { + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault()); + Template idxTemplate = new Template( + null, + new CompressedXContent("{\"properties\":{\"" + timestampFieldName + "\":{\"type\":\"date\"},\"data\":{\"type\":\"keyword\"}}}"), + null + ); + ComposableIndexTemplate template = new ComposableIndexTemplate( + List.of(dataStreamName + "*"), + idxTemplate, + null, + null, + null, + null, + new ComposableIndexTemplate.DataStreamTemplate(hidden), + null + ); + assertTrue( + client().execute( + PutComposableIndexTemplateAction.INSTANCE, + new PutComposableIndexTemplateAction.Request(dataStreamName + "_template").indexTemplate(template) + ).actionGet().isAcknowledged() + ); + assertTrue( + client().execute(CreateDataStreamAction.INSTANCE, new CreateDataStreamAction.Request(dataStreamName)).get().isAcknowledged() + ); + createdDataStreams.add(dataStreamName); + return dataStreamName; + } + + private void deleteDataStream(String dataStreamName) throws InterruptedException, java.util.concurrent.ExecutionException { + assertTrue( + client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { dataStreamName })) + .get() + .isAcknowledged() + ); + assertTrue( + client().execute( + DeleteComposableIndexTemplateAction.INSTANCE, + new DeleteComposableIndexTemplateAction.Request(dataStreamName + "_template") + ).actionGet().isAcknowledged() + ); + } }