diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index 31260a2f348c6..739d94fbb545a 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -12,11 +12,14 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; @@ -151,6 +154,18 @@ public interface DocWriteRequest extends IndicesRequest, Accountable { */ int route(IndexRouting indexRouting); + /** + * Resolves the write index that should receive this request + * based on the provided index abstraction. + * + * @param ia The provided index abstraction + * @param metadata The metadata instance used to resolve the write index. + * @return the write index that should receive this request + */ + default Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { + return ia.getWriteIndex(); + } + /** * Requested operation type to perform on the document */ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 10263f622cbf4..ee40476e94b79 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -77,7 +77,6 @@ import java.util.function.LongSupplier; import java.util.stream.Collectors; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -509,19 +508,23 @@ protected void doRun() { if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) { continue; } - if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices)) { + if (addFailureIfIndexCannotBeCreated(docWriteRequest, i)) { continue; } - Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); + IndexAbstraction ia = null; + boolean includeDataStreams = docWriteRequest.opType() == DocWriteRequest.OpType.CREATE; try { + ia = concreteIndices.resolveIfAbsent(docWriteRequest); + if (ia.isDataStreamRelated() && includeDataStreams == false) { + throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); + } // The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether // an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so // the validation needs to be performed here too. - IndexAbstraction indexAbstraction = clusterState.getMetadata().getIndicesLookup().get(concreteIndex.getName()); - if (indexAbstraction.getParentDataStream() != null && + if (ia.getParentDataStream() != null && // avoid valid cases when directly indexing into a backing index // (for example when directly indexing into .ds-logs-foobar-000001) - concreteIndex.getName().equals(docWriteRequest.index()) == false + ia.getName().equals(docWriteRequest.index()) == false && docWriteRequest.opType() != DocWriteRequest.OpType.CREATE) { throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); } @@ -531,6 +534,10 @@ protected void doRun() { docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); docWriteRequest.process(); + final Index concreteIndex = docWriteRequest.getConcreteWriteIndex(ia, metadata); + if (addFailureIfIndexIsClosed(docWriteRequest, concreteIndex, i, metadata)) { + continue; + } IndexRouting indexRouting = concreteIndices.routing(concreteIndex); int shardId = docWriteRequest.route(indexRouting); List shardRequests = requestsByShard.computeIfAbsent( @@ -538,8 +545,9 @@ protected void doRun() { shard -> new ArrayList<>() ); shardRequests.add(new BulkItemRequest(i, docWriteRequest)); - } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e); + } catch (ElasticsearchParseException | IllegalArgumentException | IndexNotFoundException | RoutingMissingException e) { + String name = ia != null ? ia.getName() : docWriteRequest.index(); + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(name, docWriteRequest.id(), e); BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure); responses.set(i, bulkItemResponse); // make sure the request gets never processed again @@ -676,20 +684,21 @@ private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest re return false; } - private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, int idx, final ConcreteIndices concreteIndices) { + private boolean addFailureIfIndexIsClosed(DocWriteRequest request, Index concreteIndex, int idx, final Metadata metadata) { + IndexMetadata indexMetadata = metadata.getIndexSafe(concreteIndex); + if (indexMetadata.getState() == IndexMetadata.State.CLOSE) { + addFailure(request, idx, new IndexClosedException(concreteIndex)); + return true; + } + return false; + } + + private boolean addFailureIfIndexCannotBeCreated(DocWriteRequest request, int idx) { IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index()); if (cannotCreate != null) { addFailure(request, idx, cannotCreate); return true; } - try { - assert request.indicesOptions().forbidClosedIndices() : "only open indices can be resolved"; - Index concreteIndex = concreteIndices.resolveIfAbsent(request); - assert concreteIndex != null; - } catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) { - addFailure(request, idx, ex); - return true; - } return false; } @@ -717,7 +726,7 @@ void executeBulk( private static class ConcreteIndices { private final ClusterState state; private final IndexNameExpressionResolver indexNameExpressionResolver; - private final Map indices = new HashMap<>(); + private final Map indexAbstractions = new HashMap<>(); private final Map routings = new HashMap<>(); ConcreteIndices(ClusterState state, IndexNameExpressionResolver indexNameExpressionResolver) { @@ -725,28 +734,11 @@ private static class ConcreteIndices { this.indexNameExpressionResolver = indexNameExpressionResolver; } - Index resolveIfAbsent(DocWriteRequest request) { - Index concreteIndex = indices.get(request.index()); - if (concreteIndex == null) { - boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE; - try { - concreteIndex = indexNameExpressionResolver.concreteWriteIndex( - state, - request.indicesOptions(), - request.indices()[0], - false, - includeDataStreams - ); - } catch (IndexNotFoundException e) { - if (includeDataStreams == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) { - throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); - } else { - throw e; - } - } - indices.put(request.index(), concreteIndex); - } - return concreteIndex; + IndexAbstraction resolveIfAbsent(DocWriteRequest request) { + return indexAbstractions.computeIfAbsent( + request.index(), + key -> indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request) + ); } IndexRouting routing(Index index) { diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 84410ae870be0..0f3b7d755976c 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -18,6 +18,8 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.client.internal.Requests; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -28,6 +30,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; @@ -718,6 +721,11 @@ public boolean isRequireAlias() { return requireAlias; } + @Override + public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { + return ia.getWriteIndex(this, metadata); + } + @Override public int route(IndexRouting indexRouting) { assert id != null : "route must be called after process"; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index dd56212398926..647e050fba1f7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -22,6 +22,8 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -29,6 +31,7 @@ import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -38,6 +41,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -171,6 +175,38 @@ public Index getWriteIndex() { return indices.get(indices.size() - 1); } + /** + * @param timestamp The timestamp used to select a backing index based on its start and end time. + * @param metadata The metadata that is used to fetch the start and end times for backing indices of this data stream. + * @return a backing index with a start time that is greater or equal to the provided timestamp and + * an end time that is less than the provided timestamp. Otherwise null is returned. + */ + public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) { + for (int i = indices.size() - 1; i >= 0; i--) { + Index index = indices.get(i); + IndexMetadata im = metadata.index(index); + + // TODO: make start and end time fields in IndexMetadata class. + // (this to avoid the overhead that occurs when reading a setting) + Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings()); + Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); + // Check should be in sync with DataStreamTimestampFieldMapper#validateTimestamp(...) method + if (timestamp.compareTo(start) >= 0 && timestamp.compareTo(end) < 0) { + return index; + } + } + return null; + } + + public boolean isTimeSeries(Function indices) { + return isTimeSeries(indices.apply(getWriteIndex())); + } + + public boolean isTimeSeries(IndexMetadata indexMetadata) { + IndexMode indexMode = IndexSettings.MODE.get(indexMetadata.getSettings()); + return indexMode == IndexMode.TIME_SERIES; + } + @Nullable public Map getMetadata() { return metadata; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index 27eac5cb2ba8f..cb9489b7745c1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -7,16 +7,30 @@ */ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; - +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.xcontent.XContent; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; + +import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + /** * An index abstraction is a reference to one or more concrete indices. * An index abstraction has a unique name and encapsulates all the {@link IndexMetadata} instances it is pointing to. @@ -51,6 +65,10 @@ public interface IndexAbstraction { @Nullable Index getWriteIndex(); + default Index getWriteIndex(IndexRequest request, Metadata metadata) { + return getWriteIndex(); + } + /** * @return the data stream to which this index belongs or null if this is not a concrete index or * if it is a concrete index that does not belong to a data stream. @@ -384,6 +402,11 @@ public int hashCode() { class DataStream implements IndexAbstraction { + public static final XContentParserConfiguration TS_EXTRACT_CONFIG = XContentParserConfiguration.EMPTY.withFiltering( + Set.of("@timestamp"), + null + ); + private final org.elasticsearch.cluster.metadata.DataStream dataStream; private final List referencedByDataStreamAliases; @@ -411,6 +434,76 @@ public Index getWriteIndex() { return dataStream.getWriteIndex(); } + @Override + public Index getWriteIndex(IndexRequest request, Metadata metadata) { + if (request.opType() != DocWriteRequest.OpType.CREATE) { + return getWriteIndex(); + } + + if (getType() != IndexAbstraction.Type.DATA_STREAM) { + return getWriteIndex(); + } + + if (dataStream.isTimeSeries(metadata::index) == false) { + return getWriteIndex(); + } + + Instant timestamp; + XContent xContent = request.getContentType().xContent(); + try (XContentParser parser = xContent.createParser(TS_EXTRACT_CONFIG, request.source().streamInput())) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser); + switch (parser.nextToken()) { + case VALUE_STRING: + // TODO: deal with nanos too here. + // (the index hasn't been resolved yet, keep track of timestamp field metadata at data stream level, + // so we can use it here) + timestamp = Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text())); + break; + case VALUE_NUMBER: + timestamp = Instant.ofEpochMilli(parser.longValue()); + break; + default: + throw new ParsingException( + parser.getTokenLocation(), + String.format( + Locale.ROOT, + "Failed to parse object: expecting token of type [%s] or [%s] but found [%s]", + XContentParser.Token.VALUE_STRING, + XContentParser.Token.VALUE_NUMBER, + parser.currentToken() + ) + ); + } + } catch (IOException e) { + throw new IllegalArgumentException("Error extracting timestamp: " + e.getMessage(), e); + } + Index result = dataStream.selectTimeSeriesWriteIndex(timestamp, metadata); + if (result == null) { + String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp); + String writeableIndicesString = dataStream.getIndices() + .stream() + .map(metadata::index) + .map(IndexMetadata::getSettings) + .map( + settings -> "[" + + settings.get(IndexSettings.TIME_SERIES_START_TIME.getKey()) + + "," + + settings.get(IndexSettings.TIME_SERIES_END_TIME.getKey()) + + "]" + ) + .collect(Collectors.joining()); + throw new IllegalArgumentException( + "the document timestamp [" + + timestampAsString + + "] is outside of ranges of currently writable indices [" + + writeableIndicesString + + "]" + ); + } + return result; + } + @Override public DataStream getParentDataStream() { // a data stream cannot have a parent data stream diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index 3179ecb5a5fe5..7ca854d5a2a0f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -11,6 +11,7 @@ import org.apache.lucene.util.automaton.Automaton; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; @@ -205,6 +206,59 @@ public List dataStreamNames(ClusterState state, IndicesOptions options, .collect(Collectors.toList()); } + /** + * Returns {@link IndexAbstraction} instance for the provided write request. This instance isn't fully resolved, + * meaning that {@link IndexAbstraction#getWriteIndex()} should be invoked in order to get concrete write index. + * + * @param state The cluster state + * @param request The provided write request + * @return {@link IndexAbstraction} instance for the provided write request + */ + public IndexAbstraction resolveWriteIndexAbstraction(ClusterState state, DocWriteRequest request) { + boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE && request.includeDataStreams(); + Context context = new Context( + state, + request.indicesOptions(), + false, + false, + includeDataStreams, + true, + getSystemIndexAccessLevel(), + getSystemIndexAccessPredicate(), + getNetNewSystemIndexPredicate() + ); + + List expressions = List.of(request.index()); + for (ExpressionResolver expressionResolver : expressionResolvers) { + expressions = expressionResolver.resolve(context, expressions); + } + + if (expressions.size() == 1) { + IndexAbstraction ia = state.metadata().getIndicesLookup().get(expressions.get(0)); + if (ia == null) { + throw new IndexNotFoundException(expressions.get(0)); + } + if (ia.getType() == IndexAbstraction.Type.ALIAS) { + Index writeIndex = ia.getWriteIndex(); + if (writeIndex == null) { + throw new IllegalArgumentException( + "no write index is defined for alias [" + + ia.getName() + + "]." + + " The write index may be explicitly disabled using is_write_index=false or the alias points to multiple" + + " indices without one being designated as a write index" + ); + } + } + checkSystemIndexAccess(context, Set.of(ia.getWriteIndex())); + return ia; + } else { + throw new IllegalArgumentException( + "unable to return a single target as the provided expression and options got resolved to multiple targets" + ); + } + } + /** * Translates the provided index expression into actual concrete indices, properly deduplicated. * @@ -403,11 +457,11 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } throw infe; } - checkSystemIndexAccess(context, concreteIndices, indexExpressions); + checkSystemIndexAccess(context, concreteIndices); return concreteIndices.toArray(Index.EMPTY_ARRAY); } - private void checkSystemIndexAccess(Context context, Set concreteIndices, String[] originalPatterns) { + private void checkSystemIndexAccess(Context context, Set concreteIndices) { final Metadata metadata = context.getState().metadata(); final Predicate systemIndexAccessPredicate = context.getSystemIndexAccessPredicate().negate(); final List systemIndicesThatShouldNotBeAccessed = concreteIndices.stream() diff --git a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index feceaaf700a69..f314843603fc0 100644 --- a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -12,10 +12,16 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; @@ -26,8 +32,11 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -37,6 +46,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -272,4 +282,122 @@ public void testRejectsEmptyStringPipeline() { assertThat(validate, notNullValue()); assertThat(validate.getMessage(), containsString("pipeline cannot be an empty string")); } + + public void testGetConcreteWriteIndex() { + Instant currentTime = Instant.now(); + Instant start1 = currentTime.minus(6, ChronoUnit.HOURS); + Instant end1 = currentTime.minus(2, ChronoUnit.HOURS); + Instant start2 = currentTime.minus(2, ChronoUnit.HOURS); + Instant end2 = currentTime.plus(2, ChronoUnit.HOURS); + + String tsdbDataStream = "logs_my-app_prod"; + var clusterState = DataStreamTestHelper.getClusterStateWithDataStream( + tsdbDataStream, + List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2)) + ); + var metadata = clusterState.getMetadata(); + + String source = """ + { + "@timestamp": $time + }"""; + { + // Not a create request => resolve to the latest backing index + IndexRequest request = new IndexRequest(tsdbDataStream); + request.source(renderSource(source, start1), XContentType.JSON); + + var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata); + assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(1))); + } + { + // Target is a regular index => resolve to this index only + String indexName = metadata.getIndices().keySet().iterator().next(); + IndexRequest request = new IndexRequest(indexName); + request.source(renderSource(source, randomFrom(start1, end1, start2, end2)), XContentType.JSON); + + var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(indexName), metadata); + assertThat(result.getName(), equalTo(indexName)); + } + { + String regularDataStream = "logs_another-app_prod"; + var backingIndex1 = DataStreamTestHelper.createBackingIndex(regularDataStream, 1).build(); + var backingIndex2 = DataStreamTestHelper.createBackingIndex(regularDataStream, 2).build(); + var metadata2 = Metadata.builder(metadata) + .put(backingIndex1, true) + .put(backingIndex2, true) + .put( + new DataStream( + regularDataStream, + new DataStream.TimestampField("@timestamp"), + List.of(backingIndex1.getIndex(), backingIndex2.getIndex()), + 2, + null + ) + ) + .build(); + // Target is a regular data stream => always resolve to the latest backing index + IndexRequest request = new IndexRequest(regularDataStream); + request.source(renderSource(source, randomFrom(start1, end1, start2, end2)), XContentType.JSON); + + var result = request.getConcreteWriteIndex(metadata2.getIndicesLookup().get(regularDataStream), metadata2); + assertThat(result.getName(), equalTo(backingIndex2.getIndex().getName())); + } + { + // provided timestamp resolves to the first backing index + IndexRequest request = new IndexRequest(tsdbDataStream); + request.opType(DocWriteRequest.OpType.CREATE); + request.source(renderSource(source, start1), XContentType.JSON); + + var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata); + assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(0))); + } + { + // provided timestamp as millis since epoch resolves to the first backing index + IndexRequest request = new IndexRequest(tsdbDataStream); + request.opType(DocWriteRequest.OpType.CREATE); + request.source(source.replace("$time", "" + start1.toEpochMilli()), XContentType.JSON); + + var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata); + assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(0))); + } + { + // provided timestamp resolves to the latest backing index + IndexRequest request = new IndexRequest(tsdbDataStream); + request.opType(DocWriteRequest.OpType.CREATE); + request.source(renderSource(source, start2), XContentType.JSON); + + var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata); + assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(1))); + } + { + // provided timestamp resolves to no index => fail with an exception + IndexRequest request = new IndexRequest(tsdbDataStream); + request.opType(DocWriteRequest.OpType.CREATE); + request.source(renderSource(source, end2), XContentType.JSON); + + var e = expectThrows( + IllegalArgumentException.class, + () -> request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata) + ); + assertThat( + e.getMessage(), + equalTo( + "the document timestamp [$time] is outside of ranges of currently writable indices [[$start1,$end1][$start2,$end2]]" + .replace("$time", formatInstant(end2)) + .replace("$start1", formatInstant(start1)) + .replace("$end1", formatInstant(end1)) + .replace("$start2", formatInstant(start2)) + .replace("$end2", formatInstant(end2)) + ) + ); + } + } + + static String renderSource(String sourceTemplate, Instant instant) { + return sourceTemplate.replace("$time", "\"" + formatInstant(instant) + "\""); + } + + static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 933690a9e6b1f..43f223ed3ea54 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.Writeable; @@ -17,6 +18,8 @@ import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -480,4 +483,37 @@ public void testSnapshotWithAllBackingIndicesRemoved() { postSnapshotDataStream.snapshot(preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList())) ); } + + public void testSelectTimeSeriesWriteIndex() { + Instant currentTime = Instant.now(); + + Instant start1 = currentTime.minus(6, ChronoUnit.HOURS); + Instant end1 = currentTime.minus(2, ChronoUnit.HOURS); + Instant start2 = currentTime.minus(2, ChronoUnit.HOURS); + Instant end2 = currentTime.plus(2, ChronoUnit.HOURS); + + String dataStreamName = "logs_my-app_prod"; + ClusterState clusterState = DataStreamTestHelper.getClusterStateWithDataStream( + dataStreamName, + List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2)) + ); + + DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName); + Index result = dataStream.selectTimeSeriesWriteIndex(currentTime, clusterState.getMetadata()); + assertThat(result, equalTo(dataStream.getIndices().get(1))); + assertThat(result.getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 2, start2.toEpochMilli()))); + + result = dataStream.selectTimeSeriesWriteIndex(currentTime.minus(2, ChronoUnit.HOURS), clusterState.getMetadata()); + assertThat(result, equalTo(dataStream.getIndices().get(1))); + assertThat(result.getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 2, start2.toEpochMilli()))); + + result = dataStream.selectTimeSeriesWriteIndex(currentTime.minus(3, ChronoUnit.HOURS), clusterState.getMetadata()); + assertThat(result, equalTo(dataStream.getIndices().get(0))); + assertThat(result.getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1, start1.toEpochMilli()))); + + result = dataStream.selectTimeSeriesWriteIndex(currentTime.minus(6, ChronoUnit.HOURS), clusterState.getMetadata()); + assertThat(result, equalTo(dataStream.getIndices().get(0))); + assertThat(result.getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1, start1.toEpochMilli()))); + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index 9efc486b8a7d3..18c6244548e9a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; @@ -72,6 +73,8 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class IndexNameExpressionResolverTests extends ESTestCase { @@ -2953,6 +2956,88 @@ public void testRemoteIndex() { } } + public void testResolveWriteIndexAbstraction() { + ClusterState state = DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(new Tuple<>("logs-foobar", 1)), + List.of("my-index") + ); + state = ClusterState.builder(state) + .metadata( + Metadata.builder(state.getMetadata()) + .put(IndexMetadata.builder(state.getMetadata().index("my-index")).putAlias(new AliasMetadata.Builder("my-alias"))) + .build() + ) + .build(); + DocWriteRequest request = new IndexRequest("logs-foobar"); + IndexAbstraction result = indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request); + assertThat(result.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM)); + assertThat(result.getName(), equalTo("logs-foobar")); + + request = new IndexRequest("my-index"); + result = indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request); + assertThat(result.getName(), equalTo("my-index")); + assertThat(result.getType(), equalTo(IndexAbstraction.Type.CONCRETE_INDEX)); + + request = new IndexRequest("my-alias"); + result = indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request); + assertThat(result.getName(), equalTo("my-alias")); + assertThat(result.getType(), equalTo(IndexAbstraction.Type.ALIAS)); + } + + public void testResolveWriteIndexAbstractionNoWriteIndexForAlias() { + ClusterState state1 = DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(new Tuple<>("logs-foobar", 1)), + List.of("my-index", "my-index2") + ); + ClusterState state2 = ClusterState.builder(state1) + .metadata( + Metadata.builder(state1.getMetadata()) + .put(IndexMetadata.builder(state1.getMetadata().index("my-index")).putAlias(new AliasMetadata.Builder("my-alias"))) + .put(IndexMetadata.builder(state1.getMetadata().index("my-index2")).putAlias(new AliasMetadata.Builder("my-alias"))) + .build() + ) + .build(); + + DocWriteRequest request = new IndexRequest("my-alias"); + var e = expectThrows( + IllegalArgumentException.class, + () -> indexNameExpressionResolver.resolveWriteIndexAbstraction(state2, request) + ); + assertThat( + e.getMessage(), + equalTo( + "no write index is defined for alias [my-alias]. The write index may be explicitly disabled using is_write_index=false" + + " or the alias points to multiple indices without one being designated as a write index" + ) + ); + } + + public void testResolveWriteIndexAbstractionMissing() { + ClusterState state = DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(new Tuple<>("logs-foobar", 1)), + List.of("my-index") + ); + DocWriteRequest request = new IndexRequest("logs-my-index"); + expectThrows(IndexNotFoundException.class, () -> indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request)); + } + + public void testResolveWriteIndexAbstractionMultipleMatches() { + ClusterState state = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(), List.of("logs-foo", "logs-bar")); + DocWriteRequest request = mock(DocWriteRequest.class); + when(request.index()).thenReturn("logs-*"); + when(request.indicesOptions()).thenReturn(IndicesOptions.lenientExpandOpen()); + when(request.opType()).thenReturn(DocWriteRequest.OpType.INDEX); + when(request.includeDataStreams()).thenReturn(true); + var e = expectThrows( + IllegalArgumentException.class, + () -> indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request) + ); + assertThat( + e.getMessage(), + equalTo("unable to return a single target as the provided expression and options got resolved to multiple targets") + ); + } + private ClusterState systemIndexTestClusterState() { Metadata.Builder mdBuilder = Metadata.builder() .put(indexBuilder(".ml-meta", SystemIndexDescriptor.DEFAULT_SETTINGS).state(State.OPEN).system(true)) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index de3729f964474..9dd2bccd20be0 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -24,6 +24,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.index.IndexSettingProviders; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MapperBuilderContext; @@ -46,6 +47,7 @@ import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -281,6 +283,35 @@ public static ClusterState getClusterStateWithDataStreams( return ClusterState.builder(new ClusterName("_name")).metadata(builder).build(); } + public static ClusterState getClusterStateWithDataStream(String dataStream, List> timeSlices) { + Metadata.Builder builder = Metadata.builder(); + + List backingIndices = new ArrayList<>(); + int generation = 1; + for (Tuple tuple : timeSlices) { + Instant start = tuple.v1(); + Instant end = tuple.v2(); + Settings settings = Settings.builder() + .put("index.mode", "time_series") + .put("index.routing_path", "uid") + .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(start)) + .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(end)) + .build(); + var im = createIndexMetadata(getDefaultBackingIndexName(dataStream, generation, start.toEpochMilli()), true, settings, 0); + builder.put(im, true); + backingIndices.add(im); + generation++; + } + DataStream ds = new DataStream( + dataStream, + createTimestampField("@timestamp"), + backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()) + ); + builder.put(ds); + + return ClusterState.builder(new ClusterName("_name")).metadata(builder).build(); + } + private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) { Settings.Builder b = Settings.builder() .put(settings) diff --git a/x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/TsdbDataStreamRestIT.java b/x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/TsdbDataStreamRestIT.java index f83a8220c96b4..c1c033890965d 100644 --- a/x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/TsdbDataStreamRestIT.java +++ b/x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/TsdbDataStreamRestIT.java @@ -83,14 +83,30 @@ public class TsdbDataStreamRestIT extends ESRestTestCase { "data_stream": {} }"""; + private static final String DOC = """ + { + "@timestamp": "$time", + "metricset": "pod", + "k8s": { + "pod": { + "name": "dog", + "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", + "ip": "10.10.55.3", + "network": { + "tx": 1434595272, + "rx": 530605511 + } + } + } + } + """; + public void testTsdbDataStreams() throws Exception { // Create a template var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1"); putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE); assertOK(client().performRequest(putComposableIndexTemplateRequest)); - Instant now = Instant.now(); - String nowAsString = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(now); var bulkRequest = new Request("POST", "/k8s/_bulk"); bulkRequest.setJsonEntity( """ @@ -111,7 +127,7 @@ public void testTsdbDataStreams() throws Exception { {"create": {}} {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}} """ - .replace("$now", nowAsString) + .replace("$now", formatInstant(Instant.now())) ); bulkRequest.addParameter("refresh", "true"); assertOK(client().performRequest(bulkRequest)); @@ -132,9 +148,10 @@ public void testTsdbDataStreams() throws Exception { var escapedBackingIndex = firstBackingIndex.replace(".", "\\."); assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s")); assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series")); - assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), notNullValue()); - String endTime = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"); - assertThat(endTime, notNullValue()); + String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"); + assertThat(startTimeFirstBackingIndex, notNullValue()); + String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"); + assertThat(endTimeFirstBackingIndex, notNullValue()); var rolloverRequest = new Request("POST", "/k8s/_rollover"); assertOK(client().performRequest(rolloverRequest)); @@ -150,8 +167,24 @@ public void testTsdbDataStreams() throws Exception { indices = getIndex(secondBackingIndex); escapedBackingIndex = secondBackingIndex.replace(".", "\\."); assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s")); - assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), equalTo(endTime)); - assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), notNullValue()); + String startTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"); + assertThat(startTimeSecondBackingIndex, equalTo(endTimeFirstBackingIndex)); + String endTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"); + assertThat(endTimeSecondBackingIndex, notNullValue()); + + var indexRequest = new Request("POST", "/k8s/_doc"); + Instant time = parseInstant(startTimeFirstBackingIndex); + indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time))); + response = client().performRequest(indexRequest); + assertOK(response); + assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex)); + + indexRequest = new Request("POST", "/k8s/_doc"); + time = parseInstant(endTimeSecondBackingIndex).minusMillis(1); + indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time))); + response = client().performRequest(indexRequest); + assertOK(response); + assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex)); } public void testSimulateTsdbDataStreamTemplate() throws Exception { @@ -220,4 +253,12 @@ private static Instant getEndTime(Map getIndexResponse) throws IOException return Instant.from(DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).parse(val)); } + static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } + + static Instant parseInstant(String input) { + return Instant.from(DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).parse(input)); + } + }