Skip to content

Commit 8fc213f

Browse files
INGEST: Move all Pipeline State into IngestService (#32617)
* INGEST: Move all Pipeline State into IngestService * Moves all pipeline state into the ingest service * Retains the existing pipeline store and pipeline execution service as inner classes to make the review easier, they should be flattened out in the next step * All tests for these classes were copied (and adapted) to the ingest service tests * This is a refactoring step to enable a clean implementation of a pipeline processor (See #32473)
1 parent 6d62d67 commit 8fc213f

19 files changed

+1437
-1480
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ private long relativeTime() {
521521
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
522522
long ingestStartTimeInNanos = System.nanoTime();
523523
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
524-
ingestService.getPipelineExecutionService().executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
524+
ingestService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
525525
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
526526
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
527527
bulkRequestModifier.markCurrentItemAsFailed(exception);

server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,23 @@
2727
import org.elasticsearch.cluster.block.ClusterBlockException;
2828
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2929
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
30-
import org.elasticsearch.cluster.service.ClusterService;
3130
import org.elasticsearch.common.inject.Inject;
3231
import org.elasticsearch.common.settings.Settings;
33-
import org.elasticsearch.ingest.PipelineStore;
34-
import org.elasticsearch.node.NodeService;
32+
import org.elasticsearch.ingest.IngestService;
3533
import org.elasticsearch.threadpool.ThreadPool;
3634
import org.elasticsearch.transport.TransportService;
3735

3836
public class DeletePipelineTransportAction extends TransportMasterNodeAction<DeletePipelineRequest, AcknowledgedResponse> {
3937

40-
private final PipelineStore pipelineStore;
41-
private final ClusterService clusterService;
38+
private final IngestService ingestService;
4239

4340
@Inject
44-
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
41+
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, IngestService ingestService,
4542
TransportService transportService, ActionFilters actionFilters,
46-
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
47-
super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
48-
this.clusterService = clusterService;
49-
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
43+
IndexNameExpressionResolver indexNameExpressionResolver) {
44+
super(settings, DeletePipelineAction.NAME, transportService, ingestService.getClusterService(),
45+
threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
46+
this.ingestService = ingestService;
5047
}
5148

5249
@Override
@@ -60,8 +57,9 @@ protected AcknowledgedResponse newResponse() {
6057
}
6158

6259
@Override
63-
protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) throws Exception {
64-
pipelineStore.delete(clusterService, request, listener);
60+
protected void masterOperation(DeletePipelineRequest request, ClusterState state,
61+
ActionListener<AcknowledgedResponse> listener) throws Exception {
62+
ingestService.delete(request, listener);
6563
}
6664

6765
@Override

server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,17 @@
2929
import org.elasticsearch.cluster.service.ClusterService;
3030
import org.elasticsearch.common.inject.Inject;
3131
import org.elasticsearch.common.settings.Settings;
32-
import org.elasticsearch.ingest.PipelineStore;
33-
import org.elasticsearch.node.NodeService;
32+
import org.elasticsearch.ingest.IngestService;
3433
import org.elasticsearch.threadpool.ThreadPool;
3534
import org.elasticsearch.transport.TransportService;
3635

3736
public class GetPipelineTransportAction extends TransportMasterNodeReadAction<GetPipelineRequest, GetPipelineResponse> {
38-
39-
private final PipelineStore pipelineStore;
40-
37+
4138
@Inject
4239
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
4340
TransportService transportService, ActionFilters actionFilters,
44-
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
41+
IndexNameExpressionResolver indexNameExpressionResolver) {
4542
super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, GetPipelineRequest::new, indexNameExpressionResolver);
46-
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
4743
}
4844

4945
@Override
@@ -58,7 +54,7 @@ protected GetPipelineResponse newResponse() {
5854

5955
@Override
6056
protected void masterOperation(GetPipelineRequest request, ClusterState state, ActionListener<GetPipelineResponse> listener) throws Exception {
61-
listener.onResponse(new GetPipelineResponse(pipelineStore.getPipelines(state, request.getIds())));
57+
listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds())));
6258
}
6359

6460
@Override

server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,10 @@
3232
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3333
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3434
import org.elasticsearch.cluster.node.DiscoveryNode;
35-
import org.elasticsearch.cluster.service.ClusterService;
3635
import org.elasticsearch.common.inject.Inject;
3736
import org.elasticsearch.common.settings.Settings;
38-
import org.elasticsearch.ingest.PipelineStore;
37+
import org.elasticsearch.ingest.IngestService;
3938
import org.elasticsearch.ingest.IngestInfo;
40-
import org.elasticsearch.node.NodeService;
4139
import org.elasticsearch.threadpool.ThreadPool;
4240
import org.elasticsearch.transport.TransportService;
4341

@@ -46,19 +44,19 @@
4644

4745
public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPipelineRequest, AcknowledgedResponse> {
4846

49-
private final PipelineStore pipelineStore;
50-
private final ClusterService clusterService;
47+
private final IngestService ingestService;
5148
private final NodeClient client;
5249

5350
@Inject
54-
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
55-
TransportService transportService, ActionFilters actionFilters,
56-
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService,
57-
NodeClient client) {
58-
super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
59-
this.clusterService = clusterService;
51+
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
52+
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
53+
IngestService ingestService, NodeClient client) {
54+
super(
55+
settings, PutPipelineAction.NAME, transportService, ingestService.getClusterService(),
56+
threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new
57+
);
6058
this.client = client;
61-
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
59+
this.ingestService = ingestService;
6260
}
6361

6462
@Override
@@ -84,7 +82,7 @@ public void onResponse(NodesInfoResponse nodeInfos) {
8482
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
8583
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
8684
}
87-
pipelineStore.put(clusterService, ingestInfos, request, listener);
85+
ingestService.putPipeline(ingestInfos, request, listener);
8886
} catch (Exception e) {
8987
onFailure(e);
9088
}

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.elasticsearch.index.VersionType;
3333
import org.elasticsearch.ingest.ConfigurationUtils;
3434
import org.elasticsearch.ingest.IngestDocument;
35+
import org.elasticsearch.ingest.IngestService;
3536
import org.elasticsearch.ingest.Pipeline;
36-
import org.elasticsearch.ingest.PipelineStore;
3737

