Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;

import java.util.Objects;

public interface MappingUpdatePerformer {

/**
Expand All @@ -39,6 +34,6 @@ public interface MappingUpdatePerformer {
* retried on the primary due to the mappings not being present yet, or a different exception if
* updating the mappings on the master failed.
*/
void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception;
void verifyMappings(Mapping update, ShardId shardId) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbersService;
Expand Down Expand Up @@ -150,8 +151,9 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index

private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest deleteRequest,
final BulkItemRequest bulkItemRequest,
final IndexShard primary) throws IOException {
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
final IndexShard primary,
final MappingUpdatePerformer mappingUpdater) throws Exception {
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater);
if (deleteResult.hasFailure()) {
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
} else {
Expand Down Expand Up @@ -241,7 +243,7 @@ static Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSha
requestIndex, updateHelper, nowInMillisSupplier, mappingUpdater);
break;
case DELETE:
responseHolder = executeDeleteRequest((DeleteRequest) itemRequest, request.items()[requestIndex], primary);
responseHolder = executeDeleteRequest((DeleteRequest) itemRequest, request.items()[requestIndex], primary, mappingUpdater);
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}
Expand Down Expand Up @@ -303,7 +305,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
break;
case DELETED:
DeleteRequest deleteRequest = translate.action();
result = executeDeleteRequestOnPrimary(deleteRequest, primary);
result = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater);
break;
case NOOP:
primary.noopUpdate(updateRequest.type());
Expand Down Expand Up @@ -609,7 +611,7 @@ static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, Ind
if (mappingUpdateNeeded) {
try {
operation = prepareIndexOperationOnPrimary(request, primary);
mappingUpdater.verifyMappings(operation, primary.shardId());
mappingUpdater.verifyMappings(operation.parsedDoc().dynamicMappingsUpdate(), primary.shardId());
} catch (MapperParsingException | IllegalStateException e) {
// there was an error in parsing the document that was not because
// of pending mapping updates, so return a failure for the result
Expand All @@ -623,12 +625,52 @@ static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, Ind
return primary.index(operation);
}

private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary,
final MappingUpdatePerformer mappingUpdater) throws Exception {
boolean mappingUpdateNeeded = false;
if (primary.indexSettings().isSingleType()) {
// When there is a single type, the unique identifier is only composed of the _id,
// so there is no way to differenciate foo#1 from bar#1. This is especially an issue
// if a user first deletes foo#1 and then indexes bar#1: since we do not encode the
// _type in the uid it might look like we are reindexing the same document, which
// would fail if bar#1 is indexed with a lower version than foo#1 was deleted with.
// In order to work around this issue, we make deletions create types. This way, we
// fail if index and delete operations do not use the same type.
try {
Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
if (update != null) {
mappingUpdateNeeded = true;
mappingUpdater.updateMappings(update, primary.shardId(), request.type());
}
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.DeleteResult(e, request.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
}
}
if (mappingUpdateNeeded) {
Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
mappingUpdater.verifyMappings(update, primary.shardId());
}
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
return primary.delete(delete);
}

private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request, IndexShard replica) throws IOException {
private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request,
IndexShard replica) throws Exception {
if (replica.indexSettings().isSingleType()) {
// We need to wait for the replica to have the mappings
Mapping update;
try {
update = replica.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.DeleteResult(e, request.version(), primaryResponse.getSeqNo(), false);
}
if (update != null) {
final ShardId shardId = replica.shardId();
throw new RetryOnReplicaException(shardId,
"Mappings are not available on the replica yet, triggered update: " + update);
}
}

final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
final long version = primaryResponse.getVersion();
assert versionType.validateVersionForWrites(version);
Expand All @@ -654,9 +696,9 @@ public void updateMappings(final Mapping update, final ShardId shardId,
}
}

public void verifyMappings(final Engine.Index operation,
public void verifyMappings(Mapping update,
final ShardId shardId) throws Exception {
if (operation.parsedDoc().dynamicMappingsUpdate() != null) {
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -93,10 +91,13 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId
ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId,
new String[]{request.type()}, request.nowInMillis, request.filteringAlias());
SearchContext context = searchService.createSearchContext(shardSearchLocalRequest, SearchService.NO_TIMEOUT, null);
Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id()));
Engine.GetResult result = null;
try {
result = context.indexShard().get(new Engine.Get(false, uidTerm));
Term uidTerm = context.mapperService().createUidTerm(request.type(), request.id());
if (uidTerm == null) {
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
}
result = context.indexShard().get(new Engine.Get(false, request.type(), request.id(), uidTerm));
if (!result.exists()) {
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbersService;

Expand All @@ -51,6 +50,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff

/** terms enum for uid field */
final String uidField;
private final TermsEnum termsEnum;

/** Reused for iteration (when the term exists) */
Expand All @@ -62,13 +62,14 @@ final class PerThreadIDVersionAndSeqNoLookup {
/**
* Initialize lookup for the provided segment
*/
PerThreadIDVersionAndSeqNoLookup(LeafReader reader) throws IOException {
PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
this.uidField = uidField;
Fields fields = reader.fields();
Terms terms = fields.terms(UidFieldMapper.NAME);
termsEnum = terms.iterator();
if (termsEnum == null) {
throw new IllegalArgumentException("reader misses the [" + UidFieldMapper.NAME + "] field");
Terms terms = fields.terms(uidField);
if (terms == null) {
throw new IllegalArgumentException("reader misses the [" + uidField + "] field");
}
termsEnum = terms.iterator();
if (reader.getNumericDocValues(VersionFieldMapper.NAME) == null) {
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.UidFieldMapper;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;

import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;
Expand All @@ -47,7 +47,7 @@ public final class VersionsAndSeqNoResolver {
}
};

private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader) throws IOException {
private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader, String uidField) throws IOException {
IndexReader.CacheHelper cacheHelper = reader.getCoreCacheHelper();
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.get(cacheHelper.getKey());
if (ctl == null) {
Expand All @@ -65,8 +65,11 @@ private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader

PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get();
if (lookupState == null) {
lookupState = new PerThreadIDVersionAndSeqNoLookup(reader);
lookupState = new PerThreadIDVersionAndSeqNoLookup(reader, uidField);
ctl.set(lookupState);
} else if (Objects.equals(lookupState.uidField, uidField) == false) {
throw new AssertionError("Index does not consistently use the same uid field: ["
+ uidField + "] != [" + lookupState.uidField + "]");
}

return lookupState;
Expand Down Expand Up @@ -109,7 +112,6 @@ public static class DocIdAndSeqNo {
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field();
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
Expand All @@ -119,7 +121,7 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
Expand All @@ -135,7 +137,6 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
* </ul>
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field();
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
Expand All @@ -145,7 +146,7 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
Expand All @@ -157,9 +158,9 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
/**
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
*/
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo) throws IOException {
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException {
LeafReader leaf = docIdAndSeqNo.context.reader();
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, uidField);
long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId, leaf);
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.AllFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.node.Node;

import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -192,7 +192,10 @@ public final class IndexSettings {
* The maximum number of slices allowed in a scroll request.
*/
private volatile int maxSlicesPerScroll;

/**
* Whether the index is required to have at most one type.
*/
private final boolean singleType;

/**
* Returns the default search field for this index.
Expand Down Expand Up @@ -280,6 +283,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL);
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
this.indexSortConfig = new IndexSortConfig(this);
singleType = scopedSettings.get(MapperService.INDEX_MAPPING_SINGLE_TYPE_SETTING);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
Expand Down Expand Up @@ -391,6 +395,11 @@ public IndexMetaData getIndexMetaData() {
*/
public int getNumberOfReplicas() { return settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, null); }

/**
* Returns whether the index enforces at most one type.
*/
public boolean isSingleType() { return singleType; }

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,7 @@ protected final GetResult getFromSearcher(Get get, Function<String, Searcher> se
if (docIdAndVersion != null) {
if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
Releasables.close(searcher);
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(docIdAndVersion.version, get.version()));
}
}
Expand Down Expand Up @@ -1028,7 +1027,6 @@ public static class Index extends Operation {
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert uid.bytes().equals(doc.uid()) : "term uid " + uid + " doesn't match doc uid " + doc.uid();
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
Expand Down Expand Up @@ -1199,18 +1197,29 @@ public int estimatedSizeInBytes() {
public static class Get {
private final boolean realtime;
private final Term uid;
private final String type, id;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;

public Get(boolean realtime, Term uid) {
public Get(boolean realtime, String type, String id, Term uid) {
this.realtime = realtime;
this.type = type;
this.id = id;
this.uid = uid;
}

public boolean realtime() {
return this.realtime;
}

public String type() {
return type;
}

public String id() {
return id;
}

public Term uid() {
return uid;
}
Expand Down
Loading