Skip to content

Commit 8d2058c

Browse files
authored
parallel exec output should be merged into one WorkflowModel (#848)
* parallel exec output should be merged into one WorkflowModel Signed-off-by: Dmitrii Tikhomirov <[email protected]> * better combined processsing Signed-off-by: Dmitrii Tikhomirov <[email protected]> * result can be AgenticScope and Map<String, Object> Signed-off-by: Dmitrii Tikhomirov <[email protected]> --------- Signed-off-by: Dmitrii Tikhomirov <[email protected]>
1 parent fd9f440 commit 8d2058c

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.serverlessworkflow.impl.expressions.agentic.langchain4j.AgenticScopeRegistryAssessor;
2525
import java.time.OffsetDateTime;
2626
import java.util.Map;
27+
import java.util.stream.Collectors;
2728

2829
class AgenticModelFactory implements WorkflowModelFactory {
2930

@@ -60,10 +61,11 @@ public WorkflowModel fromAny(WorkflowModel prev, Object obj) {
6061

6162
@Override
6263
public WorkflowModel combine(Map<String, WorkflowModel> workflowVariables) {
63-
// TODO: create a new agenticScope object in the AgenticScopeRegistryAssessor per branch
64-
// TODO: Since we share the same agenticScope object, both branches are updating the same
65-
// instance, so for now we return the first key.
66-
return workflowVariables.values().iterator().next();
64+
Map<String, Object> combinedState =
65+
workflowVariables.entrySet().stream()
66+
.map(e -> Map.entry(e.getKey(), e.getValue().asJavaObject()))
67+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
68+
return newAgenticModel(combinedState);
6769
}
6870

6971
@Override

experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,15 @@ public void testParallel() throws ExecutionException, InterruptedException {
174174
Map<String, String> topic = new HashMap<>();
175175
topic.put("style", "sci-fi");
176176

177+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
178+
Map<String, Object> result =
179+
app.workflowDefinition(workflow).instance(topic).start().get().asMap().orElseThrow();
180+
181+
assertEquals("Fake conflict response", result.get("setting").toString());
182+
assertEquals("Fake hero response", result.get("hero").toString());
183+
assertEquals("Fake setting response", result.get("conflict").toString());
184+
}
185+
177186
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
178187
AgenticScope result =
179188
app.workflowDefinition(workflow)
@@ -183,9 +192,9 @@ public void testParallel() throws ExecutionException, InterruptedException {
183192
.as(AgenticScope.class)
184193
.orElseThrow();
185194

186-
assertEquals("Fake conflict response", result.readState("setting"));
187-
assertEquals("Fake hero response", result.readState("hero"));
188-
assertEquals("Fake setting response", result.readState("conflict"));
195+
assertEquals("Fake conflict response", result.readState("setting").toString());
196+
assertEquals("Fake hero response", result.readState("hero").toString());
197+
assertEquals("Fake setting response", result.readState("conflict").toString());
189198
}
190199
}
191200

@@ -223,6 +232,14 @@ public void testSeqAndThenParallel() throws ExecutionException, InterruptedExcep
223232
Map<String, String> topic = new HashMap<>();
224233
topic.put("fact", "alien");
225234

235+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
236+
Map<String, Object> result =
237+
app.workflowDefinition(workflow).instance(topic).start().get().asMap().orElseThrow();
238+
239+
assertEquals(cultureTraits, result.get("culture"));
240+
assertEquals(technologyTraits, result.get("technology"));
241+
}
242+
226243
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
227244
AgenticScope result =
228245
app.workflowDefinition(workflow)

0 commit comments

Comments
 (0)