diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java index cda2b36a3ecff..66cddd43e6583 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java @@ -19,7 +19,9 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestDocument.MetaData; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestTemplateService; @@ -122,10 +124,10 @@ public void testConvertScalarToList() throws Exception { } } - public void testAppendMetadata() throws Exception { - //here any metadata field value becomes a list, which won't make sense in most of the cases, + public void testAppendMetadataExceptVersion() throws Exception { + // here any metadata field value becomes a list, which won't make sense in most of the cases, // but support for append is streamlined like for set so we test it - IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values()); + MetaData randomMetaData = randomFrom(MetaData.INDEX, MetaData.TYPE, MetaData.ID, MetaData.ROUTING, MetaData.PARENT); List values = new ArrayList<>(); Processor appendProcessor; if (randomBoolean()) { diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameProcessorTests.java index 6736594613954..d052ce0cd44c3 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameProcessorTests.java @@ -38,7 +38,7 @@ public void testJodaPattern() throws Exception { "events-", "y", "yyyyMMdd" ); - IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, + IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", "2016-04-25T12:24:20.101Z")); processor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); @@ -48,7 +48,7 @@ public void testTAI64N()throws Exception { Function function = DateFormat.Tai64n.getFunction(null, DateTimeZone.UTC, null); DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function), DateTimeZone.UTC, "events-", "m", "yyyyMMdd"); - IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, + IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", (randomBoolean() ? "@" : "") + "4000000050d506482dbdf024")); dateProcessor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); @@ -58,12 +58,12 @@ public void testUnixMs()throws Exception { Function function = DateFormat.UnixMs.getFunction(null, DateTimeZone.UTC, null); DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function), DateTimeZone.UTC, "events-", "m", "yyyyMMdd"); - IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, + IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", "1000500")); dateProcessor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); - document = new IngestDocument("_index", "_type", "_id", null, null, + document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", 1000500L)); dateProcessor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); @@ -73,7 +73,7 @@ public void testUnix()throws Exception { Function function = DateFormat.Unix.getFunction(null, DateTimeZone.UTC, null); DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function), DateTimeZone.UTC, "events-", "m", "yyyyMMdd"); - IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, + IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", "1000.5")); dateProcessor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index c043102ef5d3a..95c25bedb6280 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -45,7 +45,7 @@ public void testExecute() throws Exception { values.add("bar"); values.add("baz"); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) ); ForEachProcessor processor = new ForEachProcessor( @@ -61,7 +61,7 @@ public void testExecute() throws Exception { public void testExecuteWithFailure() throws Exception { IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c")) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c")) ); TestProcessor testProcessor = new TestProcessor(id -> { @@ -101,7 +101,7 @@ public void testMetaDataAvailable() throws Exception { values.add(new HashMap<>()); values.add(new HashMap<>()); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) ); TestProcessor innerProcessor = new TestProcessor(id -> { @@ -132,7 +132,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { document.put("values", values); document.put("flat_values", new ArrayList<>()); document.put("other", "value"); - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, document); ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", @@ -171,7 +171,7 @@ public String getTag() { values.add(""); } IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) ); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor); @@ -190,7 +190,7 @@ public void testModifyFieldsOutsideArray() throws Exception { values.add(1); values.add(null); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) ); TemplateScript.Factory template = new TestTemplateService.MockTemplateScript.Factory("errors"); @@ -220,7 +220,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws source.put("_value", "new_value"); source.put("values", values); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, source + "_index", "_type", "_id", null, null, null, null, source ); TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", @@ -251,7 +251,7 @@ public void testNestedForEach() throws Exception { values.add(value); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values1", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values1", values) ); TestProcessor testProcessor = new TestProcessor( diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java index 5f0759b5b2157..6fec977e6c268 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java @@ -19,7 +19,9 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestDocument.MetaData; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestTemplateService; @@ -99,14 +101,30 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception { assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue)); } - public void testSetMetadata() throws Exception { - IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values()); + public void testSetMetadataExceptVersion() throws Exception { + MetaData randomMetaData = randomFrom(MetaData.INDEX, MetaData.TYPE, MetaData.ID, MetaData.ROUTING, MetaData.PARENT); Processor processor = createSetProcessor(randomMetaData.getFieldName(), "_value", true); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value")); } + public void testSetMetadataVersion() throws Exception { + long version = randomNonNegativeLong(); + Processor processor = createSetProcessor(MetaData.VERSION.getFieldName(), version, true); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(MetaData.VERSION.getFieldName(), Long.class), Matchers.equalTo(version)); + } + + public void testSetMetadataVersionType() throws Exception { + String versionType = randomFrom("internal", "external", "external_gte"); + Processor processor = createSetProcessor(MetaData.VERSION_TYPE.getFieldName(), versionType, true); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(MetaData.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType)); + } + private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled) { return new SetProcessor(randomAlphaOfLength(10), new TestTemplateService.MockTemplateScript.Factory(fieldName), ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled); diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml new file mode 100644 index 0000000000000..10c80c8e30525 --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml @@ -0,0 +1,76 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test set document version & version type": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline1" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "_version", + "value": 1 + } + }, + { + "set" : { + "field" : "_version_type", + "value": "internal" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: "my_pipeline2" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "_version", + "value": 1 + } + }, + { + "set" : { + "field" : "_version_type", + "value": "external" + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: conflict + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline1" + body: {} + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline2" + body: {} + - match: { _version: 1 } diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/IngestDocumentMustacheIT.java b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/IngestDocumentMustacheIT.java index cbe798666129a..16dbbc6f8cbab 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/IngestDocumentMustacheIT.java +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/IngestDocumentMustacheIT.java @@ -33,7 +33,7 @@ public class IngestDocumentMustacheIT extends AbstractScriptTestCase { public void testAccessMetaDataViaTemplate() { Map document = new HashMap<>(); document.put("foo", "bar"); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{foo}}", scriptService)); assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 bar")); @@ -48,7 +48,7 @@ public void testAccessMapMetaDataViaTemplate() { innerObject.put("baz", "hello baz"); innerObject.put("qux", Collections.singletonMap("fubar", "hello qux and fubar")); document.put("foo", innerObject); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{foo.bar}} {{foo.baz}} {{foo.qux.fubar}}", scriptService)); assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 hello bar hello baz hello qux and fubar")); @@ -67,7 +67,7 @@ public void testAccessListMetaDataViaTemplate() { list.add(value); list.add(null); document.put("list2", list); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{list1.0}} {{list2.0}}", scriptService)); assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 foo {field=value}")); } @@ -77,7 +77,7 @@ public void testAccessIngestMetadataViaTemplate() { Map ingestMap = new HashMap<>(); ingestMap.put("timestamp", "bogus_timestamp"); document.put("_ingest", ingestMap); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); ingestDocument.setFieldValue(compile("ingest_timestamp"), ValueSource.wrap("{{_ingest.timestamp}} and {{_source._ingest.timestamp}}", scriptService)); assertThat(ingestDocument.getFieldValue("ingest_timestamp", String.class), diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/ValueSourceMustacheIT.java b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/ValueSourceMustacheIT.java index a8c7afcec6fee..a80b693851fc1 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/ValueSourceMustacheIT.java +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/ValueSourceMustacheIT.java @@ -64,7 +64,7 @@ public void testValueSourceWithTemplates() { } public void testAccessSourceViaTemplate() { - IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, new HashMap<>()); + IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, null, null, new HashMap<>()); assertThat(ingestDocument.hasField("marvel"), is(false)); ingestDocument.setFieldValue(compile("{{_index}}"), ValueSource.wrap("{{_index}}", scriptService)); assertThat(ingestDocument.getFieldValue("marvel", String.class), equalTo("marvel")); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 39d7f96e8e6f6..170f0bc8518cf 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; @@ -195,8 +196,17 @@ private static List parseDocs(Map config) { dataMap, MetaData.ROUTING.getFieldName()); String parent = ConfigurationUtils.readOptionalStringOrIntProperty(null, null, dataMap, MetaData.PARENT.getFieldName()); + Long version = null; + if (dataMap.containsKey(MetaData.VERSION.getFieldName())) { + version = (Long) ConfigurationUtils.readObject(null, null, dataMap, MetaData.VERSION.getFieldName()); + } + VersionType versionType = null; + if (dataMap.containsKey(MetaData.VERSION_TYPE.getFieldName())) { + versionType = VersionType.fromString(ConfigurationUtils.readStringProperty(null, null, dataMap, + MetaData.VERSION_TYPE.getFieldName())); + } IngestDocument ingestDocument = - new IngestDocument(index, type, id, routing, parent, document); + new IngestDocument(index, type, id, routing, parent, version, versionType, document); ingestDocumentList.add(ingestDocument); } return ingestDocumentList; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java b/server/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java index 64365407f2379..87168cb7a9bba 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java @@ -68,10 +68,10 @@ IngestDocument getIngestDocument() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("doc"); - Map metadataMap = ingestDocument.extractMetadata(); - for (Map.Entry metadata : metadataMap.entrySet()) { + Map metadataMap = ingestDocument.extractMetadata(); + for (Map.Entry metadata : metadataMap.entrySet()) { if (metadata.getValue() != null) { - builder.field(metadata.getKey().getFieldName(), metadata.getValue()); + builder.field(metadata.getKey().getFieldName(), metadata.getValue().toString()); } } builder.field("_source", ingestDocument.getSourceAndMetadata()); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index b35ac567f9e25..89e945780c8f5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -20,14 +20,14 @@ package org.elasticsearch.ingest; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.IndexFieldMapper; import org.elasticsearch.index.mapper.ParentFieldMapper; import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.TypeFieldMapper; -import org.elasticsearch.script.ScriptService; +import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.script.TemplateScript; import java.time.ZoneOffset; @@ -56,7 +56,8 @@ public final class IngestDocument { private final Map sourceAndMetadata; private final Map ingestMetadata; - public IngestDocument(String index, String type, String id, String routing, String parent, Map source) { + public IngestDocument(String index, String type, String id, String routing, String parent, + Long version, VersionType versionType, Map source) { this.sourceAndMetadata = new HashMap<>(); this.sourceAndMetadata.putAll(source); this.sourceAndMetadata.put(MetaData.INDEX.getFieldName(), index); @@ -68,6 +69,12 @@ public IngestDocument(String index, String type, String id, String routing, Stri if (parent != null) { this.sourceAndMetadata.put(MetaData.PARENT.getFieldName(), parent); } + if (version != null) { + sourceAndMetadata.put(MetaData.VERSION.getFieldName(), version); + } + if (versionType != null) { + sourceAndMetadata.put(MetaData.VERSION_TYPE.getFieldName(), VersionType.toString(versionType)); + } this.ingestMetadata = new HashMap<>(); this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC)); @@ -559,10 +566,10 @@ private Map createTemplateModel() { * one time operation that extracts the metadata fields from the ingest document and returns them. * Metadata fields that used to be accessible as ordinary top level fields will be removed as part of this call. */ - public Map extractMetadata() { - Map metadataMap = new EnumMap<>(MetaData.class); + public Map extractMetadata() { + Map metadataMap = new EnumMap<>(MetaData.class); for (MetaData metaData : MetaData.values()) { - metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), String.class)); + metadataMap.put(metaData, sourceAndMetadata.remove(metaData.getFieldName())); } return metadataMap; } @@ -649,7 +656,9 @@ public enum MetaData { TYPE(TypeFieldMapper.NAME), ID(IdFieldMapper.NAME), ROUTING(RoutingFieldMapper.NAME), - PARENT(ParentFieldMapper.NAME); + PARENT(ParentFieldMapper.NAME), + VERSION(VersionFieldMapper.NAME), + VERSION_TYPE("_version_type"); private final String fieldName; diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index cec622f4a2587..31bedd4ee1777 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.index.VersionType; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; @@ -164,18 +165,24 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws E String id = indexRequest.id(); String routing = indexRequest.routing(); String parent = indexRequest.parent(); + Long version = indexRequest.version(); + VersionType versionType = indexRequest.versionType(); Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, sourceAsMap); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, version, versionType, sourceAsMap); pipeline.execute(ingestDocument); - Map metadataMap = ingestDocument.extractMetadata(); + Map metadataMap = ingestDocument.extractMetadata(); //it's fine to set all metadata fields all the time, as ingest document holds their starting values //before ingestion, which might also get modified during ingestion. - indexRequest.index(metadataMap.get(IngestDocument.MetaData.INDEX)); - indexRequest.type(metadataMap.get(IngestDocument.MetaData.TYPE)); - indexRequest.id(metadataMap.get(IngestDocument.MetaData.ID)); - indexRequest.routing(metadataMap.get(IngestDocument.MetaData.ROUTING)); - indexRequest.parent(metadataMap.get(IngestDocument.MetaData.PARENT)); + indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); + indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); + indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); + indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); + indexRequest.parent((String) metadataMap.get(IngestDocument.MetaData.PARENT)); + indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); + if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { + indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); + } indexRequest.source(ingestDocument.getSourceAndMetadata()); } catch (Exception e) { totalStats.ingestFailed(); diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index 8b4ce79f3ecc4..00815807eee8a 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; +import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; @@ -44,6 +45,8 @@ import static org.elasticsearch.ingest.IngestDocument.MetaData.PARENT; import static org.elasticsearch.ingest.IngestDocument.MetaData.ROUTING; import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE; +import static org.elasticsearch.ingest.IngestDocument.MetaData.VERSION; +import static org.elasticsearch.ingest.IngestDocument.MetaData.VERSION_TYPE; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; @@ -98,7 +101,7 @@ public void testParseUsingPipelineStore() throws Exception { Iterator> expectedDocsIterator = expectedDocs.iterator(); for (IngestDocument ingestDocument : actualRequest.getDocuments()) { Map expectedDocument = expectedDocsIterator.next(); - Map metadataMap = ingestDocument.extractMetadata(); + Map metadataMap = ingestDocument.extractMetadata(); assertThat(metadataMap.get(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName()))); assertThat(metadataMap.get(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName()))); assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName()))); @@ -120,17 +123,28 @@ public void testParseWithProvidedPipeline() throws Exception { for (int i = 0; i < numDocs; i++) { Map doc = new HashMap<>(); Map expectedDoc = new HashMap<>(); - List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, PARENT); + List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, PARENT, VERSION, VERSION_TYPE); for(IngestDocument.MetaData field : fields) { - if(randomBoolean()) { - String value = randomAlphaOfLengthBetween(1, 10); + if (field == VERSION) { + Long value = randomLong(); doc.put(field.getFieldName(), value); expectedDoc.put(field.getFieldName(), value); - } - else { - Integer value = randomIntBetween(1, 1000000); + } else if (field == VERSION_TYPE) { + String value = VersionType.toString( + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE) + ); doc.put(field.getFieldName(), value); - expectedDoc.put(field.getFieldName(), String.valueOf(value)); + expectedDoc.put(field.getFieldName(), value); + } else { + if (randomBoolean()) { + String value = randomAlphaOfLengthBetween(1, 10); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), value); + } else { + Integer value = randomIntBetween(1, 1000000); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), String.valueOf(value)); + } } } String fieldName = randomAlphaOfLengthBetween(1, 10); @@ -175,12 +189,14 @@ public void testParseWithProvidedPipeline() throws Exception { Iterator> expectedDocsIterator = expectedDocs.iterator(); for (IngestDocument ingestDocument : actualRequest.getDocuments()) { Map expectedDocument = expectedDocsIterator.next(); - Map metadataMap = ingestDocument.extractMetadata(); + Map metadataMap = ingestDocument.extractMetadata(); assertThat(metadataMap.get(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName()))); assertThat(metadataMap.get(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName()))); assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName()))); assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName()))); assertThat(metadataMap.get(PARENT), equalTo(expectedDocument.get(PARENT.getFieldName()))); + assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName()))); + assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName()))); assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE))); } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java b/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java index c5f67e235061e..b04c7dfcd84f8 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java @@ -133,13 +133,13 @@ public void testToXContent() throws IOException { Map toXContentSource = (Map) toXContentDoc.get("_source"); Map toXContentIngestMetadata = (Map) toXContentDoc.get("_ingest"); - Map metadataMap = ingestDocument.extractMetadata(); - for (Map.Entry metadata : metadataMap.entrySet()) { + Map metadataMap = ingestDocument.extractMetadata(); + for (Map.Entry metadata : metadataMap.entrySet()) { String fieldName = metadata.getKey().getFieldName(); if (metadata.getValue() == null) { assertThat(toXContentDoc.containsKey(fieldName), is(false)); } else { - assertThat(toXContentDoc.get(fieldName), equalTo(metadata.getValue())); + assertThat(toXContentDoc.get(fieldName), equalTo(metadata.getValue().toString())); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 654927b19f2fb..dbbc8e443c076 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -127,7 +127,7 @@ public void testSimulate() throws Exception { source.put("foo", "bar"); source.put("fail", false); source.put("processed", true); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, source); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source); assertThat(simulateDocumentBaseResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata())); assertThat(simulateDocumentBaseResult.getFailure(), nullValue()); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java index 9df2a38c6f14b..04285b3432e12 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java @@ -76,7 +76,7 @@ public void setTestIngestDocument() { list.add(null); document.put("list", list); - ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); } public void testSimpleGetFieldValue() { diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 44c8e78bef703..3247761a548f0 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.CustomTypeSafeMatcher; @@ -157,10 +158,18 @@ public void testExecuteEmptyPipeline() throws Exception { public void testExecutePropagateAllMetaDataUpdates() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); + long newVersion = randomLong(); + String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); doAnswer((InvocationOnMock invocationOnMock) -> { IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { - ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); + if (metaData == IngestDocument.MetaData.VERSION) { + ingestDocument.setFieldValue(metaData.getFieldName(), newVersion); + } else if (metaData == IngestDocument.MetaData.VERSION_TYPE) { + ingestDocument.setFieldValue(metaData.getFieldName(), versionType); + } else { + ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); + } } return null; }).when(processor).execute(any()); @@ -175,12 +184,13 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { verify(processor).execute(any()); verify(failureHandler, never()).accept(any()); verify(completionHandler, times(1)).accept(true); - assertThat(indexRequest.index(), equalTo("update_index")); assertThat(indexRequest.type(), equalTo("update_type")); assertThat(indexRequest.id(), equalTo("update_id")); assertThat(indexRequest.routing(), equalTo("update_routing")); assertThat(indexRequest.parent(), equalTo("update_parent")); + assertThat(indexRequest.version(), equalTo(newVersion)); + assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType))); } public void testExecuteFailure() throws Exception { @@ -188,13 +198,15 @@ public void testExecuteFailure() throws Exception { when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); } @@ -207,7 +219,8 @@ public void testExecuteSuccessWithOnFailure() throws Exception { CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id") + .source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @@ -225,14 +238,17 @@ public void testExecuteFailureWithOnFailure() throws Exception { Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(), + indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); } @@ -246,15 +262,19 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { Collections.singletonList(onFailureOnFailureProcessor)))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); } @@ -380,12 +400,20 @@ private IngestDocument eqID(String index, String type, String id, Map source) { + return argThat(new IngestDocumentMatcher(index, type, id, version, versionType, source)); + } + private class IngestDocumentMatcher extends ArgumentMatcher { private final IngestDocument ingestDocument; IngestDocumentMatcher(String index, String type, String id, Map source) { - this.ingestDocument = new IngestDocument(index, type, id, null, null, source); + this.ingestDocument = new IngestDocument(index, type, id, null, null, null, null, source); + } + + IngestDocumentMatcher(String index, String type, String id, Long version, VersionType versionType, Map source) { + this.ingestDocument = new IngestDocument(index, type, id, null, null, version, versionType, source); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/RandomDocumentPicks.java b/test/framework/src/main/java/org/elasticsearch/ingest/RandomDocumentPicks.java index 0def04a79eaf1..cad7b388430bb 100644 --- a/test/framework/src/main/java/org/elasticsearch/ingest/RandomDocumentPicks.java +++ b/test/framework/src/main/java/org/elasticsearch/ingest/RandomDocumentPicks.java @@ -22,6 +22,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; +import org.elasticsearch.index.VersionType; import java.util.ArrayList; import java.util.HashMap; @@ -138,6 +139,9 @@ public static IngestDocument randomIngestDocument(Random random, Map randomSource(Random random) { @@ -219,6 +223,11 @@ public static String randomString(Random random) { return RandomStrings.randomUnicodeOfCodepointLengthBetween(random, 1, 10); } + private static Long randomNonNegtiveLong(Random random) { + long randomLong = random.nextLong(); + return randomLong == Long.MIN_VALUE ? 0 : Math.abs(randomLong); + } + private static void addRandomFields(Random random, Map parentNode, int currentDepth) { if (currentDepth > 5) { return;