From 399ad9d1be89a767d8eddf2f2a7a022adbf9f426 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Mon, 25 Nov 2024 21:39:15 +0100 Subject: [PATCH] [Fix_#474] For task implementation Signed-off-by: Francisco Javier Tirado Sarti --- .../serverlessworkflow/impl/ContextAware.java | 22 +++++++ .../impl/DefaultWorkflowPosition.java | 14 ++++- .../impl/DefaultWorkflowPositionFactory.java | 32 ++++++++++ .../serverlessworkflow/impl/TaskContext.java | 38 ++++++++++-- .../impl/WorkflowApplication.java | 15 ++++- .../impl/WorkflowContext.java | 43 +------------- .../impl/WorkflowDefinition.java | 13 ++++- .../impl/WorkflowFilter.java | 3 +- .../impl/WorkflowInstance.java | 18 +++--- .../impl/WorkflowPosition.java | 2 + .../impl/WorkflowPositionFactory.java | 20 +++++++ .../impl/WorkflowUtils.java | 32 +++++----- .../impl/executors/AbstractTaskExecutor.java | 15 ++--- .../executors/DefaultTaskExecutorFactory.java | 2 + .../impl/executors/DoExecutor.java | 2 +- .../impl/executors/ForExecutor.java | 58 +++++++++++++++++++ .../impl/executors/SetExecutor.java | 16 +++-- .../impl/executors/SwitchExecutor.java | 6 +- .../impl/executors/TaskExecutor.java | 8 ++- .../impl/expressions/Expression.java | 3 +- .../impl/expressions/ExpressionUtils.java | 8 +-- .../impl/expressions/JQExpression.java | 15 ++++- .../impl/WorkflowDefinitionTest.java | 19 ++++-- impl/core/src/test/resources/for-collect.yaml | 17 ++++++ impl/core/src/test/resources/for-sum.yaml | 19 ++++++ .../impl/executors/HttpExecutor.java | 11 ++-- 26 files changed, 332 insertions(+), 119 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/ContextAware.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPositionFactory.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java create mode 100644 impl/core/src/test/resources/for-collect.yaml create mode 100644 impl/core/src/test/resources/for-sum.yaml diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/ContextAware.java b/impl/core/src/main/java/io/serverlessworkflow/impl/ContextAware.java new file mode 100644 index 00000000..a58dc348 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/ContextAware.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.Map; + +public interface ContextAware { + Map variables(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPosition.java index 2e51f6a6..54f993b1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPosition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPosition.java @@ -17,7 +17,19 @@ public class DefaultWorkflowPosition implements WorkflowPosition { - private StringBuilder sb = new StringBuilder(""); + private StringBuilder sb; + + DefaultWorkflowPosition() { + this.sb = new StringBuilder(""); + } + + private DefaultWorkflowPosition(WorkflowPosition position) { + this.sb = new StringBuilder(position.toString()); + } + + public DefaultWorkflowPosition copy() { + return new DefaultWorkflowPosition(this); + } @Override public WorkflowPosition addIndex(int index) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPositionFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPositionFactory.java new file mode 100644 index 00000000..00b0085c --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPositionFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +class DefaultWorkflowPositionFactory implements WorkflowPositionFactory { + + private static WorkflowPositionFactory instance = new DefaultWorkflowPositionFactory(); + + public static WorkflowPositionFactory get() { + return instance; + } + + private DefaultWorkflowPositionFactory() {} + + @Override + public WorkflowPosition buildPosition() { + return new DefaultWorkflowPosition(); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 91f6aa61..c23de49f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -19,28 +19,48 @@ import io.serverlessworkflow.api.types.FlowDirective; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.TaskBase; +import java.util.HashMap; +import java.util.Map; -public class TaskContext { +public class TaskContext implements ContextAware { private final JsonNode rawInput; private final T task; + private final WorkflowPosition position; private JsonNode input; private JsonNode output; private JsonNode rawOutput; private FlowDirective flowDirective; + private Map contextVariables; - public TaskContext(JsonNode rawInput, T task) { - this.rawInput = rawInput; + public TaskContext(JsonNode input, WorkflowPosition position) { + this.rawInput = input; + this.position = position; + this.task = null; + this.contextVariables = new HashMap<>(); + init(); + } + + public TaskContext(JsonNode input, TaskContext taskContext, T task) { + this.rawInput = input; + this.position = taskContext.position.copy(); + this.task = task; + this.flowDirective = task.getThen(); + this.contextVariables = new HashMap<>(taskContext.variables()); + init(); + } + + private void init() { this.input = rawInput; this.rawOutput = rawInput; this.output = rawInput; - this.task = task; - this.flowDirective = task.getThen(); } public void input(JsonNode input) { this.input = input; + this.rawOutput = input; + this.output = input; } public JsonNode input() { @@ -81,4 +101,12 @@ public FlowDirective flowDirective() { ? new FlowDirective().withFlowDirectiveEnum(FlowDirectiveEnum.CONTINUE) : flowDirective; } + + public Map variables() { + return contextVariables; + } + + public WorkflowPosition position() { + return position; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 02d807f3..d9da16b9 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -25,7 +25,6 @@ import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory; import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory; import io.serverlessworkflow.impl.resources.ResourceLoaderFactory; - import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -40,17 +39,20 @@ public class WorkflowApplication implements AutoCloseable { private final SchemaValidatorFactory schemaValidatorFactory; private final Collection listeners; private final Map definitions; + private final WorkflowPositionFactory positionFactory; public WorkflowApplication( TaskExecutorFactory taskFactory, ExpressionFactory exprFactory, ResourceLoaderFactory resourceLoaderFactory, SchemaValidatorFactory schemaValidatorFactory, + WorkflowPositionFactory positionFactory, Collection listeners) { this.taskFactory = taskFactory; this.exprFactory = exprFactory; this.resourceLoaderFactory = resourceLoaderFactory; this.schemaValidatorFactory = schemaValidatorFactory; + this.positionFactory = positionFactory; this.listeners = listeners; this.definitions = new ConcurrentHashMap<>(); } @@ -85,6 +87,7 @@ public static class Builder { private Collection listeners; private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get(); private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get(); + private WorkflowPositionFactory positionFactory = DefaultWorkflowPositionFactory.get(); private Builder() {} @@ -111,6 +114,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) { return this; } + public Builder withPositionFactory(WorkflowPositionFactory positionFactory) { + this.positionFactory = positionFactory; + return this; + } + public Builder withSchemaValidatorFactory(SchemaValidatorFactory factory) { this.schemaValidatorFactory = factory; return this; @@ -122,6 +130,7 @@ public WorkflowApplication build() { exprFactory, resourceLoaderFactory, schemaValidatorFactory, + positionFactory, listeners == null ? Collections.emptySet() : Collections.unmodifiableCollection(listeners)); @@ -146,4 +155,8 @@ public void close() throws Exception { } definitions.clear(); } + + public WorkflowPositionFactory positionFactory() { + return positionFactory; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java index 4f0f0f16..d5c1f428 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java @@ -19,49 +19,16 @@ import io.serverlessworkflow.impl.json.JsonUtils; public class WorkflowContext { - private final WorkflowPosition position; private final WorkflowDefinition definition; private final JsonNode input; - private JsonNode current; private JsonNode context; - private WorkflowContext( - WorkflowPosition position, WorkflowDefinition definition, JsonNode input) { - this.position = position; + WorkflowContext(WorkflowDefinition definition, JsonNode input) { this.definition = definition; this.input = input; - this.current = input.deepCopy(); this.context = JsonUtils.mapper().createObjectNode(); } - public static Builder builder(WorkflowDefinition definition, JsonNode input) { - return new Builder(definition, input); - } - - public static class Builder { - private WorkflowPosition position = new DefaultWorkflowPosition(); - private WorkflowDefinition definition; - private JsonNode input; - - private Builder(WorkflowDefinition definition, JsonNode input) { - this.definition = definition; - this.input = input; - } - - public Builder position(WorkflowPosition position) { - this.position = position; - return this; - } - - public WorkflowContext build() { - return new WorkflowContext(position, definition, input); - } - } - - public WorkflowPosition position() { - return position; - } - public JsonNode context() { return context; } @@ -74,14 +41,6 @@ public JsonNode rawInput() { return input; } - public void current(JsonNode output) { - this.current = output; - } - - public JsonNode current() { - return current; - } - public WorkflowDefinition definition() { return definition; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index f593cea3..3a76ff1f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -28,7 +28,6 @@ import io.serverlessworkflow.impl.jsonschema.SchemaValidator; import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory; import io.serverlessworkflow.impl.resources.ResourceLoader; - import java.nio.file.Path; import java.util.Collection; import java.util.Map; @@ -47,6 +46,7 @@ public class WorkflowDefinition implements AutoCloseable { private final ExpressionFactory exprFactory; private final ResourceLoader resourceLoader; private final SchemaValidatorFactory schemaValidatorFactory; + private final WorkflowPositionFactory positionFactory; private final Map> taskExecutors = new ConcurrentHashMap<>(); @@ -56,12 +56,14 @@ private WorkflowDefinition( TaskExecutorFactory taskFactory, ResourceLoader resourceLoader, ExpressionFactory exprFactory, - SchemaValidatorFactory schemaValidatorFactory) { + SchemaValidatorFactory schemaValidatorFactory, + WorkflowPositionFactory positionFactory) { this.workflow = workflow; this.listeners = listeners; this.taskFactory = taskFactory; this.exprFactory = exprFactory; this.schemaValidatorFactory = schemaValidatorFactory; + this.positionFactory = positionFactory; this.resourceLoader = resourceLoader; if (workflow.getInput() != null) { Input input = workflow.getInput(); @@ -90,7 +92,8 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, application.taskFactory(), application.resourceLoaderFactory().getResourceLoader(path), application.expressionFactory(), - application.validatorFactory()); + application.validatorFactory(), + application.positionFactory()); } public WorkflowInstance execute(Object input) { @@ -142,6 +145,10 @@ public ResourceLoader resourceLoader() { return resourceLoader; } + public WorkflowPositionFactory positionFactory() { + return positionFactory; + } + @Override public void close() { // TODO close resourcers hold for uncompleted process instances, if any diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java index 7fde97ba..7d25df48 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java @@ -16,9 +16,8 @@ package io.serverlessworkflow.impl; import com.fasterxml.jackson.databind.JsonNode; -import java.util.Optional; @FunctionalInterface public interface WorkflowFilter { - JsonNode apply(WorkflowContext workflow, Optional> task, JsonNode node); + JsonNode apply(WorkflowContext workflow, TaskContext task, JsonNode node); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index bd2f94b8..1361c43f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -18,24 +18,26 @@ import static io.serverlessworkflow.impl.json.JsonUtils.toJavaValue; import com.fasterxml.jackson.databind.JsonNode; -import java.util.Optional; public class WorkflowInstance { private WorkflowState state; private WorkflowContext context; + private TaskContext taskContext; WorkflowInstance(WorkflowDefinition definition, JsonNode input) { definition.inputSchemaValidator().ifPresent(v -> v.validate(input)); - context = WorkflowContext.builder(definition, input).build(); + context = new WorkflowContext(definition, input); + taskContext = new TaskContext<>(input, definition.positionFactory().buildPosition()); definition .inputFilter() - .ifPresent(f -> context.current(f.apply(context, Optional.empty(), context.current()))); + .ifPresent(f -> taskContext.input(f.apply(context, taskContext, input))); state = WorkflowState.STARTED; - WorkflowUtils.processTaskList(definition.workflow().getDo(), context); + taskContext.rawOutput( + WorkflowUtils.processTaskList(definition.workflow().getDo(), context, taskContext)); definition .outputFilter() - .ifPresent(f -> context.current(f.apply(context, Optional.empty(), context.current()))); - definition.outputSchemaValidator().ifPresent(v -> v.validate(context.current())); + .ifPresent(f -> taskContext.output(f.apply(context, taskContext, taskContext.rawOutput()))); + definition.outputSchemaValidator().ifPresent(v -> v.validate(taskContext.output())); } public WorkflowState state() { @@ -43,10 +45,10 @@ public WorkflowState state() { } public Object output() { - return toJavaValue(context.current()); + return toJavaValue(taskContext.output()); } public Object outputAsJsonNode() { - return context.current(); + return taskContext.output(); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java index c43d4b2f..cf63844a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java @@ -24,4 +24,6 @@ public interface WorkflowPosition { WorkflowPosition addIndex(int index); WorkflowPosition back(); + + WorkflowPosition copy(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java new file mode 100644 index 00000000..e93a4c33 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +public interface WorkflowPositionFactory { + WorkflowPosition buildPosition(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 259a0e6a..54abd7e7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -35,7 +35,6 @@ import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory; import io.serverlessworkflow.impl.resources.ResourceLoader; import io.serverlessworkflow.impl.resources.StaticResource; - import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; @@ -128,32 +127,33 @@ private static TaskItem findTaskByName(ListIterator iter, String taskN throw new IllegalArgumentException("Cannot find task with name " + taskName); } - public static void processTaskList(List tasks, WorkflowContext context) { - context.position().addProperty("do"); + public static JsonNode processTaskList( + List tasks, WorkflowContext context, TaskContext parentTask) { + parentTask.position().addProperty("do"); + TaskContext currentContext = parentTask; if (!tasks.isEmpty()) { ListIterator iter = tasks.listIterator(); TaskItem nextTask = iter.next(); while (nextTask != null) { TaskItem task = nextTask; - context.position().addIndex(iter.nextIndex()).addProperty(task.getName()); + parentTask.position().addIndex(iter.nextIndex()).addProperty(task.getName()); context .definition() .listeners() - .forEach(l -> l.onTaskStarted(context.position(), task.getTask())); - TaskContext taskContext = + .forEach(l -> l.onTaskStarted(parentTask.position(), task.getTask())); + currentContext = context .definition() .taskExecutors() .computeIfAbsent( - context.position().jsonPointer(), + parentTask.position().jsonPointer(), k -> context .definition() .taskFactory() .getTaskExecutor(task.getTask(), context.definition())) - .apply(context, context.current()); - context.current(taskContext.output()); - FlowDirective flowDirective = taskContext.flowDirective(); + .apply(context, parentTask, currentContext.output()); + FlowDirective flowDirective = currentContext.flowDirective(); if (flowDirective.getFlowDirectiveEnum() != null) { switch (flowDirective.getFlowDirectiveEnum()) { case CONTINUE: @@ -170,15 +170,21 @@ public static void processTaskList(List tasks, WorkflowContext context context .definition() .listeners() - .forEach(l -> l.onTaskEnded(context.position(), task.getTask())); - context.position().back(); + .forEach(l -> l.onTaskEnded(parentTask.position(), task.getTask())); + parentTask.position().back(); } } - context.position().back(); + parentTask.position().back(); + return currentContext.output(); } public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) { + assert str != null; Expression expression = exprFactory.getExpression(str); return expression::eval; } + + public static Optional optionalFilter(ExpressionFactory exprFactory, String str) { + return str != null ? Optional.of(buildWorkflowFilter(exprFactory, str)) : Optional.empty(); + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 56e55120..086c7c51 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -83,23 +83,20 @@ private void buildContextProcessors(WorkflowDefinition definition) { } @Override - public TaskContext apply(WorkflowContext workflowContext, JsonNode rawInput) { - TaskContext taskContext = new TaskContext<>(rawInput, task); + public TaskContext apply( + WorkflowContext workflowContext, TaskContext parentContext, JsonNode input) { + TaskContext taskContext = new TaskContext<>(input, parentContext, task); inputSchemaValidator.ifPresent(s -> s.validate(taskContext.rawInput())); inputProcessor.ifPresent( - p -> - taskContext.input( - p.apply(workflowContext, Optional.of(taskContext), taskContext.rawInput()))); + p -> taskContext.input(p.apply(workflowContext, taskContext, taskContext.rawInput()))); internalExecute(workflowContext, taskContext); outputProcessor.ifPresent( - p -> - taskContext.output( - p.apply(workflowContext, Optional.of(taskContext), taskContext.rawOutput()))); + p -> taskContext.output(p.apply(workflowContext, taskContext, taskContext.rawOutput()))); outputSchemaValidator.ifPresent(s -> s.validate(taskContext.output())); contextProcessor.ifPresent( p -> workflowContext.context( - p.apply(workflowContext, Optional.of(taskContext), workflowContext.context()))); + p.apply(workflowContext, taskContext, workflowContext.context()))); contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); return taskContext; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index b5ac0119..45a988a7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -65,6 +65,8 @@ public TaskExecutor getTaskExecutor( return new DoExecutor(task.getDoTask(), definition); } else if (task.getSetTask() != null) { return new SetExecutor(task.getSetTask(), definition); + } else if (task.getForTask() != null) { + return new ForExecutor(task.getForTask(), definition); } throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet"); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java index 30f35f95..df364c14 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java @@ -29,6 +29,6 @@ protected DoExecutor(DoTask task, WorkflowDefinition definition) { @Override protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - WorkflowUtils.processTaskList(task.getDo(), workflow); + taskContext.rawOutput(WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext)); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java new file mode 100644 index 00000000..511575f3 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.ForTask; +import io.serverlessworkflow.api.types.ForTaskConfiguration; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowUtils; +import java.util.Iterator; +import java.util.Optional; + +public class ForExecutor extends AbstractTaskExecutor { + + private final WorkflowFilter collectionExpr; + private final Optional whileExpr; + + protected ForExecutor(ForTask task, WorkflowDefinition definition) { + super(task, definition); + ForTaskConfiguration forConfig = task.getFor(); + this.collectionExpr = + WorkflowUtils.buildWorkflowFilter(definition.expressionFactory(), forConfig.getIn()); + this.whileExpr = WorkflowUtils.optionalFilter(definition.expressionFactory(), task.getWhile()); + } + + @Override + protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { + Iterator iter = + collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(); + int i = 0; + while (iter.hasNext() + && whileExpr + .map(w -> w.apply(workflow, taskContext, taskContext.rawOutput())) + .map(n -> n.asBoolean(true)) + .orElse(true)) { + JsonNode item = iter.next(); + taskContext.variables().put(task.getFor().getEach(), item); + taskContext.variables().put(task.getFor().getAt(), i++); + taskContext.rawOutput(WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext)); + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java index b9d9db86..0f0d999e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java @@ -15,25 +15,33 @@ */ package io.serverlessworkflow.impl.executors; -import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.SetTask; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.expressions.ExpressionUtils; import io.serverlessworkflow.impl.json.JsonUtils; import io.serverlessworkflow.impl.json.MergeUtils; +import java.util.Map; public class SetExecutor extends AbstractTaskExecutor { - private JsonNode toBeSet; + private Map toBeSet; protected SetExecutor(SetTask task, WorkflowDefinition definition) { super(task, definition); - this.toBeSet = JsonUtils.fromValue(task.getSet().getAdditionalProperties()); + this.toBeSet = + ExpressionUtils.buildExpressionMap( + task.getSet().getAdditionalProperties(), definition.expressionFactory()); } @Override protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - taskContext.rawOutput(MergeUtils.merge(toBeSet, taskContext.input())); + taskContext.rawOutput( + MergeUtils.merge( + JsonUtils.fromValue( + ExpressionUtils.evaluateExpressionMap( + toBeSet, workflow, taskContext, taskContext.input())), + taskContext.input())); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java index f081dfa4..dee0cee7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java @@ -26,7 +26,6 @@ import io.serverlessworkflow.impl.WorkflowUtils; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; public class SwitchExecutor extends AbstractTaskExecutor { @@ -52,10 +51,7 @@ protected SwitchExecutor(SwitchTask task, WorkflowDefinition definition) { @Override protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { for (Entry entry : workflowFilters.entrySet()) { - if (entry - .getValue() - .apply(workflow, Optional.of(taskContext), taskContext.input()) - .asBoolean()) { + if (entry.getValue().apply(workflow, taskContext, taskContext.input()).asBoolean()) { taskContext.flowDirective(entry.getKey().getThen()); return; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java index 62228199..b4b66a9a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java @@ -19,7 +19,9 @@ import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import java.util.function.BiFunction; -public interface TaskExecutor - extends BiFunction> {} +@FunctionalInterface +public interface TaskExecutor { + TaskContext apply( + WorkflowContext workflowContext, TaskContext parentContext, JsonNode input); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/Expression.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/Expression.java index f9d799ec..42566c77 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/Expression.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/Expression.java @@ -18,8 +18,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import java.util.Optional; public interface Expression { - JsonNode eval(WorkflowContext workflowContext, Optional> context, JsonNode node); + JsonNode eval(WorkflowContext workflowContext, TaskContext context, JsonNode node); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/ExpressionUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/ExpressionUtils.java index 72b4e90c..7f776322 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/ExpressionUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/ExpressionUtils.java @@ -20,7 +20,6 @@ import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.json.JsonUtils; import java.util.Map; -import java.util.Optional; public class ExpressionUtils { @@ -35,10 +34,7 @@ public static Map buildExpressionMap( } public static Map evaluateExpressionMap( - Map origMap, - WorkflowContext workflow, - Optional> task, - JsonNode n) { + Map origMap, WorkflowContext workflow, TaskContext task, JsonNode n) { return new ProxyMap( origMap, o -> @@ -54,7 +50,7 @@ public static Object buildExpressionObject(Object obj, ExpressionFactory factory } public static Object evaluateExpressionObject( - Object obj, WorkflowContext workflow, Optional> task, JsonNode node) { + Object obj, WorkflowContext workflow, TaskContext task, JsonNode node) { return obj instanceof Map ? ExpressionUtils.evaluateExpressionMap((Map) obj, workflow, task, node) : obj; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java index 7b008fcb..ae6c784f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import io.serverlessworkflow.impl.ContextAware; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.json.JsonUtils; @@ -177,10 +178,10 @@ public JsonNode getResult() { } @Override - public JsonNode eval(WorkflowContext workflow, Optional> task, JsonNode node) { + public JsonNode eval(WorkflowContext workflow, TaskContext task, JsonNode node) { TypedOutput output = output(JsonNode.class); try { - internalExpr.apply(this.scope.get(), node, output); + internalExpr.apply(createScope(workflow, task), node, output); return output.getResult(); } catch (JsonQueryException e) { throw new IllegalArgumentException( @@ -188,6 +189,16 @@ public JsonNode eval(WorkflowContext workflow, Optional> task, Js } } + private Scope createScope(WorkflowContext workflow, TaskContext task) { + return createScope(scope.get(), task); + } + + private Scope createScope(Scope parentScope, ContextAware context) { + Scope childScope = Scope.newChildScope(parentScope); + context.variables().forEach((k, v) -> childScope.setValue(k, JsonUtils.fromValue(v))); + return childScope; + } + private void checkFunctionCall(net.thisptr.jackson.jq.Expression toCheck) throws JsonQueryException { if (toCheck instanceof FunctionCall) { diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java index f6c3455a..39adcf9d 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import java.util.stream.Stream; import org.assertj.core.api.Condition; @@ -45,10 +46,6 @@ void testWorkflowExecution(String fileName, Object input, Condition cond } private static Stream provideParameters() { - Map petInput = Map.of("petId", 10); - Condition petCondition = - new Condition<>( - o -> ((Map) o).containsKey("photoUrls"), "callHttpCondition"); return Stream.of( Arguments.of( "switch-then-string.yaml", @@ -82,6 +79,18 @@ private static Stream provideParameters() { o.equals( Map.of( "orderType", "unknown", "log", "warn", "message", "something's wrong")), - "switch-unknown"))); + "switch-unknown")), + Arguments.of( + "for-sum.yaml", + Map.of("input", Arrays.asList(1, 2, 3)), + new Condition(o -> o.equals(6), "for-sum")), + Arguments.of( + "for-collect.yaml", + Map.of("input", Arrays.asList(1, 2, 3)), + new Condition( + o -> + o.equals( + Map.of("input", Arrays.asList(1, 2, 3), "output", Arrays.asList(2, 4, 6))), + "for-collect"))); } } diff --git a/impl/core/src/test/resources/for-collect.yaml b/impl/core/src/test/resources/for-collect.yaml new file mode 100644 index 00000000..7bcc48c2 --- /dev/null +++ b/impl/core/src/test/resources/for-collect.yaml @@ -0,0 +1,17 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: for-collect-example + version: '0.1.0' +do: + - sumAll: + for: + each: number + in: .input + at: index + input: + from: '{input: .input, output: []}' + do: + - sumIndex: + output: + as: .output+=[$number+$index+1] \ No newline at end of file diff --git a/impl/core/src/test/resources/for-sum.yaml b/impl/core/src/test/resources/for-sum.yaml new file mode 100644 index 00000000..e0fe106b --- /dev/null +++ b/impl/core/src/test/resources/for-sum.yaml @@ -0,0 +1,19 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: for-sum-example + version: '0.1.0' +do: + - initCounter: + set: + counter: 0 + - sumAll: + for: + each: number + in: .input + do: + - accumulate: + output: + as: .counter+=$number + output: + as: .counter \ No newline at end of file diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java index 684acd47..5ecd27af 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java @@ -38,7 +38,6 @@ import jakarta.ws.rs.client.WebTarget; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; public class HttpExecutor implements CallableTask { @@ -82,8 +81,7 @@ public void init(CallHTTP task, WorkflowDefinition definition) { (request, workflow, context, node) -> request.post( Entity.json( - ExpressionUtils.evaluateExpressionObject( - body, workflow, Optional.of(context), node)), + ExpressionUtils.evaluateExpressionObject(body, workflow, context, node)), JsonNode.class); break; case HttpMethod.GET: @@ -97,12 +95,11 @@ public JsonNode apply( WorkflowContext workflow, TaskContext taskContext, JsonNode input) { WebTarget target = targetSupplier.apply(workflow, taskContext, input); for (Entry entry : - ExpressionUtils.evaluateExpressionMap(queryMap, workflow, Optional.of(taskContext), input) - .entrySet()) { + ExpressionUtils.evaluateExpressionMap(queryMap, workflow, taskContext, input).entrySet()) { target = target.queryParam(entry.getKey(), entry.getValue()); } Builder request = target.request(); - ExpressionUtils.evaluateExpressionMap(headersMap, workflow, Optional.of(taskContext), input) + ExpressionUtils.evaluateExpressionMap(headersMap, workflow, taskContext, input) .forEach(request::header); return requestFunction.apply(request, workflow, taskContext, input); } @@ -153,7 +150,7 @@ public ExpressionURISupplier(Expression expr) { @Override public WebTarget apply(WorkflowContext workflow, TaskContext task, JsonNode node) { - return client.target(expr.eval(workflow, Optional.of(task), node).asText()); + return client.target(expr.eval(workflow, task, node).asText()); } } }