Skip to content

Commit a5f17fc

Browse files
authored
Add preflight check to dynamic mapping updates (#48817)
Today if the primary discovers that an indexing request needs a mapping update then it will send it to the master for validation and processing. If, however, the put-mapping request is invalid then the master still processes it as a (no-op) cluster state update. When there are a large number of indexing operations that result in invalid mapping updates this can overwhelm the master. However, the primary already has a reasonably up-to-date mapping against which it can check the (approximate) validity of the put-mapping request before sending it to the master. For instance it is not possible to remove fields in a mapping update, so if the primary detects that a mapping update will exceed the fields limit then it can reject it itself and avoid bothering the master. This commit adds a pre-flight check to the mapping update path so that the primary can discard obviously-invalid put-mapping requests itself. Fixes #35564
1 parent dd47cf4 commit a5f17fc

File tree

5 files changed

+95
-12
lines changed

5 files changed

+95
-12
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,12 @@
4949
import org.elasticsearch.cluster.service.ClusterService;
5050
import org.elasticsearch.common.bytes.BytesReference;
5151
import org.elasticsearch.common.collect.Tuple;
52+
import org.elasticsearch.common.compress.CompressedXContent;
5253
import org.elasticsearch.common.inject.Inject;
5354
import org.elasticsearch.common.io.stream.StreamInput;
5455
import org.elasticsearch.common.settings.Settings;
5556
import org.elasticsearch.common.unit.TimeValue;
57+
import org.elasticsearch.common.xcontent.ToXContent;
5658
import org.elasticsearch.common.xcontent.XContentHelper;
5759
import org.elasticsearch.common.xcontent.XContentType;
5860
import org.elasticsearch.index.engine.Engine;
@@ -263,6 +265,17 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
263265
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
264266
}
265267
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
268+
269+
try {
270+
primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME,
271+
new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS),
272+
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
273+
} catch (Exception e) {
274+
logger.info(() -> new ParameterizedMessage("{} mapping update rejected by primary", primary.shardId()), e);
275+
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
276+
return true;
277+
}
278+
266279
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
267280
MapperService.SINGLE_MAPPING_NAME,
268281
new ActionListener<>() {

server/src/main/java/org/elasticsearch/index/mapper/MapperService.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
8282
* The reason why a mapping is being merged.
8383
*/
8484
public enum MergeReason {
85+
/**
86+
* Pre-flight check before sending a mapping update to the master
87+
*/
88+
MAPPING_UPDATE_PREFLIGHT,
8589
/**
8690
* Create or update a mapping.
8791
*/
@@ -306,6 +310,7 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge
306310

307311
private synchronized Map<String, DocumentMapper> internalMerge(IndexMetaData indexMetaData,
308312
MergeReason reason, boolean onlyUpdateIfNeeded) {
313+
assert reason != MergeReason.MAPPING_UPDATE_PREFLIGHT;
309314
Map<String, CompressedXContent> map = new LinkedHashMap<>();
310315
MappingMetaData mappingMetaData = indexMetaData.mapping();
311316
if (mappingMetaData != null) {
@@ -415,7 +420,7 @@ private synchronized Map<String, DocumentMapper> internalMerge(DocumentMapper ma
415420

416421
ContextMapping.validateContextPaths(indexSettings.getIndexVersionCreated(), fieldMappers, fieldTypes::get);
417422

418-
if (reason == MergeReason.MAPPING_UPDATE) {
423+
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
419424
// this check will only be performed on the master node when there is
420425
// a call to the update mapping API. For all other cases like
421426
// the master node restoring mappings from disk or data nodes
@@ -430,7 +435,7 @@ private synchronized Map<String, DocumentMapper> internalMerge(DocumentMapper ma
430435
results.put(newMapper.type(), newMapper);
431436
}
432437

433-
if (reason == MergeReason.MAPPING_UPDATE) {
438+
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
434439
// this check will only be performed on the master node when there is
435440
// a call to the update mapping API. For all other cases like
436441
// the master node restoring mappings from disk or data nodes
@@ -453,6 +458,10 @@ private synchronized Map<String, DocumentMapper> internalMerge(DocumentMapper ma
453458
// make structures immutable
454459
results = Collections.unmodifiableMap(results);
455460

461+
if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
462+
return results;
463+
}
464+
456465
// only need to immutably rewrap these if the previous reference was changed.
457466
// if not then they are already implicitly immutable.
458467
if (fullPathObjectMappers != this.fullPathObjectMappers) {

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@
4444
import org.elasticsearch.index.VersionType;
4545
import org.elasticsearch.index.engine.Engine;
4646
import org.elasticsearch.index.engine.VersionConflictEngineException;
47+
import org.elasticsearch.index.mapper.MapperService;
4748
import org.elasticsearch.index.mapper.Mapping;
4849
import org.elasticsearch.index.mapper.MetadataFieldMapper;
50+
import org.elasticsearch.index.mapper.RootObjectMapper;
4951
import org.elasticsearch.index.shard.IndexShard;
5052
import org.elasticsearch.index.shard.IndexShardTestCase;
5153
import org.elasticsearch.index.shard.ShardId;
@@ -233,14 +235,15 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
233235
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
234236

235237
Engine.IndexResult mappingUpdate =
236-
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
238+
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
237239
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
238240
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
239241

240242
IndexShard shard = mock(IndexShard.class);
241243
when(shard.shardId()).thenReturn(shardId);
242244
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
243245
.thenReturn(mappingUpdate);
246+
when(shard.mapperService()).thenReturn(mock(MapperService.class));
244247

245248
randomlySetIgnoredPrimaryResponse(items[0]);
246249

@@ -761,7 +764,7 @@ public void testRetries() throws Exception {
761764
"I'm conflicted <(;_;)>");
762765
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0);
763766
Engine.IndexResult mappingUpdate =
764-
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
767+
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
765768
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
766769
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
767770

@@ -778,6 +781,7 @@ public void testRetries() throws Exception {
778781
});
779782
when(shard.indexSettings()).thenReturn(indexSettings);
780783
when(shard.shardId()).thenReturn(shardId);
784+
when(shard.mapperService()).thenReturn(mock(MapperService.class));
781785

782786
UpdateHelper updateHelper = mock(UpdateHelper.class);
783787
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(

server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@
2121
import org.elasticsearch.action.DocWriteResponse;
2222
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
2323
import org.elasticsearch.action.bulk.BulkResponse;
24+
import org.elasticsearch.action.index.IndexRequestBuilder;
25+
import org.elasticsearch.cluster.ClusterState;
26+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2427
import org.elasticsearch.cluster.metadata.MappingMetaData;
28+
import org.elasticsearch.cluster.service.ClusterService;
29+
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.common.unit.TimeValue;
2531
import org.elasticsearch.plugins.Plugin;
2632
import org.elasticsearch.test.ESIntegTestCase;
2733
import org.elasticsearch.test.InternalSettingsPlugin;
@@ -34,6 +40,8 @@
3440
import java.util.concurrent.CountDownLatch;
3541
import java.util.concurrent.atomic.AtomicReference;
3642

43+
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
44+
3745
public class DynamicMappingIT extends ESIntegTestCase {
3846

3947
@Override
@@ -116,4 +124,38 @@ public void run() {
116124
assertTrue(client().prepareGet("index", Integer.toString(i)).get().isExists());
117125
}
118126
}
127+
128+
public void testPreflightCheckAvoidsMaster() throws InterruptedException {
129+
createIndex("index", Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 2).build());
130+
ensureGreen("index");
131+
client().prepareIndex("index").setId("1").setSource("field1", "value1").get();
132+
133+
final CountDownLatch masterBlockedLatch = new CountDownLatch(1);
134+
final CountDownLatch indexingCompletedLatch = new CountDownLatch(1);
135+
136+
internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()).submitStateUpdateTask("block-state-updates",
137+
new ClusterStateUpdateTask() {
138+
@Override
139+
public ClusterState execute(ClusterState currentState) throws Exception {
140+
masterBlockedLatch.countDown();
141+
indexingCompletedLatch.await();
142+
return currentState;
143+
}
144+
145+
@Override
146+
public void onFailure(String source, Exception e) {
147+
throw new AssertionError("unexpected", e);
148+
}
149+
});
150+
151+
masterBlockedLatch.await();
152+
final IndexRequestBuilder indexRequestBuilder = client().prepareIndex("index").setId("2").setSource("field2", "value2");
153+
try {
154+
assertThat(
155+
expectThrows(IllegalArgumentException.class, () -> indexRequestBuilder.get(TimeValue.timeValueSeconds(10))).getMessage(),
156+
Matchers.containsString("Limit of total fields [2] in index [index] has been exceeded"));
157+
} finally {
158+
indexingCompletedLatch.countDown();
159+
}
160+
}
119161
}

server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555

5656
import static org.hamcrest.CoreMatchers.containsString;
5757
import static org.hamcrest.Matchers.instanceOf;
58+
import static org.hamcrest.Matchers.notNullValue;
59+
import static org.hamcrest.Matchers.nullValue;
5860

5961
public class MapperServiceTests extends ESSingleNodeTestCase {
6062

@@ -99,6 +101,15 @@ public void testTypeValidation() {
99101
MapperService.validateTypeName("_doc"); // no exception
100102
}
101103

104+
public void testPreflightUpdateDoesNotChangeMapping() throws Throwable {
105+
final MapperService mapperService = createIndex("test1").mapperService();
106+
final CompressedXContent mapping = createMappingSpecifyingNumberOfFields(1);
107+
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE_PREFLIGHT);
108+
assertThat("field was not created by preflight check", mapperService.fullName("field0"), nullValue());
109+
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
110+
assertThat("field was not created by mapping update", mapperService.fullName("field0"), notNullValue());
111+
}
112+
102113
/**
103114
* Test that we can have at least the number of fields in new mappings that are defined by "index.mapping.total_fields.limit".
104115
* Any additional field should trigger an IllegalArgumentException.
@@ -113,7 +124,7 @@ public void testTotalFieldsLimit() throws Throwable {
113124
// adding one more field should trigger exception
114125
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
115126
createIndex("test2", settings).mapperService().merge("type",
116-
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), MergeReason.MAPPING_UPDATE);
127+
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), updateOrPreflight());
117128
});
118129
assertTrue(e.getMessage(),
119130
e.getMessage().contains("Limit of total fields [" + totalFieldsLimit + "] in index [test2] has been exceeded"));
@@ -149,7 +160,7 @@ public void testMappingDepthExceedsLimit() throws Throwable {
149160
indexService2.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE);
150161

151162
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
152-
() -> indexService1.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE));
163+
() -> indexService1.mapperService().merge("type", objectMapping, updateOrPreflight()));
153164
assertThat(e.getMessage(), containsString("Limit of mapping depth [1] in index [test1] has been exceeded"));
154165
}
155166

@@ -197,7 +208,7 @@ public void testIndexSortWithNestedFields() throws IOException {
197208
.endObject().endObject()));
198209
invalidNestedException = expectThrows(IllegalArgumentException.class,
199210
() -> indexService.mapperService().merge("t", nestedFieldMapping,
200-
MergeReason.MAPPING_UPDATE));
211+
updateOrPreflight()));
201212
assertThat(invalidNestedException.getMessage(),
202213
containsString("cannot have nested fields when index sort is activated"));
203214
}
@@ -233,7 +244,7 @@ public void testFieldAliasWithMismatchedNestedScope() throws Throwable {
233244
.endObject()));
234245

235246
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
236-
() -> mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE));
247+
() -> mapperService.merge("type", mappingUpdate, updateOrPreflight()));
237248
assertThat(e.getMessage(), containsString("Invalid [path] value [nested.field] for field alias [alias]"));
238249
}
239250

@@ -261,7 +272,7 @@ public void testTotalFieldsLimitWithFieldAlias() throws Throwable {
261272
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
262273
createIndex("test2",
263274
Settings.builder().put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), numberOfNonAliasFields).build())
264-
.mapperService().merge("type", new CompressedXContent(mapping), MergeReason.MAPPING_UPDATE);
275+
.mapperService().merge("type", new CompressedXContent(mapping), updateOrPreflight());
265276
});
266277
assertEquals("Limit of total fields [" + numberOfNonAliasFields + "] in index [test2] has been exceeded", e.getMessage());
267278
}
@@ -294,7 +305,7 @@ public void testFieldNameLengthLimit() throws Throwable {
294305
.endObject()));
295306

296307
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
297-
mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE);
308+
mapperService.merge("type", mappingUpdate, updateOrPreflight());
298309
});
299310

300311
assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
@@ -319,7 +330,7 @@ public void testObjectNameLengthLimit() throws Throwable {
319330
.endObject().endObject()));
320331

321332
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
322-
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
333+
mapperService.merge("type", mapping, updateOrPreflight());
323334
});
324335

325336
assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
@@ -348,7 +359,7 @@ public void testAliasFieldNameLengthLimit() throws Throwable {
348359
.endObject().endObject()));
349360

350361
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
351-
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
362+
mapperService.merge("type", mapping, updateOrPreflight());
352363
});
353364

354365
assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
@@ -439,6 +450,10 @@ private boolean assertSameContainedFilters(TokenFilterFactory[] originalTokenFil
439450
return true;
440451
}
441452

453+
private static MergeReason updateOrPreflight() {
454+
return randomFrom(MergeReason.MAPPING_UPDATE, MergeReason.MAPPING_UPDATE_PREFLIGHT);
455+
}
456+
442457
public static final class ReloadableFilterPlugin extends Plugin implements AnalysisPlugin {
443458

444459
@Override

0 commit comments

Comments
 (0)