From 75126f7c18f10c994f478e42fbea4339f419ee2e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 7 Mar 2022 13:28:19 +0100 Subject: [PATCH 1/6] TSDB: Truncate timestamps for routing purposes only Truncate timestamp to second precision for resolving data stream to backing index only. Also add support for parsing timestamps in date nanos format. Closes #83517 --- .../datastreams/TsdbDataStreamRestIT.java | 90 +++++++++++++++++++ .../DataStreamIndexSettingsProvider.java | 9 +- .../cluster/metadata/IndexAbstraction.java | 11 ++- 3 files changed, 102 insertions(+), 8 deletions(-) diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java index d6b81f4877b5b..411d8ddb6fda4 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java @@ -243,6 +243,92 @@ public void testTsdbDataStreams() throws Exception { assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex)); } + public void testTsdbDataStreamsNanos() throws Exception { + // Create a template + var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1"); + putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE.replace("date", "date_nanos")); + assertOK(client().performRequest(putComposableIndexTemplateRequest)); + + var bulkRequest = new Request("POST", "/k8s/_bulk"); + bulkRequest.setJsonEntity( + """ + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}} + """ + .replace("$now", formatInstantNanos(Instant.now())) + ); + bulkRequest.addParameter("refresh", "true"); + assertOK(client().performRequest(bulkRequest)); + + var getDataStreamsRequest = new Request("GET", "/_data_stream"); + var response = client().performRequest(getDataStreamsRequest); + assertOK(response); + var dataStreams = entityAsMap(response); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s")); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("1")); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1)); + String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name"); + assertThat(firstBackingIndex, backingIndexEqualTo("k8s", 1)); + + var indices = getIndex(firstBackingIndex); + var escapedBackingIndex = firstBackingIndex.replace(".", "\\."); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s")); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series")); + String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"); + assertThat(startTimeFirstBackingIndex, notNullValue()); + String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"); + assertThat(endTimeFirstBackingIndex, notNullValue()); + + var rolloverRequest = new Request("POST", "/k8s/_rollover"); + assertOK(client().performRequest(rolloverRequest)); + + response = client().performRequest(getDataStreamsRequest); + assertOK(response); + dataStreams = entityAsMap(response); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s")); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(2)); + String secondBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.1.index_name"); + assertThat(secondBackingIndex, backingIndexEqualTo("k8s", 2)); + + indices = getIndex(secondBackingIndex); + escapedBackingIndex = secondBackingIndex.replace(".", "\\."); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s")); + String startTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"); + assertThat(startTimeSecondBackingIndex, equalTo(endTimeFirstBackingIndex)); + String endTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"); + assertThat(endTimeSecondBackingIndex, notNullValue()); + + var indexRequest = new Request("POST", "/k8s/_doc"); + Instant time = parseInstant(startTimeFirstBackingIndex); + indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time))); + response = client().performRequest(indexRequest); + assertOK(response); + assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex)); + + indexRequest = new Request("POST", "/k8s/_doc"); + time = parseInstant(endTimeSecondBackingIndex).minusMillis(1); + indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time))); + response = client().performRequest(indexRequest); + assertOK(response); + assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex)); + } + public void testSimulateTsdbDataStreamTemplate() throws Exception { var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1"); putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE); @@ -420,6 +506,10 @@ static String formatInstant(Instant instant) { return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); } + static String formatInstantNanos(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME_NANOS.getName()).format(instant); + } + static Instant parseInstant(String input) { return Instant.from(DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).parse(input)); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java index 531dcc1ac9a19..4fcf032ec74da 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java @@ -19,6 +19,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Locale; public class DataStreamIndexSettingsProvider implements IndexSettingProvider { @@ -59,8 +60,8 @@ public Settings getAdditionalIndexSettings( final Instant start; final Instant end; if (dataStream == null || migrating) { - start = resolvedAt.minusMillis(lookAheadTime.getMillis()); - end = resolvedAt.plusMillis(lookAheadTime.getMillis()); + start = resolvedAt.minusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS); + end = resolvedAt.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS); } else { IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex()); if (currentLatestBackingIndex.getSettings().hasValue(IndexSettings.TIME_SERIES_END_TIME.getKey()) == false) { @@ -75,9 +76,9 @@ public Settings getAdditionalIndexSettings( } start = IndexSettings.TIME_SERIES_END_TIME.get(currentLatestBackingIndex.getSettings()); if (start.isAfter(resolvedAt)) { - end = start.plusMillis(lookAheadTime.getMillis()); + end = start.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS); } else { - end = resolvedAt.plusMillis(lookAheadTime.getMillis()); + end = resolvedAt.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS); } } assert start.isBefore(end) : "data stream backing index's start time is not before end time"; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index 877b666646fef..354c396808c43 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.DataStream.TimestampField; import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; @@ -22,6 +23,7 @@ import org.elasticsearch.xcontent.XContentParserConfiguration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -338,6 +340,9 @@ class DataStream implements IndexAbstraction { false ); + public static final DateFormatter TIMESTAMP_FORMATTER = + DateFormatter.forPattern("strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis"); + private final org.elasticsearch.cluster.metadata.DataStream dataStream; private final List referencedByDataStreamAliases; @@ -376,15 +381,12 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) { } Instant timestamp; - final var formatter = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; + final var formatter = TIMESTAMP_FORMATTER; XContent xContent = request.getContentType().xContent(); try (XContentParser parser = xContent.createParser(TS_EXTRACT_CONFIG, request.source().streamInput())) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser); switch (parser.nextToken()) { - // TODO: deal with nanos too here. - // (the index hasn't been resolved yet, keep track of timestamp field metadata at data stream level, - // so we can use it here) case VALUE_STRING -> timestamp = DateFormatters.from(formatter.parse(parser.text()), formatter.locale()).toInstant(); case VALUE_NUMBER -> timestamp = Instant.ofEpochMilli(parser.longValue()); default -> throw new ParsingException( @@ -401,6 +403,7 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) { } catch (Exception e) { throw new IllegalArgumentException("Error extracting data stream timestamp field: " + e.getMessage(), e); } + timestamp = timestamp.truncatedTo(ChronoUnit.SECONDS); Index result = dataStream.selectTimeSeriesWriteIndex(timestamp, metadata); if (result == null) { String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp); From 1227ac1336297c41fbebdfbfd624a1e99169d2db Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 7 Mar 2022 13:51:31 +0100 Subject: [PATCH 2/6] adjust tests --- .../datastreams/DataStreamIndexSettingsProviderTests.java | 6 +++--- .../datastreams/MetadataDataStreamRolloverServiceTests.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java index b936b9d9a5341..aacd97ecb20ef 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java @@ -32,7 +32,7 @@ public void testGetAdditionalIndexSettings() { Metadata metadata = Metadata.EMPTY_METADATA; String dataStreamName = "logs-app1"; - Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); TimeValue lookAheadTime = TimeValue.timeValueHours(2); // default Settings settings = Settings.EMPTY; var provider = new DataStreamIndexSettingsProvider(); @@ -54,7 +54,7 @@ public void testGetAdditionalIndexSettingsLookAheadTime() { Metadata metadata = Metadata.EMPTY_METADATA; String dataStreamName = "logs-app1"; - Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); TimeValue lookAheadTime = TimeValue.timeValueMinutes(30); Settings settings = builder().put("index.mode", "time_series").put("index.look_ahead_time", lookAheadTime.getStringRep()).build(); var provider = new DataStreamIndexSettingsProvider(); @@ -76,7 +76,7 @@ public void testGetAdditionalIndexSettingsDataStreamAlreadyCreated() { String dataStreamName = "logs-app1"; TimeValue lookAheadTime = TimeValue.timeValueHours(2); - Instant sixHoursAgo = Instant.now().minus(6, ChronoUnit.HOURS).truncatedTo(ChronoUnit.MILLIS); + Instant sixHoursAgo = Instant.now().minus(6, ChronoUnit.HOURS).truncatedTo(ChronoUnit.SECONDS); Instant currentEnd = sixHoursAgo.plusMillis(lookAheadTime.getMillis()); Metadata metadata = DataStreamTestHelper.getClusterStateWithDataStream( dataStreamName, diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java index d7d2652481534..2c9a35a2f6fb5 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java @@ -147,7 +147,7 @@ public void testRolloverClusterStateForDataStream() throws Exception { } public void testRolloverAndMigrateDataStream() throws Exception { - Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); String dataStreamName = "logs-my-app"; IndexMode dsIndexMode = randomBoolean() ? null : IndexMode.STANDARD; final DataStream dataStream = new DataStream( @@ -233,7 +233,7 @@ public void testRolloverAndMigrateDataStream() throws Exception { } public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExistingDataStreams() throws Exception { - Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); String dataStreamName = "logs-my-app"; final DataStream dataStream = new DataStream( dataStreamName, From 1c24235b0dd4698d0e74259934d1b4ca6ebd38eb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 7 Mar 2022 14:27:29 +0100 Subject: [PATCH 3/6] spotless --- .../org/elasticsearch/cluster/metadata/IndexAbstraction.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index 354c396808c43..b8be723035666 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -340,8 +340,9 @@ class DataStream implements IndexAbstraction { false ); - public static final DateFormatter TIMESTAMP_FORMATTER = - DateFormatter.forPattern("strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis"); + public static final DateFormatter TIMESTAMP_FORMATTER = DateFormatter.forPattern( + "strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis" + ); private final org.elasticsearch.cluster.metadata.DataStream dataStream; private final List referencedByDataStreamAliases; From 56b829f8459deebdb60db44e67906d4190dc40f9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 14 Mar 2022 14:47:49 +0100 Subject: [PATCH 4/6] reuse multi string --- .../datastreams/TsdbDataStreamRestIT.java | 64 +++++++------------ 1 file changed, 22 insertions(+), 42 deletions(-) diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java index a87996c1b0572..b717a9c36bd32 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java @@ -159,6 +159,26 @@ public class TsdbDataStreamRestIT extends ESRestTestCase { } """; + private static final String BULK = + """ + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}} + """; + public void testTsdbDataStreams() throws Exception { // Create a template var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1"); @@ -166,27 +186,7 @@ public void testTsdbDataStreams() throws Exception { assertOK(client().performRequest(putComposableIndexTemplateRequest)); var bulkRequest = new Request("POST", "/k8s/_bulk"); - bulkRequest.setJsonEntity( - """ - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}} - """ - .replace("$now", formatInstant(Instant.now())) - ); + bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now()))); bulkRequest.addParameter("refresh", "true"); assertOK(client().performRequest(bulkRequest)); @@ -252,27 +252,7 @@ public void testTsdbDataStreamsNanos() throws Exception { assertOK(client().performRequest(putComposableIndexTemplateRequest)); var bulkRequest = new Request("POST", "/k8s/_bulk"); - bulkRequest.setJsonEntity( - """ - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}} - {"create": {}} - {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}} - """ - .replace("$now", formatInstantNanos(Instant.now())) - ); + bulkRequest.setJsonEntity(BULK.replace("$now", formatInstantNanos(Instant.now()))); bulkRequest.addParameter("refresh", "true"); assertOK(client().performRequest(bulkRequest)); From 9520a3243bd520e27d83f8513d773f265509811f Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 15 Mar 2022 14:49:42 +0100 Subject: [PATCH 5/6] added unit tests --- .../DataStreamGetWriteIndexTests.java | 320 ++++++++++++++++++ .../cluster/metadata/DataStream.java | 2 + .../cluster/metadata/IndexAbstraction.java | 2 - .../metadata/DataStreamTestHelper.java | 15 +- 4 files changed, 336 insertions(+), 3 deletions(-) create mode 100644 modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java new file mode 100644 index 0000000000000..67640d652412c --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java @@ -0,0 +1,320 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.Version; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.rollover.Condition; +import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition; +import org.elasticsearch.action.admin.indices.rollover.MetadataRolloverService; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.DateFormatters; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettingProviders; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MapperBuilderContext; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.index.mapper.RootObjectMapper; +import org.elasticsearch.indices.EmptySystemIndices; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.ShardLimitValidator; +import org.elasticsearch.script.ScriptCompiler; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DataStreamGetWriteIndexTests extends ESTestCase { + + public static final DateFormatter MILLIS_FORMATTER = DateFormatter.forPattern("strict_date_optional_time"); + public static final DateFormatter NANOS_FORMATTER = DateFormatter.forPattern("strict_date_optional_time_nanos"); + + private ThreadPool testThreadPool; + private MetadataRolloverService rolloverService; + private MetadataCreateDataStreamService createDataStreamService; + + public void testPickingBackingIndicesPredefinedDates() throws Exception { + Instant time = DateFormatters.from(MILLIS_FORMATTER.parse("2022-03-15T08:29:36.547Z")).toInstant(); + + var state = createInitialState(); + state = createDataStream(state, "logs-myapp", time); + IndexMetadata backingIndex = state.getMetadata().index(".ds-logs-myapp-2022.03.15-000001"); + assertThat(backingIndex, notNullValue()); + // Ensure truncate to seconds: + assertThat(backingIndex.getSettings().get("index.time_series.start_time"), equalTo("2022-03-15T06:29:36.000Z")); + assertThat(backingIndex.getSettings().get("index.time_series.end_time"), equalTo("2022-03-15T10:29:36.000Z")); + + // advance time and rollover: + time = time.plusSeconds(80 * 60); + var result = rolloverOver(state, "logs-myapp", time); + state = result.clusterState(); + + backingIndex = state.getMetadata().index(".ds-logs-myapp-2022.03.15-000002"); + assertThat(backingIndex, notNullValue()); + assertThat(backingIndex.getSettings().get("index.time_series.start_time"), equalTo("2022-03-15T10:29:36.000Z")); + assertThat(backingIndex.getSettings().get("index.time_series.end_time"), equalTo("2022-03-15T12:29:36.000Z")); + + // first backing index: + { + long start = MILLIS_FORMATTER.parseMillis("2022-03-15T06:29:36.000Z"); + long end = MILLIS_FORMATTER.parseMillis("2022-03-15T10:29:36.000Z") - 1; + for (int i = 0; i < 256; i++) { + String timestamp = MILLIS_FORMATTER.formatMillis(randomLongBetween(start, end)); + var writeIndex = getWriteIndex(state, "logs-myapp", timestamp); + assertThat(writeIndex.getName(), equalTo(".ds-logs-myapp-2022.03.15-000001")); + } + } + + // Borderline: + { + var writeIndex = getWriteIndex(state, "logs-myapp", "2022-03-15T10:29:35.999Z"); + assertThat(writeIndex.getName(), equalTo(".ds-logs-myapp-2022.03.15-000001")); + } + + // Second backing index: + { + long start = MILLIS_FORMATTER.parseMillis("2022-03-15T10:29:36.000Z"); + long end = MILLIS_FORMATTER.parseMillis("2022-03-15T12:29:36.000Z") - 1; + for (int i = 0; i < 256; i++) { + String timestamp = MILLIS_FORMATTER.formatMillis(randomLongBetween(start, end)); + var writeIndex = getWriteIndex(state, "logs-myapp", timestamp); + assertThat(writeIndex.getName(), equalTo(".ds-logs-myapp-2022.03.15-000002")); + } + } + + // Borderline (again): + { + var writeIndex = getWriteIndex(state, "logs-myapp", "2022-03-15T12:29:35.999Z"); + assertThat(writeIndex.getName(), equalTo(".ds-logs-myapp-2022.03.15-000002")); + } + + // Outside the valid temporal ranges: + { + var finalState = state; + var e = expectThrows(IllegalArgumentException.class, () -> getWriteIndex(finalState, "logs-myapp", "2022-03-15T12:29:36.000Z")); + assertThat( + e.getMessage(), + equalTo( + "the document timestamp [2022-03-15T12:29:36.000Z] is outside of ranges of currently writable indices [" + + "[2022-03-15T06:29:36.000Z,2022-03-15T10:29:36.000Z][2022-03-15T10:29:36.000Z,2022-03-15T12:29:36.000Z]]" + ) + ); + } + } + + public void testPickingBackingIndicesNanoTimestamp() throws Exception { + Instant time = DateFormatters.from(NANOS_FORMATTER.parse("2022-03-15T08:29:36.123456789Z")).toInstant(); + + var state = createInitialState(); + state = createDataStream(state, "logs-myapp", time); + IndexMetadata backingIndex = state.getMetadata().index(".ds-logs-myapp-2022.03.15-000001"); + assertThat(backingIndex, notNullValue()); + // Ensure truncate to seconds and millis format: + assertThat(backingIndex.getSettings().get("index.time_series.start_time"), equalTo("2022-03-15T06:29:36.000Z")); + assertThat(backingIndex.getSettings().get("index.time_series.end_time"), equalTo("2022-03-15T10:29:36.000Z")); + + // advance time and rollover: + time = time.plusSeconds(80 * 60); + var result = rolloverOver(state, "logs-myapp", time); + state = result.clusterState(); + + backingIndex = state.getMetadata().index(".ds-logs-myapp-2022.03.15-000002"); + assertThat(backingIndex, notNullValue()); + assertThat(backingIndex.getSettings().get("index.time_series.start_time"), equalTo("2022-03-15T10:29:36.000Z")); + assertThat(backingIndex.getSettings().get("index.time_series.end_time"), equalTo("2022-03-15T12:29:36.000Z")); + + // first backing index: + { + long start = NANOS_FORMATTER.parseMillis("2022-03-15T06:29:36.000000000Z"); + long end = NANOS_FORMATTER.parseMillis("2022-03-15T10:29:36.000000000Z") - 1; + for (int i = 0; i < 256; i++) { + String timestamp = NANOS_FORMATTER.formatMillis(randomLongBetween(start, end)); + var writeIndex = getWriteIndex(state, "logs-myapp", timestamp); + assertThat(writeIndex.getName(), equalTo(".ds-logs-myapp-2022.03.15-000001")); + } + } + + // Borderline: + { + var writeIndex = getWriteIndex(state, "logs-myapp", "2022-03-15T10:29:35.999999999Z"); + assertThat(writeIndex.getName(), equalTo(".ds-logs-myapp-2022.03.15-000001")); + } + + // Second backing index: + { + long start = NANOS_FORMATTER.parseMillis("2022-03-15T10:29:36.000000000Z"); + long end = NANOS_FORMATTER.parseMillis("2022-03-15T12:29:36.000000000Z") - 1; + for (int i = 0; i < 256; i++) { + String timestamp = NANOS_FORMATTER.formatMillis(randomLongBetween(start, end)); + var writeIndex = getWriteIndex(state, "logs-myapp", timestamp); + assertThat(writeIndex.getName(), equalTo(".ds-logs-myapp-2022.03.15-000002")); + } + } + + // Borderline (again): + { + var writeIndex = getWriteIndex(state, "logs-myapp", "2022-03-15T12:29:35.999999999Z"); + assertThat(writeIndex.getName(), equalTo(".ds-logs-myapp-2022.03.15-000002")); + } + } + + @Before + public void setup() throws Exception { + testThreadPool = new TestThreadPool(getTestName()); + ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool); + + IndicesService indicesService; + { + DateFieldMapper dateFieldMapper = new DateFieldMapper.Builder( + "@timestamp", + DateFieldMapper.Resolution.MILLISECONDS, + null, + ScriptCompiler.NONE, + false, + Version.CURRENT + ).build(MapperBuilderContext.ROOT); + RootObjectMapper.Builder root = new RootObjectMapper.Builder("_doc"); + root.add( + new DateFieldMapper.Builder( + "@timestamp", + DateFieldMapper.Resolution.MILLISECONDS, + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER, + ScriptCompiler.NONE, + true, + Version.CURRENT + ) + ); + MetadataFieldMapper dtfm = DataStreamTestHelper.getDataStreamTimestampFieldMapper(); + Mapping mapping = new Mapping( + root.build(MapperBuilderContext.ROOT), + new MetadataFieldMapper[] { dtfm }, + Collections.emptyMap() + ); + MappingLookup mappingLookup = MappingLookup.fromMappers(mapping, List.of(dtfm, dateFieldMapper), List.of(), List.of()); + indicesService = DataStreamTestHelper.mockIndicesServices(mappingLookup); + } + + MetadataCreateIndexService createIndexService; + { + Environment env = mock(Environment.class); + when(env.sharedDataFile()).thenReturn(null); + AllocationService allocationService = mock(AllocationService.class); + when(allocationService.reroute(any(ClusterState.class), any(String.class))).then(i -> i.getArguments()[0]); + ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService); + createIndexService = new MetadataCreateIndexService( + Settings.EMPTY, + clusterService, + indicesService, + allocationService, + shardLimitValidator, + env, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + testThreadPool, + null, + EmptySystemIndices.INSTANCE, + false, + new IndexSettingProviders(Set.of(new DataStreamIndexSettingsProvider())) + ); + } + { + MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService( + clusterService, + indicesService, + null, + xContentRegistry() + ); + rolloverService = new MetadataRolloverService( + testThreadPool, + createIndexService, + indexAliasesService, + EmptySystemIndices.INSTANCE + ); + } + + createDataStreamService = new MetadataCreateDataStreamService(testThreadPool, clusterService, createIndexService); + } + + @After + public void cleanup() { + testThreadPool.shutdownNow(); + } + + private ClusterState createInitialState() { + ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of("logs-*")) + .template(new Template(Settings.builder().put("index.routing_path", "uid").build(), null, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES)) + .build(); + Metadata.Builder builder = Metadata.builder(); + builder.put("template", template); + return ClusterState.builder(ClusterState.EMPTY_STATE).metadata(builder).build(); + } + + private ClusterState createDataStream(ClusterState state, String name, Instant time) throws Exception { + var request = new MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest( + name, + time.toEpochMilli(), + null, + TimeValue.ZERO, + TimeValue.ZERO, + false + ); + return createDataStreamService.createDataStream(request, state); + } + + private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state, String name, Instant time) throws Exception { + MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); + List> metConditions = Collections.singletonList(condition); + CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_"); + return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false); + } + + private Index getWriteIndex(ClusterState state, String name, String timestamp) { + var ia = state.getMetadata().getIndicesLookup().get(name); + assertThat(ia, notNullValue()); + IndexRequest indexRequest = new IndexRequest(name); + indexRequest.opType(DocWriteRequest.OpType.CREATE); + indexRequest.source(Map.of("@timestamp", timestamp)); + return ia.getWriteIndex(indexRequest, state.getMetadata()); + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index d8827f3c24d99..076d87b399d8f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -167,6 +168,7 @@ public Index getWriteIndex() { * an end time that is less than the provided timestamp. Otherwise null is returned. */ public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) { + timestamp = timestamp.truncatedTo(ChronoUnit.SECONDS); for (int i = indices.size() - 1; i >= 0; i--) { Index index = indices.get(i); IndexMetadata im = metadata.index(index); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index b8be723035666..275d14619b9c4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -23,7 +23,6 @@ import org.elasticsearch.xcontent.XContentParserConfiguration; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -404,7 +403,6 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) { } catch (Exception e) { throw new IllegalArgumentException("Error extracting data stream timestamp field: " + e.getMessage(), e); } - timestamp = timestamp.truncatedTo(ChronoUnit.SECONDS); Index result = dataStream.selectTimeSeriesWriteIndex(timestamp, metadata); if (result == null) { String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 70ebe1a6b08a9..95502988eaea4 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperBuilderContext; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -35,6 +36,7 @@ import org.elasticsearch.index.mapper.MappingParserContext; import org.elasticsearch.index.mapper.MetadataFieldMapper; import org.elasticsearch.index.mapper.RootObjectMapper; +import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.indices.IndicesService; @@ -486,7 +488,7 @@ public static MetadataRolloverService getMetadataRolloverService( return new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, EmptySystemIndices.INSTANCE); } - private static MetadataFieldMapper getDataStreamTimestampFieldMapper() { + public static MetadataFieldMapper getDataStreamTimestampFieldMapper() { Map fieldsMapping = new HashMap<>(); fieldsMapping.put("type", DataStreamTimestampFieldMapper.NAME); fieldsMapping.put("enabled", true); @@ -508,6 +510,17 @@ public static IndicesService mockIndicesServices(MappingLookup mappingLookup) th IndexMetadata indexMetadata = (IndexMetadata) invocationOnMock.getArguments()[0]; when(indexService.index()).thenReturn(indexMetadata.getIndex()); MapperService mapperService = mock(MapperService.class); + + RootObjectMapper root = new RootObjectMapper.Builder(MapperService.SINGLE_MAPPING_NAME).build(MapperBuilderContext.ROOT); + Mapping mapping = new Mapping(root, new MetadataFieldMapper[0], null); + DocumentMapper documentMapper = mock(DocumentMapper.class); + when(documentMapper.mapping()).thenReturn(mapping); + when(documentMapper.mappingSource()).thenReturn(mapping.toCompressedXContent()); + RoutingFieldMapper routingFieldMapper = mock(RoutingFieldMapper.class); + when(routingFieldMapper.required()).thenReturn(false); + when(documentMapper.routingFieldMapper()).thenReturn(routingFieldMapper); + + when(mapperService.documentMapper()).thenReturn(documentMapper); when(indexService.mapperService()).thenReturn(mapperService); when(mapperService.mappingLookup()).thenReturn(mappingLookup); when(indexService.getIndexEventListener()).thenReturn(new IndexEventListener() { From f8d06693759c66b88e34e7f352ba434726a8dae5 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 15 Mar 2022 15:12:51 +0100 Subject: [PATCH 6/6] undo unintended change --- .../java/org/elasticsearch/cluster/metadata/DataStream.java | 2 -- .../org/elasticsearch/cluster/metadata/IndexAbstraction.java | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 076d87b399d8f..d8827f3c24d99 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -168,7 +167,6 @@ public Index getWriteIndex() { * an end time that is less than the provided timestamp. Otherwise null is returned. */ public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) { - timestamp = timestamp.truncatedTo(ChronoUnit.SECONDS); for (int i = indices.size() - 1; i >= 0; i--) { Index index = indices.get(i); IndexMetadata im = metadata.index(index); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index 275d14619b9c4..b8be723035666 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -23,6 +23,7 @@ import org.elasticsearch.xcontent.XContentParserConfiguration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -403,6 +404,7 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) { } catch (Exception e) { throw new IllegalArgumentException("Error extracting data stream timestamp field: " + e.getMessage(), e); } + timestamp = timestamp.truncatedTo(ChronoUnit.SECONDS); Index result = dataStream.selectTimeSeriesWriteIndex(timestamp, metadata); if (result == null) { String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp);