Skip to content

Commit c6cdf77

Browse files
committed
Mappings: simplify dynamic mappings updates.
While dynamic mappings updates are using the same code path as updates from the API when applied on a data node since #10593, they were still using a different code path on the master node. This commit makes dynamic updates processed the same way as updates from the API, which also seems to do a better way at acknowledgements (I could not reproduce the ConcurrentDynamicTemplateTests failure anymore). It also adds more checks, like for instance that indexing on replicas should not trigger dynamic mapping updates since they should have been handled on the primary before. Close #10720
1 parent dbeb4aa commit c6cdf77

File tree

11 files changed

+201
-423
lines changed

11 files changed

+201
-423
lines changed

src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
5353
import org.elasticsearch.index.engine.Engine;
5454
import org.elasticsearch.index.engine.VersionConflictEngineException;
55+
import org.elasticsearch.index.mapper.MapperService;
5556
import org.elasticsearch.index.mapper.Mapping;
5657
import org.elasticsearch.index.mapper.SourceToParse;
5758
import org.elasticsearch.index.shard.IndexShard;
@@ -352,23 +353,6 @@ <T extends ActionWriteResponse> T response() {
352353

353354
}
354355

355-
private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
356-
// HACK: Rivers seem to have something specific that triggers potential
357-
// deadlocks when doing concurrent indexing. So for now they keep the
358-
// old behaviour of updating mappings locally first and then
359-
// asynchronously notifying the master
360-
// this can go away when rivers are removed
361-
final String indexName = indexService.index().name();
362-
final String indexUUID = indexService.indexUUID();
363-
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
364-
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
365-
mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
366-
} else {
367-
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
368-
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
369-
}
370-
}
371-
372356
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
373357
IndexShard indexShard, IndexService indexService, boolean processed) throws Throwable {
374358

@@ -392,20 +376,54 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
392376
Engine.IndexingOperation op;
393377
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
394378
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
395-
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
396-
applyMappingUpdate(indexService, indexRequest.type(), index.parsedDoc().dynamicMappingsUpdate());
379+
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
380+
if (update != null) {
381+
final String indexName = indexService.index().name();
382+
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
383+
// With rivers, we have a chicken and egg problem if indexing
384+
// the _meta document triggers a mapping update. Because we would
385+
// like to validate the mapping update first, but on the other
386+
// hand putting the mapping would start the river, which expects
387+
// to find a _meta document
388+
// So we have no choice but to index first and send mappings afterwards
389+
MapperService mapperService = indexService.mapperService();
390+
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
391+
indexShard.index(index);
392+
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
393+
} else {
394+
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
395+
indexShard.index(index);
396+
}
397+
} else {
398+
indexShard.index(index);
397399
}
398-
indexShard.index(index);
399400
version = index.version();
400401
op = index;
401402
created = index.created();
402403
} else {
403404
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
404405
request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
405-
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
406-
applyMappingUpdate(indexService, indexRequest.type(), create.parsedDoc().dynamicMappingsUpdate());
406+
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
407+
if (update != null) {
408+
final String indexName = indexService.index().name();
409+
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
410+
// With rivers, we have a chicken and egg problem if indexing
411+
// the _meta document triggers a mapping update. Because we would
412+
// like to validate the mapping update first, but on the other
413+
// hand putting the mapping would start the river, which expects
414+
// to find a _meta document
415+
// So we have no choice but to index first and send mappings afterwards
416+
MapperService mapperService = indexService.mapperService();
417+
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
418+
indexShard.create(create);
419+
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
420+
} else {
421+
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
422+
indexShard.create(create);
423+
}
424+
} else {
425+
indexShard.create(create);
407426
}
408-
indexShard.create(create);
409427
version = create.version();
410428
op = create;
411429
created = true;
@@ -528,8 +546,9 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe
528546

529547

530548
@Override
531-
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
532-
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
549+
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception {
550+
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
551+
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
533552
final BulkShardRequest request = shardRequest.request;
534553
for (int i = 0; i < request.items().length; i++) {
535554
BulkItemRequest item = request.items()[i];
@@ -544,11 +563,29 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
544563

545564
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
546565
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
566+
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
567+
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
568+
// mappings updates on the _river are not validated synchronously so we can't
569+
// assume they are here when indexing on a replica
570+
indexService.mapperService().merge(indexRequest.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
571+
} else {
572+
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
573+
}
574+
}
547575
indexShard.index(index);
548576
} else {
549577
Engine.Create create = indexShard.prepareCreate(sourceToParse,
550578
indexRequest.version(), indexRequest.versionType(),
551579
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
580+
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
581+
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
582+
// mappings updates on the _river are not validated synchronously so we can't
583+
// assume they are here when indexing on a replica
584+
indexService.mapperService().merge(indexRequest.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
585+
} else {
586+
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
587+
}
588+
}
552589
indexShard.create(create);
553590
}
554591
} catch (Throwable e) {

src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 65 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.index;
2121

22+
import org.elasticsearch.ElasticsearchIllegalStateException;
2223
import org.elasticsearch.ExceptionsHelper;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.RoutingMissingException;
@@ -42,6 +43,7 @@
4243
import org.elasticsearch.common.settings.Settings;
4344
import org.elasticsearch.index.IndexService;
4445
import org.elasticsearch.index.engine.Engine;
46+
import org.elasticsearch.index.mapper.MapperService;
4547
import org.elasticsearch.index.mapper.Mapping;
4648
import org.elasticsearch.index.mapper.SourceToParse;
4749
import org.elasticsearch.index.shard.IndexShard;
@@ -51,6 +53,8 @@
5153
import org.elasticsearch.threadpool.ThreadPool;
5254
import org.elasticsearch.transport.TransportService;
5355

56+
import java.io.IOException;
57+
5458
/**
5559
* Performs the index operation.
5660
* <p/>
@@ -167,23 +171,6 @@ protected ShardIterator shards(ClusterState clusterState, InternalRequest reques
167171
.indexShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
168172
}
169173

170-
private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
171-
// HACK: Rivers seem to have something specific that triggers potential
172-
// deadlocks when doing concurrent indexing. So for now they keep the
173-
// old behaviour of updating mappings locally first and then
174-
// asynchronously notifying the master
175-
// this can go away when rivers are removed
176-
final String indexName = indexService.index().name();
177-
final String indexUUID = indexService.indexUUID();
178-
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
179-
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
180-
mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
181-
} else {
182-
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
183-
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
184-
}
185-
}
186-
187174
@Override
188175
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
189176
final IndexRequest request = shardRequest.request;
@@ -206,19 +193,53 @@ protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterStat
206193

207194
if (request.opType() == IndexRequest.OpType.INDEX) {
208195
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
209-
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
210-
applyMappingUpdate(indexService, request.type(), index.parsedDoc().dynamicMappingsUpdate());
196+
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
197+
if (update != null) {
198+
final String indexName = indexService.index().name();
199+
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
200+
// With rivers, we have a chicken and egg problem if indexing
201+
// the _meta document triggers a mapping update. Because we would
202+
// like to validate the mapping update first, but on the other
203+
// hand putting the mapping would start the river, which expects
204+
// to find a _meta document
205+
// So we have no choice but to index first and send mappings afterwards
206+
MapperService mapperService = indexService.mapperService();
207+
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
208+
indexShard.index(index);
209+
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
210+
} else {
211+
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
212+
indexShard.index(index);
213+
}
214+
} else {
215+
indexShard.index(index);
211216
}
212-
indexShard.index(index);
213217
version = index.version();
214218
created = index.created();
215219
} else {
216220
Engine.Create create = indexShard.prepareCreate(sourceToParse,
217221
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
218-
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
219-
applyMappingUpdate(indexService, request.type(), create.parsedDoc().dynamicMappingsUpdate());
222+
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
223+
if (update != null) {
224+
final String indexName = indexService.index().name();
225+
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
226+
// With rivers, we have a chicken and egg problem if indexing
227+
// the _meta document triggers a mapping update. Because we would
228+
// like to validate the mapping update first, but on the other
229+
// hand putting the mapping would start the river, which expects
230+
// to find a _meta document
231+
// So we have no choice but to index first and send mappings afterwards
232+
MapperService mapperService = indexService.mapperService();
233+
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
234+
indexShard.create(create);
235+
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
236+
} else {
237+
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
238+
indexShard.create(create);
239+
}
240+
} else {
241+
indexShard.create(create);
220242
}
221-
indexShard.create(create);
222243
version = create.version();
223244
created = true;
224245
}
@@ -239,17 +260,36 @@ protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterStat
239260
}
240261

241262
@Override
242-
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
243-
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
263+
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws IOException {
264+
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
265+
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
244266
IndexRequest request = shardRequest.request;
245267
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
246268
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
247269
if (request.opType() == IndexRequest.OpType.INDEX) {
248270
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
271+
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
272+
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
273+
// mappings updates on the _river are not validated synchronously so we can't
274+
// assume they are here when indexing on a replica
275+
indexService.mapperService().merge(request.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
276+
} else {
277+
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
278+
}
279+
}
249280
indexShard.index(index);
250281
} else {
251282
Engine.Create create = indexShard.prepareCreate(sourceToParse,
252283
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
284+
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
285+
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
286+
// mappings updates on the _river are not validated synchronously so we can't
287+
// assume they are here when indexing on a replica
288+
indexService.mapperService().merge(request.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
289+
} else {
290+
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
291+
}
292+
}
253293
indexShard.create(create);
254294
}
255295
if (request.refresh()) {

src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
117117
*/
118118
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
119119

120-
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
120+
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception;
121121

122122
protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;
123123

0 commit comments

Comments
 (0)