From 9600cab7f591bc8981e319ea69494ea349984e43 Mon Sep 17 00:00:00 2001 From: edwardvaneechoud Date: Fri, 22 Aug 2025 17:39:33 +0200 Subject: [PATCH 1/2] Adding test file for print_tree --- .../flowfile/print_tree/test_print_tree.py | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 flowfile_core/tests/flowfile/print_tree/test_print_tree.py diff --git a/flowfile_core/tests/flowfile/print_tree/test_print_tree.py b/flowfile_core/tests/flowfile/print_tree/test_print_tree.py new file mode 100644 index 00000000..a03f609f --- /dev/null +++ b/flowfile_core/tests/flowfile/print_tree/test_print_tree.py @@ -0,0 +1,115 @@ +from flowfile_core.flowfile.flow_graph import FlowGraph, add_connection +from flowfile_core.schemas import input_schema, transform_schema, schemas + +try: + import os + from tests.flowfile_core_test_utils import (is_docker_available, ensure_password_is_available) + from tests.utils import ensure_cloud_storage_connection_is_available_and_get_connection, get_cloud_connection +except ModuleNotFoundError: + import os + import sys + sys.path.append(os.path.dirname(os.path.abspath("flowfile_core/tests/flowfile_core_test_utils.py"))) + sys.path.append(os.path.dirname(os.path.abspath("flowfile_core/tests/utils.py"))) + # noinspection PyUnresolvedReferences + from flowfile_core_test_utils import (is_docker_available, ensure_password_is_available) + from tests.utils import ensure_cloud_storage_connection_is_available_and_get_connection, get_cloud_connection + + +def create_flow_settings(flow_id: int = 1) -> schemas.FlowSettings: + """Create basic flow settings for tests""" + return schemas.FlowSettings( + flow_id=flow_id, + execution_mode="Performance", + execution_location="local", + path="/tmp/test_flow" + ) + + +def create_basic_flow(flow_id: int = 1, name: str = "test_flow") -> FlowGraph: + """Create a basic flow graph for testing""" + return FlowGraph(flow_settings=create_flow_settings(flow_id), name=name) + + +def test_graph_tree(capsys): + """ Test made to the FlowGraph's print_tree method""" + flow = create_basic_flow() + + manual_input_data_1 = [ + [1, 2, 3], + ["North", "South", "East"], + [10, 5, 8], + [150, 300, 200] + ] + + manual_input_data_2 = [ + [4, 5, 6], + ["West", "North", "South"], + [15, 12, 7], + [100, 200, 250] + ] + + manual_input_data_3 = [ + [7, 8, 9], + ["East", "West", "North"], + [10, 6, 3], + [180, 220, 350] + ] + + manual_input_data_4 = [ + [10, 11, 12], + ["South", "East", "West"], + [9, 4, 7], + [280, 260, 230] + ] + + for i,j in zip([manual_input_data_1,manual_input_data_2,manual_input_data_3,manual_input_data_4], range(1,5)): + data = input_schema.NodeManualInput( + flow_id=1, + node_id=j, + raw_data_format=input_schema.RawData( + columns=[ + input_schema.MinimalFieldInfo(name="id", data_type="Integer"), + input_schema.MinimalFieldInfo(name="region", data_type="String"), + input_schema.MinimalFieldInfo(name="quantity", data_type="Integer"), + input_schema.MinimalFieldInfo(name="price", data_type="Integer") + ], + data=i + ) + ) + flow.add_manual_input(data) + + # Add union node + union_node = input_schema.NodeUnion( + flow_id=1, + node_id=5, + depending_on_ids=[1, 2, 3, 4], + union_input=transform_schema.UnionInput(mode="relaxed") + ) + flow.add_union(union_node) + for i in range(1, 5): + connection = input_schema.NodeConnection.create_from_simple_input(i, 5, 'main') + add_connection(flow, connection) + + # Add group by node + groupby_node = input_schema.NodeGroupBy( + flow_id=1, + node_id=6, + depending_on_id=1, + groupby_input=transform_schema.GroupByInput( + agg_cols=[ + transform_schema.AggColl("region", "groupby"), + transform_schema.AggColl("quantity", "sum", "total_quantity"), + transform_schema.AggColl("price", "mean", "avg_price"), + transform_schema.AggColl("quantity", "count", "num_transactions") + ] + ) + ) + flow.add_group_by(groupby_node) + add_connection(flow, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) + + flow.print_tree() + stdout = capsys.readouterr().out + + tree_elements = ["(id=", ">", "1.", "Execution Order", "Flow Graph Visualization", "="] + for element in tree_elements: + assert element in stdout From 2c132fa01288d2256f2b6ac63c494b7532b50760 Mon Sep 17 00:00:00 2001 From: edwardvaneechoud Date: Fri, 22 Aug 2025 17:43:41 +0200 Subject: [PATCH 2/2] Small refactor that adds types to functions --- .../flowfile_core/flowfile/flow_graph.py | 45 ++-- .../flowfile/graph_tree/__init__.py | 0 .../flowfile/graph_tree/graph_tree.py | 250 ++++++++++++++++++ .../flowfile/graph_tree/models.py | 15 ++ .../flowfile_core/flowfile/util/graph_tree.py | 233 ---------------- .../flowfile/print_tree/test_print_tree.py | 2 +- .../tests/flowfile/test_code_generator.py | 99 ------- 7 files changed, 287 insertions(+), 357 deletions(-) create mode 100644 flowfile_core/flowfile_core/flowfile/graph_tree/__init__.py create mode 100644 flowfile_core/flowfile_core/flowfile/graph_tree/graph_tree.py create mode 100644 flowfile_core/flowfile_core/flowfile/graph_tree/models.py delete mode 100644 flowfile_core/flowfile_core/flowfile/util/graph_tree.py diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 8883e467..8c6a915b 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -2,7 +2,6 @@ import pickle import polars as pl import fastexcel -import re from fastapi.exceptions import HTTPException from time import time from functools import partial @@ -11,7 +10,6 @@ from copy import deepcopy from pyarrow.parquet import ParquetFile from flowfile_core.configs import logger -from flowfile_core.configs.settings import OFFLOAD_TO_WORKER from flowfile_core.configs.flow_logger import FlowLogger from flowfile_core.flowfile.sources.external_sources.factory import data_source_factory from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import cast_str_to_polars_type, FlowfileColumn @@ -34,7 +32,10 @@ from flowfile_core.flowfile.analytics.utils import create_graphic_walker_node_from_node_promise from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.flowfile.util.execution_orderer import compute_execution_plan -from flowfile_core.flowfile.util.graph_tree import add_undrawn_nodes, build_flow_paths, build_node_info, calculate_depth, define_node_connections, draw_merged_paths, draw_standalone_paths, group_nodes_by_depth, trace_path +from flowfile_core.flowfile.graph_tree.graph_tree import (add_un_drawn_nodes, build_flow_paths, + build_node_info, calculate_depth, + define_node_connections, draw_merged_paths, + draw_standalone_paths, group_nodes_by_depth) from flowfile_core.flowfile.flow_data_engine.polars_code_parser import polars_code_parser from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import (ExternalDatabaseFetcher, ExternalDatabaseWriter, @@ -317,43 +318,37 @@ def __repr__(self): def print_tree(self): """Print flow_graph as a visual tree structure, showing the DAG relationships with ASCII art.""" if not self._node_db: - print("Empty flow graph") + self.flow_logger.info("Empty flow graph") return - # Build node information node_info = build_node_info(self.nodes) # Calculate depths for all nodes for node_id in node_info: calculate_depth(node_id, node_info) - + # Group nodes by depth depth_groups, max_depth = group_nodes_by_depth(node_info) # Sort nodes within each depth group for depth in depth_groups: depth_groups[depth].sort() - + # Create the main flow visualization - lines = [] - lines.append("=" * 80) - lines.append("Flow Graph Visualization") - lines.append("=" * 80) - lines.append("") - + lines = ["=" * 80, "Flow Graph Visualization", "=" * 80, ""] + # Track which nodes connect to what merge_points = define_node_connections(node_info) - + # Build the flow paths - paths = build_flow_paths(node_info,self._flow_starts, merge_points) - + # Find the maximum label length for each depth level max_label_length = {} for depth in range(max_depth + 1): if depth in depth_groups: - max_len = max(len(node_info[nid]['label']) for nid in depth_groups[depth]) + max_len = max(len(node_info[nid].label) for nid in depth_groups[depth]) max_label_length[depth] = max_len - + # Draw the paths drawn_nodes = set() merge_drawn = set() @@ -362,9 +357,9 @@ def print_tree(self): paths_by_merge = {} standalone_paths = [] - #Build flow paths + # Build flow paths paths = build_flow_paths(node_info, self._flow_starts, merge_points) - + # Define paths to merge and standalone paths for path in paths: if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1: @@ -376,19 +371,21 @@ def print_tree(self): standalone_paths.append(path) # Draw merged paths - draw_merged_paths(node_info, merge_points, paths_by_merge,merge_drawn, drawn_nodes, lines) + draw_merged_paths(node_info, merge_points, paths_by_merge, merge_drawn, drawn_nodes, lines) # Draw standlone paths draw_standalone_paths(drawn_nodes, standalone_paths, lines, node_info) # Add undrawn nodes - add_undrawn_nodes(drawn_nodes, node_info, lines) + add_un_drawn_nodes(drawn_nodes, node_info, lines) try: - skip_nodes, ordered_nodes = compute_execution_plan(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + skip_nodes, ordered_nodes = compute_execution_plan( + nodes=self.nodes, + flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) if ordered_nodes: for i, node in enumerate(ordered_nodes, 1): - lines.append(f" {i:3d}. {node_info[node.node_id]['label']}") + lines.append(f" {i:3d}. {node_info[node.node_id].label}") except Exception as e: lines.append(f" Could not determine execution order: {e}") diff --git a/flowfile_core/flowfile_core/flowfile/graph_tree/__init__.py b/flowfile_core/flowfile_core/flowfile/graph_tree/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/flowfile_core/flowfile_core/flowfile/graph_tree/graph_tree.py b/flowfile_core/flowfile_core/flowfile/graph_tree/graph_tree.py new file mode 100644 index 00000000..b0b50d24 --- /dev/null +++ b/flowfile_core/flowfile_core/flowfile/graph_tree/graph_tree.py @@ -0,0 +1,250 @@ +from pydantic import BaseModel + +from flowfile_core.flowfile.flow_node.flow_node import FlowNode + +from flowfile_core.flowfile.graph_tree.models import BranchInfo, InputInfo + + +def calculate_depth(node_id: int, node_info: dict[int, BranchInfo], visited: set = None) -> int: + """Calculates the depth of each node.""" + + if visited is None: + visited = set() + if node_id in visited: + return node_info[node_id].depth + visited.add(node_id) + + max_input_depth = -1 + inputs = node_info[node_id].inputs + + for main_id in inputs.main: + max_input_depth = max(max_input_depth, calculate_depth(main_id, node_info, visited)) + if inputs.left: + max_input_depth = max(max_input_depth, calculate_depth(inputs.left, node_info, visited)) + if inputs.right: + max_input_depth = max(max_input_depth, calculate_depth(inputs.right, node_info, visited)) + + node_info[node_id].depth = max_input_depth + 1 + return node_info[node_id].depth + + +# Trace paths from each root +def trace_path(node_id: int, node_info: dict[int, BranchInfo], merge_points: dict[int, list[int]], + current_path: list[int] | None = None): + """Define the trace of each node path""" + if current_path is None: + current_path = [] + + current_path = current_path + [node_id] + outputs = node_info[node_id].outputs + + if not outputs: + # End of path + return [current_path] + + # If this node has multiple outputs or connects to a merge point, branch + all_paths = [] + for output_id in outputs: + if output_id in merge_points and len(merge_points[output_id]) > 1: + # This is a merge point, end this path here + all_paths.append(current_path + [output_id]) + else: + # Continue the path + all_paths.extend(trace_path(output_id, node_info, merge_points, current_path)) + return all_paths + + +def build_node_info(nodes: list[FlowNode]) -> dict[int, BranchInfo]: + """Builds node information used to construct the graph tree.""" + + node_info = {} + for node in nodes: + node_id = node.node_id + + # Get node label + operation = node.node_type.replace("_", " ").title() if node.node_type else "Unknown" + label = f"{operation} (id={node_id})" + if hasattr(node, 'setting_input') and hasattr(node.setting_input, 'description'): + if node.setting_input.description: + desc = node.setting_input.description + if len(desc) > 20: # Truncate long descriptions + desc = desc[:17] + "..." + label = f"{operation} ({node_id}): {desc}" + + # Get inputs and outputs + inputs = InputInfo( + main=[n.node_id for n in (node.node_inputs.main_inputs or [])], + left=node.node_inputs.left_input.node_id if node.node_inputs.left_input else None, + right=node.node_inputs.right_input.node_id if node.node_inputs.right_input else None + ) + outputs = [n.node_id for n in node.leads_to_nodes] + + node_info[node_id] = BranchInfo( + label=label, + short_label=f"{operation} ({node_id})", + inputs=inputs, + outputs=outputs, + depth=0 + ) + + return node_info + + +def group_nodes_by_depth(node_info: dict[int, BranchInfo]) -> tuple[dict[int, list[int]], int]: + """Groups each node by depth""" + depth_groups = {} + max_depth = 0 + for node_id, info in node_info.items(): + depth = info.depth + max_depth = max(max_depth, depth) + if depth not in depth_groups: + depth_groups[depth] = [] + depth_groups[depth].append(node_id) + + return depth_groups, max_depth + + +def define_node_connections(node_info: dict[int, BranchInfo]) -> dict[int, list[int]]: + """Defines node connections to merge""" + merge_points = {} # target_id -> list of source_ids + for node_id, info in node_info.items(): + for output_id in info.outputs: + if output_id not in merge_points: + merge_points[output_id] = [] + merge_points[output_id].append(node_id) + + return merge_points + + +def build_flow_paths(node_info: dict[int, BranchInfo], flow_starts: list[FlowNode], + merge_points: dict[int, list[int]]): + """Build the flow paths to be drawn""" + + + # Find all root nodes (no inputs) + root_nodes = [nid for nid, info in node_info.items() + if not info.inputs.main and not info.inputs.left and not info.inputs.right] + + if not root_nodes and flow_starts: + root_nodes = [n.node_id for n in flow_starts] + paths = [] # List of paths through the graph + + # Get all paths + for root_id in root_nodes: + paths.extend(trace_path(root_id, node_info, merge_points)) + + return paths + + +def group_paths(paths:list, merge_points:dict): + """Groups each node path.""" + paths_by_merge = {} + standalone_paths = [] + + for path in paths: + if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1: + merge_id = path[-1] + if merge_id not in paths_by_merge: + paths_by_merge[merge_id] = [] + paths_by_merge[merge_id].append(path) + else: + standalone_paths.append(path) + return paths_by_merge, standalone_paths + + +def draw_merged_paths(node_info: dict[int, BranchInfo], + merge_points: dict[int, list[int]], + paths_by_merge: dict[int, list[list[int]]], + merge_drawn: set, + drawn_nodes: set, + lines: list[str]): + """Draws paths for each node that merges.""" + for merge_id, merge_paths in paths_by_merge.items(): + if merge_id in merge_drawn: + continue + merge_info = node_info[merge_id] + sources = merge_points[merge_id] + + # Draw each source path leading to the merge + for i, source_id in enumerate(sources): + # Find the path containing this source + source_path = None + for path in merge_paths: + if source_id in path: + + source_path = path[:path.index(source_id) + 1] + break + + if source_path: + # Build the line for this path + line_parts = [] + for j, nid in enumerate(source_path): + if j == 0: + line_parts.append(node_info[nid].label) + else: + line_parts.append(f" ──> {node_info[nid].short_label}") + + # Add the merge arrow + if i == 0: + # First source + line = "".join(line_parts) + " ─────┐" + lines.append(line) + elif i == len(sources) - 1: + # Last source + line = "".join(line_parts) + " ─────┴──> " + merge_info.label + lines.append(line) + + # Continue with the rest of the path after merge + remaining = node_info[merge_id].outputs + while remaining: + next_id = remaining[0] + lines[-1] += f" ──> {node_info[next_id].label}" + remaining = node_info[next_id].outputs + drawn_nodes.add(next_id) + else: + # Middle sources + line = "".join(line_parts) + " ─────┤" + lines.append(line) + + for nid in source_path: + drawn_nodes.add(nid) + + drawn_nodes.add(merge_id) + merge_drawn.add(merge_id) + lines.append("") # Add spacing between merge groups + return paths_by_merge + + +def draw_standalone_paths(drawn_nodes: set[int], standalone_paths: list[list[int]], lines: list[str], + node_info: dict[int, BranchInfo]): + """ Draws paths that do not merge.""" + # Draw standalone paths + for path in standalone_paths: + if all(nid in drawn_nodes for nid in path): + continue + + line_parts = [] + for i, node_id in enumerate(path): + if node_id not in drawn_nodes: + if i == 0: + line_parts.append(node_info[node_id].label) + else: + line_parts.append(f" ──> {node_info[node_id].short_label}") + drawn_nodes.add(node_id) + + if line_parts: + lines.append("".join(line_parts)) + + +def add_un_drawn_nodes(drawn_nodes: set[int], node_info: dict[int, BranchInfo], lines: list[str]): + """Adds isolated nodes if exists.""" + # Add any remaining undrawn nodes + + for node_id in node_info: + if node_id not in drawn_nodes: + lines.append(node_info[node_id].label + " (isolated)") + + lines.append("") + lines.append("=" * 80) + lines.append("Execution Order") + lines.append("=" * 80) diff --git a/flowfile_core/flowfile_core/flowfile/graph_tree/models.py b/flowfile_core/flowfile_core/flowfile/graph_tree/models.py new file mode 100644 index 00000000..fb818170 --- /dev/null +++ b/flowfile_core/flowfile_core/flowfile/graph_tree/models.py @@ -0,0 +1,15 @@ +from pydantic import BaseModel + + +class InputInfo(BaseModel): + main: list[int] + right: int | None = None + left: int | None = None + + +class BranchInfo(BaseModel): + label: str + short_label: str + inputs: InputInfo + outputs: list[int] + depth: int diff --git a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py deleted file mode 100644 index 9231ff0c..00000000 --- a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py +++ /dev/null @@ -1,233 +0,0 @@ - -# Calculate depth for each node -def calculate_depth(node_id, node_info, visited=None): - """Calculates the depth of each node.""" - - if visited is None: - visited = set() - if node_id in visited: - return node_info[node_id]['depth'] - visited.add(node_id) - - max_input_depth = -1 - inputs = node_info[node_id]['inputs'] - - for main_id in inputs['main']: - max_input_depth = max(max_input_depth, calculate_depth(main_id, node_info, visited)) - if inputs['left']: - max_input_depth = max(max_input_depth, calculate_depth(inputs['left'], node_info, visited)) - if inputs['right']: - max_input_depth = max(max_input_depth, calculate_depth(inputs['right'], node_info, visited)) - - node_info[node_id]['depth'] = max_input_depth + 1 - return node_info[node_id]['depth'] - - -# Trace paths from each root -def trace_path(node_id, node_info, merge_points, current_path=None): - """Define the trace of each node path""" - if current_path is None: - current_path = [] - - current_path = current_path + [node_id] - outputs = node_info[node_id]['outputs'] - - if not outputs: - # End of path - return [current_path] - - # If this node has multiple outputs or connects to a merge point, branch - all_paths = [] - for output_id in outputs: - if output_id in merge_points and len(merge_points[output_id]) > 1: - # This is a merge point, end this path here - all_paths.append(current_path + [output_id]) - else: - # Continue the path - all_paths.extend(trace_path(output_id, node_info, merge_points, current_path)) - - return all_paths - -def build_node_info(nodes): - """Builds node information used to construct the graph tree.""" - - node_info = {} - - for node in nodes: - node_id = node.node_id - - # Get node label - operation = node.node_type.replace("_", " ").title() if node.node_type else "Unknown" - label = f"{operation} (id={node_id})" - if hasattr(node, 'setting_input') and hasattr(node.setting_input, 'description'): - if node.setting_input.description: - desc = node.setting_input.description - if len(desc) > 20: # Truncate long descriptions - desc = desc[:17] + "..." - label = f"{operation} ({node_id}): {desc}" - - # Get inputs and outputs - inputs = { - 'main': [n.node_id for n in (node.node_inputs.main_inputs or [])], - 'left': node.node_inputs.left_input.node_id if node.node_inputs.left_input else None, - 'right': node.node_inputs.right_input.node_id if node.node_inputs.right_input else None - } - outputs = [n.node_id for n in node.leads_to_nodes] - - node_info[node_id] = { - 'label': label, - 'short_label': f"{operation} ({node_id})", - 'inputs': inputs, - 'outputs': outputs, - 'depth': 0 - } - - return node_info - -def group_nodes_by_depth(node_info:dict): - """Groups each node by depth""" - - depth_groups = {} - max_depth = 0 - for node_id, info in node_info.items(): - depth = info['depth'] - max_depth = max(max_depth, depth) - if depth not in depth_groups: - depth_groups[depth] = [] - depth_groups[depth].append(node_id) - - return depth_groups, max_depth - -def define_node_connections(node_info:dict): - """Defines node connections to merge""" - - merge_points = {} # target_id -> list of source_ids - for node_id, info in node_info.items(): - for output_id in info['outputs']: - if output_id not in merge_points: - merge_points[output_id] = [] - merge_points[output_id].append(node_id) - - return merge_points - -def build_flow_paths(node_info:dict, flow_starts, merge_points:dict): - """Build the flow paths to be drawn""" - paths = [] # List of paths through the graph - visited_in_paths = set() - - # Find all root nodes (no inputs) - root_nodes = [nid for nid, info in node_info.items() - if not info['inputs']['main'] and not info['inputs']['left'] and not info['inputs']['right']] - - if not root_nodes and flow_starts: - root_nodes = [n.node_id for n in flow_starts] - - # Get all paths - for root_id in root_nodes: - paths.extend(trace_path(root_id, node_info, merge_points)) - - return paths - -def group_paths(paths:list, merge_points:dict): - """Groups each node path.""" - paths_by_merge = {} - standalone_paths = [] - - for path in paths: - if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1: - merge_id = path[-1] - if merge_id not in paths_by_merge: - paths_by_merge[merge_id] = [] - paths_by_merge[merge_id].append(path) - else: - standalone_paths.append(path) - return paths_by_merge, standalone_paths - -def draw_merged_paths(node_info:dict, merge_points:dict, paths_by_merge:dict, merge_drawn:set, drawn_nodes:set, lines:str): - """Draws paths for each node that merges.""" - for merge_id, merge_paths in paths_by_merge.items(): - if merge_id in merge_drawn: - continue - - merge_info = node_info[merge_id] - sources = merge_points[merge_id] - - # Draw each source path leading to the merge - for i, source_id in enumerate(sources): - # Find the path containing this source - source_path = None - for path in merge_paths: - if source_id in path: - source_path = path[:path.index(source_id) + 1] - break - - if source_path: - # Build the line for this path - line_parts = [] - for j, nid in enumerate(source_path): - if j == 0: - line_parts.append(node_info[nid]['label']) - else: - line_parts.append(f" ──> {node_info[nid]['short_label']}") - - # Add the merge arrow - if i == 0: - # First source - line = "".join(line_parts) + " ─────┐" - lines.append(line) - elif i == len(sources) - 1: - # Last source - line = "".join(line_parts) + " ─────┴──> " + merge_info['label'] - lines.append(line) - - # Continue with the rest of the path after merge - remaining = node_info[merge_id]['outputs'] - while remaining: - next_id = remaining[0] - lines[-1] += f" ──> {node_info[next_id]['label']}" - remaining = node_info[next_id]['outputs'] - drawn_nodes.add(next_id) - else: - # Middle sources - line = "".join(line_parts) + " ─────┤" - lines.append(line) - - for nid in source_path: - drawn_nodes.add(nid) - - drawn_nodes.add(merge_id) - merge_drawn.add(merge_id) - lines.append("") # Add spacing between merge groups - return paths_by_merge - -def draw_standalone_paths(drawn_nodes:set, standalone_paths:list, lines:str, node_info:dict): - """ Draws paths that do not merge.""" - # Draw standalone paths - for path in standalone_paths: - if all(nid in drawn_nodes for nid in path): - continue - - line_parts = [] - for i, node_id in enumerate(path): - if node_id not in drawn_nodes: - if i == 0: - line_parts.append(node_info[node_id]['label']) - else: - line_parts.append(f" ──> {node_info[node_id]['short_label']}") - drawn_nodes.add(node_id) - - if line_parts: - lines.append("".join(line_parts)) - -def add_undrawn_nodes(drawn_nodes:set, node_info:dict, lines:str): - """Adds isolated nodes if exists.""" - # Add any remaining undrawn nodes - for node_id in node_info: - if node_id not in drawn_nodes: - lines.append(node_info[node_id]['label'] + " (isolated)") - - lines.append("") - lines.append("=" * 80) - lines.append("Execution Order") - lines.append("=" * 80) - diff --git a/flowfile_core/tests/flowfile/print_tree/test_print_tree.py b/flowfile_core/tests/flowfile/print_tree/test_print_tree.py index a03f609f..614e7d92 100644 --- a/flowfile_core/tests/flowfile/print_tree/test_print_tree.py +++ b/flowfile_core/tests/flowfile/print_tree/test_print_tree.py @@ -62,7 +62,7 @@ def test_graph_tree(capsys): [280, 260, 230] ] - for i,j in zip([manual_input_data_1,manual_input_data_2,manual_input_data_3,manual_input_data_4], range(1,5)): + for i, j in zip([manual_input_data_1, manual_input_data_2, manual_input_data_3, manual_input_data_4], range(1, 5)): data = input_schema.NodeManualInput( flow_id=1, node_id=j, diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index b4274aa3..3177447e 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2379,105 +2379,6 @@ def test_aggregation_functions(): expected_df = flow.get_node(2).get_resulting_data().data_frame assert_frame_equal(result_df, expected_df, check_row_order=False) -def test_graph_tree(capsys): - """ Test made to the FlowGraph's print_tree method""" - - flow = create_basic_flow() - - manual_input_data_1 = [ - [1, 2, 3], - ["North", "South", "East"], - [10, 5, 8], - [150, 300, 200] - ] - - manual_input_data_2 = [ - [4, 5, 6], - ["West", "North", "South"], - [15, 12, 7], - [100, 200, 250] - ] - - manual_input_data_3 = [ - [7, 8, 9], - ["East", "West", "North"], - [10, 6, 3], - [180, 220, 350] - ] - - manual_input_data_4 = [ - [10, 11, 12], - ["South", "East", "West"], - [9, 4, 7], - [280, 260, 230] - ] - - for i,j in zip([manual_input_data_1,manual_input_data_2,manual_input_data_3,manual_input_data_4], range(1,5)): - data = input_schema.NodeManualInput( - flow_id=1, - node_id=j, - raw_data_format=input_schema.RawData( - columns=[ - input_schema.MinimalFieldInfo(name="id", data_type="Integer"), - input_schema.MinimalFieldInfo(name="region", data_type="String"), - input_schema.MinimalFieldInfo(name="quantity", data_type="Integer"), - input_schema.MinimalFieldInfo(name="price", data_type="Integer") - ], - data=i - ) - ) - flow.add_manual_input(data) - - # Add union node - union_node = input_schema.NodeUnion( - flow_id=1, - node_id=5, - depending_on_ids=[1, 2, 3, 4], - union_input=transform_schema.UnionInput(mode="relaxed") - ) - flow.add_union(union_node) - for i in range(1, 5): - connection = input_schema.NodeConnection.create_from_simple_input(i, 5, 'main') - add_connection(flow, connection) - - # Add group by node - groupby_node = input_schema.NodeGroupBy( - flow_id=1, - node_id=6, - depending_on_id=1, - groupby_input=transform_schema.GroupByInput( - agg_cols=[ - transform_schema.AggColl("region", "groupby"), - transform_schema.AggColl("quantity", "sum", "total_quantity"), - transform_schema.AggColl("price", "mean", "avg_price"), - transform_schema.AggColl("quantity", "count", "num_transactions") - ] - ) - ) - flow.add_group_by(groupby_node) - add_connection(flow, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) - - code = export_flow_to_polars(flow) - - verify_code_contains(code, - "df_1 = pl.LazyFrame(", - "df_2 = pl.LazyFrame(", - "df_3 = pl.LazyFrame(", - "df_4 = pl.LazyFrame(", - "df_5 = pl.concat(", - "df_6 = df_5.group_by(", - ) - verify_if_execute(code) - result_df = get_result_from_generated_code(code) - expected_df = flow.get_node(6).get_resulting_data().data_frame - assert_frame_equal(result_df, expected_df, check_row_order=False) - - flow.print_tree() - stdout = capsys.readouterr().out - - tree_elements = ["(id=", ">", "1.", "Execution Order", "Flow Graph Visualization", "="] - for element in tree_elements: - assert element in stdout def test_flow_with_disconnected_nodes(): """Test a flow where some nodes might not be connected properly"""