From 1e3983c9751926e525eecdaa0a6e3db0ffc76cda Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 1 Nov 2019 16:25:09 +0000 Subject: [PATCH 1/6] Add preflight check to dynamic mapping updates Today if the primary discovers that an indexing request needs a mapping update then it will send it to the master for validation and processing. If, however, the put-mapping request is invalid then the master still processes it as a (no-op) cluster state update. When there are a large number of indexing operations that result in invalid mapping updates this can overwhelm the master. However, the primary already has a reasonably up-to-date mapping against which it can check the (approximate) validity of the put-mapping request before sending it to the master. For instance it is not possible to remove fields in a mapping update, so if the primary detects that a mapping update will exceed the fields limit then it can reject it itself and avoid bothering the master. This commit adds a pre-flight check to the mapping update path so that the primary can discard obviously-invalid put-mapping requests itself. Fixes #35564 --- .../action/bulk/TransportShardBulkAction.java | 13 ++++++ .../index/mapper/MapperService.java | 13 +++++- .../index/mapper/DynamicMappingIT.java | 42 +++++++++++++++++++ .../index/mapper/MapperServiceTests.java | 31 ++++++++++---- 4 files changed, 89 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 5531a1e9b3792..264595dd3a06f 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -49,10 +49,12 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; @@ -263,6 +265,17 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()); } if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + + try { + primary.mapperService().merge("_doc", + new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS), + MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); + } catch (Exception e) { + logger.info("required mapping update failed during pre-flight check", e); + onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); + return true; + } + mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), MapperService.SINGLE_MAPPING_NAME, new ActionListener<>() { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java index 0f3e762c6a8cc..d26d8c2ca489b 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -82,6 +82,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable { * The reason why a mapping is being merged. */ public enum MergeReason { + /** + * Pre-flight check before sending a mapping update to the master + */ + MAPPING_UPDATE_PREFLIGHT, /** * Create or update a mapping. */ @@ -306,6 +310,7 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge private synchronized Map internalMerge(IndexMetaData indexMetaData, MergeReason reason, boolean onlyUpdateIfNeeded) { + assert reason != MergeReason.MAPPING_UPDATE_PREFLIGHT; Map map = new LinkedHashMap<>(); MappingMetaData mappingMetaData = indexMetaData.mapping(); if (mappingMetaData != null) { @@ -415,7 +420,7 @@ private synchronized Map internalMerge(DocumentMapper ma ContextMapping.validateContextPaths(indexSettings.getIndexVersionCreated(), fieldMappers, fieldTypes::get); - if (reason == MergeReason.MAPPING_UPDATE) { + if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) { // this check will only be performed on the master node when there is // a call to the update mapping API. For all other cases like // the master node restoring mappings from disk or data nodes @@ -430,7 +435,7 @@ private synchronized Map internalMerge(DocumentMapper ma results.put(newMapper.type(), newMapper); } - if (reason == MergeReason.MAPPING_UPDATE) { + if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) { // this check will only be performed on the master node when there is // a call to the update mapping API. For all other cases like // the master node restoring mappings from disk or data nodes @@ -453,6 +458,10 @@ private synchronized Map internalMerge(DocumentMapper ma // make structures immutable results = Collections.unmodifiableMap(results); + if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) { + return results; + } + // only need to immutably rewrap these if the previous reference was changed. // if not then they are already implicitly immutable. if (fullPathObjectMappers != this.fullPathObjectMappers) { diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java b/server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java index 40f2682e1d322..e27f19e254b49 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java @@ -21,7 +21,13 @@ import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; @@ -34,6 +40,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING; + public class DynamicMappingIT extends ESIntegTestCase { @Override @@ -116,4 +124,38 @@ public void run() { assertTrue(client().prepareGet("index", Integer.toString(i)).get().isExists()); } } + + public void testPreflightCheckAvoidsMaster() throws InterruptedException { + createIndex("index", Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 2).build()); + ensureGreen("index"); + client().prepareIndex("index").setId("1").setSource("field1", "value1").get(); + + final CountDownLatch masterBlockedLatch = new CountDownLatch(1); + final CountDownLatch indexingCompletedLatch = new CountDownLatch(1); + + internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()).submitStateUpdateTask("block-state-updates", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + masterBlockedLatch.countDown(); + indexingCompletedLatch.await(); + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError("unexpected", e); + } + }); + + masterBlockedLatch.await(); + final IndexRequestBuilder indexRequestBuilder = client().prepareIndex("index").setId("2").setSource("field2", "value2"); + try { + assertThat( + expectThrows(IllegalArgumentException.class, () -> indexRequestBuilder.get(TimeValue.timeValueSeconds(10))).getMessage(), + Matchers.containsString("Limit of total fields [2] in index [index] has been exceeded")); + } finally { + indexingCompletedLatch.countDown(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java b/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java index 82db028b6706b..e47bcc3816e74 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java @@ -55,6 +55,8 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class MapperServiceTests extends ESSingleNodeTestCase { @@ -99,6 +101,15 @@ public void testTypeValidation() { MapperService.validateTypeName("_doc"); // no exception } + public void testPreflightUpdateDoesNotChangeMapping() throws Throwable { + final MapperService mapperService = createIndex("test1").mapperService(); + final CompressedXContent mapping = createMappingSpecifyingNumberOfFields(1); + mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE_PREFLIGHT); + assertThat("field was not created by preflight check", mapperService.fullName("field0"), nullValue()); + mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE); + assertThat("field was not created by mapping update", mapperService.fullName("field0"), notNullValue()); + } + /** * Test that we can have at least the number of fields in new mappings that are defined by "index.mapping.total_fields.limit". * Any additional field should trigger an IllegalArgumentException. @@ -113,7 +124,7 @@ public void testTotalFieldsLimit() throws Throwable { // adding one more field should trigger exception IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { createIndex("test2", settings).mapperService().merge("type", - createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), MergeReason.MAPPING_UPDATE); + createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), updateOrPreflight()); }); assertTrue(e.getMessage(), e.getMessage().contains("Limit of total fields [" + totalFieldsLimit + "] in index [test2] has been exceeded")); @@ -149,7 +160,7 @@ public void testMappingDepthExceedsLimit() throws Throwable { indexService2.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> indexService1.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE)); + () -> indexService1.mapperService().merge("type", objectMapping, updateOrPreflight())); assertThat(e.getMessage(), containsString("Limit of mapping depth [1] in index [test1] has been exceeded")); } @@ -197,7 +208,7 @@ public void testIndexSortWithNestedFields() throws IOException { .endObject().endObject())); invalidNestedException = expectThrows(IllegalArgumentException.class, () -> indexService.mapperService().merge("t", nestedFieldMapping, - MergeReason.MAPPING_UPDATE)); + updateOrPreflight())); assertThat(invalidNestedException.getMessage(), containsString("cannot have nested fields when index sort is activated")); } @@ -233,7 +244,7 @@ public void testFieldAliasWithMismatchedNestedScope() throws Throwable { .endObject())); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE)); + () -> mapperService.merge("type", mappingUpdate, updateOrPreflight())); assertThat(e.getMessage(), containsString("Invalid [path] value [nested.field] for field alias [alias]")); } @@ -261,7 +272,7 @@ public void testTotalFieldsLimitWithFieldAlias() throws Throwable { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { createIndex("test2", Settings.builder().put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), numberOfNonAliasFields).build()) - .mapperService().merge("type", new CompressedXContent(mapping), MergeReason.MAPPING_UPDATE); + .mapperService().merge("type", new CompressedXContent(mapping), updateOrPreflight()); }); assertEquals("Limit of total fields [" + numberOfNonAliasFields + "] in index [test2] has been exceeded", e.getMessage()); } @@ -294,7 +305,7 @@ public void testFieldNameLengthLimit() throws Throwable { .endObject())); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { - mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE); + mapperService.merge("type", mappingUpdate, updateOrPreflight()); }); assertEquals("Field name [" + testString + "] in index [test1] is too long. " + @@ -319,7 +330,7 @@ public void testObjectNameLengthLimit() throws Throwable { .endObject().endObject())); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { - mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE); + mapperService.merge("type", mapping, updateOrPreflight()); }); assertEquals("Field name [" + testString + "] in index [test1] is too long. " + @@ -348,7 +359,7 @@ public void testAliasFieldNameLengthLimit() throws Throwable { .endObject().endObject())); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { - mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE); + mapperService.merge("type", mapping, updateOrPreflight()); }); assertEquals("Field name [" + testString + "] in index [test1] is too long. " + @@ -439,6 +450,10 @@ private boolean assertSameContainedFilters(TokenFilterFactory[] originalTokenFil return true; } + private static MergeReason updateOrPreflight() { + return randomFrom(MergeReason.MAPPING_UPDATE, MergeReason.MAPPING_UPDATE_PREFLIGHT); + } + public static final class ReloadableFilterPlugin extends Plugin implements AnalysisPlugin { @Override From 148206d857d834e56b1e27862e2d2bce51a805c0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 1 Nov 2019 17:36:57 +0000 Subject: [PATCH 2/6] Allow tests to override mapping validation --- .../action/bulk/MappingUpdatePerformer.java | 6 ++++ .../action/bulk/TransportShardBulkAction.java | 23 ++++++++---- .../bulk/TransportShardBulkActionTests.java | 36 +++++++++++++++---- .../ESIndexLevelReplicationTestCase.java | 11 +++++- 4 files changed, 62 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java index 5a38f0f43e070..65cf2465f5388 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java @@ -23,6 +23,8 @@ import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.shard.ShardId; +import java.io.IOException; + public interface MappingUpdatePerformer { /** @@ -30,4 +32,8 @@ public interface MappingUpdatePerformer { */ void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener); + /** + * Validate the mapping update locally. Throws an unchecked exception of some kind if the mapping update is invalid. + */ + void validateMappings(Mapping update, String type) throws IOException; } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 264595dd3a06f..c58af1479d640 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -62,6 +62,7 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -121,10 +122,20 @@ protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard prim ActionListener> listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, - (update, shardId, type, mappingListener) -> { - assert update != null; - assert shardId != null; - mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener); + new MappingUpdatePerformer() { + @Override + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener mappingListener) { + assert update != null; + assert shardId != null; + mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener); + } + + @Override + public void validateMappings(Mapping update, String type) throws IOException { + primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME, + new CompressedXContent(update, XContentType.JSON, ToXContent.EMPTY_PARAMS), + MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); + } }, mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -267,9 +278,7 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { try { - primary.mapperService().merge("_doc", - new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS), - MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); + mappingUpdater.validateMappings(result.getRequiredMappingUpdate(), MapperService.SINGLE_MAPPING_NAME); } catch (Exception e) { logger.info("required mapping update failed during pre-flight check", e); onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 5d0affeb02b3c..69988430b0d73 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -248,11 +248,18 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type, listener) -> { - // There should indeed be a mapping update - assertNotNull(update); - updateCalled.incrementAndGet(); - listener.onResponse(null); + new MappingUpdatePerformer() { + @Override + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + // There should indeed be a mapping update + assertNotNull(update); + updateCalled.incrementAndGet(); + listener.onResponse(null); + } + + @Override + public void validateMappings(Mapping update, String type) { + } }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertTrue(context.isInitial()); assertTrue(context.hasMoreOperationsToExecute()); @@ -266,7 +273,16 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { .thenReturn(success); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type, listener) -> fail("should not have had to update the mappings"), listener -> {}, + new MappingUpdatePerformer() { + @Override + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + fail("should not have had to update the mappings"); + } + + @Override + public void validateMappings(Mapping update, String type) { + } + }, listener -> {}, ASSERTING_DONE_LISTENER); @@ -856,6 +872,10 @@ public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { listener.onResponse(null); } + + @Override + public void validateMappings(Mapping update, String type) { + } } /** Always throw the given exception */ @@ -870,5 +890,9 @@ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer { public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { listener.onFailure(e); } + + @Override + public void validateMappings(Mapping update, String type) { + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 7a757f5d2ab66..b3391a1a8055e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -69,6 +69,7 @@ import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; @@ -776,7 +777,15 @@ private void executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest requ final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request); try (Releasable ignored = permitAcquiredFuture.actionGet()) { - MappingUpdatePerformer noopMappingUpdater = (update, shardId, type, listener1) -> {}; + MappingUpdatePerformer noopMappingUpdater = new MappingUpdatePerformer() { + @Override + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener1) { + } + + @Override + public void validateMappings(Mapping update, String type) { + } + }; TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, null, ActionTestUtils.assertNoFailureListener(result -> { TransportWriteActionTestHelper.performPostWriteActions(primary, request, From 006a8bf9d495ea2acf8cb08ea2b07104065eab82 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sun, 3 Nov 2019 18:31:45 +0000 Subject: [PATCH 3/6] Revert "Allow tests to override mapping validation" This reverts commit 148206d857d834e56b1e27862e2d2bce51a805c0. --- .../action/bulk/MappingUpdatePerformer.java | 6 ---- .../action/bulk/TransportShardBulkAction.java | 23 ++++-------- .../bulk/TransportShardBulkActionTests.java | 36 ++++--------------- .../ESIndexLevelReplicationTestCase.java | 11 +----- 4 files changed, 14 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java index 65cf2465f5388..5a38f0f43e070 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java @@ -23,8 +23,6 @@ import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.shard.ShardId; -import java.io.IOException; - public interface MappingUpdatePerformer { /** @@ -32,8 +30,4 @@ public interface MappingUpdatePerformer { */ void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener); - /** - * Validate the mapping update locally. Throws an unchecked exception of some kind if the mapping update is invalid. - */ - void validateMappings(Mapping update, String type) throws IOException; } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index c58af1479d640..264595dd3a06f 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -62,7 +62,6 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -122,20 +121,10 @@ protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard prim ActionListener> listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, - new MappingUpdatePerformer() { - @Override - public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener mappingListener) { - assert update != null; - assert shardId != null; - mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener); - } - - @Override - public void validateMappings(Mapping update, String type) throws IOException { - primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME, - new CompressedXContent(update, XContentType.JSON, ToXContent.EMPTY_PARAMS), - MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); - } + (update, shardId, type, mappingListener) -> { + assert update != null; + assert shardId != null; + mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener); }, mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -278,7 +267,9 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { try { - mappingUpdater.validateMappings(result.getRequiredMappingUpdate(), MapperService.SINGLE_MAPPING_NAME); + primary.mapperService().merge("_doc", + new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS), + MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); } catch (Exception e) { logger.info("required mapping update failed during pre-flight check", e); onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 69988430b0d73..5d0affeb02b3c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -248,18 +248,11 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - new MappingUpdatePerformer() { - @Override - public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { - // There should indeed be a mapping update - assertNotNull(update); - updateCalled.incrementAndGet(); - listener.onResponse(null); - } - - @Override - public void validateMappings(Mapping update, String type) { - } + (update, shardId, type, listener) -> { + // There should indeed be a mapping update + assertNotNull(update); + updateCalled.incrementAndGet(); + listener.onResponse(null); }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertTrue(context.isInitial()); assertTrue(context.hasMoreOperationsToExecute()); @@ -273,16 +266,7 @@ public void validateMappings(Mapping update, String type) { .thenReturn(success); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - new MappingUpdatePerformer() { - @Override - public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { - fail("should not have had to update the mappings"); - } - - @Override - public void validateMappings(Mapping update, String type) { - } - }, listener -> {}, + (update, shardId, type, listener) -> fail("should not have had to update the mappings"), listener -> {}, ASSERTING_DONE_LISTENER); @@ -872,10 +856,6 @@ public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { listener.onResponse(null); } - - @Override - public void validateMappings(Mapping update, String type) { - } } /** Always throw the given exception */ @@ -890,9 +870,5 @@ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer { public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { listener.onFailure(e); } - - @Override - public void validateMappings(Mapping update, String type) { - } } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index b3391a1a8055e..7a757f5d2ab66 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -69,7 +69,6 @@ import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; -import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; @@ -777,15 +776,7 @@ private void executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest requ final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request); try (Releasable ignored = permitAcquiredFuture.actionGet()) { - MappingUpdatePerformer noopMappingUpdater = new MappingUpdatePerformer() { - @Override - public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener1) { - } - - @Override - public void validateMappings(Mapping update, String type) { - } - }; + MappingUpdatePerformer noopMappingUpdater = (update, shardId, type, listener1) -> {}; TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, null, ActionTestUtils.assertNoFailureListener(result -> { TransportWriteActionTestHelper.performPostWriteActions(primary, request, From 4b660992da1a601743c5815ce9f876f7313f4b3c Mon Sep 17 00:00:00 2001 From: David Turner Date: Sun, 3 Nov 2019 18:30:58 +0000 Subject: [PATCH 4/6] Fix up TransportShardBulkActionTests using mocks instead --- .../action/bulk/TransportShardBulkActionTests.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 5d0affeb02b3c..f63f34ed78959 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -44,8 +44,10 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; @@ -233,7 +235,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { new BulkShardRequest(shardId, RefreshPolicy.NONE, items); Engine.IndexResult mappingUpdate = - new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap())); + new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap())); Translog.Location resultLocation = new Translog.Location(42, 42, 42); Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); @@ -241,6 +243,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { when(shard.shardId()).thenReturn(shardId); when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) .thenReturn(mappingUpdate); + when(shard.mapperService()).thenReturn(mock(MapperService.class)); randomlySetIgnoredPrimaryResponse(items[0]); @@ -761,7 +764,7 @@ public void testRetries() throws Exception { "I'm conflicted <(;_;)>"); Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0); Engine.IndexResult mappingUpdate = - new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap())); + new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap())); Translog.Location resultLocation = new Translog.Location(42, 42, 42); Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); @@ -778,6 +781,7 @@ public void testRetries() throws Exception { }); when(shard.indexSettings()).thenReturn(indexSettings); when(shard.shardId()).thenReturn(shardId); + when(shard.mapperService()).thenReturn(mock(MapperService.class)); UpdateHelper updateHelper = mock(UpdateHelper.class); when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( From 82e60107970e4b45698d49fec1607f6f8f49b955 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 4 Nov 2019 15:56:00 +0000 Subject: [PATCH 5/6] Use constant --- .../org/elasticsearch/action/bulk/TransportShardBulkAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 264595dd3a06f..41ccf8500b343 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -267,7 +267,7 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { try { - primary.mapperService().merge("_doc", + primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME, new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS), MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); } catch (Exception e) { From 891e56a80e9721ee2d828b90cb67e2abaadc41eb Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 4 Nov 2019 11:44:22 -0500 Subject: [PATCH 6/6] Add shard ID to message --- .../org/elasticsearch/action/bulk/TransportShardBulkAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 41ccf8500b343..78d6000bec93c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -271,7 +271,7 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS), MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); } catch (Exception e) { - logger.info("required mapping update failed during pre-flight check", e); + logger.info(() -> new ParameterizedMessage("{} mapping update rejected by primary", primary.shardId()), e); onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); return true; }