Skip to content

Conversation

@hub-cap
Copy link
Contributor

@hub-cap hub-cap commented Jul 16, 2019

If a pipeline that refrences the policy exists, we should not allow the
policy to be deleted. The user will need to remove the processor from
the pipeline before deleting the policy. This commit adds a check to
ensure that the policy cannot be deleted if it is referenced by any
pipeline in the system.

If a pipeline that refrences the policy exists, we should not allow the
policy to be deleted. The user will need to remove the processor from
the pipeline before deleting the policy. This commit adds a check to
ensure that the policy cannot be deleted if it is referenced by any
pipeline in the system.
@hub-cap hub-cap added the :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP label Jul 16, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features

@hub-cap
Copy link
Contributor Author

hub-cap commented Jul 17, 2019

@elasticmachine run elasticsearch-ci/1
@elasticmachine run elasticsearch-ci/bwc
@elasticmachine run elasticsearch-ci/default-distro

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, though I think we have a minor issue with how Processors can be nested (and thus hidden) within other Processors.

if (enrichProcessor.getPolicyName().equals(request.getName())) {
listener.onFailure(
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it [{}]",
RestStatus.INTERNAL_SERVER_ERROR, request.getName(), pipeline.getId()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be BAD_REQUEST instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think CONFLICT - 409

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (PipelineConfiguration pc: pipelines) {
Pipeline pipeline = ingestService.getPipeline(pc.getId());

for (Processor processor: pipeline.getProcessors()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a little bit of a conundrum in that the processors returned here could be complex processors that contain processors within them. For instance: The ForEachProcessor could contain an ExactMatchProcessor that is called on each value in field.

A quick fix to get more coverage would be to use

Suggested change
for (Processor processor: pipeline.getProcessors()) {
for (Processor processor: pipeline.flattenAllProcessors()) {

But that only flattens processors from CompoundProcessors which things like ForEachProcessor are not subclassed... Maybe we need to introduce an interface that allows for flattening all sub-processors when a processor contains any?

Copy link
Contributor

@jakelandis jakelandis Jul 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ForEachProcessor and ConditionalProcessor are wrapping processor, meaning that they wrap other processor implementations. In both cases the .getProcessor() should return the processor that is wrapped. (I think those are the only 2 wrappers) ... so flattenAllProcessors should work for those.

However, if a wrapper processor wraps another wrapper e.g. if on a ForEachProcessor will result in ConditionalProcessor wrapping a ForEachProcessor which would miss the real processor. I think if add

if (processor instanceof ConditionalProcessor) {
                    processor = ((ConditionalProcessor) processor).getProcessor()

in the loop it handle that edge case.

Other then that edge case, i think @jbaiera suggestion to use flattenAllProcessors() will work. Also, it may make sense to add ^^ to flattenAllProcessors() since a conditional processor wrapping the real processor is an implementation detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ug... I just took a look deeper look and getProcessor() is not called by the flattenAllProcessors() ... so your loop would need to also take into account ForEachProcessor

if (processor instanceof ConditionalProcessor) {
                    processor = ((ConditionalProcessor) processor).getProcessor()
if (processor instanceof ForEachProcessor) {
                    processor = ((ForEachProcessor) processor).getProcessor()

We may want to consider adding in a interface like WrappedProcessor that exposes getProcessor() and possibly handle the unwrapping inside flattenAllProcessors(boolean unWrap)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of this conversation was handled in 4ec68ec


public abstract class AbstractEnrichProcessor extends AbstractProcessor {

protected final String policyName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: protected -> private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (Processor processor: pipeline.getProcessors()) {
if (processor instanceof AbstractEnrichProcessor) {
AbstractEnrichProcessor enrichProcessor = (AbstractEnrichProcessor) processor;
if (enrichProcessor.getPolicyName().equals(request.getName())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we treat the policy names case-insensitive (e.g. equalsIgnoreCase)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not do this anywhere else best of my knowledge, so PUT /policyA and PUT /policya will create 2... its probably a good idea to change this tho

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it [{}]",
RestStatus.INTERNAL_SERVER_ERROR, request.getName(), pipeline.getId()));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we collect all pipelines that may have a reference, so the user doesn't need to fix one, get the error, fix one, repeat. ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[1] shows the list used to glean all of the pipelines that could be in use to display

[1] https://github.com/elastic/elasticsearch/pull/44438/files#diff-a37fe0a3791b5e0c8215af76056efd9eR70

for (PipelineConfiguration pc: pipelines) {
Pipeline pipeline = ingestService.getPipeline(pc.getId());

for (Processor processor: pipeline.getProcessors()) {
Copy link
Contributor

@jakelandis jakelandis Jul 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ForEachProcessor and ConditionalProcessor are wrapping processor, meaning that they wrap other processor implementations. In both cases the .getProcessor() should return the processor that is wrapped. (I think those are the only 2 wrappers) ... so flattenAllProcessors should work for those.

However, if a wrapper processor wraps another wrapper e.g. if on a ForEachProcessor will result in ConditionalProcessor wrapping a ForEachProcessor which would miss the real processor. I think if add

if (processor instanceof ConditionalProcessor) {
                    processor = ((ConditionalProcessor) processor).getProcessor()

in the loop it handle that edge case.

Other then that edge case, i think @jbaiera suggestion to use flattenAllProcessors() will work. Also, it may make sense to add ^^ to flattenAllProcessors() since a conditional processor wrapping the real processor is an implementation detail.

if (enrichProcessor.getPolicyName().equals(request.getName())) {
listener.onFailure(
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it [{}]",
RestStatus.INTERNAL_SERVER_ERROR, request.getName(), pipeline.getId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think CONFLICT - 409

if (pipelinesWithProcessors.isEmpty() == false) {
listener.onFailure(
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to collect and join (comma seperated) these pipelines, and likely adjust the corresponding test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, this is already resolving it to a list (which is why there is no [] in the second arg in the string). I just for fun, added another processor to the test and got this error.

{"error":{"root_cause":[{"type":"status_exception","reason":"Could not delete policy [my_policy] because a pipeline is referencing it [my_pipeline, another_pipeline]"}],"type":"status
_exception","reason":"Could not delete policy [my_policy] because a pipeline is referencing it [my_pipeline, another_pipeline]"},"status":409}                                            

Copy link
Contributor

@jakelandis jakelandis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@hub-cap hub-cap merged commit 9e22fd4 into elastic:enrich Aug 14, 2019
@hub-cap hub-cap deleted the enrich_pipeline_check branch August 14, 2019 18:43
hub-cap added a commit that referenced this pull request Aug 14, 2019
If a pipeline that refrences the policy exists, we should not allow the
policy to be deleted. The user will need to remove the processor from
the pipeline before deleting the policy. This commit adds a check to
ensure that the policy cannot be deleted if it is referenced by any
pipeline in the system.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants