Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
*/
package org.elasticsearch.ingest.common;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptEngine;
Expand All @@ -33,6 +37,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -64,6 +69,66 @@ protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
}
}

public void testScriptDisabled() throws Exception {
String pipelineIdWithoutScript = randomAlphaOfLengthBetween(5, 10);
String pipelineIdWithScript = pipelineIdWithoutScript + "_script";
internalCluster().startNode();

BytesReference pipelineWithScript = new BytesArray("{\n" +
" \"processors\" : [\n" +
" {\"script\" : {\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"my_script\"}}\n" +
" ]\n" +
"}");
BytesReference pipelineWithoutScript = new BytesArray("{\n" +
" \"processors\" : [\n" +
" {\"set\" : {\"field\": \"y\", \"value\": 0}}\n" +
" ]\n" +
"}");

Consumer<String> checkPipelineExists = (id) -> assertThat(client().admin().cluster().prepareGetPipeline(id)
.get().pipelines().get(0).getId(), equalTo(id));

client().admin().cluster().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get();
client().admin().cluster().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get();

checkPipelineExists.accept(pipelineIdWithScript);
checkPipelineExists.accept(pipelineIdWithoutScript);


internalCluster().stopCurrentMasterNode();
internalCluster().startNode(Settings.builder().put("script.allowed_types", "none"));

checkPipelineExists.accept(pipelineIdWithoutScript);
checkPipelineExists.accept(pipelineIdWithScript);

client().prepareIndex("index", "doc", "1")
.setSource("x", 0)
.setPipeline(pipelineIdWithoutScript)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();

ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> client().prepareIndex("index", "doc", "2")
.setSource("x", 0)
.setPipeline(pipelineIdWithScript)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get());
assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type")));
assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown")));
assertThat(exception.getRootCause().getMessage(),
equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " +
"[ElasticsearchParseException[Error updating pipeline with id [" + pipelineIdWithScript + "]]; " +
"nested: ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " +
"nested: IllegalArgumentException[cannot execute [inline] scripts];; " +
"ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " +
"nested: IllegalArgumentException[cannot execute [inline] scripts];; java.lang.IllegalArgumentException: " +
"cannot execute [inline] scripts]"));

Map<String, Object> source = client().prepareGet("index", "doc", "1").get().getSource();
assertThat(source.get("x"), equalTo(0));
assertThat(source.get("y"), equalTo(0));
}

public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exception {
internalCluster().startNode();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,13 @@
import java.util.List;
import java.util.Map;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

import static org.elasticsearch.common.settings.Setting.Property;

/**
* Holder class for several ingest related services.
*/
Expand Down
29 changes: 27 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/PipelineStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,41 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) {
}

Map<String, Pipeline> pipelines = new HashMap<>();
List<ElasticsearchParseException> exceptions = new ArrayList<>();
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
} catch (ElasticsearchParseException e) {
throw e;
pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e));
exceptions.add(e);
} catch (Exception e) {
throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e);
ElasticsearchParseException parseException = new ElasticsearchParseException(
"Error updating pipeline with id [" + pipeline.getId() + "]", e);
pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException));
exceptions.add(parseException);
}
}
this.pipelines = Collections.unmodifiableMap(pipelines);
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

private Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]";
Processor failureProcessor = new AbstractProcessor(tag) {
@Override
public void execute(IngestDocument ingestDocument) {
throw new IllegalStateException(errorMessage);
}

@Override
public String getType() {
return type;
}
};
String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded";
return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
}

/**
Expand Down
15 changes: 11 additions & 4 deletions server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,12 @@
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.action.support.replication.TransportReplicationActionTests;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Arrays;
Expand Down Expand Up @@ -130,6 +126,10 @@ public void testSimulate() throws Exception {
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source);
assertThat(simulateDocumentBaseResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata()));
assertThat(simulateDocumentBaseResult.getFailure(), nullValue());

// cleanup
WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
assertTrue(deletePipelineResponse.isAcknowledged());
}

public void testBulkWithIngestFailures() throws Exception {
Expand Down Expand Up @@ -172,6 +172,10 @@ public void testBulkWithIngestFailures() throws Exception {
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
}
}

// cleanup
WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
assertTrue(deletePipelineResponse.isAcknowledged());
}

public void testBulkWithUpsert() throws Exception {
Expand Down Expand Up @@ -271,5 +275,8 @@ public void testPutWithPipelineFactoryError() throws Exception {
assertNotNull(ex);
assertThat(ex.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}

GetPipelineResponse response = client().admin().cluster().prepareGetPipeline("_id").get();
assertFalse(response.isFound());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class IngestProcessorNotInstalledOnAllNodesIT extends ESIntegTestCase {
Expand Down Expand Up @@ -104,7 +103,11 @@ public void testFailStartNode() throws Exception {
installPlugin = false;
String node2 = internalCluster().startNode();
pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipelineStore().get("_id");
assertThat(pipeline, nullValue());

assertNotNull(pipeline);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, " +
"because pipeline with id [_id] could not be loaded"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -92,6 +93,32 @@ public void testExecuteIndexPipelineDoesNotExist() {
verify(completionHandler, never()).accept(anyBoolean());
}

public void testExecuteIndexPipelineExistsButFailedParsing() {
when(store.get("_id")).thenReturn(new Pipeline("_id", "stub", null,
new CompoundProcessor(new AbstractProcessor("mock") {
@Override
public void execute(IngestDocument ingestDocument) {
throw new IllegalStateException("error");
}

@Override
public String getType() {
return null;
}
})));
SetOnce<Boolean> failed = new SetOnce<>();
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
Consumer<Exception> failureHandler = (e) -> {
assertThat(e.getCause().getClass(), equalTo(IllegalArgumentException.class));
assertThat(e.getCause().getCause().getClass(), equalTo(IllegalStateException.class));
assertThat(e.getCause().getCause().getMessage(), equalTo("error"));
failed.set(true);
};
Consumer<Boolean> completionHandler = (e) -> failed.set(false);
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
assertTrue(failed.get());
}

public void testExecuteBulkPipelineDoesNotExist() {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -165,7 +164,13 @@ public void testPutWithErrorResponse() {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
}
pipeline = store.get(id);
assertThat(pipeline, nullValue());
assertNotNull(pipeline);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with" +
" id [_id] could not be loaded"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertNull(pipeline.getProcessors().get(0).getTag());
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("unknown"));
}

public void testDelete() {
Expand Down