Skip to content

Commit bcfa96d

Browse files
committed
Continue registering pipelines after one pipeline parse failure.
Ingest has been failing to apply existing pipelines from cluster-state into the in-memory representation that are no longer valid. One example of this is a pipeline with a script processor. If a cluster starts up with scripting disabled, these pipelines will not be loaded. Even though GETing a pipeline worked, indexing operations claimed that this pipeline did not exist. This is because one gets information from cluster-state and the other is from an in-memory data-structure. Now, two things happen 1. suppress the exceptions until after other successfull pipelines are loaded 2. replace failed pipelines with a placeholder pipeline called `Pipeline.EMPTY` If the pipeline execution service encounters `Pipeline.EMPTY`, it is known that something went wrong at the time of pipeline creation and an exception was thrown to the user at some point at start-up. closes #28269.
1 parent 50d8a25 commit bcfa96d

File tree

6 files changed

+80
-6
lines changed

6 files changed

+80
-6
lines changed

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Collection;
3434
import java.util.Collections;
3535
import java.util.Map;
36+
import java.util.function.Consumer;
3637
import java.util.function.Function;
3738

3839
import static org.hamcrest.Matchers.equalTo;
@@ -64,6 +65,57 @@ protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
6465
}
6566
}
6667

68+
public void testScriptDisabled() throws Exception {
69+
String pipelineIdWithoutScript = randomAlphaOfLengthBetween(5, 10);
70+
String pipelineIdWithScript = pipelineIdWithoutScript + "_script";
71+
internalCluster().startNode();
72+
73+
BytesReference pipelineWithScript = new BytesArray("{\n" +
74+
" \"processors\" : [\n" +
75+
" {\"script\" : {\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"my_script\"}}\n" +
76+
" ]\n" +
77+
"}");
78+
BytesReference pipelineWithoutScript = new BytesArray("{\n" +
79+
" \"processors\" : [\n" +
80+
" {\"set\" : {\"field\": \"y\", \"value\": 0}}\n" +
81+
" ]\n" +
82+
"}");
83+
84+
Consumer<String> checkPipelineExists = (id) -> assertThat(client().admin().cluster().prepareGetPipeline(id)
85+
.get().pipelines().get(0).getId(), equalTo(id));
86+
87+
client().admin().cluster().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get();
88+
client().admin().cluster().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get();
89+
90+
checkPipelineExists.accept(pipelineIdWithScript);
91+
checkPipelineExists.accept(pipelineIdWithoutScript);
92+
93+
internalCluster().stopCurrentMasterNode();
94+
internalCluster().startNode(Settings.builder().put("script.allowed_types", "none"));
95+
96+
checkPipelineExists.accept(pipelineIdWithoutScript);
97+
checkPipelineExists.accept(pipelineIdWithScript);
98+
99+
client().prepareIndex("index", "doc", "1")
100+
.setSource("x", 0)
101+
.setPipeline(pipelineIdWithoutScript)
102+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
103+
.get();
104+
105+
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
106+
() -> client().prepareIndex("index", "doc", "2")
107+
.setSource("x", 0)
108+
.setPipeline(pipelineIdWithScript)
109+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
110+
.get());
111+
assertThat(exception.getMessage(),
112+
equalTo("pipeline with id [" + pipelineIdWithScript + "] was not parsed successfully, check logs at start-up for exceptions"));
113+
114+
Map<String, Object> source = client().prepareGet("index", "doc", "1").get().getSource();
115+
assertThat(source.get("x"), equalTo(0));
116+
assertThat(source.get("y"), equalTo(0));
117+
}
118+
67119
public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exception {
68120
internalCluster().startNode();
69121

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,13 @@
2525
import java.util.List;
2626
import java.util.Map;
2727

28-
import org.elasticsearch.common.settings.ClusterSettings;
29-
import org.elasticsearch.common.settings.Setting;
3028
import org.elasticsearch.common.settings.Settings;
3129
import org.elasticsearch.env.Environment;
3230
import org.elasticsearch.index.analysis.AnalysisRegistry;
3331
import org.elasticsearch.plugins.IngestPlugin;
3432
import org.elasticsearch.script.ScriptService;
3533
import org.elasticsearch.threadpool.ThreadPool;
3634

37-
import static org.elasticsearch.common.settings.Setting.Property;
38-
3935
/**
4036
* Holder class for several ingest related services.
4137
*/

server/src/main/java/org/elasticsearch/ingest/Pipeline.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
*/
3333
public final class Pipeline {
3434

35+
static final Pipeline EMPTY = new Pipeline("_empty", null, null, new CompoundProcessor());
3536
static final String DESCRIPTION_KEY = "description";
3637
static final String PROCESSORS_KEY = "processors";
3738
static final String VERSION_KEY = "version";

server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ private Pipeline getPipeline(String pipelineId) {
192192
Pipeline pipeline = store.get(pipelineId);
193193
if (pipeline == null) {
194194
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
195+
} else if (pipeline == Pipeline.EMPTY) {
196+
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] was not parsed successfully," +
197+
" check logs at start-up for exceptions");
195198
}
196199
return pipeline;
197200
}

server/src/main/java/org/elasticsearch/ingest/PipelineStore.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,20 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) {
8181
}
8282

8383
Map<String, Pipeline> pipelines = new HashMap<>();
84+
ArrayList<ElasticsearchParseException> exceptions = new ArrayList<>();
8485
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
8586
try {
8687
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
8788
} catch (ElasticsearchParseException e) {
88-
throw e;
89+
pipelines.put(pipeline.getId(), Pipeline.EMPTY);
90+
exceptions.add(e);
8991
} catch (Exception e) {
90-
throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e);
92+
pipelines.put(pipeline.getId(), Pipeline.EMPTY);
93+
exceptions.add(new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e));
9194
}
9295
}
9396
this.pipelines = Collections.unmodifiableMap(pipelines);
97+
ExceptionsHelper.rethrowAndSuppress(exceptions);
9498
}
9599

96100
/**

server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,24 @@ public void testExecuteIndexPipelineDoesNotExist() {
9191
verify(completionHandler, never()).accept(anyBoolean());
9292
}
9393

94+
public void testExecuteIndexPipelineExistsButFailedParsing() {
95+
when(store.get("_id")).thenReturn(Pipeline.EMPTY);
96+
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
97+
@SuppressWarnings("unchecked")
98+
Consumer<Exception> failureHandler = mock(Consumer.class);
99+
@SuppressWarnings("unchecked")
100+
Consumer<Boolean> completionHandler = mock(Consumer.class);
101+
try {
102+
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
103+
fail("IllegalArgumentException expected");
104+
} catch (IllegalArgumentException e) {
105+
assertThat(e.getMessage(),
106+
equalTo("pipeline with id [_id] was not parsed successfully, check logs at start-up for exceptions"));
107+
}
108+
verify(failureHandler, never()).accept(any(Exception.class));
109+
verify(completionHandler, never()).accept(anyBoolean());
110+
}
111+
94112
public void testExecuteBulkPipelineDoesNotExist() {
95113
CompoundProcessor processor = mock(CompoundProcessor.class);
96114
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));

0 commit comments

Comments
 (0)