Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions src/strands/multiagent/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,41 +469,32 @@ async def _execute_graph(self) -> None:
ready_nodes.clear()

# Execute current batch of ready nodes concurrently
tasks = [
asyncio.create_task(self._execute_node(node))
for node in current_batch
if node not in self.state.completed_nodes
]
tasks = [asyncio.create_task(self._execute_node(node)) for node in current_batch]

for task in tasks:
await task

# Find newly ready nodes after batch execution
ready_nodes.extend(self._find_newly_ready_nodes())
# We add all nodes in current batch as completed batch,
# because a failure would throw exception and code would not make it here
ready_nodes.extend(self._find_newly_ready_nodes(current_batch))

def _find_newly_ready_nodes(self) -> list["GraphNode"]:
def _find_newly_ready_nodes(self, completed_batch: list["GraphNode"]) -> list["GraphNode"]:
"""Find nodes that became ready after the last execution."""
newly_ready = []
for _node_id, node in self.nodes.items():
if (
node not in self.state.completed_nodes
and node not in self.state.failed_nodes
and self._is_node_ready_with_conditions(node)
):
if self._is_node_ready_with_conditions(node, completed_batch):
newly_ready.append(node)
return newly_ready

def _is_node_ready_with_conditions(self, node: GraphNode) -> bool:
def _is_node_ready_with_conditions(self, node: GraphNode, completed_batch: list["GraphNode"]) -> bool:
"""Check if a node is ready considering conditional edges."""
# Get incoming edges to this node
incoming_edges = [edge for edge in self.edges if edge.to_node == node]

if not incoming_edges:
return node in self.entry_points

# Check if at least one incoming edge condition is satisfied
for edge in incoming_edges:
if edge.from_node in self.state.completed_nodes:
if edge.from_node in completed_batch:
if edge.should_traverse(self.state):
logger.debug(
"from=<%s>, to=<%s> | edge ready via satisfied condition", edge.from_node.node_id, node.node_id
Expand Down
Loading
Loading