Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;

Expand Down Expand Up @@ -151,6 +154,18 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
*/
int route(IndexRouting indexRouting);

/**
* Resolves the write index that should receive this request
* based on the provided index abstraction.
*
* @param ia The provided index abstraction
* @param metadata The metadata instance used to resolve the write index.
* @return the write index that should receive this request
*/
default Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add javadocs for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added via: efdd833

return ia.getWriteIndex();
}

/**
* Requested operation type to perform on the document
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

Expand Down Expand Up @@ -509,19 +508,23 @@ protected void doRun() {
if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices)) {
if (addFailureIfIndexCannotBeCreated(docWriteRequest, i)) {
continue;
}
Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
IndexAbstraction ia = null;
boolean includeDataStreams = docWriteRequest.opType() == DocWriteRequest.OpType.CREATE;
try {
ia = concreteIndices.resolveIfAbsent(docWriteRequest);
if (ia.isDataStreamRelated() && includeDataStreams == false) {
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
}
// The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether
// an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so
// the validation needs to be performed here too.
IndexAbstraction indexAbstraction = clusterState.getMetadata().getIndicesLookup().get(concreteIndex.getName());
if (indexAbstraction.getParentDataStream() != null &&
if (ia.getParentDataStream() != null &&
// avoid valid cases when directly indexing into a backing index
// (for example when directly indexing into .ds-logs-foobar-000001)
concreteIndex.getName().equals(docWriteRequest.index()) == false
ia.getName().equals(docWriteRequest.index()) == false
&& docWriteRequest.opType() != DocWriteRequest.OpType.CREATE) {
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
}
Expand All @@ -531,15 +534,20 @@ protected void doRun() {
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
docWriteRequest.process();

final Index concreteIndex = docWriteRequest.getConcreteWriteIndex(ia, metadata);
if (addFailureIfIndexIsClosed(docWriteRequest, concreteIndex, i, metadata)) {
continue;
}
IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
int shardId = docWriteRequest.route(indexRouting);
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
new ShardId(concreteIndex, shardId),
shard -> new ArrayList<>()
);
shardRequests.add(new BulkItemRequest(i, docWriteRequest));
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e);
} catch (ElasticsearchParseException | IllegalArgumentException | IndexNotFoundException | RoutingMissingException e) {
String name = ia != null ? ia.getName() : docWriteRequest.index();
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(name, docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
Expand Down Expand Up @@ -676,20 +684,21 @@ private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest<?> re
return false;
}

private boolean addFailureIfIndexIsUnavailable(DocWriteRequest<?> request, int idx, final ConcreteIndices concreteIndices) {
private boolean addFailureIfIndexIsClosed(DocWriteRequest<?> request, Index concreteIndex, int idx, final Metadata metadata) {
IndexMetadata indexMetadata = metadata.getIndexSafe(concreteIndex);
if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
addFailure(request, idx, new IndexClosedException(concreteIndex));
return true;
}
return false;
}

private boolean addFailureIfIndexCannotBeCreated(DocWriteRequest<?> request, int idx) {
IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index());
if (cannotCreate != null) {
addFailure(request, idx, cannotCreate);
return true;
}
try {
assert request.indicesOptions().forbidClosedIndices() : "only open indices can be resolved";
Index concreteIndex = concreteIndices.resolveIfAbsent(request);
assert concreteIndex != null;
} catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
addFailure(request, idx, ex);
return true;
}
return false;
}

Expand Down Expand Up @@ -717,36 +726,19 @@ void executeBulk(
private static class ConcreteIndices {
private final ClusterState state;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Map<String, Index> indices = new HashMap<>();
private final Map<String, IndexAbstraction> indexAbstractions = new HashMap<>();
private final Map<Index, IndexRouting> routings = new HashMap<>();

ConcreteIndices(ClusterState state, IndexNameExpressionResolver indexNameExpressionResolver) {
this.state = state;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}

Index resolveIfAbsent(DocWriteRequest<?> request) {
Index concreteIndex = indices.get(request.index());
if (concreteIndex == null) {
boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE;
try {
concreteIndex = indexNameExpressionResolver.concreteWriteIndex(
state,
request.indicesOptions(),
request.indices()[0],
false,
includeDataStreams
);
} catch (IndexNotFoundException e) {
if (includeDataStreams == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
} else {
throw e;
}
}
indices.put(request.index(), concreteIndex);
}
return concreteIndex;
IndexAbstraction resolveIfAbsent(DocWriteRequest<?> request) {
return indexAbstractions.computeIfAbsent(
request.index(),
key -> indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request)
);
}

IndexRouting routing(Index index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
Expand All @@ -28,6 +30,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -718,6 +721,11 @@ public boolean isRequireAlias() {
return requireAlias;
}

@Override
public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like a bit too much logic for a data class, that the IndexRequest is essentially is. I wonder if it makes more sense as a part of IndexAbstraction instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented via: f97d889

return ia.getWriteIndex(this, metadata);
}

@Override
public int route(IndexRouting indexRouting) {
assert id != null : "route must be called after process";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -38,6 +41,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -171,6 +175,38 @@ public Index getWriteIndex() {
return indices.get(indices.size() - 1);
}

/**
* @param timestamp The timestamp used to select a backing index based on its start and end time.
* @param metadata The metadata that is used to fetch the start and end times for backing indices of this data stream.
* @return a backing index with a start time that is greater or equal to the provided timestamp and
* an end time that is less than the provided timestamp. Otherwise <code>null</code> is returned.
*/
public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) {
for (int i = indices.size() - 1; i >= 0; i--) {
Index index = indices.get(i);
IndexMetadata im = metadata.index(index);

// TODO: make start and end time fields in IndexMetadata class.
// (this to avoid the overhead that occurs when reading a setting)
Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
// Check should be in sync with DataStreamTimestampFieldMapper#validateTimestamp(...) method
if (timestamp.compareTo(start) >= 0 && timestamp.compareTo(end) < 0) {
return index;
}
}
return null;
}

