Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,34 +159,34 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
}
""";

private static final String BULK =
"""
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}
""";

public void testTsdbDataStreams() throws Exception {
// Create a template
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
assertOK(client().performRequest(putComposableIndexTemplateRequest));

var bulkRequest = new Request("POST", "/k8s/_bulk");
bulkRequest.setJsonEntity(
"""
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}
"""
.replace("$now", formatInstant(Instant.now()))
);
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
bulkRequest.addParameter("refresh", "true");
assertOK(client().performRequest(bulkRequest));

Expand Down Expand Up @@ -245,6 +245,72 @@ public void testTsdbDataStreams() throws Exception {
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));
}

public void testTsdbDataStreamsNanos() throws Exception {
// Create a template
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE.replace("date", "date_nanos"));
assertOK(client().performRequest(putComposableIndexTemplateRequest));

var bulkRequest = new Request("POST", "/k8s/_bulk");
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstantNanos(Instant.now())));
bulkRequest.addParameter("refresh", "true");
assertOK(client().performRequest(bulkRequest));

var getDataStreamsRequest = new Request("GET", "/_data_stream");
var response = client().performRequest(getDataStreamsRequest);
assertOK(response);
var dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("1"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
assertThat(firstBackingIndex, backingIndexEqualTo("k8s", 1));

var indices = getIndex(firstBackingIndex);
var escapedBackingIndex = firstBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeFirstBackingIndex, notNullValue());
String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeFirstBackingIndex, notNullValue());

var rolloverRequest = new Request("POST", "/k8s/_rollover");
assertOK(client().performRequest(rolloverRequest));

response = client().performRequest(getDataStreamsRequest);
assertOK(response);
dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(2));
String secondBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.1.index_name");
assertThat(secondBackingIndex, backingIndexEqualTo("k8s", 2));

indices = getIndex(secondBackingIndex);
escapedBackingIndex = secondBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
String startTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeSecondBackingIndex, equalTo(endTimeFirstBackingIndex));
String endTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeSecondBackingIndex, notNullValue());

var indexRequest = new Request("POST", "/k8s/_doc");
Instant time = parseInstant(startTimeFirstBackingIndex);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex));

indexRequest = new Request("POST", "/k8s/_doc");
time = parseInstant(endTimeSecondBackingIndex).minusMillis(1);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));
}

public void testSimulateTsdbDataStreamTemplate() throws Exception {
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
Expand Down Expand Up @@ -429,6 +495,10 @@ static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

static String formatInstantNanos(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME_NANOS.getName()).format(instant);
}

static Instant parseInstant(String input) {
return Instant.from(DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).parse(input));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.index.mapper.DateFieldMapper;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Locale;

public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
Expand Down Expand Up @@ -59,8 +60,8 @@ public Settings getAdditionalIndexSettings(
final Instant start;
final Instant end;
if (dataStream == null || migrating) {
start = resolvedAt.minusMillis(lookAheadTime.getMillis());
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
start = resolvedAt.minusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
end = resolvedAt.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
} else {
IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex());
if (currentLatestBackingIndex.getSettings().hasValue(IndexSettings.TIME_SERIES_END_TIME.getKey()) == false) {
Expand All @@ -75,9 +76,9 @@ public Settings getAdditionalIndexSettings(
}
start = IndexSettings.TIME_SERIES_END_TIME.get(currentLatestBackingIndex.getSettings());
if (start.isAfter(resolvedAt)) {
end = start.plusMillis(lookAheadTime.getMillis());
end = start.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
} else {
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
end = resolvedAt.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
}
}
assert start.isBefore(end) : "data stream backing index's start time is not before end time";
Expand Down
Loading