3838
import java.io.IOException;
3939
import java.util.ArrayList;
@@ -164,24 +164,23 @@ public boolean isVerbose() {
164164
}
165165
}
166166

167-
private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
168167
static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
169168

170-
static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) {
169+
static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, IngestService ingestService) {
171170
if (pipelineId == null) {
172171
throw new IllegalArgumentException("param [pipeline] is null");
173172
}
174-
Pipeline pipeline = pipelineStore.get(pipelineId);
173+
Pipeline pipeline = ingestService.getPipeline(pipelineId);
175174
if (pipeline == null) {
176175
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
177176
}
178177
List<IngestDocument> ingestDocumentList = parseDocs(config);
179178
return new Parsed(pipeline, ingestDocumentList, verbose);
180179
}
181180

182-
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
181+
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService pipelineStore) throws Exception {
183182
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
184-
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
183+
Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
185184
List<IngestDocument> ingestDocumentList = parseDocs(config);
186185
return new Parsed(pipeline, ingestDocumentList, verbose);
187186
}

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import org.elasticsearch.common.io.stream.Writeable;
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.xcontent.XContentHelper;
29-
import org.elasticsearch.ingest.PipelineStore;
30-
import org.elasticsearch.node.NodeService;
29+
import org.elasticsearch.ingest.IngestService;
3130
import org.elasticsearch.tasks.Task;
3231
import org.elasticsearch.threadpool.ThreadPool;
3332
import org.elasticsearch.transport.TransportService;
@@ -36,15 +35,15 @@
3635

3736
public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> {
3837

39-
private final PipelineStore pipelineStore;
38+
private final IngestService ingestService;
4039
private final SimulateExecutionService executionService;
4140

4241
@Inject
4342
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
44-
ActionFilters actionFilters, NodeService nodeService) {
43+
ActionFilters actionFilters, IngestService ingestService) {
4544
super(settings, SimulatePipelineAction.NAME, transportService, actionFilters,
4645
(Writeable.Reader<SimulatePipelineRequest>) SimulatePipelineRequest::new);
47-
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
46+
this.ingestService = ingestService;
4847
this.executionService = new SimulateExecutionService(threadPool);
4948
}
5049

@@ -55,9 +54,9 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
5554
final SimulatePipelineRequest.Parsed simulateRequest;
5655
try {
5756
if (request.getId() != null) {
58-
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore);
57+
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService);
5958
} else {
60-
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore);
59+
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService);
6160
}
6261
} catch (Exception e) {
6362
listener.onFailure(e);

0 commit comments

Comments
 (0)