public boolean isTimeSeries(Function<Index, IndexMetadata> indices) {
return isTimeSeries(indices.apply(getWriteIndex()));
}

public boolean isTimeSeries(IndexMetadata indexMetadata) {
IndexMode indexMode = IndexSettings.MODE.get(indexMetadata.getSettings());
return indexMode == IndexMode.TIME_SERIES;
}

@Nullable
public Map<String, Object> getMetadata() {
return metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,30 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xcontent.XContent;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* An index abstraction is a reference to one or more concrete indices.
* An index abstraction has a unique name and encapsulates all the {@link IndexMetadata} instances it is pointing to.
Expand Down Expand Up @@ -51,6 +65,10 @@ public interface IndexAbstraction {
@Nullable
Index getWriteIndex();

default Index getWriteIndex(IndexRequest request, Metadata metadata) {
return getWriteIndex();
}

/**
* @return the data stream to which this index belongs or <code>null</code> if this is not a concrete index or
* if it is a concrete index that does not belong to a data stream.
Expand Down Expand Up @@ -384,6 +402,11 @@ public int hashCode() {

class DataStream implements IndexAbstraction {

public static final XContentParserConfiguration TS_EXTRACT_CONFIG = XContentParserConfiguration.EMPTY.withFiltering(
Set.of("@timestamp"),
null
);

private final org.elasticsearch.cluster.metadata.DataStream dataStream;
private final List<String> referencedByDataStreamAliases;

Expand Down Expand Up @@ -411,6 +434,76 @@ public Index getWriteIndex() {
return dataStream.getWriteIndex();
}

@Override
public Index getWriteIndex(IndexRequest request, Metadata metadata) {
if (request.opType() != DocWriteRequest.OpType.CREATE) {
return getWriteIndex();
}

if (getType() != IndexAbstraction.Type.DATA_STREAM) {
return getWriteIndex();
}

if (dataStream.isTimeSeries(metadata::index) == false) {
return getWriteIndex();
}

Instant timestamp;
XContent xContent = request.getContentType().xContent();
try (XContentParser parser = xContent.createParser(TS_EXTRACT_CONFIG, request.source().streamInput())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
switch (parser.nextToken()) {
case VALUE_STRING:
// TODO: deal with nanos too here.
// (the index hasn't been resolved yet, keep track of timestamp field metadata at data stream level,
// so we can use it here)
timestamp = Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text()));
break;
case VALUE_NUMBER:
timestamp = Instant.ofEpochMilli(parser.longValue());
break;
default:
throw new ParsingException(
parser.getTokenLocation(),
String.format(
Locale.ROOT,
"Failed to parse object: expecting token of type [%s] or [%s] but found [%s]",
XContentParser.Token.VALUE_STRING,
XContentParser.Token.VALUE_NUMBER,
parser.currentToken()
)
);
}
} catch (IOException e) {
throw new IllegalArgumentException("Error extracting timestamp: " + e.getMessage(), e);
}
Index result = dataStream.selectTimeSeriesWriteIndex(timestamp, metadata);
if (result == null) {
String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp);
String writeableIndicesString = dataStream.getIndices()
.stream()
.map(metadata::index)
.map(IndexMetadata::getSettings)
.map(
settings -> "["
+ settings.get(IndexSettings.TIME_SERIES_START_TIME.getKey())
+ ","
+ settings.get(IndexSettings.TIME_SERIES_END_TIME.getKey())
+ "]"
)
.collect(Collectors.joining());
throw new IllegalArgumentException(
"the document timestamp ["
+ timestampAsString
+ "] is outside of ranges of currently writable indices ["
+ writeableIndicesString
+ "]"
);
}
return result;
}

@Override
public DataStream getParentDataStream() {
// a data stream cannot have a parent data stream
Expand Down
Loading